Structured-concurrency libraries

Are there any mature structured-concurrency libraries for OCaml, similar to dill or trio?

If you want concurrency with OCaml, you’d better stick to monadic concurrency, aka lwt or async. Both are mature and battle-tested.

I’m aware of lwt and async. I mean something like this.

Is it some specific semantics or the direct coding style that you are after? Correct me if I’m wrong, but I would think that most of the time you get about the same semantics with Lwt.

That’s not a great example. The examples here go into more detail on structured concurrency.

You can write structured-concurrency programs with a library like Lwt, just like you can write structured programs in languages with “goto line n,” but it takes discipline. I’m looking for a concurrency library which gives structured-concurrency primitives, just like languages like OCaml give structured control-flow primitives like functions, while loops, if/then, etc.

I’m curious - which one of the examples from Dill do you think needs discipline to implement in Lwt?

The examples are necessarily small. It doesn’t take any discipline to implement a for-loop demonstration in GW-BASIC, either, but that’s not the point of structured programming.

Small examples usually hint at why something has some better qualities, e.g. restricts you in writing wrong code. You don’t think that there are any of those examples that hint at the advantages of structured programming? Seems like badly chosen examples?

1 Like

I had a look at the libdill examples. The second last example can be translated to lwt as follows, where I added some extra output and reduced timeouts:

(* Parent coroutine closes multiple child coroutines - with a grace period *)

open Lwt.Infix

let worker i =
  try%lwt
    Lwt_unix.sleep (Random.float 10.0) >>= fun () ->
    Lwt_io.printlf "Promise %d done." i
  with Lwt.Canceled ->
    Lwt_io.printlf "Promise %d cancelled." i

let () = Lwt_main.run begin
  Lwt_unix.with_timeout 5.0 (fun () -> Lwt.join (List.init 3 worker))
end

with dune file:

(executables
  (names usecase5)
  (libraries lwt lwt.unix)
  (preprocess (pps lwt_ppx)))

The former examples are just limited versions of this, and about the last one: I am not sure how the worker in the correspond to the intention as described in the the text (the sleep payload comes before the broadcast wait I don’t see the purpose of the loop), but the broadcast semantics of the pipes can be done with Lwt_condition.

Considering the definition “Structured concurrency means that lifetimes of concurrent functions are cleanly nested.”, it is not surprising that the examples maps cleanly to a monadic concurrency library: The bind operator both fulfils associativity relations, allowing it to be used in nested functions, and has a sequencing semantics, delimiting the lifetimes according to the call graph.

3 Likes

I interpret the worker in the last example as:

  • The msleep is the work to be done, for some iteration
  • The work can either exit prematurely by being canceled by parent (rc < 0 && errno == ECANCELED), where it just returns
  • Or the work can be done for the current iteration, and there is then a check if there is broadcast a shutdown, for deciding to continue/stop working in infinite loop.

The broadcast can happen at any point in the infinite cycle of work, but will only be acted upon when some iterations work is done (for e.g. finishing up handling a client, unless this takes too much time).

Edit: I see your point; ‘finishing up the work’ doesn’t depend on the shutdown in the example, it just ‘is done, naturally’ when finishing an iteration, which is not what the text stated.

There’s a more complete example in this talk, briefly discussed in this section of an essay about a structured-concurrency library for python.

For example, consider the Happy Eyeballs algorithm (RFC 8305), which is a simple concurrent algorithm for speeding up the establishment of TCP connections. Conceptually, the algorithm isn’t complicated – you race several connection attempts against each other, with a staggered start to avoid overloading the network. But if you look at Twisted’s best implementation, it’s almost 600 lines of Python, and still has at least one logic bug. The equivalent in Trio is more than 15x shorter. More importantly, using Trio I was able to write it in minutes instead of months, and I got the logic correct on my first try. I never could have done this in any other framework, even ones where I have much more experience. For more details, you can watch my talk at Pyninsula last month. Is this typical? Time will tell. But it’s certainly promising.

open Lwt.Infix

let connect ip =
  Lwt_io.printlf "connecting to ip: %s" ip >>= fun () -> 
  Lwt_unix.sleep @@ 0.5 +. Random.float 9.5 >|= fun () ->
  Some ip

