Are there any mature structured-concurrency libraries for OCaml, similar to dill or trio?
If you want concurrency with OCaml, youâd better stick to monadic concurrency, aka lwt or async. Both are mature and battle-tested.
Iâm aware of lwt and async. I mean something like this.
Is it some specific semantics or the direct coding style that you are after? Correct me if Iâm wrong, but I would think that most of the time you get about the same semantics with Lwt.
Thatâs not a great example. The examples here go into more detail on structured concurrency.
You can write structured-concurrency programs with a library like Lwt, just like you can write structured programs in languages with âgoto line n,â but it takes discipline. Iâm looking for a concurrency library which gives structured-concurrency primitives, just like languages like OCaml give structured control-flow primitives like functions, while loops, if/then, etc.
Iâm curious - which one of the examples from Dill do you think needs discipline to implement in Lwt?
The examples are necessarily small. It doesnât take any discipline to implement a for-loop demonstration in GW-BASIC, either, but thatâs not the point of structured programming.
Small examples usually hint at why something has some better qualities, e.g. restricts you in writing wrong code. You donât think that there are any of those examples that hint at the advantages of structured programming? Seems like badly chosen examples?
I had a look at the libdill examples. The second last example can be translated to lwt as follows, where I added some extra output and reduced timeouts:
(* Parent coroutine closes multiple child coroutines - with a grace period *)
open Lwt.Infix
let worker i =
try%lwt
Lwt_unix.sleep (Random.float 10.0) >>= fun () ->
Lwt_io.printlf "Promise %d done." i
with Lwt.Canceled ->
Lwt_io.printlf "Promise %d cancelled." i
let () = Lwt_main.run begin
Lwt_unix.with_timeout 5.0 (fun () -> Lwt.join (List.init 3 worker))
end
with dune file:
(executables
(names usecase5)
(libraries lwt lwt.unix)
(preprocess (pps lwt_ppx)))
The former examples are just limited versions of this, and about the last one: I am not sure how the worker
in the correspond to the intention as described in the the text (the sleep payload comes before the broadcast wait I donât see the purpose of the loop), but the broadcast semantics of the pipes can be done with Lwt_condition
.
Considering the definition âStructured concurrency means that lifetimes of concurrent functions are cleanly nested.â, it is not surprising that the examples maps cleanly to a monadic concurrency library: The bind operator both fulfils associativity relations, allowing it to be used in nested functions, and has a sequencing semantics, delimiting the lifetimes according to the call graph.
I interpret the worker in the last example as:
- The
msleep
is the work to be done, for some iteration - The work can either exit prematurely by being canceled by parent (
rc < 0 && errno == ECANCELED
), where it just returns - Or the work can be done for the current iteration, and there is then a check if there is broadcast a shutdown, for deciding to continue/stop working in infinite loop.
The broadcast can happen at any point in the infinite cycle of work, but will only be acted upon when some iterations work is done (for e.g. finishing up handling a client, unless this takes too much time).
Edit: I see your point; âfinishing up the workâ doesnât depend on the shutdown in the example, it just âis done, naturallyâ when finishing an iteration, which is not what the text stated.
Thereâs a more complete example in this talk, briefly discussed in this section of an essay about a structured-concurrency library for python.
For example, consider the Happy Eyeballs algorithm (RFC 8305), which is a simple concurrent algorithm for speeding up the establishment of TCP connections. Conceptually, the algorithm isnât complicated â you race several connection attempts against each other, with a staggered start to avoid overloading the network. But if you look at Twistedâs best implementation, itâs almost 600 lines of Python, and still has at least one logic bug. The equivalent in Trio is more than 15x shorter. More importantly, using Trio I was able to write it in minutes instead of months, and I got the logic correct on my first try. I never could have done this in any other framework, even ones where I have much more experience. For more details, you can watch my talk at Pyninsula last month. Is this typical? Time will tell. But itâs certainly promising.
open Lwt.Infix
let connect ip =
Lwt_io.printlf "connecting to ip: %s" ip >>= fun () ->
Lwt_unix.sleep @@ 0.5 +. Random.float 9.5 >|= fun () ->
Some ip
let rec happy_eye = function
| [] ->
let timeout = 3.0 in
Lwt_unix.sleep timeout >|= fun () -> None
| ip :: rest ->
Lwt.pick [
connect ip;
Lwt_unix.sleep 0.2 >>= fun () -> happy_eye rest
]
let _ =
Random.self_init ();
let ips = [ "1"; "2"; "3"; "4" ] in
Lwt_main.run begin
happy_eye ips >>= function
| None -> Lwt_io.printl "timeout"
| Some ip -> Lwt_io.printlf "response from ip: %s" ip
end
The connect
function could also fail before the timeout in which case the next attempt should be scheduled immediately. That is, assuming connect
returns None
in such case,
let rec happy_eye = function
| [] -> Lwt.return_none
| ip :: rest ->
let attempt = connect ip in
let attempt_or_timeout =
Lwt.choose [attempt; Lwt_unix.sleep 0.2 >|= fun () -> None] in
Lwt.pick [
attempt;
(attempt_or_timeout >>= function
| None -> happy_eye rest
| Some _ -> Lwt.return_none);
]
To those not familiar with Lwt, Lwt.pick
returns the first successful promise and cancels other, while Lwt.choose
returns the first successful promise and leaves the others running.
Correction: Replace Lwt.pick
with pick_some
defined below.
I think the Lwt.pick
part is a bit subtle in your version. It works, but is it well defined within Lwt semantics, that itâs the thread that âyields the leastâ that is picked?
Specifically: if you replace the first Lwt.pick
thread with
(attempt >>= fun v -> Lwt_unix.yield () >|= fun () -> v)
⌠then the result will be None
when attempt
returns Some v
. It should just return Some v
when attempt_or_timeout
returns Some v
.
Also, as you removed the timeout from the []
case, then the last ip will only have 0.2 seconds to resolve, else None
is returned.
You are right. I was too hasty rephrasing your original []
case, and also connect
would have to sleep rather than returning None
. OTOH, to fix my version, we could introduce a new combinator:
val pick_some : 'a option Lwt.t list -> 'a option Lwt.t
(** [pick_some ps] returns a promise that is pending until either of [ps]
resolves to [Some x], in which case [Some x] is returned, or all of [ps]
resolves to [None] in which case [None] is returned. *)
let rec pick_some ps =
if ps = [] then Lwt.return_none else
let not_none p = Lwt.state p <> Lwt.Return None in
Lwt.choose ps >>= function
| None -> pick_some (List.filter not_none ps)
| Some _ as r -> List.iter Lwt.cancel ps; Lwt.return r
I made the variants explicit. It turns out that thereâs needed more logic for handling the failure case (thank you polymorphic variants ).
open Lwt.Infix
let connect ip =
Lwt_io.printlf "connecting to ip: %s" ip >>= fun () ->
Lwt_unix.sleep @@ 0.5 +. Random.float 5.5 >>= fun () ->
if Random.float 1. > 0.3 then
Lwt_io.printlf "failed connecting to ip: %s" ip >>= fun () ->
Lwt.return @@ `Failure ip
else
Lwt.return @@ `Response ip
let never () = Lwt_mvar.(create_empty () |> take)
let rec happy_eye = function
| [] -> Lwt_unix.sleep 3.0 >|= fun () -> `Timeout
| ip :: rest ->
let attempt = connect ip
and impatient = Lwt_unix.sleep 0.2 >|= fun () -> `Impatient in
Lwt.pick [
(attempt >>= function
| `Failure _ -> never ()
| `Response _ as response -> Lwt.return response
);
(Lwt.choose [attempt; impatient] >>= function
| `Impatient | `Failure _ -> happy_eye rest
| `Response _ -> never ()
);
]
let _ =
Random.self_init ();
let ips = [ "1"; "2"; "3"; "4" ] in
Lwt_main.run begin
happy_eye ips >>= function
| `Timeout -> Lwt_io.printl "timeout"
| `Response ip -> Lwt_io.printlf "response from ip: %s" ip
end
Edit: I see now that this logic is actually equivalent with your pick_some
@paurkedal , but I like the explicitness
Edit: Otoh, the advantage with pick_some
is that one canât make an error by inserting the never
thread in the wrong place