[ANN] First release of streaming

Hi all! :wave:

It is my pleasure to announce the first public release of streaming – a library for building efficient, incremental data processing pipelines that compose and don’t leak resources.

I built streaming as a result of many experiments with different streaming and iteration models for OCaml. There are multiple packages on OPAM that share some of the goals of streaming (we even have Stdlib.Seq now!), but none of them combine (1) excellent performance, (2) safe resource handling and (3) pure functional style for combinators. Streaming solves these problems by implementing three basic and independent models: sources, sinks and flows – they represents different parts of the pipeline that correspond to producing, consuming and transforming elements. These models can be defined and composed independently to produce reusable “streaming blocks”.

The library defines a central Stream model that relies on sources, sinks and flows. This model is a push-based iterator with performance characteristics similar to the iter iterator, which has type ('a -> unit) -> unit, and is known for being very efficient. But unlike iter, it has a pure functional core (no need to use mutable state and exceptions for flow control!) and can handle resource allocation and clean up in a lazy and deterministic way. All of this while having a slightly better performance for common stream operations.

For those who are curious about the performance characteristics of streaming and other models, I created a dedicated repository for stream benchmarks: https://github.com/rizo/streams-bench. In particular, it includes a few simple benchmarks for Gen, Base.Sequence, Stdlib.Seq, Iter, Streaming.Stream and Streaming.Source.

The library should soon be published on opam. In the meantime, I invite you to read the docs and explore the code:

Questions, opinions and suggestions are welcome!

Happy streaming!


That’s great ! From the benchmarks, it looks like you hit a really good implementation !

I’ve looked (maybe a bit fast) at the API documentation, and it is admittedly a bit outside the scope of streams/iterators, but I was wondering if there was some proper way to:

  • connect a sink to a source to create some loop
  • have some kind of fixpoint on streams

I guess it would always be possible to use some references and/or some complex functions to encode these into the provided API, but I was wondering if there was a clean way to do it.

For a bit of context and explanation, what I have in mind is the case of a program (let’s say a type-checker or something close to the idea) with a persistent state, that should operate over a stream of inputs, which are top-level phrases, and produce some outputs, for instance print some result for each correctly type-checked statement (and an error otherwise).
The type-checker would basically be a function of type (`input * `state) -> (`output * `state), and starting from an initial state, it would process an input element (giving the output to some sink), and then the next input element would be processed with the state that was reached after processing the previous element: the state would reach the sink of the flow, and then be inserted back into the source.
Separately, imagine the language being type-checked has a notion of include, then one of the step of the flow would be to expand each include into a stream of inputs/phrases, but each of the phrases in this stream would need to be expanded, so a simple flat_map/flatten is not enough.

I already have a custom implementation that handle these features, but I was wondering whether I could use streaming to handle most of the code linking all of the steps, ^^

This looks awesome ! Just so you know : in this page https://odis-labs.github.io/streaming/streaming/index.html at section Examples there is this piece of code :

 let items =
    let* n = Stream.range 1 3 in
    let* c = Stream.of_list ['x'; 'y'] in
    yield (n, c)

This does not compile without a open Stream.Syntax somewhere (probably a let open Stream.Syntax just after let items = ). You probably want to add it :slight_smile:

1 Like

Thanks for your questions, @zozozo!

if there was some proper way to:

  • connect a sink to a source to create some loop
  • have some kind of fixpoint on streams

Regarding the first point: yes! That’s exactly the point of the Stream module. You see, sources are pull-based abstractions, while sinks are push-based. Source’s type essentially says something like “I might give you some data, if you ask”, while sink’s type is the opposite “I might take some data, if you give it to me”. They are completely and intentionally decoupled; it is Stream’s role to drive the computation by pulling data from sources and pushing it into sinks. So the easiest way to connect them is:

Stream.(from srouce |> into sink)

Of course, that’s not very useful per se, but it illustrates my point. Take a look at the Stream.from code to see the implementation of the loop you’re asking for. It does some extra work to ensure that resources are correctly handled, but it should be clear what the loop is doing.

The stream types in the library are currently abstract because I didn’t want to commit to a particular representation just yet. If this is a problem for your use case, let me know, I’ll expose them in a Private module.

Regarding the second point: I’m not sure what you mean in practice by “fixpoint on streams”. I guess one thing that could help implement something like that is the Stream.run function. It allows you to continue reading elements from a source even after a sink is filled by returning a leftover stream. This stream can be used with Stream.run repeatedly.

Alternatively there’s also Flow.through, which consumes input trying to fill sinks repeatedly and produces their aggregated values as a stream. Super useful for things like streaming parsing. Might even help with your use-case for top-level phrases.

On a more general note though, the type ('input * 'state) -> ('output * 'state) looks a lot like a mealy machine. Streaming.Sink is a moore machine, which is slightly less flexible because the output values do not depend on input values, only on the state.

I thought about exposing different kinds of sinks in streaming, but wanted to make sure that the common use cases are covered first. I’ll keep your case in mind for future versions of the library.

Let me know if this helps.

1 Like

Thanks, @nico_toll! Should be fixed now.

Whoa. Will test it )))

I did not follow this thread, but this sounds like a rather negative comment. It is a bit unfair to write this without explanation. Can you please expand on what you mean ?


I’d be very glad to hear more details and try to improve the library. Being an initial release, I’m aware that there’s still a lot of missing functionality. For example, I’m currently looking into buffered stream processing, a back pressure mechanism and concurrency support for streaming. I personally think the library provides a good foundation for these features. Let me know if (and why) you disagree :slight_smile: