Lwt and synchronisation between threads

Dear all,

I’m again struggle with Lwt and the synchronisation available therein. I have two pieces of code, one called by an API user (let’s name it “iwantbytes”) – another called whenever something on the network happened (let’s call it “packetarrived”).

Now, I distinguish good and back packets, and try to demultiplex good packets to the respective API users (wantbytes). So far I use a (OCaml Stdlib) Map to store a mapping from “id” → Lwt_condition.t, and the “packetarrived” code eventually calls Lwt_condition.broadcast (while the “iwantbytes” creates a Lwt_condition, inserts it into the map, and calls wait on it). Since there’s quite some more processing, and error conditions, I only broadcast (), and the data is communicated via other means (shared mutable state – sorry about that).

Now I wonder, should I instead use “Lwt_condition.signal” (there’s never more than one client interested in a specific packet)?

Or should I use a Lwt_mvar instead? What is the advantage of Lwt_mvar?

Then I figured there’s Lwt_stream, maybe that is the way to go?

TL;DR: Are there semantic differences between the three modules (Lwt_condition/Lwt_mvar/Lwt_stream), anything in respect to resource usage. and any intuition / best practices on which thing to use?

Thanks a lot reading this post, and even more answering it :smiley:

3 Likes

TL;DR: yes, significant semantic differences

So let’s look at each in order. Then, depending on some more specifics of your use-case, you can decide which to use.

Lwt_mvar: reader-driven

An Lwt_mvar.t is a data-structure which is in one of two state: empty (i.e., holding one (1) value) or full (holding exactly one value). You can start with either states using the different constructors, but in your case I’d guess: Lwt_mvar.create_empty and you get an empty mvar m which holds no value.

The behaviour of writing to and reading from this data-structure both depend on the state it’s in.

Writing to an empty mvar (a) changes the state of the mvar to full and (b) return an already resolved promise (no blocking, no waiting, no yielding, no cooperation, no scheduler; although context switch can still happen, see epilogue).

Writing to a full mvar (a) returns a pending promise, (b) the promise resolves after the mvar changes state to empty at which point it re-changes state to full.

Note that the operation of pushing a value to an mvar is blocking until the mvar is read from. If your application is reader-driven, this is a good way to limit the throughput of the writer loop, but in the case of an application reacting to network packets it’s probably not the right choice. It depends on how frequently packets arrive, how fast the consumer eats those messages, and how much TCP(?) many packets can fit in your network interface buffers.

Note that in the case of multiple writers, there is a queue of pending writes: each value is written one after the other with a read in between each.

Reading to an empty mvar (a) returns a pending promise, (b) the promise resolves after the mvar changes state to full at which point it re-changes state to empty. (This is a lie, see epilogue.)

Reading to a full mvar (a) returns an already resolved promise (no yielding), (b) changes the state of the mvar to empty.

Lwt_condition: I hope someone is listening

An Lwt_condition.t is a data-structure which is state-free (on the data-side at least). A condition doesn’t hold a value, it’s not a mailbox like an mvar, instead it’s an announce system that a writer can send a message through in the hope that someone is listening on the other end. More concretely

Reading from a condition (using Lwt_condition.wait) returns promise that is pending. A resolver for this promise is stored in the condition’s reader queue. (So yes, it is statefull, but not for data, just(!) for control flow.)

Writing to a condition can take to form:

Lwt_condition.signal causes the first resolver in the condition’s reader queue is called. This has the effect of resolving the oldest of the promises reading on the condition. At this point the resolver is used up so it is removed from the queue. If the queue is empty (i.e., if there are no suspended promises, if there have been no calls to wait) then the value passed to signal is discarded.

Lwt_condition.broadcast causes all the resolvers in the condition’s reader queue to be called. This has the effect of resolving all the promises reading on the condition. At this point all the resolvers are used up so they are removed from the queue. If the queue is empty then the value passed to broadcast is discarded.

If it is important that every packet is handled sequentially then you will need to take extra care to prevent packets from being discarded: either delaying writes (but then you are kind of using it as an mvar) or having a buffer of signaled/boradcasted value (but then you are using it as a stream).

A condition is appropriate for synchronisation that is ok to miss (e.g., once in a while it’s fine if you miss a UI update for, say, a progress bar and you jump directly from 17% to 19% completion), A condition is also a great choice if you can process signaled/broadcasted data concurrently (if the packet order doesn’t matter): in that case it’s easy to ensure you always have a reader waiting on the condition because you can register a new one as the first step of the processing:

let rec r c f =
  let* v = Lwt_condition.wait c in
  let rr = r c f in
  Lwt.dont_wait (f v) (fun exc -> …);
  rr

Lwt_stream: a dangerous swiss-army knife

The Lwt_stream module is difficult to describe in one sentence because there are multiple ways to use an Lwt_stream. The different ways are all ok, but mixing them is very error prone. It’s a swiss-army knife and you don’t want to open several of the sharp tools at the same time.

A stream is a data-structure you can push and take values from. Pushing is never blocking because there is an internal buffer (it is stateful). This buffer is unbounded so be careful with the rate of writes and reads.

Streams as a buffer

If you use create, the push function returned by create, and get, next, or the like, the stream behaves like a buffer. Calling get when the stream’s internal buffer is empty returns a pending promise. If you call the push function, it first checks if there are any pending readers to wake the oldest one, otherwise it places the value in the internal buffer.

