Interaction between eio and domainslib, unhandled exceptions?

Hello,
I am trying to build something using both eio and domainslib, and I’m quite stumped by their interaction.
According to the readme of eio, calls to various functions require a Eio.Stdenv.t, most likely given by Eio_main.run.
On the other hand, according to its documentation domainslib states that “[The run] function should be used at the top level to enclose the calls to other functions that may await on promises. This includes await, parallel_for and its variants. Otherwise, those functions will raise Unhandled exception.
Armed with this, I wrote this MWE:

module T = Domainslib.Task

let task pool sink =
  T.parallel_for ~start:0 ~finish:1024
    ~body:(fun i -> if i mod 2 = 0 then Eio.Flow.copy_string "Hello!\n" sink)
    pool

let () =
  let pool =
    T.setup_pool ~num_domains:(Domain.recommended_domain_count () - 1) ()
  in
  T.run pool (fun () ->
      Eio_main.run (fun env ->
          let stdout = Eio.Stdenv.stdout env in
          task pool stdout));
  T.teardown_pool pool

Running this with eio = 0.9, domainslib = 0.5, ocaml = 5.0.0, on a machine where Domain.recommended_domain_count is 8 gets the following result:

Hello!
Hello!
Hello!
Hello!
Hello!
Hello!
Hello!
Hello!
Fatal error: exception Stdlib.Effect.Unhandled(Eio_linux__Sched.Alloc)

So, only 8 Hellos instead of the expected 512, and a nice crash to boot.
At first I thought I had messed up my calls to run functions, and tried swapping Eio_main.run and T.run around to check.
Same error.
On more involved examples, I also got the following error:

Fatal error: exception Multiple exceptions:
- Invalid_argument("Cancellation context accessed from wrong domain!")
- Invalid_argument("exit: 1 request(s) still active!")

I don’t really understand how to proceed from here. Are Eio and Domainslib just not supposed to work together? Is there something obvious I missed?

Thanks all

Here’s an example Eio server that handles requests over TCP, and uses Domainlib to do some processing:

module T = Domainslib.Task

let pool = T.setup_pool ~num_domains:2 ()

(* Parallel Fibonacci computation *)
let rec fib_par n =
  assert (n > 0);
  let rec fib n =
    if n < 2 then 1
    else fib (n - 1) + fib (n - 2)
  in
  if n > 20 then begin
    let a = T.async pool ( fun _ -> fib_par (n -1)) in
    let b = T.async pool ( fun _ -> fib_par (n -2)) in
    T.await pool a + T.await pool b
  end else
    fib n

open Eio.Std

let run_in_pool fn x =
  let result, set_result = Promise.create () in
  let _ : unit Domainslib.Task.promise = Domainslib.Task.async pool (fun () ->
      Promise.resolve set_result @@
      match fn x with
      | r -> Ok r
      | exception ex -> Error ex
    )
  in
  Promise.await_exn result

let request r =
  Eio.Buf_read.line r |> int_of_string

let main ~net addr =
  Switch.run @@ fun sw ->
  let sock = Eio.Net.listen ~sw ~backlog:5 net addr in
  traceln "Listening on %a" Eio.Net.Sockaddr.pp addr;
  (* Runs once per request in an Eio task *)
  let handle conn _addr =
    let n = Eio.Buf_read.parse_exn request conn ~max_size:10 in
    traceln "Spawning calculation of fib %d..." n;
    let result = run_in_pool fib_par n in
    traceln "fib %d = %d" n result;
    let reply = Printf.sprintf "%d\r\n" result in
    Eio.Flow.copy_string reply conn
  in
  Eio.Net.run_server sock handle ~on_error:(traceln "Error handling connection: %a" Fmt.exn)

