I’m aware of lwt and async. I mean something like this.
Is it some specific semantics or the direct coding style that you are after? Correct me if I’m wrong, but I would think that most of the time you get about the same semantics with Lwt.
That’s not a great example. The examples here go into more detail on structured concurrency.
You can write structured-concurrency programs with a library like Lwt, just like you can write structured programs in languages with “goto line n,” but it takes discipline. I’m looking for a concurrency library which gives structured-concurrency primitives, just like languages like OCaml give structured control-flow primitives like functions, while loops, if/then, etc.
I’m curious - which one of the examples from Dill do you think needs discipline to implement in Lwt?
The examples are necessarily small. It doesn’t take any discipline to implement a for-loop demonstration in GW-BASIC, either, but that’s not the point of structured programming.
Small examples usually hint at why something has some better qualities, e.g. restricts you in writing wrong code. You don’t think that there are any of those examples that hint at the advantages of structured programming? Seems like badly chosen examples?
I had a look at the libdill examples. The second last example can be translated to lwt as follows, where I added some extra output and reduced timeouts:
(* Parent coroutine closes multiple child coroutines - with a grace period *) open Lwt.Infix let worker i = try%lwt Lwt_unix.sleep (Random.float 10.0) >>= fun () -> Lwt_io.printlf "Promise %d done." i with Lwt.Canceled -> Lwt_io.printlf "Promise %d cancelled." i let () = Lwt_main.run begin Lwt_unix.with_timeout 5.0 (fun () -> Lwt.join (List.init 3 worker)) end
with dune file:
(executables (names usecase5) (libraries lwt lwt.unix) (preprocess (pps lwt_ppx)))
The former examples are just limited versions of this, and about the last one: I am not sure how the
worker in the correspond to the intention as described in the the text (the sleep payload comes before the broadcast wait I don’t see the purpose of the loop), but the broadcast semantics of the pipes can be done with
Considering the definition “Structured concurrency means that lifetimes of concurrent functions are cleanly nested.”, it is not surprising that the examples maps cleanly to a monadic concurrency library: The bind operator both fulfils associativity relations, allowing it to be used in nested functions, and has a sequencing semantics, delimiting the lifetimes according to the call graph.
I interpret the worker in the last example as:
msleepis the work to be done, for some iteration
- The work can either exit prematurely by being canceled by parent (
rc < 0 && errno == ECANCELED), where it just returns
- Or the work can be done for the current iteration, and there is then a check if there is broadcast a shutdown, for deciding to continue/stop working in infinite loop.
The broadcast can happen at any point in the infinite cycle of work, but will only be acted upon when some iterations work is done (for e.g. finishing up handling a client, unless this takes too much time).
Edit: I see your point; ‘finishing up the work’ doesn’t depend on the shutdown in the example, it just ‘is done, naturally’ when finishing an iteration, which is not what the text stated.
For example, consider the Happy Eyeballs algorithm (RFC 8305), which is a simple concurrent algorithm for speeding up the establishment of TCP connections. Conceptually, the algorithm isn’t complicated – you race several connection attempts against each other, with a staggered start to avoid overloading the network. But if you look at Twisted’s best implementation, it’s almost 600 lines of Python, and still has at least one logic bug. The equivalent in Trio is more than 15x shorter. More importantly, using Trio I was able to write it in minutes instead of months, and I got the logic correct on my first try. I never could have done this in any other framework, even ones where I have much more experience. For more details, you can watch my talk at Pyninsula last month. Is this typical? Time will tell. But it’s certainly promising.
open Lwt.Infix let connect ip = Lwt_io.printlf "connecting to ip: %s" ip >>= fun () -> Lwt_unix.sleep @@ 0.5 +. Random.float 9.5 >|= fun () -> Some ip let rec happy_eye = function |  -> let timeout = 3.0 in Lwt_unix.sleep timeout >|= fun () -> None | ip :: rest -> Lwt.pick [ connect ip; Lwt_unix.sleep 0.2 >>= fun () -> happy_eye rest ] let _ = Random.self_init (); let ips = [ "1"; "2"; "3"; "4" ] in Lwt_main.run begin happy_eye ips >>= function | None -> Lwt_io.printl "timeout" | Some ip -> Lwt_io.printlf "response from ip: %s" ip end
connect function could also fail before the timeout in which case the next attempt should be scheduled immediately. That is, assuming
None in such case,
let rec happy_eye = function |  -> Lwt.return_none | ip :: rest -> let attempt = connect ip in let attempt_or_timeout = Lwt.choose [attempt; Lwt_unix.sleep 0.2 >|= fun () -> None] in Lwt.pick [ attempt; (attempt_or_timeout >>= function | None -> happy_eye rest | Some _ -> Lwt.return_none); ]
To those not familiar with Lwt,
Lwt.pick returns the first successful promise and cancels other, while
Lwt.choose returns the first successful promise and leaves the others running.
pick_some defined below.
I think the
Lwt.pick part is a bit subtle in your version. It works, but is it well defined within Lwt semantics, that it’s the thread that “yields the least” that is picked?
Specifically: if you replace the first
Lwt.pick thread with
(attempt >>= fun v -> Lwt_unix.yield () >|= fun () -> v)
… then the result will be
Some v. It should just return
Some v when
Also, as you removed the timeout from the
 case, then the last ip will only have 0.2 seconds to resolve, else
None is returned.
You are right. I was too hasty rephrasing your original
 case, and also
connect would have to sleep rather than returning
None. OTOH, to fix my version, we could introduce a new combinator:
val pick_some : 'a option Lwt.t list -> 'a option Lwt.t (** [pick_some ps] returns a promise that is pending until either of [ps] resolves to [Some x], in which case [Some x] is returned, or all of [ps] resolves to [None] in which case [None] is returned. *) let rec pick_some ps = if ps =  then Lwt.return_none else let not_none p = Lwt.state p <> Lwt.Return None in Lwt.choose ps >>= function | None -> pick_some (List.filter not_none ps) | Some _ as r -> List.iter Lwt.cancel ps; Lwt.return r
I made the variants explicit. It turns out that there’s needed more logic for handling the failure case (thank you polymorphic variants ).
open Lwt.Infix let connect ip = Lwt_io.printlf "connecting to ip: %s" ip >>= fun () -> Lwt_unix.sleep @@ 0.5 +. Random.float 5.5 >>= fun () -> if Random.float 1. > 0.3 then Lwt_io.printlf "failed connecting to ip: %s" ip >>= fun () -> Lwt.return @@ `Failure ip else Lwt.return @@ `Response ip let never () = Lwt_mvar.(create_empty () |> take) let rec happy_eye = function |  -> Lwt_unix.sleep 3.0 >|= fun () -> `Timeout | ip :: rest -> let attempt = connect ip and impatient = Lwt_unix.sleep 0.2 >|= fun () -> `Impatient in Lwt.pick [ (attempt >>= function | `Failure _ -> never () | `Response _ as response -> Lwt.return response ); (Lwt.choose [attempt; impatient] >>= function | `Impatient | `Failure _ -> happy_eye rest | `Response _ -> never () ); ] let _ = Random.self_init (); let ips = [ "1"; "2"; "3"; "4" ] in Lwt_main.run begin happy_eye ips >>= function | `Timeout -> Lwt_io.printl "timeout" | `Response ip -> Lwt_io.printlf "response from ip: %s" ip end
Edit: I see now that this logic is actually equivalent with your
pick_some @paurkedal , but I like the explicitness
Edit: Otoh, the advantage with
pick_some is that one can’t make an error by inserting the
never thread in the wrong place