How to safely take from multiple Eio streams?

I’m new to concurrency programming in OCaml and I’m currently playing with Eio streams, which I understand are similar to channels in Go.

In Go, if I have multiple channels to receive from, I would use select like this:

select {
    case e1 := <-c1:
        handleEvent(e1)
    case e2 := <-c2:
        handleEvent(e2)
}

My understanding is that with Eio, I’d use Fiber.any instead of select:

Fiber.any [
  (fun () -> let e1 = Stream.take s1 in handle_event e1);
  (fun () -> let e2 = Stream.take s2 in handle_event e2);
]

What’s confusing me is that, in the Eio documentation, it’s stated that:

Warning: it is always possible that both operations will succeed. This is because there is a period of time after the first operation succeeds when it is waiting in the run-queue to resume during which the other operation may also succeed.

But if that’s the case, how am I supposed to ensure that, if both operations succeed, then both handle_event run to completion? For example, say the following happens:

  • Stream s1 yields first, so handle_event e1 runs
  • Stream s2 yields at the same time, so handle_event e2 also runs
  • handle_event e2 yields internally waiting for some IO
  • Fiber.any tries to cancel the second fiber because handle_event e1 has returned

In this case, it sounds like handle_event s2 would be interrupted. But now, an event has been taken from s2, but handle_event s2 never ran to completion because it was cancelled by Fiber.any.

Could this happen? If so, how do I ensure safety with my concurrent code, so that event handlers are never interrupted before they could run to completion? Note that this could not happen in Go because select always receives from one channel only.

There are two issues that I see in your code (without being an expert in Eio).

First, you probably want to take handle_event out of the closure, since once you have started handle_event you probably want to run it until completion, and not cancelled if the other fiber finishes first. Second, about the possibility of two operations suceeding, I thought that’s what n_any was for. In short, I would write something like:

List.iter handle_event
  (Fiber.n_any (fun () -> Stream.take s1) (fun () -> Stream.take s2))

Cheers,
Nicolas

2 Likes

select isn’t implemented yet. However, you can usually work around it by having all the producers write to a single stream, rather than having the consumer select from several streams.

1 Like

Thanks @nojb! That’s a great idea.

Thanks @talex5 – having all producers write to the same stream certainly works, but can you confirm if what @nojb suggested will also 100% ensure that all events are handled?

List.iter handle_event
  (Fiber.n_any (fun () -> Stream.take s1) (fun () -> Stream.take s2))