Emitting a stream from Eio.Buf_read.parse, and other Eio questions

I’m trying to transform incoming bytes into a stream of events (similar to how markup.ml emits XML/HTML “signals” (start/end tags, attributes, text nodes, etc.)). If my event type is signal, should I write a function that implements (signal Stream.t) Buf_read.parser, and then hand that to Buf_read.parse? Or should I implement unit Buf_read.parser and have it call an emit function internally? Or is there some other stream → stream API more catered to this? The reason I’m doubting myself is because I’m unsure if Buf_read.parse is meant more for reducing a bytestream down to a single value, or if it’s also meant for producing a stream itself.

(Sorry if I’ve got anything terribly mixed up here, I’m new to this stuff.)

Edit: I guess now that I’ve written this, I realized I’ve basically been searching for a Stream.filter_map-esque function, except that Flow.source and Stream.t are separate types. Edit 2: also except that it would need an accumulator or something, LOL. I’m not good with all the Scala-y higher order list/stream functions.

Also, if I have a recursive function using Buf_read/Buf_write functions to parse/serialize some recursive structure, should I be calling Fiber.yield () on my own? (I’m guessing Buf_write.string, etc. are yielding themselves, so maybe I don’t need to yield myself outside of calling those functions? Just want to double-check my understanding.)

Side note: it’s nice having Angstrom- and Faraday-esque functionality builtin to Eio, very convenient! I hadn’t realized that was there before I started browsing the API.

1 Like

So, I made some progress and realized (signal Stream.t) Buf_read.parser doesn’t really make sense - Buf_read.parse will only return its result (a signal Stream.t in this case) once the given parser finishes. So my parser would just push all of its resulting signals into the stream (as long as the capacity given to Stream.create allowed it) before parse could return and the stream and the consumer could start reading from it.

So I just made it a unit Buf_read.parser and it pushes to a signal Stream.t. (The producer and consumer are started with Fiber.both and just share a reference to the stream.) It feels a little weird to call Buf_read.parse entirely for side effects (pushing into the stream), but it works! I still wonder though if there’s a more natural approach.

You probably want a signal Seq.t parser. That way, it is the consumer reading from the sequence that causes the next item to be parsed. For example, Buf_read.lines works that way.

If you’re parsing a complex recursive data-structure, you do need to track the current state yourself though. I’m thinking of adding a Fiber.fork_seq function to simplify that kind of thing.

At the moment, Buf_write doesn’t suspend when it gets behind (it will just buffer forever), but probably that should be improved.

With that said, a unit Buf_read.parser may well be what you want here. Though it might be clearer to expand the type and simply describe it as a Buf_read.t -> unit function in that case, to emphasise that its purpose is to perform a side effect, rather than to parse a unit.