Domainslib blocking on Chan.recv

Hej :slight_smile:

While playing around with domainslib I have encountered an interesting case when using channels. I have created the following minimal example to demonstrate it:

module T = Domainslib.Task 
module C = Domainslib.Chan 
let num_domains = Sys.argv.(1) |> int_of_string 

let () = 
  let pool = T.setup_pool ~num_domains:(num_domains - 1) () in 
  let ping = C.make_bounded 1 in 
  let pong = C.make_bounded 1 in 
  T.run pool (fun _ -> 
    let _ = T.async pool (fun _ -> 
      C.recv ping; 
      print_endline "Hello, World"; 
      C.send pong () 
    ) in  

    C.send ping (); 
    C.recv pong 
  ); 
  T.teardown_pool pool 

If I call the program above with argument 1 (i.e. no additional domains), I get no output, while if I call it with anything bigger than 1 (i.e. there is gonna be more than 1 domain), I get the expected "Hello, World" printed out.

The following is from Chan.mli:

val recv : 'a t -> 'a
(** [recv c] returns a value [v] received over the channel. If the channel
    buffer is empty then the domain blocks until a message is sent on the
    channel. *)

Somehow I would have expected that to mean, that if one thread is blocking on an empty channel, other threads in the pool might run, but that does not seem to be the behaviour. Have I misunderstood how channels work? And if so, is there something similar that gives me the expected behaviour? I assume I could use the promise mechanism offered by Eio, but if possible I’d like to stick to domainslib.

Thanks in advance for any hints :slight_smile:

I think the crucial bit of documentation is this:

When num_domains is 0, the new pool will be empty, and when an empty pool is in use, every function in this module will run effectively sequentially, using the calling domain as the only available domain.

That appears to be a “just turn everything off” mode. Which… does seem to make channels less than useful in single-threaded usage.

Thinking about it a bit further, I suppose that makes sense for a library strictly focused on system-level threads. It does mean that behaviour in the event of num_domains=0 and blocking is completely different from that with num_domains=1.

In the interests of simplicity, I’d probably just set 1 as the lower limit. A second system-thread isn’t going to be that much less efficient than just having one. Particularly since (if I’m reading the docs/code correctly) the main system thread isn’t going to be running any user-level code at the top level.

I see, that does make sense! Requiring to have at least 2 domains in the pool also is not particularly far fetched :slight_smile:

So I have experimented a bit more, and have come up with the following adapted example:

module T = Domainslib.Task 
module C = Domainslib.Chan 
let num_domains = Sys.argv.(1) |> int_of_string 

let print_mutex = Mutex.create () 

let print s = 
  Mutex.lock print_mutex; 
  print_endline s; 
  Mutex.unlock print_mutex

let ping = C.make_bounded 1 
let pong = C.make_bounded 1 
let pang = C.make_unbounded ()  


let run_async name p () = 
  let rec f () : unit = 
    C.recv p; 
    print name;
    C.send pang (); 
    f () 
  in 
  f () 

let () = 
  let pool = T.setup_pool ~num_domains:(num_domains - 1) () in  
  T.run pool (fun _ -> 
    let _ = T.async pool (run_async "A" ping) in 
    let _ = T.async pool (run_async "B" pong) in 


    while true do 
      C.send ping (); 
      C.send pong (); 
      C.recv pang
    done
  ); 
  T.teardown_pool pool 

I again tried running it with different inputs. As expected, with 1 (no additional domains) it does immediately block. However, with 2 I get two consecutive "A"s printed out, but then it also blocks. If I provide more than 2 domains, the program runs indefinitely, as expected. It seems, that when reading from an empty channel, the call to C.recv blocks the domain and effectively removes it from the pool until there is something to read from the channel, even if there are other threads that could be run in the meantime.

Is that the expected behaviour?

Duplication notice: I have also asked the same question on the issues page of the domainslib GitHub repository

I have never used Domainslib (so please caveat everything I say with that): but if Domainslib works in the way that other thread libraries I have used work, then the canonical way of using a channel is to have one or more threads sending to any one channel and a different thread or threads reading from it. In your test case of a single domain pool, everything (including your while loop) will run in the same domain so you end up with a deadlock. Your code should work if your while loop runs in the program’s starting domain instead of running in the thread pool domain (not tested).

Having said that I cannot see how you end up with two consecutive "A"s printed.

Thanks for the input! I have to admit that I have never before worked with any concurrency/parallel libraries in OCaml.

So I just tried what happens if I move the while loop out of Task.run:

let () = 
  let pool = T.setup_pool ~num_domains:(num_domains - 1) () in  
  T.run pool (fun _ -> 
    let _ = T.async pool (run_async "A" ping) in 
    let _ = T.async pool (run_async "B" pong) in 
    ()
  ); 
  while true do 
    C.send ping (); 
    C.send pong (); 
    C.recv pang
  done;
  T.teardown_pool pool

It shows exactly the same behaviour as the previous version. I looked at the documentation of domainslib again:

val setup_pool : ?name:string -> num_domains:int -> unit -> pool

Sets up a task execution pool with num_domains new domains. If name is provided, the pool is mapped to name which can be looked up later with lookup_pool name.

When num_domains is 0, the new pool will be empty, and when an empty pool is in use, every function in this module will run effectively sequentially, using the calling domain as the only available domain.

