How to stream data in cohttp_eio server

Hello :waving_hand:

I am trying to implement Server Sent Events with Cohttp_eio.

The theory is pretty simple: Stream data periodically to the client. But I am not sure, how I can send data to a Eio.Flow.source_ty (which is the required type of the body for an Cohttp_eio response).
I guess I could use a Eio.Flow.two_way, but I don’t know how I can construct a value of such a type.

In the code snippet below, the body should be a stream of multiple such strings and the connection should stay open, until the client or the server decide to close it.

Cohttp_eio.Server.respond
  ~status:`OK
  ~body:(Body.of_string (Printf.sprintf "event: %s\ndata: %s\n\n" "test" "some data"))
  ~headers:
    (Header.of_list
       [ "Content-Type", "text/event-stream"
       ; "Cache-Control", "no-cache"
       ; "Connection", "keep-alive"
       ])
  ()

So how can I create a stream of type Eio.Flow.source_ty in cohttp_eio, where I can decide when to push new data and when to close the stream?

Just for reference: Here is the MDN documentation for Server Sent Events with an example server implementation in php.

Looks like they want to use the ā€˜expert’ server type to stream responses: cohttp-eio 6.1.1 Ā· OCaml Package

I have also found this example in the cohttp repo: ocaml-cohttp/cohttp-eio/examples/server2.ml at main Ā· mirage/ocaml-cohttp Ā· GitHub

This seems to stream data using the ā€žnormalā€œ server, but it still takes a Flow.source_ty as the body, where I am not sure how to append data to.

If you don’t mind, httpcats offers an API that allows you to stream content from a sequence, see the documentation:

let seq_of_filename filename =
  let ic = open_in_bin filename in
  let buf = Bytes.create 0x7ff in
  let rec go () =
    let len = input ic buf 0 (Bytes.length buf) in
    if len = 0 then
      let () = close_in ic in Seq.Nil
    else
      let str = Bytes.sub_string buf 0 len in
      Seq.Cons (str, go) in
  go

let run () =
  let body = Httpcats.stream (seq_of_filename "form.txt") in
  Httpcats.request ~uri:"http://foo.bar" ~body ...

It’s a bit confusing, but in cohttp you provide the response body and cohttp reads from it and sends it. i.e. cohttp pulls data from your application rather than you pushing it. Often you want it the other way around, and that requires a bit of work.

I don’t think there’s a function for this (there probably should be), but you can do it manually:

module Buf_write_source = struct
  type t = Eio.Buf_write.t

  let read_methods = []  (* todo: optimisations *)

  let single_read t buf =
    let bufs = Eio.Buf_write.await_batch t in
    let len, _ = Cstruct.fillv ~src:bufs ~dst:buf in
    Eio.Buf_write.shift t len;
    len
end

let source_of_buf_write =
  let handler = Eio.Flow.Pi.source (module Buf_write_source) in
  fun b -> Eio.Resource.T (b, handler)

and use it as e.g.

let handler _socket _request _body writer =
  let bw = Eio.Buf_write.create 4096 in
  Switch.run @@ fun sw ->
  Fiber.fork ~sw (fun () ->
      Eio.Buf_write.string bw "Hello";
      Eio.Buf_write.close bw;
    );
  Cohttp_eio.Server.respond () ~status:`OK
    ~headers:(Http.Header.of_list [ ("content-type", "text/html") ])
    ~body:(source_of_buf_write bw) writer

The forked fiber will run in parallel with the respond, so you should be able to stream things.

1 Like

Thank you :slight_smile:
the source_of_buf_write was my missing puzzle piece. It is working now.

Just so I understand. This is a functions that is missing in Eio or in Cohttp?


I would love to try it. httpcats looks pretty nice. But as I am depending on other libraries (irmin, multipart_form, websocket, …) which are all written in either lwt or eio (or are currently transitioning to eio), I am kind of forced to use cohttp. Its a bit unfortunate, that there is this ecosystem split again, but I guess there is nothing we (or I) can do :confused:

1 Like

@dinosaure I may be wrong because I don’t know the details of Httpcats.request but by reading your code snippet I’d suspect it may leak file descriptors (e.g. if the server abrubtly closes the connection).

I think you are right and I should use Fun.protect in that case :slight_smile:. I will update the documentation, thanks.

I’m not sure exactly how Fun.protect helps here.

Your problem is that you are using the famously broken ā€œlazy IOā€ paradigm, which is exactly the kind of thing I feared people would do when Seq was introduced in the stdlib. Seq looks enticing but it’s almost all of the time the wrong thing to use.

You need to treat your body data structure as a ressource, you can’t hide the ressource management in the closures of your Seqs because they may end up not being exhausted.

