Help on a design of a high-throughput Batch-Parallel DataStructure Library

Hi everyone, thanks to the new multicore support, I am investigating implementing a library of high-throughput “Batch-parallel Data structures”.

The general idea behind why these data structures have good performance is because they recieve operations in batches and only needs to handle a single batch at any point of time. The benefits of this invariant are that better optimisations and parallelism strategies can be derived because we can have prior information of the operations that are going to be executed in parallel. This is unlike traditional parallel datastructures which need to be designed more conservatively to be able to handle arbritrary concurrent operations. For more information, you can refer to this paper https://www.cse.wustl.edu/~kunal/resources/Papers/batcher.pdf

In order to implement this in OCaml, I’m currently considering the following design:

  1. The interface of the batch-parallel data structures will be
module type DS = sig
  type t
  
  (* ADT of operations and input parameters *)
  type op
  (* ADT of the corresponding operation result *)
  type res
  
  (* bop operations num, expects a left adjusted array of operations and the number of operations in the batch.
     All results of the operations are filled when the function returns *)
  val bop : (op * res Types.promise) array -> ~num:int -> unit
end
  1. To additionally provide library users with their usual atomic operation API’s, the paper also describes a custom scheduler which performs implicit batching of operations. My general sense here is to tweak domainslib’s Task module into a functor that takes in the DS module signature and adjust the scheduler to perform implicit batching and then call bop.

However this design runs into the problem that the DS implementation cannot use the parallel primitives async and await. I’ve thought about designing the DS and the Scheduler modules as recursive modules, but this exposes too much of the implementation details of the scheduler to the DS module. Ideally, I would like to make the implementations of the Scheduler and the DS as disjoint and abstract as possible whilst making their linking user friendly. Any thoughts on how to do this?

1 Like

Hello! Thanks for sharing the paper, it’s great! (I still have a bit of reading/thinking to do on their alternating-steal policy)

For this kind of work, I find it easier to start by implementing a specific datastructure to better understand what is needed, to benchmark and confirm the benefits, and only then try to generalize to support more datastructures. For example, their batched-counter example shows a parallel sum: so your bop operation would actually require a domainslib pool argument, etc, there’s a lot to learn before deciding what the common interface should be.

Regarding your architecture, I don’t think you want to specialize the scheduler such that only one datastructure is usable! Rather, the scheduler should provide the primitives that make it possible to later implement and use multiple batched datastructures. This domainslib PR would probably help for example, to “wait” for a batch to complete while making the current Domain available to work on the batch (and this other PR is unrelated but perhaps useful, as it includes a change that would make the implicit batch size larger)

(I’ve a bit more thoughts on how you could prototype this stuff with domainslib by using the first PR, but don’t want to spoil too much in case you would prefer discovering by yourself :slight_smile: )

3 Likes

++ for the pragmatic, bottom-up approach

Hey @art-w, thanks so much for the taking the time to share your ideas! Your PR actually really helped to guide a lot of the difficulties I was having writing the implicit batching logic. I also took your advice and bench-marked the batched counter against a parallel lock-free counter. At first, it seems that the standard lock-free counter is still much faster. I suspected that it was the overhead of the extra batching logic killing the speedup of a batched operation, so I sneakily added some pauses in each atomic operation to exaggerate the effects of the batching. These are my results:

Before simulating longer operations (1_000_000 increments):

    num_domains:      1        2        3        4        5        6        7   
 BatchedCounter:    492.99   432.87   380.49   327.12   305.04   311.38   308.61
LockfreeCounter:     16.08    17.81    17.37    16.33    16.07    16.83    16.66
    LockCounter:    121.15    82.55    98.30   126.35   135.61   135.05   161.81

after adding pauses to exaggerate results of batching (10_000 increments):

    num_domains:      1        2        3        4        5        6        7   
 BatchedCounter:    684.41   342.89   341.84   173.83   174.18   177.92   184.54
