Notty+Unix socket in separate domains: UI update stop for some reason

This is my first time experimenting with Notty, and the goal is to implement a monitoring systems that receives something via Unix socket and paints it.

I used this notty demo because it uses timer to periodically update UI (I want to retrofit it for updating the state, that was changed after last socket read). That’s why I got Lwt into my OCaml 5.3 demo.

The problem is that by some reason the UI stops updating after handling first socket connection. GDB says that it hangs on caml_unix_select, and there is Notty in the call stack, so, I believe that is Notty domain. I don’t use any Lwt_unix functionality, because with it I get an exception Invalid_argument("Notty: control char: U+0A, \"Received: message....\"").

To reproduce it we need to start demo and send a message to socket:
echo 'aasdf' | socat - /tmp/ocaml_test.sock . After that demo stops reacting on keyboard event.
After an application hangs the call kill -s USR2 $(pidof demo_notty.exe) should kill a demo with restoration of terminal behavior.

[@@@ocaml.warnerror "-unused-var-strict"]
[@@@ocaml.warnerror "-unused-value-declaration"]
[@@@ocaml.warnerror "-unused-var"]
[@@@ocaml.warnerror "-unused-open"]

(* N.B. echo 'aasdf' | socat - /tmp/ocaml_test.sock  *)
let socket_path = "/tmp/ocaml_test.sock"

type state = {
  mutable debug_msg : string;
  last_msg : string Atomic.t;
  changed : bool Atomic.t;
}

let state1 =
  {
    debug_msg = "dummy1";
    last_msg = Atomic.make "dummy2";
    changed = Atomic.make true;
  }

let client_task path =
  (try Unix.unlink path with Unix.Unix_error _ -> ());

  let sock = Unix.socket Unix.PF_UNIX Unix.SOCK_STREAM 0 in
  Unix.set_close_on_exec sock;
  let addr = Unix.ADDR_UNIX path in

  Unix.bind sock addr;
  Unix.listen sock 5;

  (* Increased backlog for testing *)
  Printf.printf "Listening on %s...\n" path;

  let buf = Bytes.create 1024 in
  let open Lwt.Syntax in
  let rec loop = function
    | `Waiting ->
        let client_sock, _ = Unix.accept sock in
        Unix.set_close_on_exec client_sock;
        loop (`WIP client_sock)
    | `WIP client_sock ->
        let len = Unix.recv client_sock buf 0 1024 [] in

        if len = 0 then (
          Unix.close client_sock;

          state1.debug_msg <- "disconnected";
          Atomic.set state1.changed true;
          loop `Waiting)
        else
          let received = Bytes.sub_string buf 0 len in

          Atomic.set state1.last_msg (Printf.sprintf "Received: %s" received);
          Atomic.set state1.changed true;

          let () = Unix.sleepf 0.05 in
          loop (`WIP client_sock)
  in
  fun () -> Lwt.return (loop `Waiting)

