Notty+Unix socket in separate domains: UI update stop for some reason

This is my first time experimenting with Notty, and the goal is to implement a monitoring systems that receives something via Unix socket and paints it.

I used this notty demo because it uses timer to periodically update UI (I want to retrofit it for updating the state, that was changed after last socket read). That’s why I got Lwt into my OCaml 5.3 demo.

The problem is that by some reason the UI stops updating after handling first socket connection. GDB says that it hangs on caml_unix_select, and there is Notty in the call stack, so, I believe that is Notty domain. I don’t use any Lwt_unix functionality, because with it I get an exception Invalid_argument("Notty: control char: U+0A, \"Received: message....\"").

To reproduce it we need to start demo and send a message to socket:
echo 'aasdf' | socat - /tmp/ocaml_test.sock . After that demo stops reacting on keyboard event.
After an application hangs the call kill -s USR2 $(pidof demo_notty.exe) should kill a demo with restoration of terminal behavior.

[@@@ocaml.warnerror "-unused-var-strict"]
[@@@ocaml.warnerror "-unused-value-declaration"]
[@@@ocaml.warnerror "-unused-var"]
[@@@ocaml.warnerror "-unused-open"]

(* N.B. echo 'aasdf' | socat - /tmp/ocaml_test.sock  *)
let socket_path = "/tmp/ocaml_test.sock"

type state = {
  mutable debug_msg : string;
  last_msg : string Atomic.t;
  changed : bool Atomic.t;
}

let state1 =
  {
    debug_msg = "dummy1";
    last_msg = Atomic.make "dummy2";
    changed = Atomic.make true;
  }

let client_task path =
  (try Unix.unlink path with Unix.Unix_error _ -> ());

  let sock = Unix.socket Unix.PF_UNIX Unix.SOCK_STREAM 0 in
  Unix.set_close_on_exec sock;
  let addr = Unix.ADDR_UNIX path in

  Unix.bind sock addr;
  Unix.listen sock 5;

  (* Increased backlog for testing *)
  Printf.printf "Listening on %s...\n" path;

  let buf = Bytes.create 1024 in
  let open Lwt.Syntax in
  let rec loop = function
    | `Waiting ->
        let client_sock, _ = Unix.accept sock in
        Unix.set_close_on_exec client_sock;
        loop (`WIP client_sock)
    | `WIP client_sock ->
        let len = Unix.recv client_sock buf 0 1024 [] in

        if len = 0 then (
          Unix.close client_sock;

          state1.debug_msg <- "disconnected";
          Atomic.set state1.changed true;
          loop `Waiting)
        else
          let received = Bytes.sub_string buf 0 len in

          Atomic.set state1.last_msg (Printf.sprintf "Received: %s" received);
          Atomic.set state1.changed true;

          let () = Unix.sleepf 0.05 in
          loop (`WIP client_sock)
  in
  fun () -> Lwt.return (loop `Waiting)