LockfreeCounter:    775.44   535.06   389.67   315.86   265.85   240.26   205.04
    LockCounter:   1562.00  1563.44  1560.73  1561.68  1563.11  1562.66  1560.19

So it does look like there are some practical gains to be made but only for data structures that have more expensive atomic operations which can be optimized by batching. I was also interested in seeing what were the size of the batches being generated and my profiling showed that for 8 domains (including domain 0), the batch sizes are

batch_size -> bop performed
1          -> 15626
9          -> 1
62         -> 10
63         -> 15615

If I’m not wrong, there is some upper limit on the number of awaiting tasks in the pool which is determined by the number of domains initialized in the pool. Will dig a bit more to try to find where this can be modified to maximize the batch size.

The one problem that I’m having with this implementation is that the scheduler is unable to differentiate tasks that are part of the batched operation and tasks that are for the main program (those not involved with batched operation). This can be problematic because if the main program spawns a lot of tasks, I forsee that the batched tasks can get starved. I tried to fiddle with the internals of domainslib to add a separate queue for batched operations but that turned out to be quite tricky and not as clean as the current implementation that doesn’t differentiate them. Perhaps you have suggestions for this?

(I’ve a bit more thoughts on how you could prototype this stuff with domainslib by using the first PR, but don’t want to spoil too much in case you would prefer discovering by yourself :slight_smile: )

Yes please do share! I’m curious to hear what you had in mind for the prototype design especially if it was different than mine!
I’ve put my code here for reference! https://github.com/koonwen/domainslib/tree/promise

2 Likes

Wow this is really cool and I’m glad the PR was useful, your code turned out very similar to what I had in mind! Indeed the fetch_and_add counters are too fast to beat with their hardware support, but I think there’s a lot of potential for batches on datastructures where contention is a killer (at least I’m super hyped about the batches for priority queues!.. or for the resizing of hashtables) To illustrate contention, the following counter degenerates as the number of cores grows, while your code has better scaling:

let rec increment t =
  let v = Atomic.get t.counter in
  if Atomic.compare_and_set t.counter v (v + 1)
  then ()
  else increment t

(There’s always some upfront cost with parallel-friendly datastructures, perhaps the cost of batching requires a higher number of cores to be profitable?)

Anyway, the batched counter is a great test case to focus on the core logic and see if any optimization is beneficial or not!

Some random comments:

  • domainslib/counters.ml at promise · koonwen/domainslib · GitHub There’s a tiny window on this line where other Domains can push to the queue but will not be able to launch the next batch (it’s only an issue if it happens at the end of the program such that no-one is ever able to handle the last batch)
  • The above is not too obvious to fix and the creation of the batch array from the queue creates a sequential choke point. Perhaps a simple solution might be to maintain a batch_size counter that tells you how many elements can be safely popped from the queue:
