Move modules into dedicated files multi-samples
authorSiraaj Khandkar <siraaj@khandkar.net>
Tue, 14 May 2019 16:54:07 +0000 (12:54 -0400)
committerSiraaj Khandkar <siraaj@khandkar.net>
Tue, 14 May 2019 16:55:22 +0000 (12:55 -0400)
Makefile
dups.ml
lib/file.ml [new file with mode: 0644]
lib/file.mli [new file with mode: 0644]
lib/in_channel.ml [new file with mode: 0644]
lib/in_channel.mli [new file with mode: 0644]
lib/input_delim.mll [moved from input_delim.mll with 100% similarity]
lib/metrics.ml [new file with mode: 0644]
lib/metrics.mli [new file with mode: 0644]
lib/stream.ml [new file with mode: 0644]
lib/stream.mli [new file with mode: 0644]

index 2807b9b..549cdef 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -8,7 +8,7 @@ all:
        @$(MAKE) -s build
 
 build:
-       @ocamlbuild -cflags '-w A' -pkgs 'str,unix' $(EXE_NAME).$(EXE_TYPE)
+       @ocamlbuild -cflags '-w A' -pkgs 'str,unix' -I lib $(EXE_NAME).$(EXE_TYPE)
        @cp _build/$(EXE_NAME).$(EXE_TYPE) $(EXE_NAME)
        @rm -f $(EXE_NAME).$(EXE_TYPE)
 
diff --git a/dups.ml b/dups.ml
index 2521b97..1483754 100644 (file)
--- a/dups.ml
+++ b/dups.ml
 open Printf
 
-module Array = ArrayLabels
-module List  = ListLabels
+module List   = ListLabels
 module StrSet = Set.Make(String)
