Waiting for Capnproto connection

I am using capnproto to send messages between several nodes. Each node can both send and receive messages from all others (and is connected to itself). The relevant code looks like this:

(* sending *)
let send_msg service (msg : Consensus.msg) =
	let send service =
		let open Api.Client.Hs.SendMsg in
		let request, params = Capability.Request.create Params.init_pointer in
		let msg_builder = Params.msg_get params in
		msg_to_api_msg msg_builder msg;
		let* _ = Capability.call_for_value_exn service method_id request in
		Lwt.return ()
	in
	Sturdy_ref.with_cap_exn service send

let open_conn vat id =
	let uri = Uri.of_string ("capnp://insecure@127.0.0.1:" ^ Int.to_string (id + 9000)) in
	Capnp_rpc_unix.Vat.import_exn vat uri

let open_conns nodes =
	let client_vat = Capnp_rpc_unix.client_only_vat () in
	let ids = List.init nodes Fun.id in
	List.map (open_conn client_vat) ids
(* receiving *)

(* main.ml *)
let start_node id nodes =
	Lwt_main.run begin
		let listen_address = `TCP ("127.0.0.1", 9000 + id) in
		let config = Capnp_rpc_unix.Vat_config.create ~serve_tls:false ~secret_key listen_address in
		let service_id = Capnp_rpc_net.Restorer.Id.public "" in
		let restore = Capnp_rpc_net.Restorer.single service_id (Hs.local) in
		let* vat = Capnp_rpc_unix.serve config ~restore in
		let uri = Capnp_rpc_unix.Vat.sturdy_uri vat service_id in
		Lwt.wait () |> fst
	end

(* hs.ml *)
let local  =
	let conns = open_conns nodes in (* connect to other nodes *)
	(* setup stuff *)
	let module Hs = Api.Service.Hs in
	Hs.local @@ object
		inherit Hs.service

		method send_msg_impl params release_param_caps =
			let open Hs.SendMsg in
			let msg = Params.msg_get params in
			release_param_caps ();
			(* do stuff including sending messages *)
			Service.return_empty ()
	end

I want the nodes to send some initial setup messages to each other once they start. The problem is if I do this in the “setup stuff” area then the node is not yet serving requests, and messages will fail to be delivered.
How can I wait to send these messages until all the connections are setup and all nodes are ready to serve requests?

All of your nodes are created with Capnp_rpc_unix.client_only_vat, so none of them can accept a connection anyway. You need Capnp_rpc_unix.serve instead.

For this in-process case, you’ll know when you’ve created all the vats and can send the messages then, but in a real system there’s no way to know whether the other server has started except by trying to connect to it (and retrying if that fails).

Sorry I missed out part of my code that is important - I do use serve. How can I attempt to connect and retry?

I was able to fix this by separating out the ‘Sturdy_ref.with_cap_exn’ command into a separate connect and send. The new code looks like this:

let rec connect service t =
		let* r = Sturdy_ref.connect service in
		match r with
			| Ok conn -> Lwt.return conn
			| Error _ ->
				let* () = Lwt_unix.sleep t in
				connect service (t *. 2.) (* binary exponential backoff *)

(* used by nodes to communicate with eachother *)
let send_msg service (msg : Consensus.msg) =
	let send service =
		let open Api.Client.Hs.SendMsg in
		let request, params = Capability.Request.create Params.init_pointer in
		let msg_builder = Params.msg_get params in
		msg_to_api_msg msg_builder msg;
		let* _ = Capability.call_for_value_exn service method_id request in
		Lwt.return ()
	in
	Lwt.async (fun () ->
		let* cap = connect service 0.1 in
		Capability.with_ref cap send
	)