I am using capnproto to send messages, here is my code:
main.ml
let secret_key = `Ephemeral
let start_server id =
let listen_address = `TCP ("127.0.0.1", 9000 + id) in
let config = Capnp_rpc_unix.Vat_config.create ~serve_tls:false ~secret_key listen_address in
let service_id = Capnp_rpc_net.Restorer.Id.public "" in
let restore = Capnp_rpc_net.Restorer.single service_id (Echo.local id) in
let+ vat = Capnp_rpc_unix.serve config ~restore in
let uri = Capnp_rpc_unix.Vat.sturdy_uri vat service_id in
Fmt.pr "Server running. Connect to URI %S.@." (Uri.to_string uri)
let () =
Lwt_main.run begin
let reqs = 1000 in
let rate = 50. in
let* () = start_server 0 in
let* () = start_server 1 in
let* () = start_server 2 in
let conns = Echo.open_conns 3 in
let* results = Lwt_list.mapi_p
(fun i msg ->
let* () = Lwt_unix.sleep ((Float.of_int i) /. rate) in
Echo.ping_msg (List.nth conns (i mod 3)) msg
)
(List.init reqs (fun x -> "ping #" ^ (Int.to_string x)))
in
print_results results
end
echo.ml
let local id =
let module Echo = Api.Service.Echo in
Echo.local @@ object
inherit Echo.service
method ping_impl params release_param_caps =
let open Echo.Ping in
let msg = Params.msg_get params in
Fmt.pr "%d recv: %s@." id msg;
release_param_caps ();
Service.return_empty ()
end
let rec connect service t =
let* r = Sturdy_ref.connect service in
match r with
| Ok conn -> Lwt.return conn
| Error _ ->
let* () = Lwt_unix.sleep t in
Fmt.pr "backed off %f...@." t;
connect service (t *. 2.) (* binary exponential backoff *)
let open_conn vat id =
let uri = Uri.of_string ("capnp://insecure@127.0.0.1:" ^ Int.to_string (id + 9000)) in
Capnp_rpc_unix.Vat.import_exn vat uri
let open_conns nodes =
let client_vat = Capnp_rpc_unix.client_only_vat () in
let ids = List.init nodes Fun.id in
List.map (open_conn client_vat) ids
(* calculate delta between timestamps in seconds s*)
let delta x y =
let open Base.Int63 in
(to_float (y - x)) /. 1_000_000_000.
let ping_msg sr msg =
let ping cap =
let open Api.Client.Echo.Ping in
let request, params = Capability.Request.create Params.init_pointer in
Params.msg_set params msg;
let t1 = Time_now.nanoseconds_since_unix_epoch () in
let* () = Capability.call_for_unit_exn cap method_id request in
let t2 = Time_now.nanoseconds_since_unix_epoch () in
Lwt.return (delta t1 t2)
in
let t1 = Time_now.nanoseconds_since_unix_epoch () in
let* cap = connect sr 0.01 in
let t2 = Time_now.nanoseconds_since_unix_epoch () in
let* t = Capability.with_ref cap ping in
Lwt.return((t, delta t1 t2))
echo_api.capnp
interface Echo {
ping @0 (msg :Text) -> ();
}
The time it takes to execute Capability.call_for_unit_exn
is normally on the order of 0.1ms, but occasionally it takes on the order of 10ms. What could cause it to execute so slowly sometimes?