-module Unix  = UnixLabels
-
-module Metrics : sig
-  type t
-
-  val init
-    : unit -> t
-  val report
-    : t
-    -> wall_time_all:float
-    -> wall_time_group_by_size:float
-    -> wall_time_group_by_head:float
-    -> wall_time_group_by_digest:float
-    -> proc_time_all:float
-    -> proc_time_group_by_size:float
-    -> proc_time_group_by_head:float
-    -> proc_time_group_by_digest:float
-    -> unit
-
-  val file_considered
-    : t -> size:int -> unit
-  val file_ignored
-    : t -> size:int -> unit
-  val file_empty
-    : t -> unit
-  val file_sampled
-    : t -> unit
-  val chunk_read
-    : t -> size:int -> unit
-  val file_unique_size
-    : t -> size:int -> unit
-  val file_unique_sample
-    : t -> size:int -> unit
-  val file_hashed
-    : t -> size:int -> unit
-  val digest
-    : t -> unit
-  val redundant_data
-    : t -> size:int -> unit
-end = struct
-  type t =
-    { considered_files    : int ref
-    ; considered_bytes    : int ref
-    ; empty               : int ref
-    ; ignored_files       : int ref
-    ; ignored_bytes       : int ref
-    ; unique_size_files   : int ref
-    ; unique_size_bytes   : int ref
-    ; unique_sample_files : int ref
-    ; unique_sample_bytes : int ref
-    ; sampled_files       : int ref
-    ; sampled_bytes       : int ref
-    ; hashed_files        : int ref
-    ; hashed_bytes        : int ref
-    ; digests             : int ref
-    ; redundant_data      : int ref
-    }
-
-  let init () =
-    { considered_files    = ref 0
-    ; considered_bytes    = ref 0
-    ; empty               = ref 0
-    ; ignored_files       = ref 0
-    ; ignored_bytes       = ref 0
-    ; unique_size_files   = ref 0
-    ; unique_size_bytes   = ref 0
-    ; sampled_files       = ref 0
-    ; sampled_bytes       = ref 0
-    ; hashed_files        = ref 0
-    ; hashed_bytes        = ref 0
-    ; unique_sample_files = ref 0
-    ; unique_sample_bytes = ref 0
-    ; digests             = ref 0
-    ; redundant_data      = ref 0
-    }
-
-  let add sum addend =
-    sum := !sum + addend
-
-  let file_considered t ~size =
-    incr t.considered_files;
-    add t.considered_bytes size
-
-  let file_ignored {ignored_files; ignored_bytes; _} ~size =
-    incr ignored_files;
-    add ignored_bytes size
-
-  let file_empty t =
-    incr t.empty
-
-  let chunk_read t ~size =
-    add t.sampled_bytes size
-
-  let file_sampled t =
-    incr t.sampled_files
-
-  let file_unique_size t ~size =
-    incr t.unique_size_files;
-    add t.unique_size_bytes size
-
-  let file_unique_sample t ~size =
-    incr t.unique_sample_files;
-    add t.unique_sample_bytes size
-
-  let file_hashed t ~size =
-    incr t.hashed_files;
-    add t.hashed_bytes size
-
-  let digest t =
-    incr t.digests
-
-  let redundant_data t ~size =
-    add t.redundant_data size
-
-  let report
-    t
-    ~wall_time_all
-    ~wall_time_group_by_size
-    ~wall_time_group_by_head
-    ~wall_time_group_by_digest
-    ~proc_time_all
-    ~proc_time_group_by_size
-    ~proc_time_group_by_head
-    ~proc_time_group_by_digest
-  =
-    let b_to_mb b = (float_of_int b) /. 1024. /. 1024. in
-    let b_to_gb b = (b_to_mb b) /. 1024. in
-    eprintf "Total time                   : %.2f wall sec  %.2f proc sec\n%!"
-      wall_time_all
-      proc_time_all;
-    eprintf "Considered                   : %8d files  %6.2f Gb\n%!"
-      !(t.considered_files)
-      (b_to_gb !(t.considered_bytes));
-    eprintf "Sampled                      : %8d files  %6.2f Gb\n%!"
-      !(t.sampled_files)
-      (b_to_gb !(t.sampled_bytes));
-    eprintf "Hashed                       : %8d files  %6.2f Gb  %6.2f wall sec  %6.2f proc sec\n%!"
-      !(t.hashed_files)
-      (b_to_gb !(t.hashed_bytes))
-      wall_time_group_by_digest
-      proc_time_group_by_digest;
-    eprintf "Digests                      : %8d\n%!"
-      !(t.digests);
-    eprintf "Duplicates (Hashed - Digests): %8d files  %6.2f Gb\n%!"
-      (!(t.hashed_files) - !(t.digests))
-      (b_to_gb !(t.redundant_data));
-    eprintf "Skipped due to 0      size   : %8d files\n%!" !(t.empty);
-    eprintf "Skipped due to unique size   : %8d files  %6.2f Gb  %6.2f wall sec  %6.2f proc sec\n%!"
-      !(t.unique_size_files)
-      (b_to_gb !(t.unique_size_bytes))
-      wall_time_group_by_size
-      proc_time_group_by_size;
-    eprintf "Skipped due to unique sample : %8d files  %6.2f Gb  %6.2f wall sec  %6.2f proc sec\n%!"
-      !(t.unique_sample_files)
-      (b_to_gb !(t.unique_sample_bytes))
-      wall_time_group_by_head
-      proc_time_group_by_head;
-    eprintf "Ignored due to regex match   : %8d files  %6.2f Gb\n%!"
-      !(t.ignored_files)
-      (b_to_gb !(t.ignored_bytes))
-end
+module Unix   = UnixLabels
 
 module M = Metrics
 