A Fun.protect on top (into the run function and not into the Seq.t closure) should help I think.

Yes that can work but it’s an entirely different code structure, it’s a bracket over your file open:

seq_with_open_file : string -> (string Seq.t -> 'a) -> 'a 

(Where the Seq.t is only valid during the call of the function given to the bracket)

That may be workable for bodies of requests (clients) but I don’t think that’s workable for bodies of responses (servers) (assuming you share the same body datastructure).

2 Likes

Yes you are totally right but I don’t share the same ā€œbodyā€ data-structure between the client and the server side of httpcats. The API is still a bit low level (we don’t handle things like JSON or multipart/form-data).

About ressource management, miou proposes an Ownership value which ensures that the finaliser is called if the task is cancelled (and, on the happy-path, you must release and disown the resource explicitely). Note that I’m not sure about this interface of Miou but I started to play with it and flux specially to create some kinds of sources (and, for higher expectations, it’s preferable to use these than a poor Seq.t).

I think we are a bit unrelated to the topic but I will definitely update the documentation on httpcats and if you want to share some insights about resources management, issues (on Miou) are opens :slight_smile: .

1 Like

Eio.Buf_write should probably have something like this to use a buffered writer as a source.

Probably cohttp-eio should also provide a function to make streaming easier.

I think it would be helpful to have a concept of an ā€˜Eio pipe’ (as opposed to an OS pipe) so that we could create a pair of (source, sink), write to the sink, and read out of the source.

1 Like

If you don’t mind, could you share what you came up with? I am very keen on seeing it. I tried patching together the snippets from this thread but couldn’t anything to work.

Here’s a complete version:

open Eio.Std

module Buf_write_source = struct
  type t = Eio.Buf_write.t

  let read_methods = []

  let single_read t buf =
    let bufs = Eio.Buf_write.await_batch t in
    let len, _ = Cstruct.fillv ~src:bufs ~dst:buf in
    Eio.Buf_write.shift t len;
    len
end

let source_of_buf_write =
  let handler = Eio.Flow.Pi.source (module Buf_write_source) in
  fun b -> Eio.Resource.T (b, handler)

let handler _socket _request _body writer =
  let bw = Eio.Buf_write.create 4096 in
  Switch.run @@ fun sw ->
  Fiber.fork ~sw (fun () ->
      for i = 1 to 10 do
        Eio.Buf_write.printf bw "i=%d\n" i;
        Eio_unix.sleep 1.0;
      done;
      Eio.Buf_write.close bw;
    );
  Cohttp_eio.Server.respond () ~status:`OK
    ~headers:(Http.Header.of_list [ ("content-type", "text/plain") ])
    ~body:(source_of_buf_write bw) writer

let () =
  Eio_main.run @@ fun env ->
  Eio.Switch.run @@ fun sw ->
  let addr = `Tcp (Eio.Net.Ipaddr.V4.loopback, 8080) in
  let socket = Eio.Net.listen env#net ~sw ~backlog:128 ~reuse_addr:true addr in
  let server = Cohttp_eio.Server.make ~callback:handler () in
  Cohttp_eio.Server.run socket server
    ~on_error:(fun ex -> Logs.warn (fun f -> f "%a" Eio.Exn.pp ex))
1 Like

Question: why not

Switch.run @@ fun sw ->
let bw = Eio.Buf_write.create ~sw 4096 in
Fiber.fork ~sw (fun () ->
...

?

That’s fine too, and probably a good habit. In the code given it shouldn’t make any difference because by the time the switch is turned off both fibers are either already finished or getting cancelled.

Here’s a slightly tidier version that does that and also splits the forking logic into respond_stream:

let respond_stream ?headers ~status writer fn =
  Switch.run @@ fun sw ->
  let bw = Eio.Buf_write.create ~sw 4096 in
  Fiber.fork_daemon ~sw
    (fun () -> fn bw; Eio.Buf_write.close bw; `Stop_daemon);
  Cohttp_eio.Server.respond () ?headers ~status writer
    ~body:(source_of_buf_write bw)

let handler _socket _request _body writer =
  let headers = Http.Header.of_list [("content-type", "text/plain")] in
  respond_stream writer ~headers ~status:`OK @@ fun bw ->
  for i = 1 to 10 do
    Eio.Buf_write.printf bw "i=%d\n" i;
    Eio_unix.sleep 1.0;
  done

(I also used fork_daemon so that if respond returns successfully without consuming the whole buffer then the writer fiber gets cancelled, although I don’t think cohttp-eio ever does that)

1 Like

I assume this was left in accidentally?

No; the generating fiber needs to tell the respond fiber that there is no more data to send.

Otherwise, respond will wait forever for more data (the switch doesn’t finish until it has finished).

1 Like