Multicore: Building an in-memory pubsub with domainslib (and eio)

I attempted to build an in-memory pubsub with domains and domainslibs channels. It’s usually a pretty straightforward implementation in Go with channels. Sending, waiting, and receiving on domainslib channels appear to look like how I’d do it in Go.

But how do I continue to listen to messages on an unbounded channel within separate domains? I expect that joining on a child domain that never ends will allow the child domain to continue receiving messages until a signal is sent to shut down the child domain. Then the root domain will run to completion. But in this case, no messages are printed, however the root domain is still blocked.

Receiving function is run inside a new domain

  let rec receive ch =
    print_endline "receive msg";
    let v = C.recv ch in (* this should be blocking until a message is available *)
    Format.printf "received val=%s\n" v;
    receive ch
  in
 
   ...

  let _d1 = Domain.spawn (fun _ -> receive chan1) in
  Domain.join _d1;

A reference implementation in Go: we create a map of channels. The publisher sends messages to these channels. As they’re received, a subscribed handler is called on each channel.

1 Like

It works fine for me. You need to flush the printf though and tell runtest to not buffer the output. Try the following

diff --git a/lib/dune b/lib/dune
index 2ad98a0..6423f7c 100644
--- a/lib/dune
+++ b/lib/dune
@@ -1,4 +1,4 @@
 (library
  (name pubsub)
  (public_name pubsub)
- (libraries base domainslib eio_main))
+ (libraries domainslib eio_main))
diff --git a/test/test_pubsub.ml b/test/test_pubsub.ml
index 103cc4d..85296e9 100644
--- a/test/test_pubsub.ml
+++ b/test/test_pubsub.ml
@@ -9,7 +9,7 @@ let () =
   let rec receive ch =
     print_endline "receive msg";
     let v = C.recv ch in
-    Format.printf "received val=%s\n" v;
+    Format.printf "received val=%s\n%!" v;
     receive ch
   in
 
@@ -24,9 +24,9 @@ let () =
   let _d1 = Domain.spawn (fun _ -> receive chan1) in
 
   (* The trigger button *)
-  (* Domain.join _d1; *)
+  Domain.join _d1;
 
-  (* 
+  (*
   Really interesting behavior:
 
   First run `dune runtest` with the Domain.join. The process rightfully never shuts down, but no messages

And the run dune runtest --no-buffer. I get the expected output.

I am using 5.00.0+trunk with domainslib, eio and eio_main from the default opam-repository

5 Likes

that solves it! thank you so much, @mseri . the combo of %! and --no-buffer were what I needed to print the messages properly.

maybe i should finish the cs3110 course. they explicitly have a page reminding learners to flush as well. 2.6. Printing — OCaml Programming: Correct + Efficient + Beautiful

1 Like

Very minor FWIW, but I believe the \n%! sequence in @mseri’s fix can be replaced with @..

Format.printf "received val=%s@." v; also worked. That’s cool to know. But what exactly is the difference?

TIL:
https://ocaml.org/api/Format.html

The flush function is called whenever the pretty-printer is flushed (via conversion %! , or pretty-printing indications @? or @. , or using low level functions print_flush or print_newline ).

In today’s learned conclusion, ocaml channels = go channels. an oversimplification but for the common developer like myself, i’m quite happy with the current primitives so far.

next up is to consider where eio fits, and/or perhaps build provider adapters.

You might be interested in Lwt_eio’s chat.ml example, which I guess is a kind of pub/sub: when you connect to the chat room you effectively subscribe to messages, and when you write something, you’re publishing it to the room.

That example is a bit more complicated because it’s showing a Lwt server and an Eio server working together, but you can just ignore the Lwt bits.

The basic idea is that a sequence of messages is represented as a string along with a promise for the rest of the sequence:

type cons = Cons of (string * cons) Promise.t

Note that with this approach there is no back-pressure, so if one consumer is slow then the messages will remain in memory until it’s ready (but other consumers keep going). Here we want to keep the whole room history anyway so it doesn’t matter, but I guess you might want to make the producer wait until all clients have accepted a message in some cases.

4 Likes

Just be careful, the @ options are only for the Format module. So if you use Printf.printf you need to use %!.
For example

# Printf.printf "Test\n%!";;
Test
- : unit = ()
# Printf.printf "Test@.";;
Test@.- : unit = ()
# Format.printf "Test@.";;
Test
- : unit = ()
4 Likes