Hi,
I want to stream events from a cohttp
server to connected clients and robustly manage the lifecycle of a client. I’ve more or less done this but I’m still “leaking” exceptions when a client disconnects. In short, I would expect an exception related to a client to be tagged somehow so that when handling that exception I can tie it to the client rather than just assuming all is well. This doesn’t seem to be the case.
My understanding:
After a client disconnects, the server may attempt to write to the response Lwt_stream.t
before realizing the connection is closed. When that happens, a Unix_error
exception is raised reporting the pipe is closed. OK, fine. The problem is there is no way to tie this exception back to the particular connection that died, hence a leaked exception.
Am I missing something in lwt
/cohttp
that would let me do this? I don’t see how I can plumb the data I need, a Cohttp.Connection.t
id, through to the Lwt.async_exception_hook, the only place it seems I can catch the exception. Simply ignoring the exception seems wrong.
The problem appears to be there’s no real lwt thread that owns the stream. I can push into the stream, but then the actual socket write is in some other library thread.
Thanks for any knowledge you can share
Building and running the reproduction:
$ ocamlfind ocamlopt -o server -linkpkg -package cohttp-lwt-unix repro.ml
$ ./serve
Ready
# in another terminal, curl http://localhost:8000 and quit after a few seconds
0 clients connected
new client
1 clients connected
client disconnected
Got exception: Unix.Unix_error(Unix.EPIPE, "write", "")
0 clients connected
open Lwt
open Cohttp
open Cohttp_lwt_unix
type client_push = string option -> unit
let connected_clients : (Connection.t, client_push) Hashtbl.t = Hashtbl.create 10
let send_heartbeat client_id push =
(* the push function, created by Lwt_stream.create, does not
* throw any exceptions - I cannot tie a dead connection to a
* particular client here. Neither a try/with block or Lwt.catch
* section captures the pipe error *)
push @@ Some "event: heartbeat\ndata:tick\n\n"
let rec heartbeat () =
Lwt_unix.sleep 2.0 >>= fun () ->
Lwt_io.printf "%d clients connected\n"
@@ Hashtbl.length connected_clients >>= fun () ->
Hashtbl.iter send_heartbeat connected_clients;
heartbeat ()
let event_stream (_, conn_id) req body =
let headers = Header.of_list
["Content-Type", "text/event-stream";
"Cache-Control","no-cache"]
in
let output_stream, stream_push = Lwt_stream.create () in
Hashtbl.add connected_clients conn_id stream_push;
Lwt_io.printf "new client\n" >>= fun () ->
Server.respond ~flush:true ~status:`OK ~headers
~body:(Cohttp_lwt.Body.of_stream output_stream) ()
let server =
let conn_closed (_, conn_id) =
(* runs after trying to push to a dead client. trying to write
* to a dead client is in fact what raises the conn_closed event *)
Lwt_io.printf "client disconnected\n" |> Lwt.ignore_result;
Hashtbl.remove connected_clients conn_id
in
let server = Server.create ~mode:(`TCP (`Port 8000))
(Server.make ~conn_closed ~callback:event_stream ())
in
async heartbeat;
print_endline "Ready";
server
let () =
(* install an exception handler - trying to write to a dead connection
* raises an exception *)
async_exception_hook.contents <- (fun e ->
Lwt_io.printf "Got exception: %s\n" (Printexc.to_string e) |> Lwt.ignore_result);
ignore (Lwt_main.run server)