deer OCaml aficionados,
Moonpool 0.3 was just released on opam. Moonpool is a new concurrency library for OCaml >= 4.08, with support for OCaml 5 from the get-go. It started out with a thread pool (possibly distributed on multiple domains to be able to use multiple cores) along with a future/promise module.
This release comes with a set of new features on top of pool+futures:
- a small
'a Lock.t
abstraction to protect a resource with a lock in RAII-style - a type of unbounded channels (which are fairly naive in implementation)
- improvements to
Pool
such asPool.run_wait_block: (unit -> 'a) -> 'a
that runs a whole computation on the pool, and waits for its result (or re-raises) - add
Fut.await
(only on OCaml 5) - add support for domain-local-await if installed
- a
Fork_join
module for, well, fork-join parallelism, including parallel for and parallelList.map
/Array.map
. These computations can be nested and āfeelā like writing code in a direct style. This relies on effects and is only available on OCaml 5.
Examples for fork-join
The (too) classic parallel fibonacci function:
open Moonpool
let (let@) = (@@)
let rec fib_direct x =
if x <= 1 then
1
else
fib_direct (x - 1) + fib_direct (x - 2)
let rec fib x : int =
(* some cutoff for sequential computation *)
if x <= 18 then
fib_direct x
else (
let n1, n2 =
Fork_join.both
(fun () -> fib (x - 1))
(fun () -> fib (x - 2))
in
n1 + n2
)
let fib_40 : int =
let@ pool = Pool.with_ ~min:8 () in
Pool.run_wait_block pool (fun () -> fib 40)
A parallel sum, from a test case:
let () =
let total_sum = Atomic.make 0 in
Pool.run_wait_block pool (fun () ->
Fork_join.for_ ~chunk_size:5 100 (fun low high ->
(* iterate on the range sequentially. The range should have 5 items or less. *)
let local_sum = ref 0 in
for i = low to high do
local_sum := !local_sum + i
done;
ignore (Atomic.fetch_and_add total_sum !local_sum : int)));
assert (Atomic.get total_sum = 4950)
Note that Fork_join.for_
gives its functional argument a range to process, the size of which is controllable with the optional chunk_size
. This allows for large values to be passed to for_
without starting as many tasks, as demonstrated below:
Computing digits of Ļ:
let my_pi : float =
let@ pool = with_pool () in
let num_steps = 100_000_000 in
let num_tasks = Pool.size pool in
let step = 1. /. float num_steps in
let global_sum = Lock.create 0. in
Pool.run_wait_block pool (fun () ->
Fork_join.for_
~chunk_size:(3 + (num_steps / num_tasks))
num_steps
(fun low high ->
let sum = ref 0. in
for i = low to high do
let x = (float i +. 0.5) *. step in
sum := !sum +. (4. /. (1. +. (x *. x)))
done;
let sum = !sum in
Lock.update global_sum (fun n -> n +. sum)));
let pi = step *. Lock.get global_sum in
pi
Here the Lock
is not a performance issue because there are only num_tasks
(ie roughly your CPUās number of cores) chunks processed in the for_
, so thereās only like 8 updates at the end, not 100_000_000
updates which would create a lot of contention.