let rec happy_eye = function
  | [] ->
    let timeout = 3.0 in
    Lwt_unix.sleep timeout >|= fun () -> None
  | ip :: rest -> 
    Lwt.pick [
      connect ip;
      Lwt_unix.sleep 0.2 >>= fun () -> happy_eye rest
    ]

let _ =
  Random.self_init ();
  let ips = [ "1"; "2"; "3"; "4" ] in
  Lwt_main.run begin
    happy_eye ips >>= function
    | None -> Lwt_io.printl "timeout"
    | Some ip -> Lwt_io.printlf "response from ip: %s" ip
  end
2 Likes

The connect function could also fail before the timeout in which case the next attempt should be scheduled immediately. That is, assuming connect returns None in such case,

let rec happy_eye = function
  | [] -> Lwt.return_none
  | ip :: rest ->
    let attempt = connect ip in
    let attempt_or_timeout =
      Lwt.choose [attempt; Lwt_unix.sleep 0.2 >|= fun () -> None] in
    Lwt.pick [
      attempt;
      (attempt_or_timeout >>= function
       | None -> happy_eye rest
       | Some _ -> Lwt.return_none);
    ]

To those not familiar with Lwt, Lwt.pick returns the first successful promise and cancels other, while Lwt.choose returns the first successful promise and leaves the others running.

Correction: Replace Lwt.pick with pick_some defined below.

2 Likes

I think the Lwt.pick part is a bit subtle in your version. It works, but is it well defined within Lwt semantics, that it’s the thread that “yields the least” that is picked?
Specifically: if you replace the first Lwt.pick thread with

(attempt >>= fun v -> Lwt_unix.yield () >|= fun () -> v)

… then the result will be None when attempt returns Some v. It should just return Some v when attempt_or_timeout returns Some v.

1 Like

Also, as you removed the timeout from the [] case, then the last ip will only have 0.2 seconds to resolve, else None is returned.

You are right. I was too hasty rephrasing your original [] case, and also connect would have to sleep rather than returning None. OTOH, to fix my version, we could introduce a new combinator:

val pick_some : 'a option Lwt.t list -> 'a option Lwt.t
(** [pick_some ps] returns a promise that is pending until either of [ps]
    resolves to [Some x], in which case [Some x] is returned, or all of [ps]
    resolves to [None] in which case [None] is returned. *)

let rec pick_some ps =
  if ps = [] then Lwt.return_none else
  let not_none p = Lwt.state p <> Lwt.Return None in
  Lwt.choose ps >>= function
  | None -> pick_some (List.filter not_none ps)
  | Some _ as r -> List.iter Lwt.cancel ps; Lwt.return r
2 Likes

I made the variants explicit. It turns out that there’s needed more logic for handling the failure case (thank you polymorphic variants :kissing: ).

open Lwt.Infix

let connect ip =
  Lwt_io.printlf "connecting to ip: %s" ip >>= fun () -> 
  Lwt_unix.sleep @@ 0.5 +. Random.float 5.5 >>= fun () ->
  if Random.float 1. > 0.3 then
    Lwt_io.printlf "failed connecting to ip: %s" ip >>= fun () -> 
    Lwt.return @@ `Failure ip
  else
    Lwt.return @@ `Response ip 

let never () = Lwt_mvar.(create_empty () |> take)

let rec happy_eye = function
  | [] -> Lwt_unix.sleep 3.0 >|= fun () -> `Timeout
  | ip :: rest -> 
    let attempt = connect ip 
    and impatient = Lwt_unix.sleep 0.2 >|= fun () -> `Impatient in
    Lwt.pick [
      (attempt >>= function
        | `Failure _ -> never ()
        | `Response _ as response -> Lwt.return response
      );
      (Lwt.choose [attempt; impatient] >>= function
        | `Impatient | `Failure _ -> happy_eye rest
        | `Response _ -> never ()
      );
    ]

let _ =
  Random.self_init ();
  let ips = [ "1"; "2"; "3"; "4" ] in
  Lwt_main.run begin
    happy_eye ips >>= function
    | `Timeout -> Lwt_io.printl "timeout"
    | `Response ip -> Lwt_io.printlf "response from ip: %s" ip
  end

Edit: I see now that this logic is actually equivalent with your pick_some @paurkedal , but I like the explicitness

Edit: Otoh, the advantage with pick_some is that one can’t make an error by inserting the never thread in the wrong place

1 Like