include struct
  open Notty
  open Lwt.Infix
  module T = Notty_lwt.Term

  module Images = struct
    (* U+25AA BLACK SMALL SQUARE *)
    let square color = I.string (A.fg color) "▪"

    let rec sierp c n =
      I.(
        if n > 1 then
          let ss = sierp c (pred n) in
          ss <-> (ss <|> ss)
        else hpad 1 0 (square c))
  end

  let notifyf fmt =
    Format.kasprintf
      (fun str ->
        let cmd = Printf.sprintf "notify-send 'Hello world!' '%s'" str in
        let _ = Stdlib.Sys.command cmd in
        ())
      fmt

  let timer =
    let counter = ref 0 in
    function
    | None -> Lwt.wait () |> fst
    | Some t ->
        Lwt_unix.sleep t >>= fun () ->
        notifyf "timer wakes up. c=%d, debug=%s, last=%s, chgd=%b" !counter
          state1.debug_msg
          (Atomic.get state1.last_msg)
          (Atomic.get state1.changed);
        incr counter;

        Lwt.return (`Timer !counter)

  let event e =
    Lwt_stream.get (T.events e) >|= function
    | Some ((`Resize _ | #Unescape.event) as x) -> x
    | None -> `End

  let simpleterm_lwt_timed ?delay ~f init =
    let term = T.create () in
    let _ =
      Sys.signal Sys.sigusr2
        (Sys.Signal_handle
           (fun _ ->
             print_endline "RELEASE terminal";
             Lwt.async (fun () ->
                 T.release term >>= fun () -> Lwt.return (exit 1))))
    in
    let rec loop (e, t) dim s =
      e <?> t >>= function
      | `End | `Key (`Escape, []) | `Key (`ASCII 'C', [ `Ctrl ]) ->
          Lwt.return_unit
      | `Resize dim as evt -> invoke (event term, t) dim s evt
      | #Unescape.event as evt -> invoke (event term, t) dim s evt
      | `Timer _ as evt -> invoke (e, timer delay) dim s evt
    and invoke es dim s e =
      match f dim s e with
      | `Continue s ->
          notifyf "Continue";
          loop es dim s
      | `Redraw (s, i) ->
          notifyf "Redraw";
          T.image term i >>= fun () -> loop es dim s
      | `Stop ->
          notifyf "STOP";
          Lwt.return_unit
    in
    let size = T.size term in
    loop (event term, timer delay) size init
end

let () =
  let pool = Lwt_domain.setup_pool 3 in

  let open Lwt.Infix in
  let task1 =
    simpleterm_lwt_timed ~delay:1.5 1
      ~f:
        (let repaint s =
           let img =
             let open Notty in
             let attr = A.(fg lightmagenta ++ bg black) in
             I.vcat
               I.
                 [
                   I.strf ~attr "path: %s" socket_path;
                   I.strf ~attr "DEBUG: %s" state1.debug_msg;
                   I.strf ~attr "last: %s" (Atomic.get state1.last_msg);
                   string A.empty (string_of_int s)
                   <-> pad ~l:2 ~t:1 (Images.sierp A.magenta s);
                 ]
           in
           `Redraw (s, img)
         in

         fun _size s -> function
           | `Key (`ASCII 'q', _) -> exit 1
           | `Key (`Arrow a, _) ->
               let new_st =
                 match a with
                 | `Up | `Left -> max 1 (s - 1)
                 | `Down | `Right -> min 10 (s + 1)
               in
               repaint new_st
           | _ when Atomic.get state1.changed ->
               Atomic.set state1.changed false;
               notifyf "repaint";
               repaint s
           | `Timer _ ->
               notifyf "repaint";
               repaint s
           | _ ->
               notifyf "Continue";
               `Continue s)
    >>= fun _ ->
    print_endline "task 1 finbished";
    Lwt.return ()
  in

  let task2 =
    Lwt_domain.detach pool (fun () -> client_task socket_path ()) ()
  in
  Lwt_main.run (Lwt.both task2 task1 >>= fun _ -> Lwt.return ())

I tried to ask a LLM. It recommended to replace boolean atomic flag with Lwt_mvar.t but I can’t imagine how it is related. Also, maybe all that Lwt stuff is too complicated for OCaml 5.3, and I should try marrying Notty with Eia without any Lwt?

notty doesn’t want to draw the newline U+0A that you’re sending to the socket. An Lwt_unix version updates the message without that:

$ echo -n 'hello' | socat - /tmp/ocaml_test.sock
$ echo -n 'world' | socat - /tmp/ocaml_test.sock

This is also the problem with the code you have here: it works as-is if newlines aren’t sent. task1 is raising an exception that’s going nowhere. You can do this before calling Lwt_main.run:

  let task1 =
    Lwt.catch
      (fun () -> task1)
      (fun e ->
        Printf.eprintf "Unhandled Lwt exception: %s\n%!" (Printexc.to_string e);
        Lwt.fail e)
  in

which still isn’t good error handling, but you can at least see it.

The issue is solved. This is the final working code for the future

Spoiler
let notifyf fmt =
  Format.kasprintf
    (fun str ->
      let cmd = Printf.sprintf "notify-send 'Hello world!' '%s'" str in
      let _ = Stdlib.Sys.command cmd in
      ())
    fmt

(* N.B. echo -n 'asdf' | socat - /tmp/ocaml_test.sock  *)
let socket_path = "/tmp/ocaml_test.sock"

type state = {
  mutable debug_msg : string;
  last_msg : string Atomic.t;
  changed : bool Atomic.t;
}

let state1 =
  {
    debug_msg = "dummy1";
    last_msg = Atomic.make "dummy2";
    changed = Atomic.make true;
  }

let client_task path =
  (try Unix.unlink path with Unix.Unix_error _ -> ());

  let sock = Unix.socket Unix.PF_UNIX Unix.SOCK_STREAM 0 in
  Unix.set_close_on_exec sock;
  let addr = Unix.ADDR_UNIX path in

  Unix.bind sock addr;
  Unix.listen sock 5;

  Printf.printf "Listening on %s...\n" path;

  let buf = Bytes.create 1024 in
  let rec loop = function
    | `Waiting ->
        let client_sock, _ = Unix.accept sock in
        Unix.set_close_on_exec client_sock;
        loop (`WIP client_sock)
    | `WIP client_sock ->
        let len = Unix.recv client_sock buf 0 1024 [] in

        if len = 0 then (
          Unix.close client_sock;
          state1.debug_msg <- "disconnected";
          Atomic.set state1.changed true;
          loop `Waiting)
        else
          let received = Bytes.sub_string buf 0 len in
          (* notifyf "received: %s" received; *)
          Atomic.set state1.last_msg (Printf.sprintf "Received: %s" received);
          Atomic.set state1.changed true;

          let () = Unix.sleepf 0.05 in
          loop (`WIP client_sock)
  in
  fun () -> Lwt.return (loop `Waiting)

include struct
  open Notty
  open Lwt.Infix
  module T = Notty_lwt.Term

  module Images = struct
    (* U+25AA BLACK SMALL SQUARE *)
    let square color = I.string (A.fg color) "▪"

    let rec sierp c n =
      I.(
        if n > 1 then
          let ss = sierp c (pred n) in
          ss <-> (ss <|> ss)
        else hpad 1 0 (square c))
  end

  let timer =
    let counter = ref 0 in
    function
    | None -> Lwt.wait () |> fst
    | Some t ->
        Lwt_unix.sleep t >>= fun () ->
        incr counter;
        Lwt.return (`Timer !counter)

  let event e =
    Lwt_stream.get (T.events e) >|= function
    | Some ((`Resize _ | #Unescape.event) as x) -> x
    | None -> `End

  let simpleterm_lwt_timed ?delay ~f init =
    let term = T.create () in
    let _ =
      Sys.signal Sys.sigusr2
        (Sys.Signal_handle
           (fun _ ->
             print_endline "RELEASE terminal";
             Lwt.async (fun () ->
                 T.release term >>= fun () -> Lwt.return (exit 1))))
    in
    let rec loop (e, t) dim s =
      e <?> t >>= function
      | `End | `Key (`Escape, []) | `Key (`ASCII 'C', [ `Ctrl ]) ->
          Lwt.return_unit
      | `Resize dim as evt -> invoke (event term, t) dim s evt
      | #Unescape.event as evt -> invoke (event term, t) dim s evt
      | `Timer _ as evt -> invoke (e, timer delay) dim s evt
    and invoke es dim s e =
      match f dim s e with
      | `Continue s -> loop es dim s
      | `Redraw (s, i) -> T.image term i >>= fun () -> loop es dim s
      | `Stop -> Lwt.return_unit
    in
    let size = T.size term in
    loop (event term, timer delay) size init
end

let () =
  let pool = Lwt_domain.setup_pool 3 in

  let open Lwt.Infix in
  let task1 =
    let make () =
      let f =
        let repaint s =
          let img =
            let open Notty in
            let attr = A.(fg lightmagenta ++ bg black) in
            I.vcat
              I.
                [
                  I.strf ~attr "path: %s" socket_path;
                  I.strf ~attr "DEBUG: %s" state1.debug_msg;
                  I.strf ~attr "last: %s" (Atomic.get state1.last_msg);
                  string A.empty (string_of_int s)
                  <-> pad ~l:2 ~t:1 (Images.sierp A.magenta s);
                ]
          in
          `Redraw (s, img)
        in

        fun _size s -> function
          | `Key (`ASCII 'q', _) -> exit 1
          | `Key (`Arrow a, _) ->
              let new_st =
                match a with
                | `Up | `Left -> max 1 (s - 1)
                | `Down | `Right -> min 10 (s + 1)
              in
              repaint new_st
          | _ when Atomic.get state1.changed ->
              Atomic.set state1.changed false;
              repaint s
          | `Timer _ -> repaint s
          | _ -> `Continue s
      in
      simpleterm_lwt_timed ~delay:2.0 1 ~f
    in
    Lwt.catch
      (fun () -> make ())
      (fun e ->
        Printf.eprintf "Unhandled Lwt exception: %s\n%!" (Printexc.to_string e);
        Lwt.fail e)
  in

  let task2 =
    Lwt_domain.detach pool (fun () -> client_task socket_path ()) ()
  in
  Lwt_main.run (Lwt.both task2 task1 >>= fun _ -> Lwt.return ())
1 Like