@UnixJunkie, it seems that there’s some sort of corruption happening. To wit, the following one-line patch fixes your bug:
diff --git a/src/pardi.ml b/src/pardi.ml
index 618e9f3..ea808e7 100644
--- a/src/pardi.ml
+++ b/src/pardi.ml
@@ -109,6 +109,7 @@ let process_some job_dir cmd (count, tmp_in_fn) =
let out_queue = Sorted_queue.create ()
let gather_some mux_count mux_mode (count, tmp_out_fn) =
+ let tmp_out_fn = Bytes.(tmp_out_fn |> of_string |> copy |> to_string) in
begin
match mux_mode with
| Mux.Null -> () (* tmp_out_fn is not removed? *)
It seems that something is modifying the value of tmp_out_fn
between the moment it’s inserted into out_queue
and when it is popped out. Obviously we should discount the possibility of a real memory error, so I’m supposing that there’s code in netmulticore that demarshals, and is reusing memory somehow.
Looking at parany.ml
, I see
(* Netmcore_queue.pop_p avoids data copy out of the shared heap *)
My bet is, this is what’s at fault. Here’s some more substantiation. With the following (somewhat more-extensive) debugging patch, we get a clear indication that there’s a memory-error going on:
modified src/pardi.ml
@@ -35,9 +35,16 @@ let rec read_one_block
let read_buff = ref (Bytes.create 0)
let read_fd = ref (Unix.descr_of_in_channel stdin)
+let format_filename job_dir is_input n =
+ let direction = if is_input then "in" else "out" in
+ sprintf "%s/pardi_%s_%09d" job_dir direction n
+
let read_some job_dir buff count csize input demux () =
let read = ref 0 in
+ let tmp_fn = format_filename job_dir true !count in
+(*
let tmp_fn = sprintf "%s/pardi_in_%09d" job_dir !count in
+*)
Utls.with_out_file tmp_fn (fun out ->
match demux with
| Demux.Bytes n ->
@@ -98,30 +105,59 @@ let output_fn_tag = Str.regexp "%OUT"
let process_some job_dir cmd (count, tmp_in_fn) =
assert(Utls.regexp_in_string input_fn_tag cmd);
let cmd' = Str.replace_first input_fn_tag tmp_in_fn cmd in
+ let tmp_out_fn = format_filename job_dir false count in
+(*
let tmp_out_fn = sprintf "%s/pardi_out_%09d" job_dir count in
+*)
+ assert (Sys.file_exists tmp_in_fn) ;
+ assert (not (Sys.file_exists tmp_out_fn)) ;
assert(Utls.regexp_in_string output_fn_tag cmd');
let cmd'' = Str.replace_first output_fn_tag tmp_out_fn cmd' in
let cmd''' = sprintf "%s; rm -f %s" cmd'' tmp_in_fn in
Utls.run_command !Flags.debug cmd''';
+ assert (Sys.file_exists tmp_out_fn) ;
(count, tmp_out_fn)
(* in case we need to preserve input order *)
let out_queue = Sorted_queue.create ()
+let seen_count = (Hashtbl.create 23 : (int, unit) Hashtbl.t)
+let seen = (Hashtbl.create 23 : (string, unit) Hashtbl.t)
-let gather_some mux_count mux_mode (count, tmp_out_fn) =
+let gather_some job_dir mux_count mux_mode (count, tmp_out_fn) =
+(*
+ let tmp_out_fn = Bytes.(tmp_out_fn |> of_string |> copy |> to_string) in
+*)
+ printf "GATHER (%d, %s)\n" count tmp_out_fn ;
+ assert (format_filename job_dir false count = tmp_out_fn) ;
+ assert (not (Hashtbl.mem seen_count count)) ;
+ Hashtbl.add seen_count count () ;
+ assert (not (Hashtbl.mem seen tmp_out_fn)) ;
+ Hashtbl.add seen tmp_out_fn () ;
+ if not (Sys.file_exists tmp_out_fn) then begin
+ eprintf "ERROR: file %s does not exist\n" tmp_out_fn ;
+ assert (Sys.file_exists tmp_out_fn) ;
+ end;
+(*
+ assert (Sys.file_exists tmp_out_fn) ;
+*)
begin
match mux_mode with
| Mux.Null -> () (* tmp_out_fn is not removed? *)
| Mux.Sort_cat_into dst_fn ->
begin
- Squeue.insert (count, tmp_out_fn) out_queue;
+ Squeue.insert (count, (count,tmp_out_fn)) out_queue;
let popped = Squeue.pop_all out_queue in
- List.iter (fun out_fn ->
+ List.iter (fun (count, out_fn) ->
+ printf "MUX (%d, %s)\n" count out_fn ;
+ assert (format_filename job_dir false count = out_fn) ;
let cmd =
if !mux_count = 0 then
sprintf "mv %s %s" out_fn dst_fn
else
sprintf "cat %s >> %s; rm -f %s" out_fn dst_fn out_fn in
+(*
+ sprintf "cat %s >> %s" out_fn dst_fn in
+*)
Utls.run_command !Flags.debug cmd;
incr mux_count
) popped
@@ -184,10 +220,12 @@ let main () =
Parany.run ~verbose:false ~csize:1 ~nprocs
~demux:(read_some job_dir (Buffer.create 1024) (ref 0) csize in_chan demux)
~work:(process_some job_dir cmd)
- ~mux:(gather_some (ref 0) mux);
+ ~mux:(gather_some job_dir (ref 0) mux);
printf "\n";
+(*
if not !Flags.debug then
Utls.run_command !Flags.debug (sprintf "rm -rf %s" job_dir);
+*)
close_in in_chan
let () = main ()
With this patch, the bug occurs, and if we egrep for “(GATHER|MUX)” we get:
GATHER (1, /tmp/pardi_xIhP/pardi_out_000000001)
GATHER (0, /tmp/pardi_xIhP/pardi_out_000000000)
MUX (0, /tmp/pardi_xIhP/pardi_out_000000000)
MUX (1, /tmp/pardi_xIhP/pardi_out_000000001)
GATHER (2, /tmp/pardi_xIhP/pardi_out_000000002)
MUX (2, /tmp/pardi_xIhP/pardi_out_000000002)
GATHER (6, /tmp/pardi_xIhP/pardi_out_000000006)
GATHER (3, /tmp/pardi_xIhP/pardi_out_000000003)
MUX (3, /tmp/pardi_xIhP/pardi_out_000000003)
GATHER (4, /tmp/pardi_xIhP/pardi_out_000000004)
MUX (4, /tmp/pardi_xIhP/pardi_out_000000004)
GATHER (5, /tmp/pardi_xIhP/pardi_out_000000005)
MUX (5, /tmp/pardi_xIhP/pardi_out_000000005)
MUX (6, /tmp/pardi_xIhP/pardi_out_000000025)
Notice that MUX(6)
has a filename ending in “25”. No “GATHER” has appeared with that filename. Note also that the GATHER(6)
has a filename ending in “06”. So something probably scribbled on the memory of that filename. The comment tells us that this is expected behaviour, I guess.
I think this is your problem. Just copy the arguments to gather_one
and things should work as expected.