TL;DR: yes, significant semantic differences
So let’s look at each in order. Then, depending on some more specifics of your use-case, you can decide which to use.
Lwt_mvar
: reader-driven
An Lwt_mvar.t
is a data-structure which is in one of two state: empty (i.e., holding one (1) value) or full (holding exactly one value). You can start with either states using the different constructors, but in your case I’d guess: Lwt_mvar.create_empty
and you get an empty mvar m
which holds no value.
The behaviour of writing to and reading from this data-structure both depend on the state it’s in.
Writing to an empty mvar (a) changes the state of the mvar to full and (b) return an already resolved promise (no blocking, no waiting, no yielding, no cooperation, no scheduler; although context switch can still happen, see epilogue).
Writing to a full mvar (a) returns a pending promise, (b) the promise resolves after the mvar changes state to empty at which point it re-changes state to full.
Note that the operation of pushing a value to an mvar is blocking until the mvar is read from. If your application is reader-driven, this is a good way to limit the throughput of the writer loop, but in the case of an application reacting to network packets it’s probably not the right choice. It depends on how frequently packets arrive, how fast the consumer eats those messages, and how much TCP(?) many packets can fit in your network interface buffers.
Note that in the case of multiple writers, there is a queue of pending writes: each value is written one after the other with a read in between each.
Reading to an empty mvar (a) returns a pending promise, (b) the promise resolves after the mvar changes state to full at which point it re-changes state to empty. (This is a lie, see epilogue.)
Reading to a full mvar (a) returns an already resolved promise (no yielding), (b) changes the state of the mvar to empty.
Lwt_condition
: I hope someone is listening
An Lwt_condition.t
is a data-structure which is state-free (on the data-side at least). A condition doesn’t hold a value, it’s not a mailbox like an mvar, instead it’s an announce system that a writer can send a message through in the hope that someone is listening on the other end. More concretely
Reading from a condition (using Lwt_condition.wait
) returns promise that is pending. A resolver for this promise is stored in the condition’s reader queue. (So yes, it is statefull, but not for data, just(!) for control flow.)
Writing to a condition can take to form:
Lwt_condition.signal
causes the first resolver in the condition’s reader queue is called. This has the effect of resolving the oldest of the promises reading on the condition. At this point the resolver is used up so it is removed from the queue. If the queue is empty (i.e., if there are no suspended promises, if there have been no calls to wait
) then the value passed to signal
is discarded.
Lwt_condition.broadcast
causes all the resolvers in the condition’s reader queue to be called. This has the effect of resolving all the promises reading on the condition. At this point all the resolvers are used up so they are removed from the queue. If the queue is empty then the value passed to broadcast
is discarded.
If it is important that every packet is handled sequentially then you will need to take extra care to prevent packets from being discarded: either delaying writes (but then you are kind of using it as an mvar) or having a buffer of signaled/boradcasted value (but then you are using it as a stream).
A condition is appropriate for synchronisation that is ok to miss (e.g., once in a while it’s fine if you miss a UI update for, say, a progress bar and you jump directly from 17% to 19% completion), A condition is also a great choice if you can process signaled/broadcasted data concurrently (if the packet order doesn’t matter): in that case it’s easy to ensure you always have a reader waiting on the condition because you can register a new one as the first step of the processing:
let rec r c f =
let* v = Lwt_condition.wait c in
let rr = r c f in
Lwt.dont_wait (f v) (fun exc -> …);
rr
Lwt_stream
: a dangerous swiss-army knife
The Lwt_stream
module is difficult to describe in one sentence because there are multiple ways to use an Lwt_stream
. The different ways are all ok, but mixing them is very error prone. It’s a swiss-army knife and you don’t want to open several of the sharp tools at the same time.
A stream is a data-structure you can push and take values from. Pushing is never blocking because there is an internal buffer (it is stateful). This buffer is unbounded so be careful with the rate of writes and reads.
Streams as a buffer
If you use create
, the push function returned by create
, and get
, next
, or the like, the stream behaves like a buffer. Calling get
when the stream’s internal buffer is empty returns a pending promise. If you call the push function, it first checks if there are any pending readers to wake the oldest one, otherwise it places the value in the internal buffer.
You can even distribute the push function to multiple writing loops and call the get
function from multiple reading loops: each value pushed to the stream is gotten (get
ted) exactly once by one reader.
One value in, one value out, simple as.
Streams for multiplexing
If you use map
, filter
, fold
, iter
or any of the other traversors, in any of their variation (plain, _s
, _p
), then the stream still includes a buffer (to avoid losing packets), but each of the value pushed to the stream is passed to all the traversors that have been called. Each value is also passed on to one (1) of the get
/junk
if any are registered.
let s, push = Lwt_stream.create ()
let s1 = Lwt_stream.(iter print_endline (map capitalize_ascii s))
let s2 = Lwt_stream.(iter print_endline (map uncapitalize_ascii s))
let () = push "This" (* causes two prints to happen *)
let reading = let* v = Lwt_stream.next s in print_endline v; Lwt.return ()
let () = push "that" (* causes three prints to happen and v to be resolved *)
Mixing is dangerous
As pointed out above, you can get values passed onto different readers concurrently if you use your stream with a mix of functions. I’d recommend to: create, use a few transformers (like some maps and filters), and then attach one reader at the very end of that processing pipeline. Avoid using using get
/next
/etc. on the intermediate streams.
Similarly, avoid attaching a traversor on the reading end of your processing pipeline. (Typical use-case: attaching an iter
with a logging/printf-debugging function.) This will drain the buffer and your reading loop might start missing messages. Basically
(* avoid *)
Lwt_stream.iter (fun x -> …) s
(* prefer *)
let s_dup_for_logging = Lwt_stream.map Fun.id s
Lwt_stream.iter (fun x -> …) s_dup_for_logging
let s_dup_for_consuming = Lwt_stream.map Fun.id s
(* and pass s_dup_for_consuming to your consumer *)
Conclusion
Depending on your application, one of these might be appropriate.
Lwt_mvar
is good for reader-driven applications in which writers can absorb some back-pressure.
Lwt_contition
is good for concurrent processing or for non-incremental status update (“progress is now 19%” is ok, “progress is +1” is not).
Lwt_stream
is good for writer-driven applications in which readers may take some time and values might need to be buffered. (But beware the buffers are unbounded.)
You may also want to check Lwt_seq
or the more complete Seqes
library.
Epilogue: Context switch without yielding
Lwt has some surprising semantics. One surprising thing that can happen is that you can switch “threads” without yielding or entering a “cooperation point”. I’m using scare quotes because there are no threads in Lwt: only promises and callbacks.
Consider the following piece of code:
open Lwt.Syntax;; (* [let*] *)
let return = Lwt.return;;
let m = Lwt_mvar.create_empty ();;
let r =
let* () = Lwt_mvar.take m in
print_endline "r";
return ()
let () =
assert (Lwt.state (Lwt_mvar.put m ()) = Lwt.Return ());
print_endline "w"
It prints “r” before it prints “w” even though “w” is not even in a “thread” just a plain unit
sequence of instructions.
The same happens with conditions and streams. For more details see this longer explanation.