Can you factorize this code more?

I managed to get what I want to compile.
However, there is a big copy-paste
if preserve then X else Y in the run function.
X and Y are almost the same, except that X uses module Ord while
Y uses the module Dis.

The code comes from parany: https://github.com/UnixJunkie/parany/blob/master/src/parany.ml

(* demux and index items *)
let idemux (demux: unit -> 'a) =
  let demux_count = ref 0 in
  function () ->
    let res = (!demux_count, demux ()) in
    incr demux_count;
    res

(* work ignoring item index *)
let iwork (work: 'a -> 'b) ((i, x): int * 'a): int * 'b =
  (i, work x)

(* mux items in the right order *)
let imux (mux: 'b -> unit) =
  let mux_count = ref 0 in
  (* weak type variable avoidance *)
  let wait_list = Ht.create 11 in
  function (i, res) ->
    if !mux_count = i then
      begin
        (* unpile as much as possible *)
        mux res;
        incr mux_count;
        if Ht.length wait_list > 0 then
          try
            while true do
              let next = Ht.find wait_list !mux_count in
              Ht.remove wait_list !mux_count;
              mux next;
              incr mux_count
            done
          with Not_found -> () (* no more or index hole *)
      end
    else
      (* put somewhere into the pile *)
      Ht.add wait_list i res

let run
    ?(preserve = false)
    ?(core_pin = false)
    ?csize:(cs = 1)
    nprocs ~demux ~work ~mux =
  if nprocs <= 1 then
    (* sequential version *)
    try
      while true do
        mux (work (demux ()))
      done
    with End_of_input -> ()
  else
    begin
      assert(cs >= 1);
      let max_cores = Cpu.numcores () in
      assert(nprocs <= max_cores);
      (* to maximize parallel efficiency, by default we don't care about the
         order in which jobs are computed. *)
      let module Dis = struct
        let demux = demux
        let work = work
        let mux = mux
      end in
      (* However, in some cases, it is necessary for the user to preserve the
         input order in the output. In this case, we still compute things
         potentially out of order (for parallelization efficiency); but we will
         order back the results in input order (for user's convenience) *)
      let module Ord = struct
        let demux = idemux demux
        let work = iwork work
        let mux = imux mux
      end in
      (* parallel version *)
      (* let pid = Unix.getpid () in
       * eprintf "father(%d) started\n%!" pid; *)
      (* create queues *)
      let jobs_in, jobs_out = Shm.init () in
      let res_in, res_out = Shm.init () in
      if preserve then
        begin
          (* start feeder *)
          (* eprintf "father(%d) starting feeder\n%!" pid; *)
          Gc.compact (); (* like parmap: reclaim memory prior to forking *)
          fork_out (fun () -> feed_them_all cs nprocs Ord.demux jobs_in);
          (* start workers *)
          for worker_rank = 0 to nprocs - 1 do
            (* eprintf "father(%d) starting a worker\n%!" pid; *)
            fork_out (fun () ->
                if core_pin then Cpu.setcore worker_rank;
                go_to_work jobs_out Ord.work res_in
              )
          done;
          (* collect results *)
          let finished = ref 0 in
          let buff = Bytes.create 80 in
          while !finished < nprocs do
            try
              while true do
                let xs = Shm.receive res_out buff in
                (* eprintf "father(%d) collecting one\n%!" pid; *)
                List.iter Ord.mux xs
              done
            with End_of_input ->
              incr finished
          done;
        end
      else
        begin
          (* start feeder *)
          (* eprintf "father(%d) starting feeder\n%!" pid; *)
          Gc.compact (); (* like parmap: reclaim memory prior to forking *)
          fork_out (fun () -> feed_them_all cs nprocs Dis.demux jobs_in);
          (* start workers *)
          for worker_rank = 0 to nprocs - 1 do
            (* eprintf "father(%d) starting a worker\n%!" pid; *)
            fork_out (fun () ->
                if core_pin then Cpu.setcore worker_rank;
                go_to_work jobs_out Dis.work res_in
              )
          done;
          (* collect results *)
          let finished = ref 0 in
          let buff = Bytes.create 80 in
          while !finished < nprocs do
            try
              while true do
                let xs = Shm.receive res_out buff in
                (* eprintf "father(%d) collecting one\n%!" pid; *)
                List.iter Dis.mux xs
              done
            with End_of_input ->
              incr finished
          done;
        end;
      (* eprintf "father(%d) finished\n%!" pid; *)
      (* free resources *)
      List.iter Unix.close [jobs_in; jobs_out; res_in; res_out]
    end

For cases like this I normally do run time function selection (or however it should be called):

let work, demux, mux =
  if preserve then
    Ord.work, Ord.demux, Ord.mux
  else
    Dis.work, Dis.demux, Dis.mux
in
...

My understanding is that this is not as fast as writing everything out, but still faster than run time selection of first class modules.

If performance is really critical, then I’d use something like cinaps to roll the code out for me, since manually done duplicate code is almost always bad.

(I meant to edit my first comment as my initial reading was wrong, but ended up deleting it.)

An alternative way of doing what @darrenldl suggested is to have this choice be done by the caller themselves.

type preserve_mode = $WORK_TYPE -> $DEMUX_TYPE ->  $MUX_TYPE -> $WORK_TYPE * $DEMUX_TYPE * $MUX_TYPE

let preserve work demux mux = iwork work, idemux demux, imux mux

let nopreserve work demux mux = work, demux, mux

let run ?(preserve = nopreserve) ~work ~demux ~mux ... =
  let (work, demux, mux) = preserve work demux mux in
  ...

The important part is that in the signature you expose it as

type preserve_mode

val preserve : preserve_mode

val nopreserve : preserve_mode

 val run : ?preserve:preserve_mode -> ...

One option is to go through first-class modules to select the module:

  let module Implementation =
     (val
        if preserve then (module struct
          let demux = idemux demux
          let work = iwork work
          let mux = imux mux
          end: S )
        else (module struct
          let demux = demux
          let work = work
          let mux = mux
        end: S)
     ) in
  ...

An important point however is that if the module type S only contains functions, it can be replaced by a record, which means that you can avoid the high syntactic cost of first-class modules.

1 Like

What do you mean by high syntactic cost?

Is it the number of lines you have to write?

One solution I was trying to make compile was very close to the one you are proposing.

I think I tried this one but it was not compiling because of a weak type variable.
This part is not a loop, so the cost of one if-then-else is OK.

Are you sure you can write an S for it?

If a record of function is sufficient, the code above can be reduced to:

let implementation =
  if preserve then
   { demux; work; mux }
 else
  { demux = idemux demux;
    work = iwork work;
    mux = imux mux
  } in

which is simpler and does not require explicit type annotations.
If you have issues with weak type variables, and if the three are functions, it is probably just a matter of eta-expanding (transforming let fn = combinator gn to let fn x = combinator gn x .

I am not sure that an S can be written, but I don’t see any obstacle after a quick at your code?

I cloned your repo and it seems that a dummy index is needed for the unordered version to match the function types.

The functions adding the dummy indices can probably be coded as static functions as iwork_dummy etc for further refactoring as well

      let demux, work, mux =
        if not preserve then
          (* to maximize parallel efficiency, by default we don't care about the
             order in which jobs are computed. *)
          (fun () -> 0, demux ()),
          (fun (i, x) -> i, work x),
          (fun (_i, x) -> mux x)
        else
          (* However, in some cases, it is necessary for the user to preserve the
             input order in the output. In this case, we still compute things
             potentially out of order (for parallelization efficiency); but we will
             order back the results in input order (for user's convenience) *)
          (idemux demux), (iwork work), (imux mux)
      in

And, I really don’t want that.
Since adding an index adds some serialization/deserialization overhead.
And, my current (slightly ugly copy-paste) doesn’t need a dummy index…

Ah…okay I see…hm…this is kinda an interesting problem then

1 Like

I’ll take any PR that compiles, reduces the code size but doesn’t change the semantic.
And doesn’t add a dummy index when the user had not chosen preserve=true.
:slight_smile:

This solution does not compile:

Error: This expression has type unit -> 'a
       but an expression was expected of type unit -> int * 'a
       The type variable 'a occurs inside int * 'a

Submitted a PR, resolving conflicts rn.

EDIT: just for reference, the updates (PR #41) can be seen here.

1 Like