Eio, streams, clean termination

Imagine a system with a producer reading an indeterminate batch of jobs, which
are processed by workers before being consumed. Each job might generate 0…n
results and we don’t particularly care about order. At the end of a batch it
would be useful for the consumer to log some statistics of results processed.

I currently have code something like this:

  let work_stream = Eio.Stream.create n in
  let results_stream = Eio.Stream.create n in
  Eio.Fiber.all [
    (fun () -> work_producer id_source work_stream);
    (fun () -> workers work_stream results_stream);
    (fun () -> results_consumer results_stream result_sender);

Now, reading from a
stream is blocking and there is no “EOF” equivalent - you can’t close streams. That means we need to know there are no more results going to be available or our results_consumer will block forever.

It’s not enough to know that the producer has finished reading from its source,
you also need to know there are no active workers or queued jobs or results.

I have ended up with a set of mutex-protected counters (one per stream and fiber) and when all (except the consumer) reach zero then we are finished.

But… that feels overly complicated. What am I missing? Were streams the wrong tool for the job? Is the pipeline structured incorrectly?


Have the producer write in the result stream whenever it submits a new job and also when it is done submitting new jobs. Have the workers write whenever they complete a job. Then the consumer can just count the number of events in the result stream.

Another option would be to write one “stop” message to work_stream for each worker. Each worker exits after reading a stop message, so each will only take one.

(note: Eio.Executor_pool cheats; it uses an internal stream API that does provide close. I’d like to make this public in the future.)

Thanks for the suggestions both - I’ll try them out at some point in the next few days.

Was there a particular reason for not exposing close in the standard Switch api @talex5 ? Or is it just that it is always easier to expand an API’s interface than to reduce an overly large one?

Internally, Stream has two implementations, depending on whether you ask for a 0-capacity stream or not. So far, I only added close to the 0-capacity one (but doing the other would be easy). We might also want separate types for closable and non-closable streams.

And then if we’re changing the API then we might want other changes too (see Consider renaming Stream · Issue #267 · ocaml-multicore/eio · GitHub)… just needs a bit of thinking about.