module StrSet = Set.Make(String)
module Unix = UnixLabels
+module Metrics : sig
+ type t
+
+ val init
+ : unit -> t
+ val report
+ : t
+ -> time_all:float
+ -> time_group_by_size:float
+ -> time_group_by_head:float
+ -> time_group_by_digest:float
+ -> unit
+
+ val file_considered
+ : t -> size:int -> unit
+ val file_ignored
+ : t -> size:int -> unit
+ val file_empty
+ : t -> unit
+ val file_sampled
+ : t -> unit
+ val chunk_read
+ : t -> size:int -> unit
+ val file_unique_size
+ : t -> size:int -> unit
+ val file_unique_sample
+ : t -> size:int -> unit
+ val file_hashed
+ : t -> size:int -> unit
+ val digest
+ : t -> unit
+end = struct
+ type t =
+ { considered_files : int ref
+ ; considered_bytes : int ref
+ ; empty : int ref
+ ; ignored_files : int ref
+ ; ignored_bytes : int ref
+ ; unique_size_files : int ref
+ ; unique_size_bytes : int ref
+ ; unique_sample_files : int ref
+ ; unique_sample_bytes : int ref
+ ; sampled_files : int ref
+ ; sampled_bytes : int ref
+ ; hashed_files : int ref
+ ; hashed_bytes : int ref
+ ; digests : int ref
+ }
+
+ let init () =
+ { considered_files = ref 0
+ ; considered_bytes = ref 0
+ ; empty = ref 0
+ ; ignored_files = ref 0
+ ; ignored_bytes = ref 0
+ ; unique_size_files = ref 0
+ ; unique_size_bytes = ref 0
+ ; sampled_files = ref 0
+ ; sampled_bytes = ref 0
+ ; hashed_files = ref 0
+ ; hashed_bytes = ref 0
+ ; unique_sample_files = ref 0
+ ; unique_sample_bytes = ref 0
+ ; digests = ref 0
+ }
+
+ let add sum addend =
+ sum := !sum + addend
+
+ let file_considered t ~size =
+ incr t.considered_files;
+ add t.considered_bytes size
+
+ let file_ignored {ignored_files; ignored_bytes; _} ~size =
+ incr ignored_files;
+ add ignored_bytes size
+
+ let file_empty t =
+ incr t.empty
+
+ let chunk_read t ~size =
+ add t.sampled_bytes size
+
+ let file_sampled t =
+ incr t.sampled_files
+
+ let file_unique_size t ~size =
+ incr t.unique_size_files;
+ add t.unique_size_bytes size
+
+ let file_unique_sample t ~size =
+ incr t.unique_sample_files;
+ add t.unique_sample_bytes size
+
+ let file_hashed t ~size =
+ incr t.hashed_files;
+ add t.hashed_bytes size
+
+ let digest t =
+ incr t.digests
+
+ let report
+ t
+ ~time_all
+ ~time_group_by_size
+ ~time_group_by_head
+ ~time_group_by_digest
+ =
+ let b_to_mb b = (float_of_int b) /. 1024. /. 1024. in
+ let b_to_gb b = (b_to_mb b) /. 1024. in
+ eprintf "Time : %8.2f seconds\n%!"
+ time_all;
+ eprintf "Considered : %8d files %6.2f Gb\n%!"
+ !(t.considered_files)
+ (b_to_gb !(t.considered_bytes));
+ eprintf "Sampled : %8d files %6.2f Gb\n%!"
+ !(t.sampled_files)
+ (b_to_gb !(t.sampled_bytes));
+ eprintf "Hashed : %8d files %6.2f Gb %6.2f seconds\n%!"
+ !(t.hashed_files)
+ (b_to_gb !(t.hashed_bytes))
+ time_group_by_digest;
+ eprintf "Digests : %8d\n%!"
+ !(t.digests);
+ eprintf "Duplicates (Hashed - Digests): %8d\n%!"
+ (!(t.hashed_files) - !(t.digests));
+ eprintf "Skipped due to 0 size : %8d files\n%!" !(t.empty);
+ eprintf "Skipped due to unique size : %8d files %6.2f Gb %6.2f seconds\n%!"
+ !(t.unique_size_files)
+ (b_to_gb !(t.unique_size_bytes))
+ time_group_by_size;
+ eprintf "Skipped due to unique sample : %8d files %6.2f Gb %6.2f seconds\n%!"
+ !(t.unique_sample_files)
+ (b_to_gb !(t.unique_sample_bytes))
+ time_group_by_head;
+ eprintf "Ignored due to regex match : %8d files %6.2f Gb\n%!"
+ !(t.ignored_files)
+ (b_to_gb !(t.ignored_bytes))
+end
+
+module M = Metrics
+
module Stream : sig
type 'a t
val create : (unit -> 'a option) -> 'a t
+ val of_queue : 'a Queue.t -> 'a t
+
val iter : 'a t -> f:('a -> unit) -> unit
val map : 'a t -> f:('a -> 'b) -> 'b t
val filter : 'a t -> f:('a -> bool) -> 'a t
val concat : ('a t) list -> 'a t
+
+ val group_by : 'a t -> f:('a -> 'b) -> ('b * int * 'a list) t
end = struct
module S = Stream
let create f =
{streams = [S.from (fun _ -> f ())]}
+ let of_queue q =
+ create (fun () ->
+ match Queue.take q with
+ | exception Queue.Empty ->
+ None
+ | x ->
+ Some x
+ )
+
let rec next t =
match t.streams with
| [] ->
let concat ts =
{streams = List.concat (List.map ts ~f:(fun {streams} -> streams))}
+
+ let group_by t ~f =
+ let groups_tbl = Hashtbl.create 1_000_000 in
+ let group_update x =
+ let group = f x in
+ let members =
+ match Hashtbl.find_opt groups_tbl group with
+ | None ->
+ (1, [x])
+ | Some (n, xs) ->
+ (succ n, x :: xs)
+ in
+ Hashtbl.replace groups_tbl group members
+ in
+ iter t ~f:group_update;
+ let groups = Queue.create () in
+ Hashtbl.iter
+ (fun name (length, members) -> Queue.add (name, length, members) groups)
+ groups_tbl;
+ of_queue groups
end
module In_channel : sig
val lookup : string Stream.t -> t Stream.t
(** Lookup file info for given paths *)
- module Set : sig include Set.S with type elt := t end
+ val filter_out_unique_sizes : t Stream.t -> metrics:M.t -> t Stream.t
+ val filter_out_unique_heads : t Stream.t -> len:int -> metrics:M.t -> t Stream.t
end = struct
type t =
{ path : string
; size : int
}
- let compare {path=p1; _} {path=p2; _} =
- Stdlib.compare p1 p2
-
let lookup paths =
Stream.map paths ~f:(fun path ->
let {Unix.st_size = size; _} = Unix.lstat path in
in
Stream.create next
- module Set = Set.Make(struct
- type elt = t
- type t = elt
- let compare = compare
- end)
+ let filter_out_singletons files ~group ~handle_singleton =
+ let q = Queue.create () in
+ Stream.iter (Stream.group_by files ~f:group) ~f:(fun group ->
+ let (_, n, members) = group in
+ if n > 1 then
+ List.iter members ~f:(fun m -> Queue.add m q)
+ else
+ handle_singleton group
+ );
+ Stream.of_queue q
+
+ let filter_out_unique_sizes files ~metrics =
+ filter_out_singletons
+ files
+ ~group:(fun {size; _} -> size)
+ ~handle_singleton:(fun (size, _, _) -> M.file_unique_size metrics ~size)
+
+ let head path ~len ~metrics =
+ let buf = Bytes.make len ' ' in
+ let ic = open_in_bin path in
+ let rec read pos len =
+ assert (len >= 0);
+ if len = 0 then
+ ()
+ else begin
+ let chunk_size = input ic buf pos len in
+ M.chunk_read metrics ~size:chunk_size;
+ if chunk_size = 0 then (* EOF *)
+ ()
+ else
+ read (pos + chunk_size) (len - chunk_size)
+ end
+ in
+ read 0 len;
+ close_in ic;
+ Bytes.to_string buf
+
+ let filter_out_unique_heads files ~len ~metrics =
+ filter_out_singletons
+ files
+ ~group:(fun {path; _} ->
+ M.file_sampled metrics;
+ head path ~len ~metrics
+ )
+ ~handle_singleton:(fun (_, _, files) ->
+ let {size; _} = List.hd files in (* Guaranteed non-empty *)
+ M.file_unique_sample metrics ~size
+ )
end
type input =
; sample : int
}
-type count =
- { considered_files : int ref
- ; considered_bytes : int ref
- ; empty : int ref
- ; ignored_files : int ref
- ; ignored_bytes : int ref
- ; unique_size_files : int ref
- ; unique_size_bytes : int ref
- ; unique_sample_files : int ref
- ; unique_sample_bytes : int ref
- ; sampled_files : int ref
- ; sampled_bytes : int ref
- ; hashed_files : int ref
- ; hashed_bytes : int ref
- ; digests : int ref
- }
-
-let add sum addend =
- sum := !sum + addend
-
-let make_input_stream input ignore count =
+let make_input_stream input ignore ~metrics =
let input =
match input with
| Stdin ->
Stream.concat (List.map paths ~f:File.find)
in
Stream.filter input ~f:(fun {File.path; size} ->
- incr count.considered_files;
- add count.considered_bytes size;
+ M.file_considered metrics ~size;
let empty = size = 0 in
+ if empty then M.file_empty metrics;
let ignored =
match ignore with
| Some regexp when (Str.string_match regexp path 0) ->
- incr count.ignored_files;
- add count.ignored_bytes size;
+ M.file_ignored metrics ~size;
true
| Some _ | None ->
false
in
- if empty then incr count.empty;
(not empty) && (not ignored)
)
| Stdout ->
fun digest n_files files ->
printf "%s %d\n%!" (Digest.to_hex digest) n_files;
- List.iter (File.Set.elements files) ~f:(fun {File.path; _} ->
+ List.iter files ~f:(fun {File.path; _} ->
printf " %S\n%!" path
)
| Directory dir ->
let dir = Filename.concat dir (String.sub digest 0 2) in
Unix.mkdir dir ~perm:0o700;
let oc = open_out (Filename.concat dir digest) in
- List.iter (File.Set.elements files) ~f:(fun {File.path; _} ->
+ List.iter files ~f:(fun {File.path; _} ->
output_string oc (sprintf "%S\n%!" path)
);
close_out oc
-let sample path ~len ~count =
- let buf = Bytes.make len ' ' in
- let ic = open_in_bin path in
- let rec read pos len =
- assert (len >= 0);
- if len = 0 then
- ()
- else begin
- let chunk_size = input ic buf pos len in
- add count.sampled_bytes chunk_size;
- if chunk_size = 0 then (* EOF *)
- ()
- else
- read (pos + chunk_size) (len - chunk_size)
- end
- in
- read 0 len;
- close_in ic;
- Bytes.to_string buf
-
let main {input; output; ignore; sample = sample_len} =
- let t0 = Sys.time () in
- let count =
- { considered_files = ref 0
- ; considered_bytes = ref 0
- ; empty = ref 0
- ; ignored_files = ref 0
- ; ignored_bytes = ref 0
- ; unique_size_files = ref 0
- ; unique_size_bytes = ref 0
- ; sampled_files = ref 0
- ; sampled_bytes = ref 0
- ; hashed_files = ref 0
- ; hashed_bytes = ref 0
- ; unique_sample_files = ref 0
- ; unique_sample_bytes = ref 0
- ; digests = ref 0
- }
- in
+ let t0_all = Sys.time () in
+ let metrics = M.init () in
let output = make_output_fun output in
- let input = make_input_stream input ignore count in
- let files_by_size = Hashtbl.create 1_000_000 in
- let files_by_sample = Hashtbl.create 1_000_000 in
- let files_by_digest = Hashtbl.create 1_000_000 in
- let process tbl ~group ~file =
- let count, files =
- match Hashtbl.find_opt tbl group with
- | None ->
- (0, File.Set.empty)
- | Some (n, files) ->
- (n, files)
- in
- Hashtbl.replace tbl group (count + 1, File.Set.add file files)
- in
+ let input = make_input_stream input ignore ~metrics in
(* TODO: Make a nice(r) abstraction to re-assemble pieces in the pipeline:
*
* from input to files_by_size
*
* input |> files_by_size |> files_by_sample |> files_by_digest |> output
*)
+
+ let files = input in
+
let t0_group_by_size = Sys.time () in
- Stream.iter input ~f:(fun ({File.size; _} as file) ->
- process files_by_size ~group:size ~file
- );
+ let files = File.filter_out_unique_sizes files ~metrics in
let t1_group_by_size = Sys.time () in
- let t0_group_by_sample = Sys.time () in
- Hashtbl.iter
- (fun _ (n, files) ->
- (* Skip files with unique sizes *)
- if n > 1 then
- File.Set.iter
- (fun ({File.path; _} as file) ->
- incr count.sampled_files;
- process
- files_by_sample
- ~group:(sample path ~len:sample_len ~count)
- ~file
- )
- files
- else
- File.Set.iter
- (fun {File.size; _} ->
- incr count.unique_size_files;
- add count.unique_size_bytes size
- )
- files
- )
- files_by_size;
+
+ let t0_group_by_sample = t1_group_by_size in
+ let files = File.filter_out_unique_heads files ~len:sample_len ~metrics in
let t1_group_by_sample = Sys.time () in
- let t0_group_by_digest = Sys.time () in
- Hashtbl.iter
- (fun _ (n, files) ->
- (* Skip files with unique samples *)
- if n > 1 then
- File.Set.iter
- (fun ({File.path; size} as file) ->
- incr count.hashed_files;
- add count.hashed_bytes size;
- process files_by_digest ~group:(Digest.file path) ~file
- )
- files
- else
- File.Set.iter
- (fun {File.size; _} ->
- incr count.unique_sample_files;
- add count.unique_sample_bytes size;
- )
- files
+
+ let t0_group_by_digest = t1_group_by_sample in
+ let groups =
+ Stream.group_by files ~f:(fun {File.path; size} ->
+ M.file_hashed metrics ~size;
+ Digest.file path
)
- files_by_sample;
+ in
let t1_group_by_digest = Sys.time () in
- Hashtbl.iter
- (fun d (n, files) ->
- incr count.digests;
- if n > 1 then
- output d n files
- )
- files_by_digest;
- let t1 = Sys.time () in
- let b_to_mb b = (float_of_int b) /. 1024. /. 1024. in
- let b_to_gb b = (b_to_mb b) /. 1024. in
- eprintf "Time : %8.2f seconds\n%!" (t1 -. t0);
- eprintf "Considered : %8d files %6.2f Gb\n%!"
- !(count.considered_files)
- (b_to_gb !(count.considered_bytes));
- eprintf "Sampled : %8d files %6.2f Gb\n%!"
- !(count.sampled_files)
- (b_to_gb !(count.sampled_bytes));
- eprintf "Hashed : %8d files %6.2f Gb %6.2f seconds\n%!"
- !(count.hashed_files)
- (b_to_gb !(count.hashed_bytes))
- (t1_group_by_digest -. t0_group_by_digest);
- eprintf "Digests : %8d\n%!"
- !(count.digests);
- eprintf "Duplicates (Hashed - Digests): %8d\n%!"
- (!(count.hashed_files) - !(count.digests));
- eprintf "Skipped due to 0 size : %8d files\n%!" !(count.empty);
- eprintf "Skipped due to unique size : %8d files %6.2f Gb %6.2f seconds\n%!"
- !(count.unique_size_files)
- (b_to_gb !(count.unique_size_bytes))
- (t1_group_by_size -. t0_group_by_size);
- eprintf "Skipped due to unique sample : %8d files %6.2f Gb %6.2f seconds\n%!"
- !(count.unique_sample_files)
- (b_to_gb !(count.unique_sample_bytes))
- (t1_group_by_sample -. t0_group_by_sample);
- eprintf "Ignored due to regex match : %8d files %6.2f Gb\n%!"
- !(count.ignored_files)
- (b_to_gb !(count.ignored_bytes))
+
+ Stream.iter groups ~f:(fun (d, n, files) ->
+ M.digest metrics;
+ if n > 1 then output d n files
+ );
+
+ let t1_all = Sys.time () in
+
+ M.report metrics
+ ~time_all: (t1_all -. t0_all)
+ ~time_group_by_size: (t1_group_by_size -. t0_group_by_size)
+ ~time_group_by_head: (t1_group_by_sample -. t0_group_by_sample)
+ ~time_group_by_digest:(t1_group_by_digest -. t0_group_by_digest)
let get_opt () : opt =
let assert_ test x msg =