let () =
  Eio_main.run @@ fun env ->
  main (`Tcp (Eio.Net.Ipaddr.V4.loopback, 7000))
    ~net:env#net

Then use it with e.g.

echo 20 | nc 127.0.0.1 7000

(if you try it interactively, note that it waits for you to shut down the sending side before doing the calculation, so press Ctrl-D after entering the input)

I’ve made a PR to add a bit of documentation about this: Document how to use Domainslib from Eio by talex5 · Pull Request #489 · ocaml-multicore/eio · GitHub. I’m not very familiar with Domainslib though, so it would be good to get a review from someone who works on it.

Running an Eio mainloop inside a Domainslib task that Domainslib can migrate between domains certainly won’t work. You only want one fiber scheduler per domain.

… I don’t understand how to apply this to my example.
The web server aspect brings a lot of fluff so it’s hard to extract the useful bits.
I can’t really translate a call to parallel_for to several async and await, that would require manually redoing the “split the work between all threads” task myself.
I tried anyway, and my naive attempts get me

Fatal error: exception Stdlib.Effect.Unhandled(Eio_linux__Sched.Alloc)
# or
Fatal error: exception Stdlib.Effect.Unhandled(Domainslib__Task.Wait(_, _))
# or simply nothing (without prints)

My latest attempt here:

let task2 pool sink =
  let nb_domains = T.get_num_domains pool in
  let promise, resolver = Eio.Promise.create () in
  for _ = 1 to nb_domains do
    ignore
    @@ T.async pool (fun () ->
           Eio.Promise.resolve resolver
           @@ for j = 0 to 128 do
                if j mod 2 = 0 then Eio.Flow.copy_string "Hello!\n" sink
              done)
  done;
  Eio.Promise.await promise

Hangs up indefinitely.

Now would also be a good time to mention that these exception backtraces are mostly uninformative.

Running an Eio mainloop inside a Domainslib task that Domainslib can migrate between domains certainly won’t work. You only want one fiber scheduler per domain.

I also don’t understand this bit. Should I only have one of T.run and Eio_main.run? Or should they be in a specific order?

Thanks for the quick response by the way

You can only run one scheduler in any given domain. The scheduler decides what fiber should run next and when to put the domain to sleep. It doesn’t really make sense to have two running at once for a single domain.

Domainslib doesn’t do IO. When the Domainslib scheduler runs out of things to do it stops and waits on a Condition for more work, but it doesn’t ask the OS if any IO is ready. So you can’t do (concurrent) IO from a Domainslib domain.

Your example is unusual in that the parallel_for doesn’t produce a single result, but instead produces lots of independent IO (in no particular order). If it collected the results together into a set of strings, it could return them with the promise, as I mentioned. If you want to trigger the IO as the computation goes along then, as @polytypic says, you could send them back to Eio for output using e.g. a Stream. This also ensures that you only write one message at a time (having multiple domains writing to stdout at one could interleave things oddly). For example:

open Eio.Std

module T = Domainslib.Task

let task pool sink =
  T.parallel_for ~start:0 ~finish:1024
    ~body:(fun i -> if i mod 2 = 0 then sink "Hello!\n")
    pool

let eio_main ~stdout ~pool =
  let stream = Eio.Stream.create max_int in
  let rec write_output () =
    match Eio.Stream.take stream with
    | None -> ()
    | Some msg ->
      Eio.Flow.copy_string msg stdout;
      write_output ()
  in
  Fiber.both
    write_output
    (fun () ->
       ignore @@
       Domainslib.Task.async pool (fun () ->
           task pool (fun s -> Eio.Stream.add stream (Some s));
           Eio.Stream.add stream None
         )
    )

let () =
  let pool =
    T.setup_pool ~num_domains:(Domain.recommended_domain_count () - 1) ()
  in
  Eio_main.run (fun env -> eio_main ~stdout:(Eio.Stdenv.stdout env) ~pool);
  T.teardown_pool pool

This relies on the fact that writing to an Eio.Stream can be done from a non-Eio domain as long as the stream isn’t full (so I created it with capacity max_int, making it effectively unbounded).

Depending on what you’re really trying to do, it might make more sense to replace the parallel_for with Eio code, though. It would also be good to provide something similar to a Domainslib worker pool in the Eio API.

Hmm… I’m afraid my initial example was too contrived and doesn’t reflect accurately what I’m trying to do.
A more involved snippet would be this:

module C = Domainslib.Chan
module T = Domainslib.Task

let task pool sink =
  (* Use eio for normal IO stuff before the parallel_for *)
  Eio.Flow.copy_string "hello\n" sink;
  let chan = C.make_unbounded () in
  T.parallel_for ~start:0 ~finish:1024
    ~body:(fun i ->
      (* No eio stuff happening inside the ~body *)
      let thing = expensive_computation i in
      (* Note that I expect only one result in total. *)
      Option.iter (C.send chan) thing)
    pool;
  let result = C.recv chan in
  (* Use eio for normal IO stuff after the parallel_for *)
  Eio.Flow.copy_string "goodbye\n" sink;
  use_result result

let () =
  let pool =
    T.setup_pool ~num_domains:(Domain.recommended_domain_count () - 1) ()
  in
  Eio_main.run (fun env ->
    T.run pool (fun () ->
      task pool env#stdout));
  T.teardown_pool pool

I hope it’s clear that I am not expecting Eio and Domainslib to “work together nicely”, but merely to “not stand in each other’s way”.

My use case is a parallel search (what I hoped parallel_scan would be when seeing the name), so in a way the parallel_for does produce a single result, but I just don’t know where this result will come from.

With this code, having both runs in either order, or just the Eio_main one doesn’t work with the expected errors

That should be easy enough, since the Eio bits are separate from the calculation. e.g.

(* Domainslib part *)

module C = Domainslib.Chan
module T = Domainslib.Task

let expensive_computation = function
  | 500 -> Some 42
  | _ -> None

let task pool =
  let chan = C.make_unbounded () in
  T.parallel_for ~start:0 ~finish:1024
    ~body:(fun i ->
      (* No eio stuff happening inside the ~body *)
      let thing = expensive_computation i in
      (* Note that I expect only one result in total. *)
      Option.iter (C.send chan) thing)
    pool;
  C.recv chan

(* Standard Domainslib / Eio bridge *)

open Eio.Std

let run_in_pool fn pool =
  let result, set_result = Promise.create () in
  let _ : unit Domainslib.Task.promise = Domainslib.Task.async pool (fun () ->
      Promise.resolve set_result @@
      match fn pool with
      | r -> Ok r
      | exception ex -> Error ex
    )
  in
  Promise.await_exn result

(* Eio part *)

let main pool sink =
  (* Use eio for normal IO stuff before the parallel_for *)
  Eio.Flow.copy_string "hello\n" sink;
  let result = run_in_pool task pool in
  (* Use eio for normal IO stuff after the parallel_for *)
  Eio.Flow.copy_string "goodbye\n" sink;
  traceln "Got %d" result

let () =
  let pool = T.setup_pool ~num_domains:(Domain.recommended_domain_count () - 1) () in
  Eio_main.run (fun env -> main pool env#stdout);
  T.teardown_pool pool

Wow that’s almost exactly what I needed, thanks a lot!
The resulting code, in a format that can be adapted to my real use-case, looks like this:

module T = Domainslib.Task

let task pool sink =
  (* Use eio for normal IO stuff before the parallel_for *)
  Eio.Flow.copy_string "hello\n" sink;
  let promise, resolver = Eio.Promise.create () in
  let _ =
    T.async pool (fun () -> (* <------- NEW! *)
        T.parallel_for ~start:0 ~finish:1024
          ~body:(fun i ->
            (* No eio stuff happening inside the ~body *)
            expensive_computation i
            |> Option.iter (Eio.Promise.resolve resolver))
            (* Note that I expect only one result in total. *)
          pool)
  in
  let result = Eio.Promise.await promise in
  (* Use eio for normal IO stuff after the parallel_for *)
  Eio.Flow.copy_string "goodbye\n" sink;
  use_result result

let () =
  let pool =
    T.setup_pool ~num_domains:(Domain.recommended_domain_count () - 1) ()
  in
  Eio_main.run @@ fun env ->
  task pool env#stdout;
  T.teardown_pool pool

So this is almost identical to what I had in mind and should be easy to adapt. Also switching from a domain-blocking Domainslib.Chan.recv to a fiber-blocking Eio.Promise.await is really nice.
From my tests I think I understood that there can be no “blocking” eio tasks inside the call to T.async, is that correct?

All of this brings me to what I hope is my last question:
If we modify expensive_computation to this:

let expensive_computation = function
  | 0 | 500 -> Some 42
  | _ -> None

I would expect to have a Invalid_argument thrown around because you can’t resolve the same promise twice. But somehow it doesn’t and returns 42 like normal. Do you know why?

Note that by doing this you’re throwing away the final result of the Domainslib function (with let _ =), which holds any exception it might raise. My version catches exceptions and passes them to Eio to avoid that.

I suspect because of the above.