Handling client connection lifecycles in cohttp servers

Hi,

I want to stream events from a cohttp server to connected clients and robustly manage the lifecycle of a client. I’ve more or less done this but I’m still “leaking” exceptions when a client disconnects. In short, I would expect an exception related to a client to be tagged somehow so that when handling that exception I can tie it to the client rather than just assuming all is well. This doesn’t seem to be the case.

My understanding:
After a client disconnects, the server may attempt to write to the response Lwt_stream.t before realizing the connection is closed. When that happens, a Unix_error exception is raised reporting the pipe is closed. OK, fine. The problem is there is no way to tie this exception back to the particular connection that died, hence a leaked exception.

Am I missing something in lwt/cohttp that would let me do this? I don’t see how I can plumb the data I need, a Cohttp.Connection.t id, through to the Lwt.async_exception_hook, the only place it seems I can catch the exception. Simply ignoring the exception seems wrong.

The problem appears to be there’s no real lwt thread that owns the stream. I can push into the stream, but then the actual socket write is in some other library thread.

Thanks for any knowledge you can share :grinning:

Building and running the reproduction:

$ ocamlfind ocamlopt -o server -linkpkg -package cohttp-lwt-unix repro.ml
$ ./serve
Ready
# in another terminal, curl http://localhost:8000 and quit after a few seconds
0 clients connected
new client
1 clients connected
client disconnected
Got exception: Unix.Unix_error(Unix.EPIPE, "write", "")
0 clients connected

repro.ml

open Lwt
open Cohttp
open Cohttp_lwt_unix

type client_push = string option -> unit
let connected_clients : (Connection.t, client_push) Hashtbl.t = Hashtbl.create 10

let send_heartbeat client_id push =
  (* the push function, created by Lwt_stream.create, does not
   * throw any exceptions - I cannot tie a dead connection to a
   * particular client here.  Neither a try/with block or Lwt.catch
   * section captures the pipe error *)
  push @@ Some "event: heartbeat\ndata:tick\n\n"

let rec heartbeat () =
  Lwt_unix.sleep 2.0 >>= fun () ->
  Lwt_io.printf "%d clients connected\n"
    @@ Hashtbl.length connected_clients >>= fun () ->
  Hashtbl.iter send_heartbeat connected_clients;
  heartbeat ()

let event_stream (_, conn_id) req body =
  let headers = Header.of_list
    ["Content-Type", "text/event-stream";
     "Cache-Control","no-cache"]
  in
  let output_stream, stream_push = Lwt_stream.create () in
  Hashtbl.add connected_clients conn_id stream_push;
  Lwt_io.printf "new client\n" >>= fun () ->
  Server.respond ~flush:true ~status:`OK ~headers
    ~body:(Cohttp_lwt.Body.of_stream output_stream) ()

let server =
  let conn_closed (_, conn_id) =
    (* runs after trying to push to a dead client. trying to write
     * to a dead client is in fact what raises the conn_closed event *)
    Lwt_io.printf "client disconnected\n" |> Lwt.ignore_result;
    Hashtbl.remove connected_clients conn_id
  in
  let server = Server.create ~mode:(`TCP (`Port 8000))
                             (Server.make ~conn_closed ~callback:event_stream ())
  in
  async heartbeat;
  print_endline "Ready";
  server

let () =
  (* install an exception handler - trying to write to a dead connection
   * raises an exception *)
  async_exception_hook.contents <- (fun e ->
    Lwt_io.printf "Got exception: %s\n" (Printexc.to_string e) |> Lwt.ignore_result);
  ignore (Lwt_main.run server)

Based on further investigation I don’t think this is possible. I’ve published a proof of concept to support this in ocaml-cohttp here: https://github.com/mirage/ocaml-cohttp/pull/597

I’m marking this solved in the sense that I’ve answered the question to my satisfaction, even if there isn’t a true solution.

1 Like