include struct
  open Notty
  open Lwt.Infix
  module T = Notty_lwt.Term

  module Images = struct
    (* U+25AA BLACK SMALL SQUARE *)
    let square color = I.string (A.fg color) "▪"

    let rec sierp c n =
      I.(
        if n > 1 then
          let ss = sierp c (pred n) in
          ss <-> (ss <|> ss)
        else hpad 1 0 (square c))
  end

  let notifyf fmt =
    Format.kasprintf
      (fun str ->
        let cmd = Printf.sprintf "notify-send 'Hello world!' '%s'" str in
        let _ = Stdlib.Sys.command cmd in
        ())
      fmt

  let timer =
    let counter = ref 0 in
    function
    | None -> Lwt.wait () |> fst
    | Some t ->
        Lwt_unix.sleep t >>= fun () ->
        notifyf "timer wakes up. c=%d, debug=%s, last=%s, chgd=%b" !counter
          state1.debug_msg
          (Atomic.get state1.last_msg)
          (Atomic.get state1.changed);
        incr counter;

        Lwt.return (`Timer !counter)

  let event e =
    Lwt_stream.get (T.events e) >|= function
    | Some ((`Resize _ | #Unescape.event) as x) -> x
    | None -> `End

  let simpleterm_lwt_timed ?delay ~f init =
    let term = T.create () in
    let _ =
      Sys.signal Sys.sigusr2
        (Sys.Signal_handle
           (fun _ ->
             print_endline "RELEASE terminal";
             Lwt.async (fun () ->
                 T.release term >>= fun () -> Lwt.return (exit 1))))
    in
    let rec loop (e, t) dim s =
      e <?> t >>= function
      | `End | `Key (`Escape, []) | `Key (`ASCII 'C', [ `Ctrl ]) ->
          Lwt.return_unit
      | `Resize dim as evt -> invoke (event term, t) dim s evt
      | #Unescape.event as evt -> invoke (event term, t) dim s evt
      | `Timer _ as evt -> invoke (e, timer delay) dim s evt
    and invoke es dim s e =
      match f dim s e with
      | `Continue s ->
          notifyf "Continue";
          loop es dim s
      | `Redraw (s, i) ->
          notifyf "Redraw";
          T.image term i >>= fun () -> loop es dim s
      | `Stop ->
          notifyf "STOP";
          Lwt.return_unit
    in
    let size = T.size term in
    loop (event term, timer delay) size init
end

let () =
  let pool = Lwt_domain.setup_pool 3 in

  let open Lwt.Infix in
  let task1 =
    simpleterm_lwt_timed ~delay:1.5 1
      ~f:
        (let repaint s =
           let img =
             let open Notty in
             let attr = A.(fg lightmagenta ++ bg black) in
             I.vcat
               I.
                 [
                   I.strf ~attr "path: %s" socket_path;
                   I.strf ~attr "DEBUG: %s" state1.debug_msg;
                   I.strf ~attr "last: %s" (Atomic.get state1.last_msg);
                   string A.empty (string_of_int s)
                   <-> pad ~l:2 ~t:1 (Images.sierp A.magenta s);
                 ]
           in
           `Redraw (s, img)
         in

         fun _size s -> function
           | `Key (`ASCII 'q', _) -> exit 1
           | `Key (`Arrow a, _) ->
               let new_st =
                 match a with
                 | `Up | `Left -> max 1 (s - 1)
                 | `Down | `Right -> min 10 (s + 1)
               in
               repaint new_st
           | _ when Atomic.get state1.changed ->
               Atomic.set state1.changed false;
               notifyf "repaint";
               repaint s
           | `Timer _ ->
               notifyf "repaint";
               repaint s
           | _ ->
               notifyf "Continue";
               `Continue s)
    >>= fun _ ->
    print_endline "task 1 finbished";
    Lwt.return ()
  in

  let task2 =
    Lwt_domain.detach pool (fun () -> client_task socket_path ()) ()
  in
  Lwt_main.run (Lwt.both task2 task1 >>= fun _ -> Lwt.return ())

I tried to ask a LLM. It recommended to replace boolean atomic flag with Lwt_mvar.t but I can’t imagine how it is related. Also, maybe all that Lwt stuff is too complicated for OCaml 5.3, and I should try marrying Notty with Eia without any Lwt?

notty doesn’t want to draw the newline U+0A that you’re sending to the socket. An Lwt_unix version updates the message without that:

$ echo -n 'hello' | socat - /tmp/ocaml_test.sock
$ echo -n 'world' | socat - /tmp/ocaml_test.sock

This is also the problem with the code you have here: it works as-is if newlines aren’t sent. task1 is raising an exception that’s going nowhere. You can do this before calling Lwt_main.run:

  let task1 =
    Lwt.catch
      (fun () -> task1)
      (fun e ->
        Printf.eprintf "Unhandled Lwt exception: %s\n%!" (Printexc.to_string e);
        Lwt.fail e)
  in

which still isn’t good error handling, but you can at least see it.