Hello camels,
I’m happy to announce the release of moonpool 0.9 and moonpool-lwt 0.9. Moonpool is a concurrency and parallelism library that provides lightweight fibers and a concept of Runner.t that they can be dispatched on. Multiple runners can co-exist inside a program.
This release is a fairly large one. First, Moonpool now requires OCaml >= 5.0 (no more 4.xx compat), which removes the need for a preprocessor and makes await generally available on every Runner.t. Some sub-libraries are now deprecated (moonpool-io, moonpool.sync in favor of picos, etc.).
The biggest improvement is moonpool-lwt. It now wraps Lwt_main.run and turns it into a Moonpool.Runner.t, meaning that Lwt, Lwt_io, Lwt_unix, and all the libraries built on top can now directly be used from Moonpool. Lwt promises can be turned into moonpool futures and conversely; fibers can be spawned in the Lwt_engine loop from any thread (to perform IO and call lwt libraries) and be awaited from other threads, too.
Documentation: index (moonpool.index) , Moonpool_lwt (moonpool-lwt.Moonpool_lwt)
Example echo server
module M_lwt = Moonpool_lwt
let ( let@ ) = ( @@ )
let str_of_sockaddr = function
| Unix.ADDR_UNIX s -> s
| Unix.ADDR_INET (addr, port) ->
Printf.sprintf "%s:%d" (Unix.string_of_inet_addr addr) port
let main ~port () : unit =
(* never resolved *)
let lwt_fut, _lwt_prom = Lwt.wait () in
let handle_client client_addr (ic, oc) : _ Lwt.t =
(* spawn a new fiber in the lwt thread *)
let@ () = M_lwt.spawn_lwt in
Printf.printf "got new client on %s\n%!" (str_of_sockaddr client_addr);
let buf = Bytes.create 1024 in
let continue = ref true in
while !continue do
let n = Lwt_io.read_into ic buf 0 (Bytes.length buf) |> M_lwt.await_lwt in
if n = 0 then
continue := false
else (
Lwt_io.write_from_exactly oc buf 0 n |> M_lwt.await_lwt;
Lwt_io.flush oc |> M_lwt.await_lwt;
)
done;
Printf.printf "done with client on %s\n%!" (str_of_sockaddr client_addr);
in
Printf.printf "listening on port=%d\n%!" port;
let addr = Unix.ADDR_INET (Unix.inet_addr_any, port) in
let _server =
Lwt_io.establish_server_with_client_address addr handle_client
|> M_lwt.await_lwt
in
M_lwt.await_lwt lwt_fut (* never returns *)
let () =
let port = ref 1234 in
let opts =
[
"-p", Arg.Set_int port, " port";
]
|> Arg.align
in
Arg.parse opts ignore "echo server";
M_lwt.lwt_main @@ fun _ -> main ~port:!port ()
Run it as echo_server -p 1234 and use nc localhost 1234 to connect. It will echo lines sent to it.
We can reuse Lwt_io.establish_server_with_client_address just fine, and use direct style to implement the client handler inside a single Moonpool fiber (via Moonpool_lwt.spawn_lwt that runs its argument in the lwt event loop).
Small server with a thread pool for compute
a variation on the previous one, with a thread pool on which CPU bound tasks can be run:
module M_lwt = Moonpool_lwt
let ( let@ ) = ( @@ )
let str_of_sockaddr = function
| Unix.ADDR_UNIX s -> s
| Unix.ADDR_INET (addr, port) ->
Printf.sprintf "%s:%d" (Unix.string_of_inet_addr addr) port
(* don't do this at home *)
let rec fib n =
if n <= 2 then 1 else fib (n-1) + fib (n-2)
let main ~port ~tpool () : unit =
(* never resolved *)
let lwt_fut, _lwt_prom = Lwt.wait () in
let handle_client client_addr (ic, oc) : _ Lwt.t =
(* spawn a new fiber in the lwt thread *)
let@ () = M_lwt.spawn_lwt in
Printf.printf "got new client on %s\n%!" (str_of_sockaddr client_addr);
let continue = ref true in
while !continue do
match Lwt_io.read_line ic |> M_lwt.await_lwt with
| exception End_of_file -> continue := false
| line ->
let input = int_of_string @@ String.trim line in
(* run fib(input) in the thread pool and suspend until
it's done *)
let fib_input =
Moonpool.Fut.spawn ~on:tpool (fun () -> fib input)
|> Moonpool.Fut.await
in
Lwt_io.write oc (Printf.sprintf "%d\n" fib_input)
|> M_lwt.await_lwt;
Lwt_io.flush oc |> M_lwt.await_lwt;
done;
Printf.printf "done with client on %s\n%!" (str_of_sockaddr client_addr);
in
Printf.printf "listening on port=%d\n%!" port;
let addr = Unix.ADDR_INET (Unix.inet_addr_any, port) in
let _server =
Lwt_io.establish_server_with_client_address addr handle_client
|> M_lwt.await_lwt
in
M_lwt.await_lwt lwt_fut (* never returns *)
let () =
let port = ref 1234 in
let j = ref 8 in
let opts =
[
"-j", Arg.Set_int j, " thread pool size";
"-p", Arg.Set_int port, " port";
]
|> Arg.align
in
Arg.parse opts ignore "echo server";
let@ tpool = Moonpool.Ws_pool.with_ ~num_threads:!j () in
M_lwt.lwt_main @@ fun _ -> main ~port:!port ~tpool ()
Note how the computation is done by starting a task in the tpool argument (a moonpool Runner.t provided to the main, by default a work stealing pool of 8 threads that can be set via -j <number of threads>) and then await-ed from the lwt handler. While the computation is running, the lwt client handler is suspended and doesn’t prevent other clients from making progress.
To test this one, use nc localhost 1234 and write (small) integers to get fib(n) computed. To see it work in parallel, open top or htop and run:
for i in `seq 1 200`; do nc localhost 1234 <<< '35' & done