You can even distribute the push function to multiple writing loops and call the get function from multiple reading loops: each value pushed to the stream is gotten (getted) exactly once by one reader.

One value in, one value out, simple as.

Streams for multiplexing

If you use map, filter, fold, iter or any of the other traversors, in any of their variation (plain, _s, _p), then the stream still includes a buffer (to avoid losing packets), but each of the value pushed to the stream is passed to all the traversors that have been called. Each value is also passed on to one (1) of the get/junk if any are registered.

let s, push = Lwt_stream.create ()
let s1 = Lwt_stream.(iter print_endline (map capitalize_ascii s))
let s2 = Lwt_stream.(iter print_endline (map uncapitalize_ascii s))
let () = push "This" (* causes two prints to happen *)
let reading = let* v = Lwt_stream.next s in print_endline v; Lwt.return ()
let () = push "that" (* causes three prints to happen and v to be resolved *)

Mixing is dangerous

As pointed out above, you can get values passed onto different readers concurrently if you use your stream with a mix of functions. I’d recommend to: create, use a few transformers (like some maps and filters), and then attach one reader at the very end of that processing pipeline. Avoid using using get/next/etc. on the intermediate streams.

Similarly, avoid attaching a traversor on the reading end of your processing pipeline. (Typical use-case: attaching an iter with a logging/printf-debugging function.) This will drain the buffer and your reading loop might start missing messages. Basically

(* avoid *)
Lwt_stream.iter (fun x -> …) s

(* prefer *)
let s_dup_for_logging = Lwt_stream.map Fun.id s
Lwt_stream.iter (fun x -> …) s_dup_for_logging
let s_dup_for_consuming = Lwt_stream.map Fun.id s
(* and pass s_dup_for_consuming to your consumer *)

Conclusion

Depending on your application, one of these might be appropriate.

Lwt_mvar is good for reader-driven applications in which writers can absorb some back-pressure.
Lwt_contition is good for concurrent processing or for non-incremental status update (“progress is now 19%” is ok, “progress is +1” is not).
Lwt_stream is good for writer-driven applications in which readers may take some time and values might need to be buffered. (But beware the buffers are unbounded.)

You may also want to check Lwt_seq or the more complete Seqes library.

Epilogue: Context switch without yielding

Lwt has some surprising semantics. One surprising thing that can happen is that you can switch “threads” without yielding or entering a “cooperation point”. I’m using scare quotes because there are no threads in Lwt: only promises and callbacks.

Consider the following piece of code:

open Lwt.Syntax;; (* [let*] *)
let return = Lwt.return;;

let m = Lwt_mvar.create_empty ();;

let r =
  let* () = Lwt_mvar.take m in
  print_endline "r";
  return ()

let () =
  assert (Lwt.state (Lwt_mvar.put m ()) = Lwt.Return ());
  print_endline "w"

It prints “r” before it prints “w” even though “w” is not even in a “thread” just a plain unit sequence of instructions.

The same happens with conditions and streams. For more details see this longer explanation.

10 Likes

Thanks a lot, @raphael-proust.

I only have a question, what does “reader-driven” mean? For me, indeed the API provides a “iwantbytes” (which reads, called by an application) and is blocking. But the “packetarrived” is as well part of the API, called by the underlying layer, and is fine to not block (thus mvar sounds slightly brittle - since then any application that doesn’t follow the API could make the “packetarrived” function block).

“Reader-driven” is a bit vague. In this case I meant to say that the side of the application which consumes the data in the mvar/condition/stream is the one that sets the rhythm at which data traverses the structure, it’s the one that the other part waits on.

For example you might have an application which moves data like filebufferscreen where the buffer is an mvar/condition/stream (or a few of them with some processing in between). This application blocks on user input once the screen is full, and only when it gets a PageDown/DownArrow/J/whatever does the data start moving again.

I think “pull-based” is also used sometimes for that. I’m not too sure.

“Writer-driven” is the other way around. It’s when the part of the application which sets the rhythm of the movement of data is controlled by the side that pushes data into the mvar/condition/stream.

For example a part of the UI which has a progress bar: some progress happens which causes some new value to be pushed which causes a refresh on the screen.

I’m not sure it makes a lot of sense apart from when discussing such data-structures… Sometimes it does match the context of your application: a program that receives packets from the network and has to react to them is driven by the arrival of data. In this case the loop with the blocking inside is upstream of the data-structures (it’s on the push-side). Conversely a program that accesses data on demand based on UI interactions is driven by events. In this case the loop with the blocking inside is downstream of the data-structure (it’s on the pull-side).

It doesn’t always make a lot of sense because applications can be a little-bit of both… so maybe not a useful consideration because it’s too local?


Based on the description, it seems that what you are most likely to use are streams. Or maybe something based on stream but with bounds in the number and/or size of values that it holds.

If you don’t mix readers and traversors then you can easily make sure you accept all the packets that arrive and that you consume them all. (Provided the read and write rates are reasonable.)

1 Like

I would note too that Lwt_mvar is good for synchronizing concurrent threads, without needing any buffering. An interesting relation is also that Lwt_stream.create_bounded 1 is equivalent.

Also, a dangerous thing about unbounded streams in the context of a push-based flow is that you need to be sure that the consumer is faster on average than the producer (or implement some throttling mechanism), else you have a memory leak.