RFC for a distributed process/actor model library

Hello all,

I recently wrote down what’s been brewing in my mind for a bit: a Erlang style process/actor model library in the new OCaml 5 landscape. It is beginning to resemble something I’d use, but I’m curious about what others think.

The repo of the prototype is available at here

The library is built on top of Eio and should place nicely with Eio primitives. Snippet of a runnable example is shown below (debug/main.ml in the repo).

Overview:

  • Gateway plays roughly the role of Erlang’s BEAM VM.
    • Intention is user can establish TLS tunnels to other gateways to allow distribution.
  • Mailbox.Local API allows skipping the serialization cost when sending things internally
    • Mailbox.Global.t (when implemented) conversely demands serialization procedures during construction.
  • Process a uses selective receive interface (Selective.recv and Selective.Recv.*) to handle timeout and guards/message filtering.
    • A save queue is used underneath per mailbox for messages rejected (for now), following Erlang’s design.
    • Doesn’t seem possible to have a global save queue without also serialising everything.
  • Process b sends a bunch of things a does not care about, wait, then finally sends something a is looking for.
  • Process controller is just largely there to demonstrate one can redirect processes.

Code snippet

(debug/main.ml on repo)

let () =
  let pid_mailbox : Proc.Pid.t Mailbox.Local.t =
    Mailbox.Local.make ()
  in
  let Mailbox.Local.{ send = send_pid; recv = recv_pid } = Mailbox.Local.interface pid_mailbox in
  let x_mailbox  : x Mailbox.Local.t =
    Mailbox.Local.make ()
  in
  let Mailbox.Local.{ send = send_x; recv = recv_x } = Mailbox.Local.interface x_mailbox in
  let a =
    Gateway.spawn (fun h ->
        Fmt.epr "a: my pid is %a@." Proc.Pid.pp (Proc.Handle.pid h);

        let _, send_to = recv_pid h in
        Fmt.epr "a: received instruction to send to %a@." Proc.Pid.pp send_to;
        send_x h (send_to, A);
        send_x h (send_to, A);
        send_x h (send_to, A);

        let rec aux () =
          let success =
            Selective.recv h
              ~timeout:(1.0, fun () ->
                  Fmt.epr "a: I haven't received anything useful yet@.";
                  false
                )
              Selective.Recv.[
                case_local x_mailbox
                  [
                    entry ~guard:(fun (from, x) -> x = A)
                      (fun (from, msg) ->
                         Fmt.epr "a: received %a from %a@." pp_x msg Proc.Pid.pp from;
                         true
                      );
                    entry ~guard:(fun (from, x) -> x = B)
                      (fun (from, msg) ->
                         Fmt.epr "a: received %a from %a@." pp_x msg Proc.Pid.pp from;
                         true
                      );
                  ]
              ]
          in
          if not success then
            aux ()
        in
        aux ()
      )
  in
  let b =
    Gateway.spawn (fun h ->
        Fmt.epr "b: my pid is %a@." Proc.Pid.pp (Proc.Handle.pid h);
        let _, send_to = recv_pid h in
        Fmt.epr "b: received instruction to send to %a@." Proc.Pid.pp send_to;

        let clock = Eio.Stdenv.clock (Proc.Handle.env h) in

        send_x h (send_to, C);
        send_x h (send_to, C);
        send_x h (send_to, C);
        send_x h (send_to, C);
        send_x h (send_to, C);
        send_x h (send_to, C);

        Eio.Time.sleep clock 5.0;

        send_x h (send_to, A);

        let from, msg = recv_x h in
        Fmt.epr "b: received %a from %a@." pp_x msg Proc.Pid.pp from;
      )
  in
  let _controller =
    Gateway.spawn (fun h ->
        Fmt.epr "controller: my pid is %a@." Proc.Pid.pp (Proc.Handle.pid h);
        send_pid h (a, b);
        Fmt.epr "controller: sent instructions to a@.";
        send_pid h (b, a);
        Fmt.epr "controller: sent instructions to b@.";
      )
  in
  Eio_main.run Gateway.main

Example output

  a: my pid is (0, 3)
  b: my pid is (0, 4)
  controller: my pid is (0, 5)
  a: received instruction to send to (0, 4)
  controller: sent instructions to a
  b: received instruction to send to (0, 3)
  controller: sent instructions to b
  a: I haven't received anything useful yet
  a: I haven't received anything useful yet
  a: I haven't received anything useful yet
  a: I haven't received anything useful yet
  b: received A from (0, 3)
  a: I haven't received anything useful yet
  a: received A from (0, 4)
4 Likes

Does the design depend on OCaml 5? Or could this be factored out?

Should be implementable in Lwt now that you mention it (if that’s what you’re hinting toward). Overall the main moving parts rely on some way to yield to the scheduler and some way wait on a list of promises and return the first, and both Lwt and Eio do that.

The “ideal” scheduler would allow automatic distribution of processes across domains with effects etc, which would be the part concretely within OCaml 5 territory. It is doable but it would mean needing to reimplementing Eio just to have a scheduler-aware IO layer. It seems easier to just wait for upstream Eio to maybe introduce that.

I might give the Lwt version a shot.


Right okay your question was more about whether it can be architectured to swap backends. I will have to think about this a bit, not even sure if functor would reconcile the two different styles of promises.

1 Like

Not to be mean, but you should probably think more about zmq and less about eio.

Actually, imho zmq and eio would make a pretty good pairing. zmq sockets are not thread-safe so you would basically need a socket per domain. Or could also explore a socket-per-fiber model. Eio fibers are basically green threads so they can scale the system vertically.

Actually yeah zmq would be handy for Mailbox.Global. But zmq by itself doesn’t do scheduling/green threading.

Re socket-per-fiber: I wonder how well a Eio scheduling aware zmq binding of sorts can work (not sure how the two event loops interact).

1 Like

Just some thought dump for reference in future: the ocaml-zmq package uses a stepping interface (I think) to handle both Lwt and Async backend. So next step would mainly be adding a Eio backend for that package, and see if the zmq-deferred module interface is sufficient for the scheduling needed within distrproc.

1 Like

I also recommend checking Tokio and Actix for inspiration.

2 Likes