Tiny educational concurrent I/O and promises library

Yo!

I like Lwt. It’s a fantastic library, but how does it work? I spent a few days studying its source code.

Finally, inspired by the implementation of Lwt and the CS3110 chapter, 8.7. Promises. I wrote a maximally stupid tiny-async-lib library.

Maybe you may be interested in this naive implementation.

Examples of use

let () = 
  Engine.run main begin
    let* () = Io.(write_all stdout) "Hi! What's your name? " in
    let* name = Io.(read_line stdin) in
    Io.(write_all stdout) ("Hello, " ^ name ^ "!\n")
  end
let read_and_print_file filename = 
  Io.(read_file filename >>= write_all stdout)

let _ =
  Engine.run begin
    let filenames = [ (* ... *) ] in  

    filenames
    |> List.map read_and_print_file
    |> Promise.join
  end

Implementation details

The first key abstraction of the whole library is, of course, Promise. Promise is an abstraction for synchronizing program execution in concurrent evaluations. In simple terms, it’s an abstraction over callbacks. Promises allows us to build (monadic) sequence evaluations inside of non-sequence evaluations.

(* promise.ml *)

type 'a t = { mutable state: 'a state }  

and 'a state = 
  | Fulfilled of 'a 
  | Rejected of exn
  | Pending of 'a callback list 

and 'a callback = 'a state -> unit 

Promises are represented as a mutable record with three possible states: fulfilled (contains a value), rejected (contains an exception), and pending (contains callbacks).

Callbacks are functions that are called when a promise is resolved.
So when we bind, if the promise is in pending state, we add a callback that calls the following monadic sequence when the promise is resolved.

(* io.ml *)

let sleep delay =
  let p, r = Promise.make () in

  Engine.(on_timer instance) delay (fun handler ->
      Engine.stop_handler handler;
      Promise.fulfill r ());

  p

The second key abstraction is an asynchronous I/O engine that polls I/O events and dispatches them to handlers. Original Lwt has few engines (like based libev, select, poll), but I hardcoded a select-based engine inspired by Lwt_engine.select_based.

The typical async engine in internals has an event loop. At each iteration of the event loop, the engine polls for new events and calls handlers to handle them.

(* engine.ml *)

let iter engine =
  (* ... *)

  let readable_fds, writable_fds, _ =
    Unix.select readable_fds writable_fds [] timeout
  in

  engine.sleepers <- restart_sleepers now engine.sleepers;

  invoke_io_handlers engine.wait_readable readable_fds;
  invoke_io_handlers engine.wait_writable writable_fds

How to execute I/O promise? It’s not a big deal.

(* engine.ml *)

let rec run promise =
  match Promise.state promise with
  | Fulfilled value -> value
  | Rejected exc -> raise exc
  | Pending _ ->
      iter instance;
      run promise

We just need to loop the event loop until the promis is resolved.


It’s just a toy! I’m not an expert in such things, so the library is very naive and tries to mimic Lwt. But I think it’s a good demonstration.

Repository
https://github.com/dx3mod/tiny-async-lib

Thank you for your attention!

6 Likes

This is a really nice library for anyone who wants to build a mental model of Lwt! Thanks for writing it and for sharing it <3

1 Like

I did also a little tutorial which explains how to implement a scheduler with effects. It’s available here: A simple scheduler - Miou, a simple scheduler for OCaml 5

I would like to mention this great paper: Lwt: a Cooperative Thread Library which is really accessible to fully understand the core of lwt.

6 Likes

Thanks to all of you for your comments and links!

I did a big refactoring and wrote more details in the README for those interested in the topic.