Parallelize file hashing
[dups.git] / dups.ml
diff --git a/dups.ml b/dups.ml
index 7c03773..44acdf3 100644 (file)
--- a/dups.ml
+++ b/dups.ml
@@ -164,6 +164,9 @@ module Stream : sig
 
   val iter : 'a t -> f:('a -> unit) -> unit
 
+  val bag_map : 'a t -> njobs:int -> f:('a -> 'b) -> ('a * 'b) t
+  (** Parallel map with arbitrarily-reordered elements. *)
+
   val map : 'a t -> f:('a -> 'b) -> 'b t
 
   val filter : 'a t -> f:('a -> bool) -> 'a t
@@ -177,6 +180,14 @@ end = struct
   type 'a t =
     {mutable streams : ('a S.t) list}
 
+  type ('input, 'output) msg_from_vassal =
+    | Ready of int
+    | Result of (int * ('input * 'output))
+    | Exiting of int
+
+  type 'input msg_from_lord =
+    | Job of 'input option
+
   let create f =
     {streams = [S.from (fun _ -> f ())]}
 
@@ -246,6 +257,91 @@ end = struct
       (fun name (length, members) -> Queue.add (name, length, members) groups)
       groups_tbl;
     of_queue groups
+
+  module Ipc : sig
+    val send : out_channel -> 'a -> unit
+    val recv : in_channel -> 'a
+  end = struct
+    let send oc msg =
+      Marshal.to_channel oc msg [];
+      flush oc
+
+    let recv ic =
+      Marshal.from_channel ic
+  end
+
+  let lord t ~njobs ~vassals ~ic ~ocs =
+    eprintf "[debug] [lord] started\n%!";
+    let active_vassals = ref njobs in
+    let results = Queue.create () in
+    let rec dispatch () =
+      match Ipc.recv ic with
+      | ((Exiting i) : ('input, 'output) msg_from_vassal) ->
+          close_out ocs.(i);
+          decr active_vassals;
+          if !active_vassals = 0 then
+            ()
+          else
+            dispatch ()
+      | ((Ready i) : ('input, 'output) msg_from_vassal) ->
+          Ipc.send ocs.(i) (Job (next t));
+          dispatch ()
+      | ((Result (i, result)) : ('input, 'output) msg_from_vassal) ->
+          Queue.add result results;
+          Ipc.send ocs.(i) (Job (next t));
+          dispatch ()
+    in
+    let rec wait = function
+      | [] -> ()
+      | vassals ->
+          let pid, _process_status = Unix.wait () in
+          (* TODO: handle process_status *)
+          wait (List.filter vassals ~f:(fun p -> p <> pid))
+    in
+    dispatch ();
+    close_in ic;
+    wait vassals;
+    of_queue results
+
+  let vassal i ~f ~vassal_pipe_r ~lord_pipe_w =
+    eprintf "[debug] [vassal %d] started\n%!" i;
+    let ic = Unix.in_channel_of_descr vassal_pipe_r in
+    let oc = Unix.out_channel_of_descr lord_pipe_w in
+    let rec work msg =
+      Ipc.send oc msg;
+      match Ipc.recv ic with
+      | (Job (Some x) : 'input msg_from_lord) ->
+          work (Result (i, (x, f x)))
+      | (Job None : 'input msg_from_lord) ->
+          Ipc.send oc (Exiting i)
+    in
+    work (Ready i);
+    close_in ic;
+    close_out oc;
+    exit 0
+
+  let bag_map t ~njobs ~f =
+    let lord_pipe_r, lord_pipe_w = Unix.pipe () in
+    let vassal_pipes   = Array.init njobs ~f:(fun _ -> Unix.pipe ()) in
+    let vassal_pipes_r = Array.map vassal_pipes ~f:(fun (r, _) -> r) in
+    let vassal_pipes_w = Array.map vassal_pipes ~f:(fun (_, w) -> w) in
+    let vassals = ref [] in
+    for i=0 to (njobs - 1) do
+      begin match Unix.fork () with
+      | 0 ->
+          Unix.close lord_pipe_r;
+          vassal i ~f ~lord_pipe_w ~vassal_pipe_r:vassal_pipes_r.(i)
+      | pid ->
+          vassals := pid :: !vassals
+      end
+    done;
+    Unix.close lord_pipe_w;
+    lord
+      t
+      ~njobs
+      ~vassals:!vassals
+      ~ic:(Unix.in_channel_of_descr lord_pipe_r)
+      ~ocs:(Array.map vassal_pipes_w ~f:Unix.out_channel_of_descr)
 end
 
 module In_channel : sig
@@ -382,6 +478,7 @@ type opt =
   ; output : output
   ; ignore : string -> bool
   ; sample : int
+  ; njobs  : int
   }
 
 let make_input_stream input ignore ~metrics =
@@ -420,8 +517,11 @@ let make_output_fun = function
         );
         close_out oc
 
-let main {input; output; ignore; sample = sample_len} =
-  let t0_all = Sys.time () in
+let time () =
+  Unix.gettimeofday ()
+
+let main {input; output; ignore; sample = sample_len; njobs} =
+  let t0_all = time () in
   let metrics = M.init () in
   let output = make_output_fun  output in
   let input  = make_input_stream input ignore ~metrics in
@@ -437,23 +537,42 @@ let main {input; output; ignore; sample = sample_len} =
 
   let files = input in
 
-  let t0_group_by_size = Sys.time () in
+  let t0_group_by_size = time () in
+  eprintf "[debug] filtering-out files with unique size\n%!";
   let files = File.filter_out_unique_sizes files ~metrics in
-  let t1_group_by_size = Sys.time () in
+  let t1_group_by_size = time () in
 
   let t0_group_by_sample = t1_group_by_size in
+  eprintf "[debug] filtering-out files with unique heads\n%!";
   let files = File.filter_out_unique_heads files ~len:sample_len ~metrics in
-  let t1_group_by_sample = Sys.time () in
+  let t1_group_by_sample = time () in
 
   let t0_group_by_digest = t1_group_by_sample in
+  eprintf "[debug] hashing\n%!";
   let groups =
-    Stream.group_by files ~f:(fun {File.path; size} ->
-      M.file_hashed metrics ~size;
-      Digest.file path
-    )
+    if njobs > 1 then
+      let digests =
+        Stream.bag_map files ~njobs ~f:(fun {File.path; _} -> Digest.file path)
+      in
+      Stream.map (Stream.group_by digests ~f:(fun (_, d) -> d)) ~f:(
+        fun (digest, n, file_digest_pairs) ->
+          let files =
+            List.map file_digest_pairs ~f:(fun (file, _) ->
+              M.file_hashed metrics ~size:file.File.size;
+              file
+            )
+          in
+          (digest, n, files)
+      )
+    else
+      Stream.group_by files ~f:(fun {File.path; size} ->
+        M.file_hashed metrics ~size;
+        Digest.file path
+      )
   in
-  let t1_group_by_digest = Sys.time () in
+  let t1_group_by_digest = time () in
 
+  eprintf "[debug] reporting\n%!";
   Stream.iter groups ~f:(fun (d, n, files) ->
     M.digest metrics;
     if n > 1 then
@@ -461,7 +580,7 @@ let main {input; output; ignore; sample = sample_len} =
       output d n files
   );
 
-  let t1_all = Sys.time () in
+  let t1_all = time () in
 
   M.report metrics
     ~time_all:            (t1_all             -. t0_all)
@@ -485,7 +604,8 @@ let get_opt () : opt =
   let input  = ref Stdin in
   let output = ref Stdout in
   let ignore = ref (fun _ -> false) in
-  let sample = ref 256 in
+  let sample = ref 512 in
+  let njobs  = ref 8 in
   let spec =
     [ ( "-out"
       , Arg.String (fun path ->
@@ -505,6 +625,10 @@ let get_opt () : opt =
       , Arg.Set_int sample
       , (sprintf " Byte size of file samples to use. Default: %d" !sample)
       )
+    ; ( "-j"
+      , Arg.Set_int njobs
+      , (sprintf " Number of parallel jobs. Default: %d" !njobs)
+      )
     ]
   in
   Arg.parse
@@ -527,6 +651,7 @@ let get_opt () : opt =
   ; output = !output
   ; ignore = !ignore
   ; sample = !sample
+  ; njobs  = !njobs
   }
 
 let () =
This page took 0.031581 seconds and 4 git commands to generate.