Moonpool.Fifo_pool
A simple thread pool in FIFO order.
FIFO: first-in, first-out. Basically tasks are put into a queue, and worker threads pull them out of the queue at the other end.
Since this uses a single blocking queue to manage tasks, it's very simple and reliable. The number of worker threads is fixed, but they are spread over several domains to enable parallelism.
This can be useful for latency-sensitive applications (e.g. as a pool of workers for network servers). Work-stealing pools might have higher throughput but they're very unfair to some tasks; by contrast, here, older tasks have priority over younger tasks.
include module type of Runner
A runner.
If a runner is no longer needed, shutdown
can be used to signal all worker threads in it to stop (after they finish their work), and wait for them to stop.
The threads are distributed across a fixed domain pool (whose size is determined by Domain.recommended_domain_count
on OCaml 5, and simple the single runtime on OCaml 4).
val size : t -> int
Number of threads/workers.
val num_tasks : t -> int
Current number of tasks. This is at best a snapshot, useful for metrics and debugging.
val shutdown : t -> unit
Shutdown the runner and wait for it to terminate. Idempotent.
val shutdown_without_waiting : t -> unit
Shutdown the pool, and do not wait for it to terminate. Idempotent.
run_async pool f
schedules f
for later execution on the runner in one of the threads. f()
will run on one of the runner's worker threads/domains.
val run_wait_block : t -> (unit -> 'a) -> 'a
run_wait_block pool f
schedules f
for later execution on the pool, like run_async
. It then blocks the current thread until f()
is done executing, and returns its result. If f()
raises an exception, then run_wait_block pool f
will raise it as well.
NOTE be careful with deadlocks (see notes in Fut.wait_block
about the required discipline to avoid deadlocks).
module For_runner_implementors : sig ... end
This module is specifically intended for users who implement their own runners. Regular users of Moonpool should not need to look at it.
val get_current_runner : unit -> t option
Access the current runner. This returns Some r
if the call happens on a thread that belongs in a runner.
type ('a, 'b) create_args =
?on_init_thread:(dom_id:int -> t_id:int -> unit -> unit) ->
?on_exit_thread:(dom_id:int -> t_id:int -> unit -> unit) ->
?on_exn:(exn -> Stdlib.Printexc.raw_backtrace -> unit) ->
?around_task:((t -> 'b) * (t -> 'b -> unit)) ->
?num_threads:int ->
'a
val create : (unit -> t, _) create_args
create ()
makes a new thread pool.
val with_ : (unit -> (t -> 'a) -> 'a, _) create_args