-module Stream : sig
-  type 'a t
-
-  val create : (unit -> 'a option) -> 'a t
-
-  val of_queue : 'a Queue.t -> 'a t
-
-  val iter : 'a t -> f:('a -> unit) -> unit
-
-  val bag_map : 'a t -> njobs:int -> f:('a -> 'b) -> ('a * 'b) t
-  (** Parallel map with arbitrarily-reordered elements. *)
-
-  val map : 'a t -> f:('a -> 'b) -> 'b t
-
-  val filter : 'a t -> f:('a -> bool) -> 'a t
-
-  val concat : ('a t) list -> 'a t
-
-  val group_by : 'a t -> f:('a -> 'b) -> ('b * int * 'a list) t
-end = struct
-  module S = Stream
-
-  type 'a t =
-    {mutable streams : ('a S.t) list}
-
-  type ('input, 'output) msg_from_vassal =
-    | Ready of int
-    | Result of (int * ('input * 'output))
-    | Exiting of int
-
-  type 'input msg_from_lord =
-    | Job of 'input option
-
-  let create f =
-    {streams = [S.from (fun _ -> f ())]}
-
-  let of_queue q =
-    create (fun () ->
-      match Queue.take q with
-      | exception Queue.Empty ->
-          None
-      | x ->
-          Some x
-    )
-
-  let rec next t =
-    match t.streams with
-    | [] ->
-        None
-    | s :: streams ->
-        (match S.next s with
-        | exception Stream.Failure ->
-            t.streams <- streams;
-            next t
-        | x ->
-            Some x
-        )
-
-  let map t ~f =
-    create (fun () ->
-      match next t with
-      | None   -> None
-      | Some x -> Some (f x)
-    )
-
-  let filter t ~f =
-    let rec filter () =
-      match next t with
-      | None ->
-          None
-      | Some x when f x ->
-          Some x
-      | Some _ ->
-          filter ()
-    in
-    create filter
-
-  let iter t ~f =
-    List.iter t.streams ~f:(S.iter f)
-
-  let concat ts =
-    {streams = List.concat (List.map ts ~f:(fun {streams} -> streams))}
-
-  let group_by t ~f =
-    let groups_tbl = Hashtbl.create 1_000_000 in
-    let group_update x =
-      let group = f x in
-      let members =
-        match Hashtbl.find_opt groups_tbl group with
-        | None ->
-            (1, [x])
-        | Some (n, xs) ->
-            (succ n, x :: xs)
-      in
-      Hashtbl.replace groups_tbl group members
-    in
-    iter t ~f:group_update;
-    let groups = Queue.create () in
-    Hashtbl.iter
-      (fun name (length, members) -> Queue.add (name, length, members) groups)
-      groups_tbl;
-    of_queue groups
-
-  module Ipc : sig
-    val send : out_channel -> 'a -> unit
-    val recv : in_channel -> 'a
-  end = struct
-    let send oc msg =
-      Marshal.to_channel oc msg [];
-      flush oc
-
-    let recv ic =
-      Marshal.from_channel ic
-  end
-
-  let lord t ~njobs ~vassals ~ic ~ocs =
-    let active_vassals = ref njobs in
-    let results = Queue.create () in
-    let rec loop () =
-      match ((Ipc.recv ic) : ('input, 'output) msg_from_vassal) with
-      | Exiting i ->
-          close_out ocs.(i);
-          decr active_vassals;
-          if !active_vassals = 0 then () else loop ()
-      | Ready i ->
-          Ipc.send ocs.(i) (Job (next t));
-          loop ()
-      | Result (i, result) ->
-          Queue.add result results;
-          Ipc.send ocs.(i) (Job (next t));
-          loop ()
-    in
-    let rec wait = function
-      | [] ->
-          ()
-      | vassals ->
-          let pid, _process_status = Unix.wait () in
-          (* TODO: handle process_status *)
-          wait (List.filter vassals ~f:(fun p -> p <> pid))
-    in
-    loop ();
-    close_in ic;
-    wait vassals;
-    of_queue results
-
-  let vassal i ~f ~vassal_pipe_r ~lord_pipe_w =
-    let ic = Unix.in_channel_of_descr vassal_pipe_r in
-    let oc = Unix.out_channel_of_descr lord_pipe_w in
-    let rec loop () =
-      match (Ipc.recv ic : 'input msg_from_lord) with
-      | Job (Some x) ->
-          Ipc.send oc (Result (i, (x, f x)));
-          loop ()
-      | Job None ->
-          Ipc.send oc (Exiting i)
-    in
-    Ipc.send oc (Ready i);
-    loop ();
-    close_in ic;
-    close_out oc;
-    exit 0
-
-  let bag_map t ~njobs ~f =
-    let lord_pipe_r, lord_pipe_w = Unix.pipe () in
-    let vassal_pipes   = Array.init njobs ~f:(fun _ -> Unix.pipe ()) in
-    let vassal_pipes_r = Array.map vassal_pipes ~f:(fun (r, _) -> r) in
-    let vassal_pipes_w = Array.map vassal_pipes ~f:(fun (_, w) -> w) in
-    let vassals = ref [] in
-    for i=0 to (njobs - 1) do
-      begin match Unix.fork () with
-      | 0 ->
-          Unix.close lord_pipe_r;
-          vassal i ~f ~lord_pipe_w ~vassal_pipe_r:vassal_pipes_r.(i)
-      | pid ->
-          vassals := pid :: !vassals
-      end
-    done;
-    Unix.close lord_pipe_w;
-    lord
-      t
-      ~njobs
-      ~vassals:!vassals
-      ~ic:(Unix.in_channel_of_descr lord_pipe_r)
-      ~ocs:(Array.map vassal_pipes_w ~f:Unix.out_channel_of_descr)
-end
-
-module In_channel : sig
-  val lines : ?delim_null:bool -> in_channel -> string Stream.t
-end = struct
-  let read_until_newline ic () =
-    match input_line ic with
-    | exception End_of_file ->
-        None
-    | line ->
-        Some line
-
-  let read_until_null ic =
-    let lexbuf = Lexing.from_channel ic in
-    fun () -> Input_delim.by_null lexbuf
-
-  let lines ?(delim_null=false) ic =
-    let reader =
-      if delim_null then
-        read_until_null ic
-      else
-        read_until_newline ic
-    in
-    Stream.create reader
-end
-
-module File : sig
-  type t =
-    { path : string
-    ; size : int
-    }
-
-  val find : string -> t Stream.t
-  (** Find all files in the directory tree, starting from the given root path *)
-
-  val lookup : string Stream.t -> t Stream.t
-  (** Lookup file info for given paths *)
-
-  val head : t -> len:int -> metrics:M.t -> string
-
-  val filter_out_unique_sizes : t Stream.t -> metrics:M.t -> t Stream.t
-  val filter_out_unique_heads : t Stream.t -> len:int -> metrics:M.t -> t Stream.t
-end = struct
-  type t =
-    { path : string
-    ; size : int
-    }
-
-  let lookup paths =
-    Stream.map paths ~f:(fun path ->
-      let {Unix.st_size = size; _} = Unix.lstat path in
-      {path; size}
-    )
-
-  let find root =
-    let dirs  = Queue.create () in
-    let files = Queue.create () in
-    let explore parent =
-      Array.iter (Sys.readdir parent) ~f:(fun child ->
-        let path = Filename.concat parent child in
-        let {Unix.st_kind = file_kind; st_size; _} = Unix.lstat path in
-        match file_kind with
-        | Unix.S_REG ->
-            let file = {path; size = st_size} in
-            Queue.add file files
-        | Unix.S_DIR ->
-            Queue.add path dirs
-        | Unix.S_CHR
-        | Unix.S_BLK
-        | Unix.S_LNK
-        | Unix.S_FIFO
-        | Unix.S_SOCK ->
-            ()
-      )
-    in
-    explore root;
-    let rec next () =
-      match Queue.is_empty files, Queue.is_empty dirs with
-      | false, _     -> Some (Queue.take files)
-      | true , true  -> None
-      | true , false ->
-          explore (Queue.take dirs);
-          next ()
-    in
-    Stream.create next
-
-  let filter_out_singletons files ~group ~handle_singleton =
-    let q = Queue.create () in
-    Stream.iter (Stream.group_by files ~f:group) ~f:(fun group ->
-      let (_, n, members) = group in
-      if n > 1 then
-        List.iter members ~f:(fun m -> Queue.add m q)
-      else
-        handle_singleton group
-    );
-    Stream.of_queue q
-
-  let filter_out_unique_sizes files ~metrics =
-    filter_out_singletons
-      files
-      ~group:(fun {size; _} -> size)
-      ~handle_singleton:(fun (size, _, _) -> M.file_unique_size metrics ~size)
-
-  let head {path; _} ~len ~metrics =
-    M.file_sampled metrics;
-    let buf = Bytes.make len ' ' in
-    let ic = open_in_bin path in
-    let rec read pos len =
-      assert (len >= 0);
-      if len = 0 then
-        ()
-      else begin
-        let chunk_size = input ic buf pos len in
-        M.chunk_read metrics ~size:chunk_size;
-        if chunk_size = 0 then (* EOF *)
-          ()
-        else
-          read (pos + chunk_size) (len - chunk_size)
-      end
-    in
-    read 0 len;
-    close_in ic;
-    Bytes.to_string buf
-
-  let filter_out_unique_heads files ~len ~metrics =
-    filter_out_singletons
-      files
-      ~group:(head ~len ~metrics)
-      ~handle_singleton:(fun (_, _, files) ->
-        let {size; _} = List.hd files in  (* Guaranteed non-empty *)
-        M.file_unique_sample metrics ~size
-      )
-end
-
 type input =
   | Stdin
   | Directories of string list
diff --git a/lib/file.ml b/lib/file.ml
new file mode 100644 (file)
index 0000000..df7e608
--- /dev/null
@@ -0,0 +1,95 @@
+module Array = ArrayLabels
+module List  = ListLabels
+module Unix  = UnixLabels
+
+module M = Metrics
+
+type t =
+  { path : string
+  ; size : int
+  }
+
+let lookup paths =
+  Stream.map paths ~f:(fun path ->
+    let {Unix.st_size = size; _} = Unix.lstat path in
+    {path; size}
+  )
+
+let find root =
+  let dirs  = Queue.create () in
+  let files = Queue.create () in
+  let explore parent =
+    Array.iter (Sys.readdir parent) ~f:(fun child ->
+      let path = Filename.concat parent child in
+      let {Unix.st_kind = file_kind; st_size; _} = Unix.lstat path in
+      match file_kind with
+      | Unix.S_REG ->
+          let file = {path; size = st_size} in
+          Queue.add file files
+      | Unix.S_DIR ->
+          Queue.add path dirs
+      | Unix.S_CHR
+      | Unix.S_BLK
+      | Unix.S_LNK
+      | Unix.S_FIFO
+      | Unix.S_SOCK ->
+          ()
+    )
+  in
+  explore root;
+  let rec next () =
+    match Queue.is_empty files, Queue.is_empty dirs with
+    | false, _     -> Some (Queue.take files)
+    | true , true  -> None
+    | true , false ->
+        explore (Queue.take dirs);
+        next ()
+  in
+  Stream.create next
+
+let filter_out_singletons files ~group ~handle_singleton =
+  let q = Queue.create () in
+  Stream.iter (Stream.group_by files ~f:group) ~f:(fun group ->
+    let (_, n, members) = group in
+    if n > 1 then
+      List.iter members ~f:(fun m -> Queue.add m q)
+    else
+      handle_singleton group
+  );
+  Stream.of_queue q
+
+let filter_out_unique_sizes files ~metrics =
+  filter_out_singletons
+    files
+    ~group:(fun {size; _} -> size)
+    ~handle_singleton:(fun (size, _, _) -> M.file_unique_size metrics ~size)
+
+let head {path; _} ~len ~metrics =
+  M.file_sampled metrics;
+  let buf = Bytes.make len ' ' in
+  let ic = open_in_bin path in
+  let rec read pos len =
+    assert (len >= 0);
+    if len = 0 then
+      ()
+    else begin
+      let chunk_size = input ic buf pos len in
+      M.chunk_read metrics ~size:chunk_size;
+      if chunk_size = 0 then (* EOF *)
+        ()
+      else
+        read (pos + chunk_size) (len - chunk_size)
+    end
+  in
+  read 0 len;
+  close_in ic;
+  Bytes.to_string buf
+
+let filter_out_unique_heads files ~len ~metrics =
+  filter_out_singletons
+    files
+    ~group:(head ~len ~metrics)
+    ~handle_singleton:(fun (_, _, files) ->
+      let {size; _} = List.hd files in  (* Guaranteed non-empty *)
+      M.file_unique_sample metrics ~size
+    )
diff --git a/lib/file.mli b/lib/file.mli
new file mode 100644 (file)
index 0000000..d6e6fa0
--- /dev/null
@@ -0,0 +1,17 @@
+module M = Metrics
+
+type t =
+  { path : string
+  ; size : int
+  }
+
+val find : string -> t Stream.t
+(** Find all files in the directory tree, starting from the given root path *)
+
+val lookup : string Stream.t -> t Stream.t
+(** Lookup file info for given paths *)
+
+val head : t -> len:int -> metrics:M.t -> string
+
+val filter_out_unique_sizes : t Stream.t -> metrics:M.t -> t Stream.t
+val filter_out_unique_heads : t Stream.t -> len:int -> metrics:M.t -> t Stream.t
diff --git a/lib/in_channel.ml b/lib/in_channel.ml
new file mode 100644 (file)
index 0000000..3581d0e
--- /dev/null
@@ -0,0 +1,19 @@
+let read_until_newline ic () =
+  match input_line ic with
+  | exception End_of_file ->
+      None
+  | line ->
+      Some line
+
+let read_until_null ic =
+  let lexbuf = Lexing.from_channel ic in
+  fun () -> Input_delim.by_null lexbuf
+
+let lines ?(delim_null=false) ic =
+  let reader =
+    if delim_null then
+      read_until_null ic
+    else
+      read_until_newline ic
+  in
+  Stream.create reader
diff --git a/lib/in_channel.mli b/lib/in_channel.mli
new file mode 100644 (file)
index 0000000..5b9cad5
--- /dev/null
@@ -0,0 +1 @@
+val lines : ?delim_null:bool -> in_channel -> string Stream.t
similarity index 100%
rename from input_delim.mll
rename to lib/input_delim.mll
diff --git a/lib/metrics.ml b/lib/metrics.ml
new file mode 100644 (file)
index 0000000..8775874
--- /dev/null
@@ -0,0 +1,122 @@
+open Printf
+
+type t =
+  { considered_files    : int ref
+  ; considered_bytes    : int ref
+  ; empty               : int ref
+  ; ignored_files       : int ref
+  ; ignored_bytes       : int ref
+  ; unique_size_files   : int ref
+  ; unique_size_bytes   : int ref
+  ; unique_sample_files : int ref
+  ; unique_sample_bytes : int ref
+  ; sampled_files       : int ref
+  ; sampled_bytes       : int ref
+  ; hashed_files        : int ref
+  ; hashed_bytes        : int ref
+  ; digests             : int ref
+  ; redundant_data      : int ref
+  }
+
+let init () =
+  { considered_files    = ref 0
+  ; considered_bytes    = ref 0
+  ; empty               = ref 0
+  ; ignored_files       = ref 0
+  ; ignored_bytes       = ref 0
+  ; unique_size_files   = ref 0
+  ; unique_size_bytes   = ref 0
+  ; sampled_files       = ref 0
+  ; sampled_bytes       = ref 0
+  ; hashed_files        = ref 0
+  ; hashed_bytes        = ref 0
+  ; unique_sample_files = ref 0
+  ; unique_sample_bytes = ref 0
+  ; digests             = ref 0
+  ; redundant_data      = ref 0
+  }
+
+let add sum addend =
+  sum := !sum + addend
+
+let file_considered t ~size =
+  incr t.considered_files;
+  add t.considered_bytes size
+
+let file_ignored {ignored_files; ignored_bytes; _} ~size =
+  incr ignored_files;
+  add ignored_bytes size
+
+let file_empty t =
+  incr t.empty
+
+let chunk_read t ~size =
+  add t.sampled_bytes size
+
+let file_sampled t =
+  incr t.sampled_files
+
+let file_unique_size t ~size =
+  incr t.unique_size_files;
+  add t.unique_size_bytes size
+
+let file_unique_sample t ~size =
+  incr t.unique_sample_files;
+  add t.unique_sample_bytes size
+
+let file_hashed t ~size =
+  incr t.hashed_files;
+  add t.hashed_bytes size
+
+let digest t =
+  incr t.digests
+
+let redundant_data t ~size =
+  add t.redundant_data size
+
+let report
+  t
+  ~wall_time_all
+  ~wall_time_group_by_size
+  ~wall_time_group_by_head
+  ~wall_time_group_by_digest
+  ~proc_time_all
+  ~proc_time_group_by_size
+  ~proc_time_group_by_head
+  ~proc_time_group_by_digest
+=
+  let b_to_mb b = (float_of_int b) /. 1024. /. 1024. in
+  let b_to_gb b = (b_to_mb b) /. 1024. in
+  eprintf "Total time                   : %.2f wall sec  %.2f proc sec\n%!"
+    wall_time_all
+    proc_time_all;
+  eprintf "Considered                   : %8d files  %6.2f Gb\n%!"
+    !(t.considered_files)
+    (b_to_gb !(t.considered_bytes));
+  eprintf "Sampled                      : %8d files  %6.2f Gb\n%!"
+    !(t.sampled_files)
+    (b_to_gb !(t.sampled_bytes));
+  eprintf "Hashed                       : %8d files  %6.2f Gb  %6.2f wall sec  %6.2f proc sec\n%!"
+    !(t.hashed_files)
+    (b_to_gb !(t.hashed_bytes))
+    wall_time_group_by_digest
+    proc_time_group_by_digest;
+  eprintf "Digests                      : %8d\n%!"
+    !(t.digests);
+  eprintf "Duplicates (Hashed - Digests): %8d files  %6.2f Gb\n%!"
+    (!(t.hashed_files) - !(t.digests))
+    (b_to_gb !(t.redundant_data));
+  eprintf "Skipped due to 0      size   : %8d files\n%!" !(t.empty);
+  eprintf "Skipped due to unique size   : %8d files  %6.2f Gb  %6.2f wall sec  %6.2f proc sec\n%!"
+    !(t.unique_size_files)
+    (b_to_gb !(t.unique_size_bytes))
+    wall_time_group_by_size
+    proc_time_group_by_size;
+  eprintf "Skipped due to unique sample : %8d files  %6.2f Gb  %6.2f wall sec  %6.2f proc sec\n%!"
+    !(t.unique_sample_files)
+    (b_to_gb !(t.unique_sample_bytes))
+    wall_time_group_by_head
+    proc_time_group_by_head;
+  eprintf "Ignored due to regex match   : %8d files  %6.2f Gb\n%!"
+    !(t.ignored_files)
+    (b_to_gb !(t.ignored_bytes))
diff --git a/lib/metrics.mli b/lib/metrics.mli
new file mode 100644 (file)
index 0000000..27adc8c
--- /dev/null
@@ -0,0 +1,36 @@
+type t
+
+val init
+  : unit -> t
+val report
+  : t
+  -> wall_time_all:float
+  -> wall_time_group_by_size:float
+  -> wall_time_group_by_head:float
+  -> wall_time_group_by_digest:float
+  -> proc_time_all:float
+  -> proc_time_group_by_size:float
+  -> proc_time_group_by_head:float
+  -> proc_time_group_by_digest:float
+  -> unit
+
+val file_considered
+  : t -> size:int -> unit
+val file_ignored
+  : t -> size:int -> unit
+val file_empty
+  : t -> unit
+val file_sampled
+  : t -> unit
+val chunk_read
+  : t -> size:int -> unit
+val file_unique_size
+  : t -> size:int -> unit
+val file_unique_sample
+  : t -> size:int -> unit
+val file_hashed
+  : t -> size:int -> unit
+val digest
+  : t -> unit
+val redundant_data
+  : t -> size:int -> unit
diff --git a/lib/stream.ml b/lib/stream.ml
new file mode 100644 (file)
index 0000000..4d60816
--- /dev/null
@@ -0,0 +1,166 @@
+module Array = ArrayLabels
+module List  = ListLabels
+module S     = Stdlib.Stream
+
+type 'a t =
+  {mutable streams : ('a S.t) list}
+
+type ('input, 'output) msg_from_vassal =
+  | Ready of int
+  | Result of (int * ('input * 'output))
+  | Exiting of int
+
+type 'input msg_from_lord =
+  | Job of 'input option
+
+let create f =
+  {streams = [S.from (fun _ -> f ())]}
+
+let of_queue q =
+  create (fun () ->
+    match Queue.take q with
+    | exception Queue.Empty ->
+        None
+    | x ->
+        Some x
+  )
+
+let rec next t =
+  match t.streams with
+  | [] ->
+      None
+  | s :: streams ->
+      (match S.next s with
+      | exception S.Failure ->
+          t.streams <- streams;
+          next t
+      | x ->
+          Some x
+      )
+
+let map t ~f =
+  create (fun () ->
+    match next t with
+    | None   -> None
+    | Some x -> Some (f x)
+  )
+
+let filter t ~f =
+  let rec filter () =
+    match next t with
+    | None ->
+        None
+    | Some x when f x ->
+        Some x
+    | Some _ ->
+        filter ()
+  in
+  create filter
+
+let iter t ~f =
+  List.iter t.streams ~f:(S.iter f)
+
+let concat ts =
+  {streams = List.concat (List.map ts ~f:(fun {streams} -> streams))}
+
+let group_by t ~f =
+  let groups_tbl = Hashtbl.create 1_000_000 in
+  let group_update x =
+    let group = f x in
+    let members =
+      match Hashtbl.find_opt groups_tbl group with
+      | None ->
+          (1, [x])
+      | Some (n, xs) ->
+          (succ n, x :: xs)
+    in
+    Hashtbl.replace groups_tbl group members
+  in
+  iter t ~f:group_update;
+  let groups = Queue.create () in
+  Hashtbl.iter
+    (fun name (length, members) -> Queue.add (name, length, members) groups)
+    groups_tbl;
+  of_queue groups
+
+module Ipc : sig
+  val send : out_channel -> 'a -> unit
+  val recv : in_channel -> 'a
+end = struct
+  let send oc msg =
+    Marshal.to_channel oc msg [];
+    flush oc
+
+  let recv ic =
+    Marshal.from_channel ic
+end
+
+let lord t ~njobs ~vassals ~ic ~ocs =
+  let active_vassals = ref njobs in
+  let results = Queue.create () in
+  let rec loop () =
+    match ((Ipc.recv ic) : ('input, 'output) msg_from_vassal) with
+    | Exiting i ->
+        close_out ocs.(i);
+        decr active_vassals;
+        if !active_vassals = 0 then () else loop ()
+    | Ready i ->
+        Ipc.send ocs.(i) (Job (next t));
+        loop ()
+    | Result (i, result) ->
+        Queue.add result results;
+        Ipc.send ocs.(i) (Job (next t));
+        loop ()
+  in
+  let rec wait = function
+    | [] ->
+        ()
+    | vassals ->
+        let pid, _process_status = Unix.wait () in
+        (* TODO: handle process_status *)
+        wait (List.filter vassals ~f:(fun p -> p <> pid))
+  in
+  loop ();
+  close_in ic;
+  wait vassals;
+  of_queue results
+
+let vassal i ~f ~vassal_pipe_r ~lord_pipe_w =
+  let ic = Unix.in_channel_of_descr vassal_pipe_r in
+  let oc = Unix.out_channel_of_descr lord_pipe_w in
+  let rec loop () =
+    match (Ipc.recv ic : 'input msg_from_lord) with
+    | Job (Some x) ->
+        Ipc.send oc (Result (i, (x, f x)));
+        loop ()
+    | Job None ->
+        Ipc.send oc (Exiting i)
+  in
+  Ipc.send oc (Ready i);
+  loop ();
+  close_in ic;
+  close_out oc;
+  exit 0
+
+let bag_map t ~njobs ~f =
+  let lord_pipe_r, lord_pipe_w = Unix.pipe () in
+  let vassal_pipes   = Array.init njobs ~f:(fun _ -> Unix.pipe ()) in
+  let vassal_pipes_r = Array.map vassal_pipes ~f:(fun (r, _) -> r) in
+  let vassal_pipes_w = Array.map vassal_pipes ~f:(fun (_, w) -> w) in
+  let vassals = ref [] in
+  for i=0 to (njobs - 1) do
+    begin match Unix.fork () with
+    | 0 ->
+        Unix.close lord_pipe_r;
+        vassal i ~f ~lord_pipe_w ~vassal_pipe_r:vassal_pipes_r.(i)
+    | pid ->
+        vassals := pid :: !vassals
+    end
+  done;
+  Unix.close lord_pipe_w;
+  lord
+    t
+    ~njobs
+    ~vassals:!vassals
+    ~ic:(Unix.in_channel_of_descr lord_pipe_r)
+    ~ocs:(Array.map vassal_pipes_w ~f:Unix.out_channel_of_descr)
diff --git a/lib/stream.mli b/lib/stream.mli
new file mode 100644 (file)
index 0000000..6cb5b11
--- /dev/null
@@ -0,0 +1,18 @@
+type 'a t
+
+val create : (unit -> 'a option) -> 'a t
+
+val of_queue : 'a Queue.t -> 'a t
+
+val iter : 'a t -> f:('a -> unit) -> unit
+
+val bag_map : 'a t -> njobs:int -> f:('a -> 'b) -> ('a * 'b) t
+(** Parallel map with arbitrarily-reordered elements. *)
+
+val map : 'a t -> f:('a -> 'b) -> 'b t
+
+val filter : 'a t -> f:('a -> bool) -> 'a t
+
+val concat : ('a t) list -> 'a t
+
+val group_by : 'a t -> f:('a -> 'b) -> ('b * int * 'a list) t
This page took 0.087314 seconds and 4 git commands to generate.