I am using capnproto to send messages between several nodes. Each node can both send and receive messages from all others. The relevant code looks like this:
let start_node id nodes =
Lwt_main.run begin
let listen_address = `TCP ("", 9000 + id) in
let config = Capnp_rpc_unix.Vat_config.create ~serve_tls:false ~secret_key listen_address in
let service_id = Capnp_rpc_net.Restorer.Id.public "" in
let restore = Capnp_rpc_net.Restorer.single service_id (Hs.local id nodes) in
let* vat = Capnp_rpc_unix.serve config ~restore in
let uri = Capnp_rpc_unix.Vat.sturdy_uri vat service_id in
Fmt.pr "Server ID=%s running. Connect to URI %S.@." (Int.to_string id) (Uri.to_string uri);
Lwt.wait () |> fst
let open_conn id =
let uri = Uri.of_string ("capnp://insecure@" ^ Int.to_string (id + 9000)) in
let client_vat = Capnp_rpc_unix.client_only_vat () in
Capnp_rpc_unix.Vat.import_exn client_vat uri
let open_conns nodes =
let ids = List.init nodes Fun.id in
List.map (fun x -> open_conn x) ids
let send msg service =
let open Api.Client.Hs.SendMsg in
let request, params = Capability.Request.create Params.init_pointer in
let _ = Params.msg_set_reader params msg in
let* _ = Capability.call_for_value_exn service method_id request in
Lwt.return ()
let send_msg service msg =
Sturdy_ref.with_cap_exn service (send msg)
let local id nodes =
let conns = open_conns nodes in
let module Hs = Api.Service.Hs in
Hs.local @@ object
inherit Hs.service
method send_msg_impl params release_param_caps =
let open Hs.SendMsg in
let msg = Params.msg_get params in
release_param_caps ();
(* do stuff like sending messages *)
Service.return_empty ()
After sending several messages the nodes crash with LWT errors:
hs: internal error, uncaught exception:
Unix.Unix_error(Unix.EINVAL, "select", "")
Raised by primitive operation at Lwt_engine.select#select in file "src/unix/lwt_engine.ml", line 405, characters 26-60
Called from Lwt_engine.select_based#iter in file "src/unix/lwt_engine.ml", line 346, characters 8-39
Called from Lwt_main.run.run_loop in file "src/unix/lwt_main.ml", line 41, characters 6-49
Called from Lwt_main.run in file "src/unix/lwt_main.ml", line 118, characters 8-13
Re-raised at Lwt_main.run in file "src/unix/lwt_main.ml", line 124, characters 4-13
Called from Cmdliner_term.app.(fun) in file "cmdliner_term.ml", line 24, characters 19-24
Called from Cmdliner_eval.run_parser in file "cmdliner_eval.ml", line 34, characters 37-44
Fatal error: exception Lwt_switch.Off
Raised by primitive operation at Lwt_unix.read_bigarray.(fun) in file "src/unix/lwt_unix.cppo.ml", line 688, characters 8-59
Called from Lwt_unix.wrap_syscall.(fun) in file "src/unix/lwt_unix.cppo.ml", line 571, characters 17-28
main.exe: [WARNING] Uncaught exception handling CapTP connection: Failure("recv: Unix.Unix_error(Unix.ECONNRESET, \"read\", \"\")") (dropping connection)
main.exe: [DEBUG] Error calling field(6110)(rc=1+1) -> #[] -> remote-promise(6109, rc=1) -> q0(Hs.sendMsg): Disconnected: Switch turned off
Fatal error: exception Failure("Hs.sendMsg: Disconnected: Switch turned off")
Raised at Stdlib.failwith in file "stdlib.ml", line 29, characters 17-33
Called from Lwt.Sequential_composition.bind.create_result_promise_and_callback_if_deferred.c in file "src/core/lwt.ml", line 1849, characters 23-26
How can I fix this?