let rec try_launch pool t =
  if Atomic.get t.batch_size > 0
  && Atomic.compare_and_set t.running false true
  then
    let len = Atomic.exchange t.batch_size 0 in
    assert (len > 0);
    let add_n =
      T.parallel_for_reduce pool ~start:0 ~finish:(len-1)
          ~body:(fun _i ->
              match Q.pop t.q with (* no [batch] or [t.container] array *)
              | Incr (_, set) -> set (); 1
    (* ... *)
    Atomic.set t.running false ;
    try_launch pool t

let increment pool t =
  (* ... *) Q.push t.q ... ; Atomic.incr t.batch_size ; try_launch pool t (* ... *)

As an alternative to the batch_size, you could also try to swap the current batch for a new queue when the bop starts. It would require adding a close operation to the queue to make sure that other domains can’t push to it anymore such that you don’t miss/skip an element from the current batch (some similar function in lockfree/mpsc_queue.mli at e1396b0a038cdc32cf0c9087548a49d3e9c9c934 · ocaml-multicore/lockfree · GitHub )

Then there’s also a lot of potential to customize the batch queue to fit your precise needs:

  • Reaching directly to the elements in the queue array without having to call Q.pop? (with a Q.nth t.q index function that skips the contentious atomic operations of pop?)
  • Using a bag rather than a queue as ordering doesn’t matter? (see Lock-free bag · Issue #29 · ocaml-multicore/lockfree · GitHub , similar to what domainslib does with its per-domain queues, also @lambda_foo might have some code/advice on this?)

(Maybe Domain Local Store can help to have a queue-per-domain?)

  • If the batch size is often 1, perhaps there’s an opportunity to optimize for it? It sounds like the fast-path-slow-path of wait-free datastructures could work: start by doing it fast as if you were alone, but if you run into contention then push the operation onto the batch.
let increment pool t =
  if (* fast path *)
     let v = Atomic.get t.counter in
     Atomic.compare_and_set t.counter v (v + 1)
  then ()
  else begin (* slow path *)
    let pr, set = T.promise () in
    Q.push t.q (Incr (t, set));
    try_launch pool t;
    T.await pool pr
  end

I’m seeing some crazy performances from this (faster than fetch_and_add), but it’s cheating and probably not applicable everywhere as par_prefix_sums now needs to be aware of concurrent updates:

-    Atomic.set t.counter (start + add_n)
+    let _ : int = Atomic.fetch_and_add t.counter add_n in ()
  • Perhaps not all operations have to end in a batch: The counter get could just read the atomic straight away, but then there’s some cost because we can’t call the promises’ set () before having updated t.counter (I’m mentioning this because I feel it has some potential to scale purely functional datastructures to multicore: fast unbatched (pure) queries, but handle the update operations in a batch)

  • I still need to read about the scheduler fairness from the paper, I’m guessing your batch size upper limit is an accidental side-effect of domainslib’s implementation of parallel_for

Sorry I went in so many directions, please ignore most of my random comments! You are off to a great start and I’m super excited to follow your next steps :slight_smile:

2 Likes

Thanks for the detailed comments @art-w! I’ve included most of your suggestions and am now looking into implementing lock free bag as an alternative batching option! In other news, here’s an update along with some open questions.

BATCHER design

The implicit batching support that I’m using relies on promises to mimic the batching logic. This is quite different from the design that was described in the paper at least in terms of fairness to data structure operations. With the original batcher design, if a worker (domain) encounters data structure operation, it will be committed to only work on batch related operations. This also means that it can no longer be a part of the main program execution that will call a data structure operation, therefore the batch size is limited to the number of domains running the program. On the other hand in the promise-based batcher, the batch size is theoretically limited by the number of asynchronous tasks that are spawned. In my opinion, the promise-based one seems to be the better choice but am wondering if there’s any value in providing the original batcher. I’ve thought about the pros and cons of each design and here’s what I have come up with.

Original batcher:

Pros:

  • DS related tasks are biased to run to completion first.

Cons

  • Whilst throughput of the DS is fast, it is at the expense of the progress made by other workers executing non DS related work.

  • No customizability on the size of the batches. Less flexibility on performance tuning of the program. E.g. If operations are cheap, committing 1 domain per operation could slowdown the progress of the entire program

Promise-based batcher:

Pros:

  • All tasks are run equally according to the Work-stealing FIFO policy. The program makes progress more fairly and generally runs to completion faster.

  • Batch sizes become customizable by the user’s program. Performance gains can be made by adjusting the batch sizes according to the cost of operations. (However, this could also be a bad thing that users now have to control the number of DS tasks that are running)

Cons:

  • At the cost of collecting a larger batch, the time between when the operation was launched and when it is completed is longer.

A gross simplification of the design trade-off is between how quick you want your data structure operations to be fulfilled against how quick you want the program complete. The original design provides the former and the promise-based design the latter.

There are just my hypothesis, I would still need to implement the batcher true to it’s specification to validate them. Any thoughts about other things to consider regarding the design limitations?

Performance tuning with variable batch sizes on batched skip-list

I tried to reproduce the experimental results of the paper by implementing a skip-list with “set-like” properties. I can’t quite get the throughput that the paper reported, but the trends seem reasonably similar.

Initialized: 1 Million elements
Inserts: 100,000 elements

num_domains:      2        3        4        5        6        7        8 
    Seq_ins:     299      284      299      301      298      284      297  ops/ms
Batched_ins:     346      432      465      563      585      644      627  ops/ms

------------------------------------------------------------------------------------
Initialized: 10 Million elements
Inserts: 100,000 elements

num_domains:       2        3        4        5        6        7        8
    Seq_ins:      137      142      153      153      149      153      149  ops/ms
Batched_ins:      156      240      260      409      432      423      522  ops/ms

Running 8 cores, the implicit batch insertions are performing 2X on 1 Million preset elements and 3X on 10 million insertions.

As mentioned in the Pros of our promise-based design, we can play around with the batch sizes and see how it affects performance. In the above, I used the default chunk_sizes that are generated by parallel for. Below are some performance metrics when we vary the chunk_sizes.

Initialized: 10 Million elements
Inserts: 100,000 elements

batch limit = 1
 num_domains:      1        2        3        4        5        6        7        8   
ImpBatch_ins:     148      147      145      130      134      138      138      133  ops/ms

batch limit = 64
ImpBatch_ins:     149      234      236      382      412      456      487      476  ops/ms

batch limit = 127
ImpBatch_ins:     246      332      525      571      644      663      646      700  ops/ms

batch limit = 512
ImpBatch_ins:     144      215      281      350      410      428      483      469  ops/ms

batch limit = 4096
ImpBatch_ins:     141      225      250      267      285      430      457      475  ops/ms

batch limit = 100,000
 num_domains:      1        2        3        4        5        6        7        8   
ImpBatch_ins:     136      103      357      183      494      462      506      571  ops/ms

It seems like a batch limit of 127 operations is the sweet spot for this skip-list workload. However, this might not be the case for a different workload and different data structures. The batched counter best performance comes from a batch limit of 4096 operations for example.

I suppose for now we could leave it up to the default chunk_sizes to pick an appropriate batch size but I think there should be a better way to think about how to select batch_sizes or provide some facility to automatically pick a good batch size. Something like a recommended_batch_size function but I don’t have a good idea of how to come up with it at the moment, any suggestions?

Another thing is on how to limit the batch size. Currently the batched DS carries around a large array of configurable size to store the batch. This has a performance cost and the problem also comes when the number of operations exceed the size of the array. There isn’t any way of handling it and the program will crash with an assert failure. Ideally I would like the behavior of the batcher to either suspend the current task so that the user can set up the batch size once and not have to worry about accidentally creating too many tasks that make data structure calls. I’m thinking that an implicit yield could be helpful in this regard but am also open to other ideas.

1 Like

Nice, this is great work and I really enjoy your in-depth presentations :heart: I need to do a deeper dive into this, you did a lot!

  • I don’t think the skip list implementation is really safe with all the ref and array mutated from different domains :stuck_out_tongue: I’m guessing you at least need to switch the mutable/ref to Atomic.t, and 'a array to 'a Atomic.t array, otherwise there is a risk for domains to see “old” values when reading from them… ocaml-tsan can help you confirm if you have data-races and/or call my bullshit because I didn’t actually test your code with it :-° (but I wrote some high level explanation of the risks and propaganda for tsan in case it helps motivate the problem)

  • Regarding the batch size, have you tried bounding the while match Q.pop t.q with while !i < Array.length t.container && match Q.pop ..? (as any left-over will be taken care of in the next batch) Perhaps there are some resizing tricks to do with the container batch array if you see that the batches are always full (? it probably would mean that the channel is getting filled faster than the batches are popping)

Thanks again for sharing your progress!

1 Like