How do you create a remote workers with RPC_parallel



I am trying to use Jane Street’s Rpc_parallel library in Ocaml to create multiple workers in different remote servers. Using the simple example that Jane Street provided, I created a program to compute the sum of numbers from 0 to max on the host, The code can be seen below:

open Core
open Async

(* A bare bones use case of the [Rpc_parallel] library. This demonstrates how to
   define a simple worker type that implements some functions. The master then spawns a
   worker of this type and calls a function to run on this worker *)

module Sum_worker = struct
  module T = struct
    (* A [Sum_worker.worker] implements a single function [sum : int -> int]. Because this
       function is parameterized on a ['worker], it can only be run on workers of the
       [Sum_worker.worker] type. *)
    type 'worker functions = { sum : ('worker, int, int) Rpc_parallel.Function.t }

    (* No initialization upon spawn *)
    module Worker_state = struct
      type init_arg = unit [@@deriving bin_io]

      type t = unit

    module Connection_state = struct
      type init_arg = unit [@@deriving bin_io]

      type t = unit

    module Functions
        (C : Rpc_parallel.Creator
         with type worker_state := Worker_state.t
          and type connection_state := Connection_state.t) =
      (* Define the implementation for the [sum] function *)
      let sum_impl ~worker_state:() ~conn_state:() arg =
        let sum = List.fold ~init:0 ~f:( + ) (List.init arg in "Sum_worker.sum: %i\n" sum;
        Core.printf !"%d" sum;
        return sum

      (* Create a [Rpc_parallel.Function.t] from the above implementation *)
      let sum = C.create_rpc ~f:sum_impl ~bin_input:Int.bin_t ~bin_output:Int.bin_t ()

      (* This type must match the ['worker functions] type defined above *)
      let functions = { sum }

      let init_worker_state () = Deferred.unit

      let init_connection_state ~connection:_ ~worker_state:_ = return

  include Rpc_parallel.Make (T)

let main max log_dir () =
  let executable_path =  "/app/simple.exe" in
  let host = "root@"in
  let redirect_stdout, redirect_stderr =
    match log_dir with
    | None -> `Dev_null, `Dev_null
    | Some _ -> `File_append "sum.out", `File_append "sum.err"

      (Rpc_parallel.Remote_executable.existing_on_host ~executable_path
        ~strict_host_key_checking:`No host))
  >>=? fun conn -> conn ~f:Sum_worker.functions.sum ~arg:max
  >>=? fun res ->
  Core.Printf.printf "sum_worker: %d\n%!" res;

let command =
  (* Make sure to always use [Command.async] *)
    ~summary:"Simple use of Async Rpc_parallel V2"
      +> flag "max" (required int) ~doc:""
      +> flag "log-dir" (optional string) ~doc:" Folder to write worker logs to")

(* This call to [Rpc_parallel.start_app] must be top level *)
let () = Rpc_parallel.start_app command

let () = never_returns (Scheduler.go ())

The master node tries to spawn a worker node on the remote host. When I run simple.exe, it asks me for permissions to ssh into So, I am successfully ssh’d into However, I get the following error:

[WORKER  STDERR]: (((pid 283) (thread_id 0)) "2018-07-27 22:17:22.611147668Z"
[WORKER  STDERR]:  "unhandled exception in Async scheduler"
[WORKER  STDERR]:  ("unhandled exception"
[WORKER  STDERR]:     ("Worker failed to register"
[WORKER  STDERR]:      (
[WORKER  STDERR]:       (src/ ubuntu "host not found")
[WORKER  STDERR]:       ("Raised at file \"src/\" (inlined), line 351, characters 22-32"
[WORKER  STDERR]:        "Called from file \"src/\", line 168, characters 17-26"
[WORKER  STDERR]:        "Called from file \"src/\", line 20, characters 40-45"
[WORKER  STDERR]:        "Called from file \"src/\", line 159, characters 6-47"))
[WORKER  STDERR]:      src/
[WORKER  STDERR]:     ("Raised at file \"src/\" (inlined), line 351, characters 22-32"
[WORKER  STDERR]:      "Called from file \"src/\" (inlined), line 9, characters 14-30"
[WORKER  STDERR]:      "Called from file \"src/\" (inlined), line 5, characters 2-50"
[WORKER  STDERR]:      "Called from file \"src/\", line 1365, characters 19-78"
[WORKER  STDERR]:      "Called from file \"src/\", line 20, characters 40-45"
[WORKER  STDERR]:      "Called from file \"src/\", line 159, characters 6-47"
[WORKER  STDERR]:      "Caught by monitor main"))
[WORKER  STDERR]:    ((pid 283) (thread_id 2)))))

It says that host is not found, but I find it strange that I successfully ssh’d into the host without any problem. Does anybody have clue of why this is happening?


As part of the RPC_parallel initial worker handshake, the worker needs to connect back to the master to register itself. This is the step that is failing. The worker is trying to connect to “ubuntu” (the name of the master server), which is failing to resolve.


Greatly appreciate your response @tlubin. I was suspecting this as well. How would it possible to find my master server? Would it be possible to resolve this issue by specifying the host of the master server?


The master server does a Unix.gethostname and passes the result to the worker. You are going to have to make some changes so the master’s hostname (i.e. “ubuntu”) resolves properly from the worker machine. A quick fix would be to just throw it into /etc/hosts.


I got the two “remote” process to connect with each other successfully by modifying the “/etc/hosts” variables. Why is RPC_parallel designed by ssh’ing into another machine and running the executable? A huge security hole can unveiled if I ssh into another machine.


You can run local workers, in fact local workers is the default. Remote workers give the library a lot more flexibility.

In terms of the “huge security hole”, I’m not sure exactly what you are suggesting. The user can specify where to sync the executable remotely. There is an md5 check to ensure that the remote executable matches the local one. Yes, there is a small race here, but if somebody is on the remote machine and has suitable permissions to change the executable, they could always gdb the running process and do whatever they want. It is up to the user to choose a file system path that has suitable permissions.