Raises Invalid_argument when num_domains is less than 0.

To me that reads as if the calling domain automatically becomes part of the pool when I call setup_pool, right?

val async : pool -> 'a task -> 'a promise

async p t runs the task t asynchronously in the pool p. The function returns a promise r in which the result of the task t will be stored.

So I thought, that running the two calls to run_async within Task.async should mean, that they can run independently in the provided pool, also independently of the calling thread.

The consecutive "A"s are indeed a bit mysterious, but I would assume that the OS scheduler decides to run the run_async "A" ping twice, and then ends up in a situation where one domain blocks on pang and one blocks on ping. If the call to Chan.recv really does remove the current domain from the pool that leaves no domain left for running run_async "B" pong, even though that one would be ready to run. That would explain the output in case of 2 domains, and why it works as soon as I have more than 2.

I was also thinking that maybe Task.async only runs a given function in a pool in case that there is an unused domain available, and otherwise blocks until a domain becomes available. However, it seems that the way it is implemented (at least as far as I can tell from the code), is that the closure provided as an argument to Task.async gets added to an unbounded queue, so it should always return immediately.

As another experiment I tried what happens if I use something else besides Domainslib.Chan. So I adapted the code to use an Mvar.t provided by the Kcas_data library:

module T = Domainslib.Task 
module C = Domainslib.Chan 
let num_domains = Sys.argv.(1) |> int_of_string 
let print_mutex = Mutex.create () 

let print s = 
  Mutex.lock print_mutex; 
  print_endline s; 
  Mutex.unlock print_mutex

let ping = Kcas_data.Mvar.create None  
let pong = Kcas_data.Mvar.create None 
let pang = Kcas_data.Mvar.create None 


let run_async name p () = 
  let rec f () : unit = 
    Kcas_data.Mvar.take p; 
    print name;
    Kcas_data.Mvar.try_put pang () |> ignore; 
    f () 
  in 
  f () 

let () = 
  let pool = T.setup_pool ~num_domains:(num_domains - 1) () in  
  T.run pool (fun _ -> 
    let _ = T.async pool (run_async "A" ping) in 
    let _ = T.async pool (run_async "B" pong) in 
    ()
  ); 
  while true do 
    Kcas_data.Mvar.put ping (); 
    Kcas_data.Mvar.put pong (); 
    Kcas_data.Mvar.take pang
  done;
  T.teardown_pool pool 

With this new code in place, the program still blocks if run with only one domain in the pool, but now it outputs "A"s and "B"s indefinitely already with 2 domains.

So that is a bit contrary to what it says in the domainslib documentation about Chan.create_bounded:

val make_bounded : int -> 'a t

make_bounded n makes a bounded channel with a buffer of size n. Raises Invalid_argument "Chan.make_bounded" if the buffer size is less than 0.

With a buffer size of 0, the send operation becomes synchronous. With a buffer size of 1, you get the familiar MVar structure. The channel may be shared between many sending and receiving domains.

I don’t think it has anything to do with the OS scheduler. Function run_async "A" gets assigned the only available domain from the pool. Then the main code fills both ping and pong. Function “A” empties ping, displays A, and fills pang. The main code empties pang and fills ping, but then it blocks since pong is already filled. Again, function “A” empties ping, displays A, and fills pang, but then it blocks since ping is empty. At this point, everybody is blocked, including “B”, since no domain was ever made available.

The problem with this implementation is that the documentation for Task.run indicates that Task.run runs synchronously and therefor presumably blocks until the task is complete. Edit: maybe that is not a problem as the task run by Thread.run only applies asynchronous functions; I don’t know how Domainslib implements its asynchronous functions in relation to thread pools.

That is a point of detail. Assuming Domainslib works like other thread implementations, the overarching point is that (i) the program starting domain and the thread pool domain(s) run independently of each other (the program starting domain does not run in the thread pool), and (ii) the same domain should not both send to and receive from the same channel. If you allow the latter to happen you may deadlock.

Try this:

module T = Domainslib.Task
module C = Domainslib.Chan

let num_domains = Sys.argv.(1) |> int_of_string
let print_mutex = Mutex.create ()

let print s =
  Mutex.lock print_mutex;
  print_endline s;
  Mutex.unlock print_mutex

let ping = Kcas_data.Mvar.create None
let pong = Kcas_data.Mvar.create None
let pang = Kcas_data.Mvar.create None

let run_async name p () =
  let rec f () : unit =
    Kcas_data.Mvar.take p;
    print name;
    Kcas_data.Mvar.try_put pang () |> ignore;
    f ()
  in
  f ()

let () =
  let pool = T.setup_pool ~num_domains:(num_domains - 1) () in
  T.run pool (fun _ ->
      let _ = T.async pool (run_async "A" ping) in
      let _ = T.async pool (run_async "B" pong) in
      while true do
        Kcas_data.Mvar.put ping ();
        Kcas_data.Mvar.put pong ();
        Kcas_data.Mvar.take pang
      done);
  T.teardown_pool pool

When you use Kcas from inside a domainslib fiber, it will use a blocking method that allows domainslib to run other fibers. When you use Kcas from outside of a scheduler (like domainslib), Kcas uses a default blocking mechanism based on the Stdlib Mutex and Condition and that will block the systhread or domain (and can only be unblocked from another systhread or domain).