Websocket_async server, broadcasting messages isn't working as expected

Basically I’m working on a simple websocket server that broadcasts messages to each client listening.

I have some code working, I can connect. But something weird happens. When I send the message, the first client gets both messages, then if I send a message from the second client, the second client get’s both of those message and so on and so forth. It seems to rotate between connections?

Am I misunderstanding how the server creation and Pipe’s are getting set up? I had the problem before I added the connection hash table as well?

Here’s the code:

open Async
open Core
open Websocket_async
module Log = Log.Global

type client_connection = {
  addr : string;
  reader : Frame.t Pipe.Reader.t;
  writer : Frame.t Pipe.Writer.t;

type client_map = (string, client_connection) Hashtbl.t

module ClientMap = struct
  let add clients addr reader writer =
    Log.debug "New connection from %s" addr;
    Hashtbl.add_exn clients ~key:addr ~data:{ addr; reader; writer }

  let remove clients addr =
    Log.debug "Connection closed by %s" addr;
    Hashtbl.remove clients addr

  let get clients addr = Hashtbl.find clients addr
  let get_all clients = Hashtbl.data clients

  let get_all_except clients addr =
    Hashtbl.filteri clients ~f:(fun ~key ~data:_ -> not (String.equal key addr))
    |> Hashtbl.data

type server_context = { clients : client_map }

module ServerContext = struct
  type t = server_context

  let create () = { clients = Hashtbl.create (module String) }

(** Create a websocket server *)
let parse_frame frame content opcode addr =
  let open Frame in
  let frame', closed =
    match opcode with
    | Opcode.Ping -> ({ frame with opcode = Opcode.Pong }, false)
    | Opcode.Close ->
        Log.debug "Connection closed by %s" addr;
        ({ frame with opcode = Opcode.Close }, true)
    | Opcode.Text ->
        Log.debug "Received %s from %s" content addr;
        (frame, false)
    | Opcode.Pong ->
        Log.debug "Received %s from %s" (Opcode.to_string opcode) addr;
        (frame, false)
    | _ -> (frame, true)
  (frame', closed)

(** Client connection loop, parse incoming frames *)
let client_loop server_context addr receiver_read _sender_write =
  let rec loop () =
    match%bind Pipe.read receiver_read with
    | `Eof ->
        Log.debug "Connection closed by %s" addr;
        return ()
    | `Ok ({ Frame.opcode; content; _ } as frame) ->
        let open Frame in
        let frame', closed = parse_frame frame content opcode addr in

        let%bind () =
          match frame'.opcode with
          | Opcode.Text ->
              (* Send the frame to each client in the client_map *)
              let clients =
                ClientMap.get_all_except server_context.clients addr
              Deferred.List.iter clients ~f:(fun client ->
                  Pipe.write client.writer frame')
          | _ -> Deferred.unit

        if closed then return () else loop ()
  loop ()

(** Check if the URI of the request is /ws *)
let check_request req =
  let uri = Cohttp.Request.uri req in
  Deferred.return (String.equal (Uri.path uri) "/ws")

(** Handle server creation errors *)
let handle_server_error server =
  match%bind server with
  | Error err when Poly.equal (Error.to_exn err) Exit -> Deferred.unit
  | Error err -> Error.raise err
  | Ok () -> Deferred.unit

(** Create a websocket server waiting for HTTP requests at "/ws" *)
let create_server app_to_ws ws_to_app reader writer =
  server ~check_request ~app_to_ws ~ws_to_app ~reader ~writer ()
  |> handle_server_error

(** Create the appropriate pipes to talk to the websocket server *)
let create_ws_pipes =
  let app_to_ws, sender_write = Pipe.create () in
  let receiver_read, ws_to_app = Pipe.create () in
  (app_to_ws, ws_to_app, sender_write, receiver_read)

(** Handle a new connection *)
let handle_client server_context addr reader writer =
  (* Convert the address into a string *)
  let addr_str = Socket.Address.to_string addr in
  Log.debug "New connection from %s" addr_str;
  (* Make the pipes for ws communication *)
  let app_to_ws, ws_to_app, sender_write, receiver_read = create_ws_pipes in
  (* Add the client to the map *)
  ClientMap.add server_context.clients addr_str receiver_read sender_write;
  (* Connect the server and start the client loop *)
  let%bind () =
        create_server app_to_ws ws_to_app reader writer;
        client_loop server_context addr_str receiver_read sender_write;
  (* Remove the client from the map *)
  ClientMap.remove server_context.clients addr_str;

(** Start the websocket server *)
let start port =
  Log.info "Starting server on port %d" port;
  let server_context = ServerContext.create () in
  let%bind _server =
      Server.create ~on_handler_error:`Ignore
        (Where_to_listen.of_port port)
        (handle_client server_context))
  Deferred.never ()

I haven’t read all the way through, but one thing that jumps out is that create_ws_pipes is not a function, so there’s only one set of pipes which gets shared between each client. It makes sense that messages would only go to one client in that case.

That was it! I’m still learning OCaml and I kind of dove in deep in with the Async stuff, completely missed a fundamental aspect of defining functions! :stuck_out_tongue: