I managed to get what I want to compile.
However, there is a big copy-paste
if preserve then X else Y
in the run function.
X and Y are almost the same, except that X uses module Ord while
Y uses the module Dis.
The code comes from parany: https://github.com/UnixJunkie/parany/blob/master/src/parany.ml
(* demux and index items *)
let idemux (demux: unit -> 'a) =
let demux_count = ref 0 in
function () ->
let res = (!demux_count, demux ()) in
incr demux_count;
res
(* work ignoring item index *)
let iwork (work: 'a -> 'b) ((i, x): int * 'a): int * 'b =
(i, work x)
(* mux items in the right order *)
let imux (mux: 'b -> unit) =
let mux_count = ref 0 in
(* weak type variable avoidance *)
let wait_list = Ht.create 11 in
function (i, res) ->
if !mux_count = i then
begin
(* unpile as much as possible *)
mux res;
incr mux_count;
if Ht.length wait_list > 0 then
try
while true do
let next = Ht.find wait_list !mux_count in
Ht.remove wait_list !mux_count;
mux next;
incr mux_count
done
with Not_found -> () (* no more or index hole *)
end
else
(* put somewhere into the pile *)
Ht.add wait_list i res
let run
?(preserve = false)
?(core_pin = false)
?csize:(cs = 1)
nprocs ~demux ~work ~mux =
if nprocs <= 1 then
(* sequential version *)
try
while true do
mux (work (demux ()))
done
with End_of_input -> ()
else
begin
assert(cs >= 1);
let max_cores = Cpu.numcores () in
assert(nprocs <= max_cores);
(* to maximize parallel efficiency, by default we don't care about the
order in which jobs are computed. *)
let module Dis = struct
let demux = demux
let work = work
let mux = mux
end in
(* However, in some cases, it is necessary for the user to preserve the
input order in the output. In this case, we still compute things
potentially out of order (for parallelization efficiency); but we will
order back the results in input order (for user's convenience) *)
let module Ord = struct
let demux = idemux demux
let work = iwork work
let mux = imux mux
end in
(* parallel version *)
(* let pid = Unix.getpid () in
* eprintf "father(%d) started\n%!" pid; *)
(* create queues *)
let jobs_in, jobs_out = Shm.init () in
let res_in, res_out = Shm.init () in
if preserve then
begin
(* start feeder *)
(* eprintf "father(%d) starting feeder\n%!" pid; *)
Gc.compact (); (* like parmap: reclaim memory prior to forking *)
fork_out (fun () -> feed_them_all cs nprocs Ord.demux jobs_in);
(* start workers *)
for worker_rank = 0 to nprocs - 1 do
(* eprintf "father(%d) starting a worker\n%!" pid; *)
fork_out (fun () ->
if core_pin then Cpu.setcore worker_rank;
go_to_work jobs_out Ord.work res_in
)
done;
(* collect results *)
let finished = ref 0 in
let buff = Bytes.create 80 in
while !finished < nprocs do
try
while true do
let xs = Shm.receive res_out buff in
(* eprintf "father(%d) collecting one\n%!" pid; *)
List.iter Ord.mux xs
done
with End_of_input ->
incr finished
done;
end
else
begin
(* start feeder *)
(* eprintf "father(%d) starting feeder\n%!" pid; *)
Gc.compact (); (* like parmap: reclaim memory prior to forking *)
fork_out (fun () -> feed_them_all cs nprocs Dis.demux jobs_in);
(* start workers *)
for worker_rank = 0 to nprocs - 1 do
(* eprintf "father(%d) starting a worker\n%!" pid; *)
fork_out (fun () ->
if core_pin then Cpu.setcore worker_rank;
go_to_work jobs_out Dis.work res_in
)
done;
(* collect results *)
let finished = ref 0 in
let buff = Bytes.create 80 in
while !finished < nprocs do
try
while true do
let xs = Shm.receive res_out buff in
(* eprintf "father(%d) collecting one\n%!" pid; *)
List.iter Dis.mux xs
done
with End_of_input ->
incr finished
done;
end;
(* eprintf "father(%d) finished\n%!" pid; *)
(* free resources *)
List.iter Unix.close [jobs_in; jobs_out; res_in; res_out]
end