Naive implementation of Cyclic barrier

Hi

The Cyclicbarrier (The code is here. ) is ported from Go. It is an exercise as I am aware that concurrency primitives are notoriously difficult to create.

But my simple test seems to work.

The await function which is the main driver is this.

 let await b =

    try Fiber.check();

     let captured_round_result =
     Eio.Mutex.use_rw ~protect:true b.lock @@
      fun () ->
       let capture_round = b.round in
	     if capture_round.is_broken then(
          raise ( Barrier_breached "await\n" )
       )
       else
            capture_round.count <- capture_round.count + 1;
            let c = capture_round.count in
                Printf.printf "Count is %d\n" capture_round.count;
                if  c > b.participants then
                    failwith ("c > participants fatal")
                else
                if c < b.participants then(
                 match (Fiber.get wait_cond) with
                 | Some w ->
                  Eio.Condition.await w (* capture_round.wait_cond *) b.lock ;
                  false
                | None -> failwith "await error"
                )
                else(
                    true
                )
      in
      if captured_round_result = true then(
        break_barrier b  (Fiber.get wait_cond) (Fiber.get breached_cond) true;
        reset b  (Fiber.get wait_cond) (Fiber.get breached_cond);
      )
     with | e ->
       let msg = Printexc.to_string e
       and stack = Printexc.get_backtrace () in
         Printf.eprintf "there was an error: %s%s\n" msg stack;
         break_barrier b (Fiber.get wait_cond) (Fiber.get breached_cond) true;
     ()


I have some questions.

  1. I use Fiber local variables but even otherwise it executes the test. Not sure if I need them.

  2. I didn’t find a way to check for multiple Eio.Condition and proceed even if one of them is broadcast. So if a FIber is cancelled or finished or if the barrier exceeds its capacity then I would like to broadcast that particular condition. I wanted to select from a list of Conditions like this API
    val nchoose_split : 'a tlist -> ('a list * 'atlist)t

    Lwt.nchoose_split ps is the same as Lwt.nchoose ps, except that when multiple promises in ps are fulfilled simultaneously (and none are rejected), the result promise is fulfilled with both the list of values of the fulfilled promises, and the list of promises that are still pending.

  3. I could only find Fiber.check to check if Fibers are cancelled or not. How do I find if the Fiber is done ?

The Barrier-Actioneffect is a dummy now. Moreover I didn’t know what type of data/thread race could be caused. Should I use T-san to check ?

Thanks

Your code is hard to review, there’s too much deadcode that’s not relevant but which distracts from your core implementation (e.g. the barrier_action effect stuff that’s unused, the noop if b.round.is_broken then b.round.is_broken <- true, etc etc). Could you clean it up and re-paste it, while making sure it has syntax highlighting and is properly indented? (using ocamlformat if needed)

  1. Indeed the fiber local variables don’t seem required, a better design would be to have the conditions in the cyclic_barrier datastructure, instead of relying on fiber local storage to pass them implicitly
  2. You can wait on multiple conditions (or other stuff) with Eio.Fiber.first and any/n_any/etc
  3. I don’t think your call to Fiber.check is useful, since it’s followed by an Eio.Mutex.use_rw which already checks the fiber for cancellation. In general Fiber.check is only required when you are doing a CPU-heavy computation or other non-Eio operations, to once in a while check that your fiber wasn’t aborted. You can’t check if a Fiber is “done” from within itself, since by definition the Fiber would be running when that check would execute :stuck_out_tongue: (but you can use an Eio.Promise to start a fiber and check on its progress/completion from the outside)

TSan is great and highly recommended when doing multicore work, mistakes are very easy to make otherwise :slight_smile:

3 Likes

Sure. I have to investigate Point 2 in your reply.

I have turned on the syntax highlighter in Pastebin. Some comments are removed and indentation is improved. I guess await should be refactored but it is the same for now.

Update : I believe this should proceed if one of the multiple Eio.Condition fires ?


let await b =

    try Fiber.check();

     let captured_round_result =
     Eio.Mutex.use_rw ~protect:true b.lock @@
      fun () ->
       let capture_round = b.round in
	     if capture_round.is_broken then(
          raise ( Barrier_breached "await\n" )
       )
       else
            capture_round.count <- capture_round.count + 1;
            let c = capture_round.count in
                Printf.printf "Count is %d\n" capture_round.count;
                if  c > b.participants then
                    failwith ("c > participants fatal")
                else
                if c < b.participants then(
                Fiber.any[ fun() ->
                 (match (Fiber.get wait_cond) with
                 | Some w ->
                  Eio.Condition.await w b.lock ;
                  false
                 | None -> failwith "await error")
                ]
                )
                else(
                  Fiber.any[(fun () -> true )]
                )
      in
      if captured_round_result  then(
        break_barrier b  (Fiber.get wait_cond) (Fiber.get breached_cond) true;
        reset b  (Fiber.get wait_cond) (Fiber.get breached_cond);
      )


     with | e ->
       let msg = Printexc.to_string e
       and stack = Printexc.get_backtrace () in
         Printf.eprintf "there was an error: %s%s\n" msg stack;
         break_barrier b (Fiber.get wait_cond) (Fiber.get breached_cond) true;
     ()


Thanks.