Hello,
I am trying to build something using both eio and domainslib, and I’m quite stumped by their interaction.
According to the readme of eio, calls to various functions require a Eio.Stdenv.t, most likely given by Eio_main.run.
On the other hand, according to its documentation domainslib states that “[The run] function should be used at the top level to enclose the calls to other functions that may await on promises. This includes await, parallel_for and its variants. Otherwise, those functions will raise Unhandled exception.”
Armed with this, I wrote this MWE:
module T = Domainslib.Task
let task pool sink =
T.parallel_for ~start:0 ~finish:1024
~body:(fun i -> if i mod 2 = 0 then Eio.Flow.copy_string "Hello!\n" sink)
pool
let () =
let pool =
T.setup_pool ~num_domains:(Domain.recommended_domain_count () - 1) ()
in
T.run pool (fun () ->
Eio_main.run (fun env ->
let stdout = Eio.Stdenv.stdout env in
task pool stdout));
T.teardown_pool pool
Running this with eio = 0.9, domainslib = 0.5, ocaml = 5.0.0, on a machine where Domain.recommended_domain_count is 8 gets the following result:
So, only 8 Hellos instead of the expected 512, and a nice crash to boot.
At first I thought I had messed up my calls to run functions, and tried swapping Eio_main.run and T.run around to check.
Same error.
On more involved examples, I also got the following error:
Fatal error: exception Multiple exceptions:
- Invalid_argument("Cancellation context accessed from wrong domain!")
- Invalid_argument("exit: 1 request(s) still active!")
I don’t really understand how to proceed from here. Are Eio and Domainslib just not supposed to work together? Is there something obvious I missed?
Here’s an example Eio server that handles requests over TCP, and uses Domainlib to do some processing:
module T = Domainslib.Task
let pool = T.setup_pool ~num_domains:2 ()
(* Parallel Fibonacci computation *)
let rec fib_par n =
assert (n > 0);
let rec fib n =
if n < 2 then 1
else fib (n - 1) + fib (n - 2)
in
if n > 20 then begin
let a = T.async pool ( fun _ -> fib_par (n -1)) in
let b = T.async pool ( fun _ -> fib_par (n -2)) in
T.await pool a + T.await pool b
end else
fib n
open Eio.Std
let run_in_pool fn x =
let result, set_result = Promise.create () in
let _ : unit Domainslib.Task.promise = Domainslib.Task.async pool (fun () ->
Promise.resolve set_result @@
match fn x with
| r -> Ok r
| exception ex -> Error ex
)
in
Promise.await_exn result
let request r =
Eio.Buf_read.line r |> int_of_string
let main ~net addr =
Switch.run @@ fun sw ->
let sock = Eio.Net.listen ~sw ~backlog:5 net addr in
traceln "Listening on %a" Eio.Net.Sockaddr.pp addr;
(* Runs once per request in an Eio task *)
let handle conn _addr =
let n = Eio.Buf_read.parse_exn request conn ~max_size:10 in
traceln "Spawning calculation of fib %d..." n;
let result = run_in_pool fib_par n in
traceln "fib %d = %d" n result;
let reply = Printf.sprintf "%d\r\n" result in
Eio.Flow.copy_string reply conn
in
Eio.Net.run_server sock handle ~on_error:(traceln "Error handling connection: %a" Fmt.exn)
let () =
Eio_main.run @@ fun env ->
main (`Tcp (Eio.Net.Ipaddr.V4.loopback, 7000))
~net:env#net
Then use it with e.g.
echo 20 | nc 127.0.0.1 7000
(if you try it interactively, note that it waits for you to shut down the sending side before doing the calculation, so press Ctrl-D after entering the input)
Running an Eio mainloop inside a Domainslib task that Domainslib can migrate between domains certainly won’t work. You only want one fiber scheduler per domain.
… I don’t understand how to apply this to my example.
The web server aspect brings a lot of fluff so it’s hard to extract the useful bits.
I can’t really translate a call to parallel_for to several async and await, that would require manually redoing the “split the work between all threads” task myself.
I tried anyway, and my naive attempts get me
Fatal error: exception Stdlib.Effect.Unhandled(Eio_linux__Sched.Alloc)
# or
Fatal error: exception Stdlib.Effect.Unhandled(Domainslib__Task.Wait(_, _))
# or simply nothing (without prints)
My latest attempt here:
let task2 pool sink =
let nb_domains = T.get_num_domains pool in
let promise, resolver = Eio.Promise.create () in
for _ = 1 to nb_domains do
ignore
@@ T.async pool (fun () ->
Eio.Promise.resolve resolver
@@ for j = 0 to 128 do
if j mod 2 = 0 then Eio.Flow.copy_string "Hello!\n" sink
done)
done;
Eio.Promise.await promise
Hangs up indefinitely.
Now would also be a good time to mention that these exception backtraces are mostly uninformative.
Running an Eio mainloop inside a Domainslib task that Domainslib can migrate between domains certainly won’t work. You only want one fiber scheduler per domain.
I also don’t understand this bit. Should I only have one of T.run and Eio_main.run? Or should they be in a specific order?
You can only run one scheduler in any given domain. The scheduler decides what fiber should run next and when to put the domain to sleep. It doesn’t really make sense to have two running at once for a single domain.
Domainslib doesn’t do IO. When the Domainslib scheduler runs out of things to do it stops and waits on a Condition for more work, but it doesn’t ask the OS if any IO is ready. So you can’t do (concurrent) IO from a Domainslib domain.
Your example is unusual in that the parallel_for doesn’t produce a single result, but instead produces lots of independent IO (in no particular order). If it collected the results together into a set of strings, it could return them with the promise, as I mentioned. If you want to trigger the IO as the computation goes along then, as @polytypic says, you could send them back to Eio for output using e.g. a Stream. This also ensures that you only write one message at a time (having multiple domains writing to stdout at one could interleave things oddly). For example:
open Eio.Std
module T = Domainslib.Task
let task pool sink =
T.parallel_for ~start:0 ~finish:1024
~body:(fun i -> if i mod 2 = 0 then sink "Hello!\n")
pool
let eio_main ~stdout ~pool =
let stream = Eio.Stream.create max_int in
let rec write_output () =
match Eio.Stream.take stream with
| None -> ()
| Some msg ->
Eio.Flow.copy_string msg stdout;
write_output ()
in
Fiber.both
write_output
(fun () ->
ignore @@
Domainslib.Task.async pool (fun () ->
task pool (fun s -> Eio.Stream.add stream (Some s));
Eio.Stream.add stream None
)
)
let () =
let pool =
T.setup_pool ~num_domains:(Domain.recommended_domain_count () - 1) ()
in
Eio_main.run (fun env -> eio_main ~stdout:(Eio.Stdenv.stdout env) ~pool);
T.teardown_pool pool
This relies on the fact that writing to an Eio.Stream can be done from a non-Eio domain as long as the stream isn’t full (so I created it with capacity max_int, making it effectively unbounded).
Depending on what you’re really trying to do, it might make more sense to replace the parallel_for with Eio code, though. It would also be good to provide something similar to a Domainslib worker pool in the Eio API.
Hmm… I’m afraid my initial example was too contrived and doesn’t reflect accurately what I’m trying to do.
A more involved snippet would be this:
module C = Domainslib.Chan
module T = Domainslib.Task
let task pool sink =
(* Use eio for normal IO stuff before the parallel_for *)
Eio.Flow.copy_string "hello\n" sink;
let chan = C.make_unbounded () in
T.parallel_for ~start:0 ~finish:1024
~body:(fun i ->
(* No eio stuff happening inside the ~body *)
let thing = expensive_computation i in
(* Note that I expect only one result in total. *)
Option.iter (C.send chan) thing)
pool;
let result = C.recv chan in
(* Use eio for normal IO stuff after the parallel_for *)
Eio.Flow.copy_string "goodbye\n" sink;
use_result result
let () =
let pool =
T.setup_pool ~num_domains:(Domain.recommended_domain_count () - 1) ()
in
Eio_main.run (fun env ->
T.run pool (fun () ->
task pool env#stdout));
T.teardown_pool pool
I hope it’s clear that I am not expecting Eio and Domainslib to “work together nicely”, but merely to “not stand in each other’s way”.
My use case is a parallel search (what I hoped parallel_scan would be when seeing the name), so in a way the parallel_for does produce a single result, but I just don’t know where this result will come from.
With this code, having both runs in either order, or just the Eio_main one doesn’t work with the expected errors
That should be easy enough, since the Eio bits are separate from the calculation. e.g.
(* Domainslib part *)
module C = Domainslib.Chan
module T = Domainslib.Task
let expensive_computation = function
| 500 -> Some 42
| _ -> None
let task pool =
let chan = C.make_unbounded () in
T.parallel_for ~start:0 ~finish:1024
~body:(fun i ->
(* No eio stuff happening inside the ~body *)
let thing = expensive_computation i in
(* Note that I expect only one result in total. *)
Option.iter (C.send chan) thing)
pool;
C.recv chan
(* Standard Domainslib / Eio bridge *)
open Eio.Std
let run_in_pool fn pool =
let result, set_result = Promise.create () in
let _ : unit Domainslib.Task.promise = Domainslib.Task.async pool (fun () ->
Promise.resolve set_result @@
match fn pool with
| r -> Ok r
| exception ex -> Error ex
)
in
Promise.await_exn result
(* Eio part *)
let main pool sink =
(* Use eio for normal IO stuff before the parallel_for *)
Eio.Flow.copy_string "hello\n" sink;
let result = run_in_pool task pool in
(* Use eio for normal IO stuff after the parallel_for *)
Eio.Flow.copy_string "goodbye\n" sink;
traceln "Got %d" result
let () =
let pool = T.setup_pool ~num_domains:(Domain.recommended_domain_count () - 1) () in
Eio_main.run (fun env -> main pool env#stdout);
T.teardown_pool pool
Wow that’s almost exactly what I needed, thanks a lot!
The resulting code, in a format that can be adapted to my real use-case, looks like this:
module T = Domainslib.Task
let task pool sink =
(* Use eio for normal IO stuff before the parallel_for *)
Eio.Flow.copy_string "hello\n" sink;
let promise, resolver = Eio.Promise.create () in
let _ =
T.async pool (fun () -> (* <------- NEW! *)
T.parallel_for ~start:0 ~finish:1024
~body:(fun i ->
(* No eio stuff happening inside the ~body *)
expensive_computation i
|> Option.iter (Eio.Promise.resolve resolver))
(* Note that I expect only one result in total. *)
pool)
in
let result = Eio.Promise.await promise in
(* Use eio for normal IO stuff after the parallel_for *)
Eio.Flow.copy_string "goodbye\n" sink;
use_result result
let () =
let pool =
T.setup_pool ~num_domains:(Domain.recommended_domain_count () - 1) ()
in
Eio_main.run @@ fun env ->
task pool env#stdout;
T.teardown_pool pool
So this is almost identical to what I had in mind and should be easy to adapt. Also switching from a domain-blocking Domainslib.Chan.recv to a fiber-blocking Eio.Promise.await is really nice.
From my tests I think I understood that there can be no “blocking” eio tasks inside the call to T.async, is that correct?
All of this brings me to what I hope is my last question:
If we modify expensive_computation to this:
let expensive_computation = function
| 0 | 500 -> Some 42
| _ -> None
I would expect to have a Invalid_argument thrown around because you can’t resolve the same promise twice. But somehow it doesn’t and returns 42 like normal. Do you know why?
Note that by doing this you’re throwing away the final result of the Domainslib function (with let _ =), which holds any exception it might raise. My version catches exceptions and passes them to Eio to avoid that.