I am using capnproto to send messages between several nodes. Each node can both send and receive messages from all others (and is connected to itself). The relevant code looks like this:
(* sending *)
let send_msg service (msg : Consensus.msg) =
let send service =
let open Api.Client.Hs.SendMsg in
let request, params = Capability.Request.create Params.init_pointer in
let msg_builder = Params.msg_get params in
msg_to_api_msg msg_builder msg;
let* _ = Capability.call_for_value_exn service method_id request in
Lwt.return ()
in
Sturdy_ref.with_cap_exn service send
let open_conn vat id =
let uri = Uri.of_string ("capnp://insecure@127.0.0.1:" ^ Int.to_string (id + 9000)) in
Capnp_rpc_unix.Vat.import_exn vat uri
let open_conns nodes =
let client_vat = Capnp_rpc_unix.client_only_vat () in
let ids = List.init nodes Fun.id in
List.map (open_conn client_vat) ids
(* receiving *)
(* main.ml *)
let start_node id nodes =
Lwt_main.run begin
let listen_address = `TCP ("127.0.0.1", 9000 + id) in
let config = Capnp_rpc_unix.Vat_config.create ~serve_tls:false ~secret_key listen_address in
let service_id = Capnp_rpc_net.Restorer.Id.public "" in
let restore = Capnp_rpc_net.Restorer.single service_id (Hs.local) in
let* vat = Capnp_rpc_unix.serve config ~restore in
let uri = Capnp_rpc_unix.Vat.sturdy_uri vat service_id in
Lwt.wait () |> fst
end
(* hs.ml *)
let local =
let conns = open_conns nodes in (* connect to other nodes *)
(* setup stuff *)
let module Hs = Api.Service.Hs in
Hs.local @@ object
inherit Hs.service
method send_msg_impl params release_param_caps =
let open Hs.SendMsg in
let msg = Params.msg_get params in
release_param_caps ();
(* do stuff including sending messages *)
Service.return_empty ()
end
I want the nodes to send some initial setup messages to each other once they start. The problem is if I do this in the “setup stuff” area then the node is not yet serving requests, and messages will fail to be delivered.
How can I wait to send these messages until all the connections are setup and all nodes are ready to serve requests?