I attempted to build an in-memory pubsub with domains and domainslibs channels. It’s usually a pretty straightforward implementation in Go with channels. Sending, waiting, and receiving on domainslib channels appear to look like how I’d do it in Go.
But how do I continue to listen to messages on an unbounded channel within separate domains? I expect that joining on a child domain that never ends will allow the child domain to continue receiving messages until a signal is sent to shut down the child domain. Then the root domain will run to completion. But in this case, no messages are printed, however the root domain is still blocked.
Receiving function is run inside a new domain
let rec receive ch =
print_endline "receive msg";
let v = C.recv ch in (* this should be blocking until a message is available *)
Format.printf "received val=%s\n" v;
receive ch
in
...
let _d1 = Domain.spawn (fun _ -> receive chan1) in
Domain.join _d1;
A reference implementation in Go: we create a map of channels. The publisher sends messages to these channels. As they’re received, a subscribed handler is called on each channel.
The flush function is called whenever the pretty-printer is flushed (via conversion %! , or pretty-printing indications @? or @. , or using low level functions print_flush or print_newline ).
In today’s learned conclusion, ocaml channels = go channels. an oversimplification but for the common developer like myself, i’m quite happy with the current primitives so far.
next up is to consider where eio fits, and/or perhaps build provider adapters.
You might be interested in Lwt_eio’s chat.ml example, which I guess is a kind of pub/sub: when you connect to the chat room you effectively subscribe to messages, and when you write something, you’re publishing it to the room.
That example is a bit more complicated because it’s showing a Lwt server and an Eio server working together, but you can just ignore the Lwt bits.
The basic idea is that a sequence of messages is represented as a string along with a promise for the rest of the sequence:
type cons = Cons of (string * cons) Promise.t
Note that with this approach there is no back-pressure, so if one consumer is slow then the messages will remain in memory until it’s ready (but other consumers keep going). Here we want to keep the whole room history anyway so it doesn’t matter, but I guess you might want to make the producer wait until all clients have accepted a message in some cases.