`Lwt.asyc` thread not executing with blocking read

I’m trying to create a simple UDP socket with the tcpip package, but am having some concurrency issues with Lwt.

It seems the Lwt.async thread spawned by Udpv4_socket.listen is not executing with another thread looping on a blocking read from stdin.

Adding a Lwt_unix.sleep after the read from stdin causes all the packets to be read and passed to the callback at once.

Can anyone point me in the right direction of parallel Lwt threads with a blocking read?

open Lwt.Infix

let addr_from_string of_string addr_string =
  match of_string addr_string with
  | Ok addr -> addr
  | Error `Msg e -> prerr_endline ("Failed to resolve " ^ addr_string ^ ": " ^ e); exit 2

let client () =
  if Array.length Sys.argv < 3 then begin
    prerr_endline "Usage: client <host> <port>";
    exit 2;
  end;
  let addr_string = Sys.argv.(1) and
      src_port = int_of_string Sys.argv.(2) and
      dst_port = int_of_string Sys.argv.(3) in
  addr_from_string Ipaddr.V4.Prefix.of_string "127.0.0.1/32" |> Udpv4_socket.connect
  >>= fun sock ->
    let callback ~src:_ ~dst:_ ~src_port:_ buf =
      Cstruct.to_string buf |> print_string |> Lwt.return in
    Udpv4_socket.listen sock ~port:src_port callback;
    
    let send () =
      let dst = addr_from_string Ipaddr.V4.of_string addr_string in
      read_line ()
      |> fun line -> Cstruct.of_string (line ^ "\n")
      |> Udpv4_socket.write ~src_port ~dst ~dst_port sock 
      >>= function
      | Ok () -> Lwt.return ()
      | Error `Sendto_failed -> prerr_endline "sendto failed"; exit 1
    in
    let rec loop () = send () >>= fun () -> Lwt_unix.sleep 0. >>= loop in
    loop ()

let () = Lwt_main.run (client ())

Disclaimer: I haven’t read your code. I skimmed it and I don’t have time immediately to parse and understand it. So I offer answers to some of your questions hoping that it can help you anyway.


You do not need to use async to do “parallel” Lwt “threads”. (See below about why I’m using quotes.)

The main ways to have two tasks executed concurrently is to use

  • Lwt.both or the variants Lwt.all and Lwt.join. These functions take multiple promises as argument and return a new promise which resolves once all the provided promises have.
  • Lwt.choose or Lwt.pick. These functions take multiple promises as argument and return a new promise which resolves once either of the provided promises have.

The promise starts making progress towards resolution as soon as it exists. You can pass the promise to async to ignore the value it resolves to as well as the resolution entirely. That is, async allows you to ignore promises you don’t care about. You can pass a promise to both/all/join/choose/pick to synchronise on the resolution.


About the earlier quotes:

  • There are no threads in Lwt, only promises.
  • There is no parallelism with Lwt, only concurrency.

If you are not sure what that means and how you might rewrite your questions more accurately to get a more accurate answer, I’d recommend this relatively short introduction/tutorial I wrote a while ago.

3 Likes

If I understand the intent of your program correctly, the issue has nothing to do with Lwt.
You’re sending packets to dst_port:

But the socket is listening on src_port:

When I changed the above line to ~port:dst_port, it worked as intended (echoing back what I typed into the shell).

2 Likes

Thanks for the help, Raphael!

You do not need to use async to do “parallel” Lwt “threads”. (See below about why I’m using quotes.)

The mirage-tcpip library, in Udpv4_socket.listen, is actually where Lwt.async occurs.

  • There are no threads in Lwt, only promises.

Ah, my mistake. The library name led me astray :slight_smile:

  • There is no parallelism with Lwt, only concurrency.

Apologies for playing fast and loose with my terminology here. Reading my question back I mistakenly interchange concurrency and parallelism.

If you are not sure what that means and how you might rewrite your questions more accurately to get a more accurate answer, I’d recommend this relatively short introduction/tutorial I wrote a while ago.

Thank you for this resource! Part 2 OS Interactions clued me to the deceptively simple problem. I was using the stdlib read_line when I should have been using Lwt_io.(read_line stdin).

For posterity, here is the (refactored) working solution:


open Lwt.Syntax
open Lwt.Infix

let addr_from_string of_string addr_string =
  match of_string addr_string with
  | Ok addr -> addr
  | Error `Msg e -> prerr_endline ("Failed to resolve " ^ addr_string ^ ": " ^ e); exit 2

let client () =
  if Array.length Sys.argv < 3 then begin
    prerr_endline "Usage: client <host> <src port> <dst port>";
    exit 2;
  end;
  let addr_string = Sys.argv.(1) and
      src_port = int_of_string Sys.argv.(2) and
      dst_port = int_of_string Sys.argv.(3) in
  let* sock = addr_from_string Ipaddr.V4.Prefix.of_string "127.0.0.1/32" |> Udpv4_socket.connect in
  let callback ~src:_ ~dst:_ ~src_port:_ buf =
    Cstruct.to_string buf |> Lwt_io.print in
  Udpv4_socket.listen sock ~port:src_port callback;
  
  let send () =
    let dst = addr_from_string Ipaddr.V4.of_string addr_string in
    let* line = Lwt_io.(read_line stdin) in
    let buf = Cstruct.of_string (line ^ "\n") in
    Udpv4_socket.write ~src_port ~dst ~dst_port sock buf
    >>= function
    | Ok () -> Lwt.pause ()
    | Error `Sendto_failed -> prerr_endline "sendto failed"; exit 1
  in
  let rec loop () = send () >>= loop in
  loop ()

let () = Lwt_main.run (client ())

That is actually intentional. This is more or less a netcat clone, and works like:

$ _build/default/bin/main.exe 127.0.0.1 10001 10002
hello
world
$ nc -u 127.0.0.1 10001 -p 10002
hello
world

Interesting that the lack of use of Lwt IO functions isn’t an issue in this case, though.

Thanks for the help!

Ah, I understand now. I thought all communication happened within this one program. I didn’t register the word “client” there :slight_smile:

But yes, you definitely want to use Lwt_io regardless.

One suggestion, I would recommend sticking with Lwt.Syntax throughout the entire codebase instead of using the older Lwt.Infix. You can also avoid global open which pollutes the entire scope, and use local open:

let client () =
  if Array.length Sys.argv < 3 then begin
    prerr_endline "Usage: client <host> <src port> <dst port>";
    exit 2
  end;
  let addr_string = Sys.argv.(1) in
  let src_port = int_of_string Sys.argv.(2) in
  let dst_port = int_of_string Sys.argv.(3) in
  let open Lwt.Syntax in (* opening only from the point we need it from *)
  let* sock = addr_from_string Ipaddr.V4.Prefix.of_string "127.0.0.1/32" |> Udpv4_socket.connect in
  let callback ~src:_ ~dst:_ ~src_port:_ buf = Cstruct.to_string buf |> Lwt_io.print in
  Udpv4_socket.listen sock ~port:src_port callback;

  let send () =
    let dst = addr_from_string Ipaddr.V4.of_string addr_string in
    let* line = Lwt_io.(read_line stdin) in
    let buf = Cstruct.of_string (line ^ "\n") in
    let* res = Udpv4_socket.write ~src_port ~dst ~dst_port sock buf in
    match res with
    | Ok () ->
      Lwt.pause ()
    | Error `Sendto_failed ->
      prerr_endline "sendto failed";
      exit 1
  in
  let rec loop () =
    let* () = send () in
    loop ()
  in
  loop ()
1 Like

Thank you, Yawar. I’ve applied these suggestions :slight_smile:

1 Like