Lwt alternative to async.pipe

Hey all, I’m new to the ocaml ecosystem and I’m trying to get my bearings on libraries and conventions around common operations.

I have some code that downloads a lot of large files. Currently I’m doing this with Cohttp and Async using Cohttp_async.Response.to_pipe. Coming from a systems programming world, this is a really straight forward API.

Using it in combination with Async.Pipe.iter, does does exactly what one would assume. The writer blocks intermittently, keeping memory use low while the reader picks data off the fifo. Using it in combination with output channels lets you write it out to a file, or open up the standard input of another program (super handy). And when I say blocking the writer, I mean the literal thread handling that work. This is all “hidden” by the async libraries.

I haven’t figured out how to approach this in Lwt yet; enqueue data into a fifo, blocking the writer so memory usage stays low, and then have the ability do dequeue that data into an arbitrary file descriptor.

If anyone has input, I’d greatly appreciate it. In the meantime, I’ll keep looking through the docs.

Hi Matt. Welcome. I have noticed Lwt is missing an equivalent to Async Pipe as well. Discussed previously, very briefly, here: Migrating an Async project to Lwt, a short primer

I haven’t dived into any of those suggestions yet but if you do please report back on how it went! :slight_smile:

Did you checked out lwt-pipe?

Thanks for the suggestions. I was able to figure out lwt_stream. I came up with something similar to the following.

aux_http header uri
>>= function
  | l ->
      Lwt_stream.iter_s
        (fun b ->
          let buf = Bytes.of_string b in
          let len = Bytes.length buf in
          Lwt_io.write_from process#stdin buf 0 len
          |> fun _ -> return ())
        l

which is similar to the original async version I was working with, though not quite as terse.

aux_http header uri
>>= function
  | l -> (
    Async.Pipe.iter l ~f:(fun b ->
        return
          (Async_unix.Writer.write_bytes process_oc
             (Bytes.of_string b)))
1 Like