Howto start parallel tasks and choose the first one which finishes?

I’m trying to take advantage of the multicore capabilities to speed-up some time consuming lookup.

Suppose I have val f: start:int -> finish:int -> int, a function which computes “something”. Internally, it tries values from start to finish, and when a suitable solution is found, it stops iterating and returns an int.

How could I start, say 4 tasks in parallel (with 4 start/finish pairs chosen to cover the whole research domain), and stop when the first task returns?

Is there any library allowing this without managing myself the communication between tasks to have all of them interrupted upon the first one returns?

Best regards

1 Like

I believe you can use Eio for that. Slightly adapting the example from the readme:

open Eio.Std

let run_task ~start ~finish ~domain_mgr =
  traceln "Running f on range %d..%d = %d" start finish
    (Eio.Domain_manager.run domain_mgr
      (fun () -> f ~start ~finish))

let main ~domain_mgr =
  Fiber.first
    (fun () -> run_task ~start:0 ~finish:50000 ~domain_mgr)
    (fun () -> run_task ~start:50000 ~finish:100000 ~domain_mgr)

let () =
  Eio_main.run @@ fun env ->
    main ~domain_mgr:(Eio.Stdenv.domain_mgr env)

Here Fiber.first is the function that launches tasks in parallel and returns the result of whichever returns first, cancelling the others.

3 Likes

(there’s Fiber.any for more than 2 parallel tasks, and there’s of course domainslib’s Task.parallel_{for, scan, for_reduce})

1 Like

True. I mentioned Eio because I’m not sure there is a comparably simple way of doing cancellation in Domainslib.

1 Like

Thanks for the links.

Before I’ll try cancellation, i’m loocking at parralelism (for the provided values, my test function does not exit the loop).

I’m not sure why, but it seems not to speedup computation with Eio? Surely, I’m not using it the right way? Or maybe I’m paying the price because computations can be cancelled?

  match test with

  (**** ≈ 14 s / 1 core used ****)
  | 1 -> ff 0 ()

  (**** ≈ 26 s / 1 core used ****)
  | 2 -> ff 0 (); ff 1 ()


  (**** ≈ 40 s / 1 core used ****)      
  | 3 -> ff 0 (); ff 1 (); ff 2 ()

  (**** ≈ 58 s / 1 core used ****)
  | 4 -> ff 0 (); ff 1 (); ff 2 (); ff 3 ()

  (**** On a 4-core machine    ****)
  (**** ≈ 40 s / 4 cores used ****)
  | 5 ->
  let pool = Domainslib.Task.setup_pool ~num_domains:3 () in
  Domainslib.Task.run pool
    (fun () -> Domainslib.Task.parallel_for pool
        ~chunk_size:1 ~start:0 ~finish:3
        ~body:(fun i -> ff i ()));
  Domainslib.Task.teardown_pool pool

  (**** On a 4-core machine    ****)
  (**** ≈ 61 s / 4 cores used ****)
  | 6 ->
  Eio_main.run (fun env ->
      let domain_mgr = Eio.Stdenv.domain_mgr env in
      Eio.Fiber.all
        [
          (fun () -> Eio.Domain_manager.run domain_mgr (ff 0));
          (fun () -> Eio.Domain_manager.run domain_mgr (ff 1));
          (fun () -> Eio.Domain_manager.run domain_mgr (ff 2));
          (fun () -> Eio.Domain_manager.run domain_mgr (ff 3));
      ]
    );

  | 7 ->
  (**** On a 4-core machine    ****)
  (**** ≈ 450 s / 4 cores used ****)
  Eio_main.run (fun env ->
      let domain_mgr = Eio.Stdenv.domain_mgr env in
      Eio.Fiber.all
        [
          (fun () -> Eio.Domain_manager.run domain_mgr (ff 0));
          (fun () -> Eio.Domain_manager.run domain_mgr (ff 1));
          (fun () -> Eio.Domain_manager.run domain_mgr (ff 2));
          (fun () -> Eio.Domain_manager.run domain_mgr (ff 3));
          (fun () -> Eio.Domain_manager.run domain_mgr (ff 0));
          (fun () -> Eio.Domain_manager.run domain_mgr (ff 1));
          (fun () -> Eio.Domain_manager.run domain_mgr (ff 2));
          (fun () -> Eio.Domain_manager.run domain_mgr (ff 3));
        ]
    )

  | 8 -> 
  (**** On a 4-core machine    ****)
  (**** ≈  42 s / 2 cores used ****)
  Eio_main.run (fun env ->
      let domain_mgr = Eio.Stdenv.domain_mgr env in
      Eio.Fiber.all
        [
          (fun () -> Eio.Domain_manager.run domain_mgr (fun () -> ff 0 (); ff 1 ()));
          (fun () -> Eio.Domain_manager.run domain_mgr (fun () -> ff 2 (); ff 3 ()));
        ]
    )

  | 9 ->
  (**** On a 4-core machine    ****)
  (**** ≈  127 s /  4 cores used ****)
  Eio_main.run (fun env ->
      let domain_mgr = Eio.Stdenv.domain_mgr env in
      Eio.Fiber.all
        [
          (fun () -> Eio.Domain_manager.run domain_mgr (fun () -> ff 0 (); ff 1 ()));
          (fun () -> Eio.Domain_manager.run domain_mgr (fun () -> ff 2 (); ff 3 ()));
          (fun () -> Eio.Domain_manager.run domain_mgr (fun () -> ff 0 (); ff 1 ()));
          (fun () -> Eio.Domain_manager.run domain_mgr (fun () -> ff 2 (); ff 3 ()))
        ]
    )

   | 10 ->
    (**** On a 4-core machine                                  ****)
    (**** ≈ 42 s / 1 core used for 15 s, and 4 core used after ****)
    Eio_main.run (fun env ->
        let domain_mgr = Eio.Stdenv.domain_mgr env in
        Eio.Fiber.all
          [
            (fun () -> ff 0 ());
            (fun () -> Eio.Domain_manager.run domain_mgr (fun () -> ff 1 ()));
            (fun () -> Eio.Domain_manager.run domain_mgr (fun () -> ff 2 ()));
            (fun () -> Eio.Domain_manager.run domain_mgr (fun () -> ff 3 ()))
          ]
      )

| _ -> assert false

My test functions may not be really thread safe. Checking…

OK. My function was not to blame.

Just that I tried this on my 10 years old computer, which has not much to offer in term of parralelisation…

With a brand new one, it’s really efficient. And easy to set up with Domainslib.

And for the cancellation part, I adapted my test function f such as it tests one value at a time, and raise (Found (result)): the Domainslib parallel_for loop is interrupted when the solution is found. Great.

…
  let pool = Domainslib.Task.setup_pool ~num_domains:19 () in
  try
    Domainslib.Task.run pool (fun () ->
        Domainslib.Task.parallel_for pool ~start:0 ~finish:4_000_000
          ~body:(f ~interrupt:true))
  with
    Found result ->
    Fmt.pr "My result: %d@," result;

    Domainslib.Task.teardown_pool pool

Yes. However, if the fiber isn’t doing IO at the time then the cancellation will only take effect the next time it does. If it’s all CPU stuff, you’ll need to call Fiber.check () from time to time to make it work. It might be easier to do the check yourself (e.g. if Atomic.get answer <> None then raise Exit).

Note: the following domainslib PR is relevant and would probably work even nicer than parallel_for for your purposes:

1 Like