Hi.
I’ve noticed some strange behavior in Eio fibers. Consider the following simple program:
let producer stream =
while true do
Eio.Stream.add stream "asdfasdfasdf\n"
done
let consumer stream =
while true do
let s = Eio.Stream.take stream in
let c = String.get s 100 in
Eio.traceln "%c\n" c
done
let main ~pool ~sw =
let stream = Eio.Stream.create 5 in
let producer_task =
Eio.Executor_pool.submit_fork ~sw pool ~weight:0.2 (fun () ->
producer stream)
in
let consumer_task =
Eio.Executor_pool.submit_fork ~sw pool ~weight:0.2 (fun () ->
consumer stream)
in
let promises = [ producer_task; consumer_task ] in
List.iter promises ~f:(fun p -> Eio.Promise.await_exn p)
let () =
Eio_main.run @@ fun env ->
Eio.Switch.run @@ fun sw ->
let pool =
Eio.Executor_pool.create ~sw (Eio.Stdenv.domain_mgr env) ~domain_count:2
in
main ~pool ~sw
We have two fibers here, a shared stream between them, and an executor pull. In fiber consumer, we try to access a character outside the bounds of the s string, but the program doesn’t throw an “index out of bounds” exception – it just hangs.
If I try to get a symbol with wrong index in a first coroutine, the program does crash. If I don’t read the string from the stream, but create it locally, the program crashes. Why does the lock happen?
List.iter is telling it to wait for the producer to finish before examining the consumer. But the producer runs forever (waiting for the consumer to make space in the queue).
You could use Fiber.all instead of List.iter to wait for them all at the same time. You could then simplify it a bit:
let main ~pool =
let stream = Eio.Stream.create 5 in
let producer_task () =
Eio.Executor_pool.submit_exn pool ~weight:0.2 (fun () ->
producer stream)
in
let consumer_task () =
Eio.Executor_pool.submit_exn pool ~weight:0.2 (fun () ->
consumer stream)
in
Eio.Fiber.all [ producer_task; consumer_task ]
(and if you only have two tasks, Fiber.both is a bit simpler too)
It hangs because the consumer stops reading from the stream after getting an exception, then the producer stops because the stream fills up.
Normally, the exception would propagate upwards and terminate the program (which is what happens with the code I posted above).
But when you use submit_fork, you are asking it to catch any exception and store it in a promise for you to deal with later. Your code does that (Eio.Promise.await_exn), but not until after the producer has finished, which never happens.
I tried to reproduce your receipt with submit_exn and Eio.Fiber.all and now my program hangs always:
let producer stream =
while true do
Eio.Stream.add stream "asdfasdfasdf\n"
done
let consumer stream =
while true do
let s = Eio.Stream.take stream in
Eio.traceln "Received %s\n" s
done
let main ~pool =
let stream = Eio.Stream.create 5 in
let producer_task =
Eio.Executor_pool.submit_exn pool ~weight:0.2 (fun () ->
producer stream)
in
let consumer_task =
Eio.Executor_pool.submit_exn pool ~weight:0.2 (fun () ->
consumer stream)
in
Eio.Fiber.all [producer_task; consumer_task ]
let () =
Eio_main.run @@ fun env ->
Eio.Switch.run @@ fun sw ->
let pool =
Eio.Executor_pool.create ~sw (Eio.Stdenv.domain_mgr env) ~domain_count:2
in
main ~pool