Module Fuseau_lwt

Interoperability between Fuseau and Lwt.

This combines Fuseau's fibers with the Lwt event loop (ie Lwt_engine) to deal with timers and file descriptors readiness. In essence, one can use fibers and structured concurrency from Fuseau alongside Lwt libraries and Lwt IO operations.

Re-export of fuseau

include module type of struct include Fuseau end

Foundations

module Fiber_handle = Fuseau.Fiber_handle

The unique name of a fiber

module Event = Fuseau.Event

Atomic events.

type 'ret branch = 'ret Event.branch =
  1. | When : 'a Event.t * ('a -> 'ret) -> 'ret branch
val select : 'ret branch list -> 'ret

Synchronization

module Chan = Fuseau.Chan

Basic channels

Utils

module Exn_bt = Fuseau.Exn_bt

Exception with backtrace

module Time = Fuseau.Time

Time measurement

module Fiber = Fuseau.Fiber

Fibers.

module FLS = Fuseau.FLS

Fiber-local storage.

IO event loop

exception Inactive

Exception raised when trying to perform operations on the scheduler after it's been disposed of

module Scheduler = Fuseau.Scheduler

Scheduler that runs fibers.

module Event_loop = Fuseau.Event_loop

Resource management

module Resource_pool = Fuseau.Resource_pool

Resource pool.

module Buf_pool = Fuseau.Buf_pool

A pool of buffers to reuse.

module Cancel_handle = Fuseau.Cancel_handle

IO streams

Sleep

val sleep_s : float -> unit

Put the current fiber to sleep for that many seconds.

Re-exports

exception Timeout

Exception used for cancellation caused by timeout

val await : 'a Fiber.t -> 'a

Wait for the fiber to terminate, and return the result. If the fiber failed, this re-raises the exception.

This must be called from inside another fiber, which will be suspended if needed.

val try_await : 'a Fiber.t -> 'a Exn_bt.result

Like await but catches exceptions.

val cancel_after_s : float -> unit

Cancel the current fiber after delay seconds, unless the fiber terminates first. The cancellation will use the Timeout exception.

val ev_timeout : float -> 'a Event.t

ev_timeout duration is an event that resolves after duration seconds with a Error Timeout error

val ev_deadline : float -> 'a Event.t

ev_deadline time is an event that resolves at monotonic time t with a Error Timeout error

val with_cancel_callback : (Exn_bt.t -> unit) -> (unit -> 'a) -> 'a

let@ () = with_cancel_callback cb in <e> evaluates e in a scope in which, if the current fiber is cancelled, cb() is called. If e returns without the fiber being cancelled, this callback is removed.

val spawn : ?name:string -> ?propagate_cancel_to_parent:bool -> (unit -> 'a) -> 'a Fiber.t

Must be run from inside the scheduler's thread. Spawn a new computation. This fiber has an implicit parent, which is normally the currently running fiber (the one calling spawn). If the parent fails or is cancelled, the resulting fiber will also be cancelled (parent to child).

  • parameter propagate_cancel_to_parent

    if true (the default), if this fiber fails then the parent fiber will also fail (child to parent).

  • raises Inactive

    if the scheduler is inactive.

val spawn_from_anywhere : ?name:string -> Scheduler.t -> (unit -> 'a) -> 'a Fiber.t

Spawn a task from anywhere, possibly from another thread. The task will run in a subsequent call to run_iteration in the scheduler's thread. Thread-safe, more costly than spawn. Runs under the root switch.

  • raises Inactive

    if the scheduler is inactive.

val spawn_as_child_of : ?name:string -> ?propagate_cancel_to_parent:bool -> Scheduler.t -> _ Fiber.t -> (unit -> 'a) -> 'a Fiber.t

Spawn a fiber in the given parent fiber's scope. See spawn for more details on the arguments

val schedule_micro_task : (unit -> unit) -> unit

Must be run from inside a Scheduler.t's thread. Schedules a microtask that will run in this tick. Be careful not to create infinite sequences of micro tasks that starve the IO loop!

These microtasks do not handle effects and should try their best to not raise exceptions. Only use them for very short amount of work.

Not thread-safe.

  • raises Inactive

    if the scheduler is inactive.

val yield : unit -> unit

yield () returns control to the scheduler and checks for cancellation. This must be called from a fiber.

val get_scheduler : unit -> Scheduler.t

This returns the scheduler on which the caller runs. It must be called from inside a fiber, or from inside the thread running main.

  • raises Failure

    if not run from inside a fiber or the thread running main.

Main loop.

This is the loop that runs fibers that are ready, and the IO event loop, in an interleaved way.

Interop with Lwt

Here we have functions that are used to cross the boundary between the Fuseau world and the Lwt world.

val await_lwt : 'a Lwt.t -> 'a

Like Fuseau.await but on a Lwt promise.

val spawn_as_lwt : ?parent:_ Fiber.t -> ?name:string -> ?propagate_cancel_to_parent:bool -> (unit -> 'a) -> 'a Lwt.t

spawn_as_lwt f runs f() in the Fuseau+Lwt thread, and returns a Lwt future that resolves when the fiber does

val spawn_as_lwt_from_anywhere : ?name:string -> Scheduler.t -> (unit -> 'a) -> 'a Lwt.t

spawn_from_anywhere scheduler f runs f() as a toplevel fiber in scheduler. It can be run from another thread.

module IO_lwt : sig ... end

IO through Lwt's engine

val ev_read : Unix.file_descr -> bytes -> int -> int -> int Event.t

ev_read fd buf i len is an event that, when ready, will ready at most len bytes from fd into buf[i.. i+len], and return how many bytes were read. It uses Lwt_engine to wait for fd's readiness.

val ev_write : Unix.file_descr -> bytes -> int -> int -> int Event.t

ev_write fd buf i len is an event that, when ready, writes at most len bytes from buf into fd. It uses Lwt_engine to wait for fd's readiness.

module Iostream : sig ... end

Iostream specialized for Lwt

module Net : sig ... end

Networking using Lwt_io

Main loop

val main : (unit -> 'a) -> 'a

Run main loop, using the current Lwt_engine.t.