I am trying to implement Server Sent Events with Cohttp_eio.
The theory is pretty simple: Stream data periodically to the client. But I am not sure, how I can send data to a Eio.Flow.source_ty (which is the required type of the body for an Cohttp_eio response).
I guess I could use a Eio.Flow.two_way, but I donāt know how I can construct a value of such a type.
In the code snippet below, the body should be a stream of multiple such strings and the connection should stay open, until the client or the server decide to close it.
If you donāt mind, httpcats offers an API that allows you to stream content from a sequence, see the documentation:
let seq_of_filename filename =
let ic = open_in_bin filename in
let buf = Bytes.create 0x7ff in
let rec go () =
let len = input ic buf 0 (Bytes.length buf) in
if len = 0 then
let () = close_in ic in Seq.Nil
else
let str = Bytes.sub_string buf 0 len in
Seq.Cons (str, go) in
go
let run () =
let body = Httpcats.stream (seq_of_filename "form.txt") in
Httpcats.request ~uri:"http://foo.bar" ~body ...
Itās a bit confusing, but in cohttp you provide the response body and cohttp reads from it and sends it. i.e. cohttp pulls data from your application rather than you pushing it. Often you want it the other way around, and that requires a bit of work.
I donāt think thereās a function for this (there probably should be), but you can do it manually:
module Buf_write_source = struct
type t = Eio.Buf_write.t
let read_methods = [] (* todo: optimisations *)
let single_read t buf =
let bufs = Eio.Buf_write.await_batch t in
let len, _ = Cstruct.fillv ~src:bufs ~dst:buf in
Eio.Buf_write.shift t len;
len
end
let source_of_buf_write =
let handler = Eio.Flow.Pi.source (module Buf_write_source) in
fun b -> Eio.Resource.T (b, handler)
and use it as e.g.
let handler _socket _request _body writer =
let bw = Eio.Buf_write.create 4096 in
Switch.run @@ fun sw ->
Fiber.fork ~sw (fun () ->
Eio.Buf_write.string bw "Hello";
Eio.Buf_write.close bw;
);
Cohttp_eio.Server.respond () ~status:`OK
~headers:(Http.Header.of_list [ ("content-type", "text/html") ])
~body:(source_of_buf_write bw) writer
The forked fiber will run in parallel with the respond, so you should be able to stream things.
Thank you
the source_of_buf_write was my missing puzzle piece. It is working now.
Just so I understand. This is a functions that is missing in Eio or in Cohttp?
I would love to try it. httpcats looks pretty nice. But as I am depending on other libraries (irmin, multipart_form, websocket, ā¦) which are all written in either lwt or eio (or are currently transitioning to eio), I am kind of forced to use cohttp. Its a bit unfortunate, that there is this ecosystem split again, but I guess there is nothing we (or I) can do
@dinosaure I may be wrong because I donāt know the details of Httpcats.request but by reading your code snippet Iād suspect it may leak file descriptors (e.g. if the server abrubtly closes the connection).
Iām not sure exactly how Fun.protect helps here.
Your problem is that you are using the famously broken ālazy IOā paradigm, which is exactly the kind of thing I feared people would do when Seq was introduced in the stdlib. Seq looks enticing but itās almost all of the time the wrong thing to use.
You need to treat your body data structure as a ressource, you canāt hide the ressource management in the closures of your Seqs because they may end up not being exhausted.
Yes that can work but itās an entirely different code structure, itās a bracket over your file open:
seq_with_open_file : string -> (string Seq.t -> 'a) -> 'a
(Where the Seq.t is only valid during the call of the function given to the bracket)
That may be workable for bodies of requests (clients) but I donāt think thatās workable for bodies of responses (servers) (assuming you share the same body datastructure).
Yes you are totally right but I donāt share the same ābodyā data-structure between the client and the server side of httpcats. The API is still a bit low level (we donāt handle things like JSON or multipart/form-data).
About ressource management, miou proposes an Ownership value which ensures that the finaliser is called if the task is cancelled (and, on the happy-path, you must release and disown the resource explicitely). Note that Iām not sure about this interface of Miou but I started to play with it and flux specially to create some kinds of sources (and, for higher expectations, itās preferable to use these than a poor Seq.t).
I think we are a bit unrelated to the topic but I will definitely update the documentation on httpcats and if you want to share some insights about resources management, issues (on Miou) are opens .
I think it would be helpful to have a concept of an āEio pipeā (as opposed to an OS pipe) so that we could create a pair of (source, sink), write to the sink, and read out of the source.
If you donāt mind, could you share what you came up with? I am very keen on seeing it. I tried patching together the snippets from this thread but couldnāt anything to work.
open Eio.Std
module Buf_write_source = struct
type t = Eio.Buf_write.t
let read_methods = []
let single_read t buf =
let bufs = Eio.Buf_write.await_batch t in
let len, _ = Cstruct.fillv ~src:bufs ~dst:buf in
Eio.Buf_write.shift t len;
len
end
let source_of_buf_write =
let handler = Eio.Flow.Pi.source (module Buf_write_source) in
fun b -> Eio.Resource.T (b, handler)
let handler _socket _request _body writer =
let bw = Eio.Buf_write.create 4096 in
Switch.run @@ fun sw ->
Fiber.fork ~sw (fun () ->
for i = 1 to 10 do
Eio.Buf_write.printf bw "i=%d\n" i;
Eio_unix.sleep 1.0;
done;
Eio.Buf_write.close bw;
);
Cohttp_eio.Server.respond () ~status:`OK
~headers:(Http.Header.of_list [ ("content-type", "text/plain") ])
~body:(source_of_buf_write bw) writer
let () =
Eio_main.run @@ fun env ->
Eio.Switch.run @@ fun sw ->
let addr = `Tcp (Eio.Net.Ipaddr.V4.loopback, 8080) in
let socket = Eio.Net.listen env#net ~sw ~backlog:128 ~reuse_addr:true addr in
let server = Cohttp_eio.Server.make ~callback:handler () in
Cohttp_eio.Server.run socket server
~on_error:(fun ex -> Logs.warn (fun f -> f "%a" Eio.Exn.pp ex))
Thatās fine too, and probably a good habit. In the code given it shouldnāt make any difference because by the time the switch is turned off both fibers are either already finished or getting cancelled.
Hereās a slightly tidier version that does that and also splits the forking logic into respond_stream:
let respond_stream ?headers ~status writer fn =
Switch.run @@ fun sw ->
let bw = Eio.Buf_write.create ~sw 4096 in
Fiber.fork_daemon ~sw
(fun () -> fn bw; Eio.Buf_write.close bw; `Stop_daemon);
Cohttp_eio.Server.respond () ?headers ~status writer
~body:(source_of_buf_write bw)
let handler _socket _request _body writer =
let headers = Http.Header.of_list [("content-type", "text/plain")] in
respond_stream writer ~headers ~status:`OK @@ fun bw ->
for i = 1 to 10 do
Eio.Buf_write.printf bw "i=%d\n" i;
Eio_unix.sleep 1.0;
done
(I also used fork_daemon so that if respond returns successfully without consuming the whole buffer then the writer fiber gets cancelled, although I donāt think cohttp-eio ever does that)