[ANN] moonpool 0.9, moonpool-lwt 0.9

Hello camels,

I’m happy to announce the release of moonpool 0.9 and moonpool-lwt 0.9. Moonpool is a concurrency and parallelism library that provides lightweight fibers and a concept of Runner.t that they can be dispatched on. Multiple runners can co-exist inside a program.

This release is a fairly large one. First, Moonpool now requires OCaml >= 5.0 (no more 4.xx compat), which removes the need for a preprocessor and makes await generally available on every Runner.t. Some sub-libraries are now deprecated (moonpool-io, moonpool.sync in favor of picos, etc.).

The biggest improvement is moonpool-lwt. It now wraps Lwt_main.run and turns it into a Moonpool.Runner.t, meaning that Lwt, Lwt_io, Lwt_unix, and all the libraries built on top can now directly be used from Moonpool. Lwt promises can be turned into moonpool futures and conversely; fibers can be spawned in the Lwt_engine loop from any thread (to perform IO and call lwt libraries) and be awaited from other threads, too.

Documentation: index (moonpool.index) , Moonpool_lwt (moonpool-lwt.Moonpool_lwt)

Example echo server

module M_lwt = Moonpool_lwt

let ( let@ ) = ( @@ )
let str_of_sockaddr = function
  | Unix.ADDR_UNIX s -> s
  | Unix.ADDR_INET (addr, port) ->
    Printf.sprintf "%s:%d" (Unix.string_of_inet_addr addr) port


let main ~port () : unit =
  (* never resolved *)
  let lwt_fut, _lwt_prom = Lwt.wait () in

  let handle_client client_addr (ic, oc) : _ Lwt.t =
    (* spawn a new fiber in the lwt thread *)
    let@ () = M_lwt.spawn_lwt in
    Printf.printf "got new client on %s\n%!" (str_of_sockaddr client_addr);

    let buf = Bytes.create 1024 in
    let continue = ref true in
    while !continue do
      let n = Lwt_io.read_into ic buf 0 (Bytes.length buf) |> M_lwt.await_lwt in
      if n = 0 then
        continue := false
      else (
        Lwt_io.write_from_exactly oc buf 0 n |> M_lwt.await_lwt;
        Lwt_io.flush oc |> M_lwt.await_lwt;
      )
    done;
    Printf.printf "done with client on %s\n%!" (str_of_sockaddr client_addr);
  in

  Printf.printf "listening on port=%d\n%!" port;
  let addr = Unix.ADDR_INET (Unix.inet_addr_any, port) in
  let _server =
    Lwt_io.establish_server_with_client_address addr handle_client
    |> M_lwt.await_lwt
  in

  M_lwt.await_lwt lwt_fut (* never returns *)

let () =
  let port = ref 1234 in
  let opts =
    [
      "-p", Arg.Set_int port, " port";
    ]
    |> Arg.align
  in
  Arg.parse opts ignore "echo server";

  M_lwt.lwt_main @@ fun _ -> main ~port:!port ()

Run it as echo_server -p 1234 and use nc localhost 1234 to connect. It will echo lines sent to it.

We can reuse Lwt_io.establish_server_with_client_address just fine, and use direct style to implement the client handler inside a single Moonpool fiber (via Moonpool_lwt.spawn_lwt that runs its argument in the lwt event loop).

Small server with a thread pool for compute

a variation on the previous one, with a thread pool on which CPU bound tasks can be run:

module M_lwt = Moonpool_lwt

let ( let@ ) = ( @@ )
let str_of_sockaddr = function
  | Unix.ADDR_UNIX s -> s
  | Unix.ADDR_INET (addr, port) ->
    Printf.sprintf "%s:%d" (Unix.string_of_inet_addr addr) port

(* don't do this at home *)
let rec fib n =
  if n <= 2 then 1 else fib (n-1) + fib (n-2)

let main ~port ~tpool () : unit =
  (* never resolved *)
  let lwt_fut, _lwt_prom = Lwt.wait () in

  let handle_client client_addr (ic, oc) : _ Lwt.t =
    (* spawn a new fiber in the lwt thread *)
    let@ () = M_lwt.spawn_lwt in
    Printf.printf "got new client on %s\n%!" (str_of_sockaddr client_addr);

    let continue = ref true in
    while !continue do
      match Lwt_io.read_line ic |> M_lwt.await_lwt with
      | exception End_of_file -> continue := false
      | line ->
        let input = int_of_string @@ String.trim line in
        (* run fib(input) in the thread pool and suspend until
           it's done *)
        let fib_input =
          Moonpool.Fut.spawn ~on:tpool (fun () -> fib input)
          |> Moonpool.Fut.await
        in
  
        Lwt_io.write oc (Printf.sprintf "%d\n" fib_input)
          |> M_lwt.await_lwt;
        Lwt_io.flush oc |> M_lwt.await_lwt;
    done;

    Printf.printf "done with client on %s\n%!" (str_of_sockaddr client_addr);
  in

  Printf.printf "listening on port=%d\n%!" port;
  let addr = Unix.ADDR_INET (Unix.inet_addr_any, port) in
  let _server =
    Lwt_io.establish_server_with_client_address addr handle_client
    |> M_lwt.await_lwt
  in

  M_lwt.await_lwt lwt_fut (* never returns *)

let () =
  let port = ref 1234 in
  let j = ref 8 in
  let opts =
    [
      "-j", Arg.Set_int j, " thread pool size";
      "-p", Arg.Set_int port, " port";
    ]
    |> Arg.align
  in
  Arg.parse opts ignore "echo server";

  let@ tpool = Moonpool.Ws_pool.with_ ~num_threads:!j () in
  M_lwt.lwt_main @@ fun _ -> main ~port:!port ~tpool ()

Note how the computation is done by starting a task in the tpool argument (a moonpool Runner.t provided to the main, by default a work stealing pool of 8 threads that can be set via -j <number of threads>) and then await-ed from the lwt handler. While the computation is running, the lwt client handler is suspended and doesn’t prevent other clients from making progress.

To test this one, use nc localhost 1234 and write (small) integers to get fib(n) computed. To see it work in parallel, open top or htop and run:

for i in `seq 1 200`; do nc localhost 1234 <<< '35' &  done
19 Likes

What would be the standard way of using Moonpool (system threads) with tiny_httpd? I found https://github.com/c-cube/tiny-httpd-moonpool-bench/blob/main/examples/t1.ml but I can only guess that it uses obsolete APIs because it doesn’t match any documentation. It opens Moonpool but doesn’t seem to use it, using Pool which I can’t find in https://c-cube.github.io/moonpool/moonpool/Moonpool/index.html but is in tiny_httpd itself, however it calls Pool.create with the wrong arguments and Pool.size which doesn’t exist:

let pool = Pool.create ~min:!j ~per_domain:!min_per_dom () in
let server =
  H.create ~max_connections:2048 ~port:!port ~new_thread:(Pool.run pool) ()
in
(* ... *)
Printf.printf "listening on http://127.0.0.1:%d (%d threads)\n%!" !port
  (Pool.size pool);

I need a simple thread pool (no fibers/lwt/etc.) but spread over a handful of domains, so I naturally thought of Moonpool especially since you’re the author of both, but an example would be great. :slight_smile:

(Side note: is using Jemalloc still required for long-running OCaml processes? I found very little chatter about it online. Ahrefs’ Liquidsoap is the only known dependent.)

We are using jemalloc extensively for https://www.xenserver.com/.

1 Like

I updated the code to latest versions of tiny_httpd and moonpool :slight_smile: . It wasn’t a big change.

And yes I also use jemalloc for servers.

2 Likes