Wow that’s almost exactly what I needed, thanks a lot!
The resulting code, in a format that can be adapted to my real use-case, looks like this:
module T = Domainslib.Task
let task pool sink =
(* Use eio for normal IO stuff before the parallel_for *)
Eio.Flow.copy_string "hello\n" sink;
let promise, resolver = Eio.Promise.create () in
let _ =
T.async pool (fun () -> (* <------- NEW! *)
T.parallel_for ~start:0 ~finish:1024
~body:(fun i ->
(* No eio stuff happening inside the ~body *)
expensive_computation i
|> Option.iter (Eio.Promise.resolve resolver))
(* Note that I expect only one result in total. *)
pool)
in
let result = Eio.Promise.await promise in
(* Use eio for normal IO stuff after the parallel_for *)
Eio.Flow.copy_string "goodbye\n" sink;
use_result result
let () =
let pool =
T.setup_pool ~num_domains:(Domain.recommended_domain_count () - 1) ()
in
Eio_main.run @@ fun env ->
task pool env#stdout;
T.teardown_pool pool
So this is almost identical to what I had in mind and should be easy to adapt. Also switching from a domain-blocking Domainslib.Chan.recv to a fiber-blocking Eio.Promise.await is really nice.
From my tests I think I understood that there can be no “blocking” eio tasks inside the call to T.async, is that correct?
All of this brings me to what I hope is my last question:
If we modify expensive_computation to this:
let expensive_computation = function
| 0 | 500 -> Some 42
| _ -> None
I would expect to have a Invalid_argument thrown around because you can’t resolve the same promise twice. But somehow it doesn’t and returns 42 like normal. Do you know why?