Lwt error using Capnproto to send messages

I am using capnproto to send messages between several nodes. Each node can both send and receive messages from all others. The relevant code looks like this:

main.ml:

let start_node id nodes =
	Lwt_main.run begin
		let listen_address = `TCP ("127.0.0.1", 9000 + id) in
		let config = Capnp_rpc_unix.Vat_config.create ~serve_tls:false ~secret_key listen_address in
		let service_id = Capnp_rpc_net.Restorer.Id.public "" in
		let restore = Capnp_rpc_net.Restorer.single service_id (Hs.local id nodes) in
		let* vat = Capnp_rpc_unix.serve config ~restore in
		let uri = Capnp_rpc_unix.Vat.sturdy_uri vat service_id in
		Fmt.pr "Server ID=%s running. Connect to URI %S.@." (Int.to_string id) (Uri.to_string uri);
		Lwt.wait () |> fst
	end

hs.ml:

(*client*)
let open_conn id =
	let uri = Uri.of_string ("capnp://insecure@127.0.0.1:" ^ Int.to_string (id + 9000)) in
	let client_vat = Capnp_rpc_unix.client_only_vat () in
	Capnp_rpc_unix.Vat.import_exn client_vat uri

let open_conns nodes =
	let ids = List.init nodes Fun.id in
	List.map (fun x -> open_conn x) ids

let send msg service =
	let open Api.Client.Hs.SendMsg in
	let request, params = Capability.Request.create Params.init_pointer in
	let _ = Params.msg_set_reader params msg in
	let* _ = Capability.call_for_value_exn service method_id request in
	Lwt.return ()

let send_msg service msg =
	Sturdy_ref.with_cap_exn service (send msg)

(*server*)
let local id nodes =
	(*...*)
	let conns = open_conns nodes in
	let module Hs = Api.Service.Hs in
	Hs.local @@ object
		inherit Hs.service

		method send_msg_impl params release_param_caps =
			let open Hs.SendMsg in
			let msg = Params.msg_get params in
			release_param_caps ();
                        (* do stuff like sending messages *)
			Service.return_empty ()
	end

After sending several messages the nodes crash with LWT errors:

hs: internal error, uncaught exception:
    Unix.Unix_error(Unix.EINVAL, "select", "")
    Raised by primitive operation at Lwt_engine.select#select in file "src/unix/lwt_engine.ml", line 405, characters 26-60
    Called from Lwt_engine.select_based#iter in file "src/unix/lwt_engine.ml", line 346, characters 8-39
    Called from Lwt_main.run.run_loop in file "src/unix/lwt_main.ml", line 41, characters 6-49
    Called from Lwt_main.run in file "src/unix/lwt_main.ml", line 118, characters 8-13
    Re-raised at Lwt_main.run in file "src/unix/lwt_main.ml", line 124, characters 4-13
    Called from Cmdliner_term.app.(fun) in file "cmdliner_term.ml", line 24, characters 19-24
    Called from Cmdliner_eval.run_parser in file "cmdliner_eval.ml", line 34, characters 37-44
Fatal error: exception Lwt_switch.Off
Raised by primitive operation at Lwt_unix.read_bigarray.(fun) in file "src/unix/lwt_unix.cppo.ml", line 688, characters 8-59
Called from Lwt_unix.wrap_syscall.(fun) in file "src/unix/lwt_unix.cppo.ml", line 571, characters 17-28
main.exe: [WARNING] Uncaught exception handling CapTP connection: Failure("recv: Unix.Unix_error(Unix.ECONNRESET, \"read\", \"\")") (dropping connection)
main.exe: [DEBUG] Error calling field(6110)(rc=1+1) -> #[] -> remote-promise(6109, rc=1) -> q0(Hs.sendMsg): Disconnected: Switch turned off
Fatal error: exception Failure("Hs.sendMsg: Disconnected: Switch turned off")
Raised at Stdlib.failwith in file "stdlib.ml", line 29, characters 17-33
Called from Lwt.Sequential_composition.bind.create_result_promise_and_callback_if_deferred.c in file "src/core/lwt.ml", line 1849, characters 23-26

How can I fix this?

Well, the immediate problem is that you’re using a version of Lwt without libev, so it’s using select, which is limited to 1024 FDs. Installing conf-libev should fix that.

However, you should normally only have one vat per process (unless you’re trying to e.g. stress test this scenario for some reason). Cap’n Proto can multiplex many requests over a single TCP connection.

1 Like

How would I communicate with multiple different nodes using one vat?

I’ve installed conf-libev but nothing has changed. I installed it by running
opam install conf-libev

However when I try to add ‘conf-libev’ to my dune libraries it is not found.
I also tried adding this to main.ml:

Lwt_engine.set (new Lwt_engine.libev ());

But this causes my program to segfault.

P.S. this is running on macOS if that is relevant

You can have multiple connections from one vat. However, you’ll still need one connection for every host you connect to, indeed, so you can still hit the FD limit.

Ah. Looks like this is a known problem on macOS: Tell libev to use kqueue by default on macOS · Issue #601 · ocsigen/lwt · GitHub

Yes, I’ve tried putting this in main.ml:

Lwt_engine.set (new Lwt_engine.libev ~backend:Lwt_engine.Ev_backend.kqueue ());

But I still get a segfault. If I remove this line it goes back to using select.
Is there any workaround for this issue?

Also I’m testing with only 4 nodes. So it seems surprising that I would reach the FD limit.

I’ve tried again on linux with libev installed and I get the following error:

Fatal error: exception Unix.Unix_error(Unix.EMFILE, "accept", "")
Raised by primitive operation at Lwt_unix.accept_and_set_nonblock in file "src/unix/lwt_unix.cppo.ml", line 1701, characters 21-58
Called from Lwt_unix.wrap_syscall.(fun) in file "src/unix/lwt_unix.cppo.ml", line 571, characters 17-28

The error always occurs after the same number of messages have been sent (the same amount as before when running on macOS).
I installed conf-libev and the dependencies and have this line in main.ml:

Lwt_engine.set (new Lwt_engine.libev ());

How can I fix this and confirm that libev is actually being used?

Looks like sockets are not being closed somewhere in your program.

1 Like

Yes, looks like something is leaking FDs. If you’re only opening 4 connections then it’s unlikely to be capnp-rpc (which will just use one FD for each connection). There’s probably a way to view the FDs in use on macos, but I don’t know what it is. I’d use strace -y or lsof -p on Linux to check.

1 Like

Turns out the default FDs was set to 1024, when I increase it then the program no longer crashes so it seems like I am leaking FDs. I think this may be to do with the way I handle sending messages / making connections.

let open_conn vat id =
	let uri = Uri.of_string ("capnp://insecure@127.0.0.1:" ^ Int.to_string (id + 9000)) in
	Capnp_rpc_unix.Vat.import_exn vat uri

let open_conns nodes =
	let client_vat = Capnp_rpc_unix.client_only_vat () in
	let ids = List.init nodes Fun.id in
	List.map (open_conn client_vat) ids

let send msg service =
	let open Api.Client.Hs.SendMsg in
	let request, params = Capability.Request.create Params.init_pointer in
	let _ = Params.msg_set_reader params msg in
	let* _ = Capability.call_for_value_exn service method_id request in
	Lwt.return ()

let send_msg service msg =
	Sturdy_ref.with_cap_exn service (send msg)

This is the code I use for opening connections to all nodes (including this node) and sending messages. I run ‘open_conns’ once to get a list of services, then use ‘send_msg’ every time I send a message. Is this correct or could I be leaking FDs here?