open Printf
-module Array = ArrayLabels
-module List = ListLabels
-
-module Stream : sig
- type 'a t
-
- val create : (unit -> 'a option) -> 'a t
-
- val iter : 'a t -> f:('a -> unit) -> unit
-end = struct
- module S = Stream
-
- type 'a t =
- 'a S.t
-
- let create f =
- S.from (fun _ -> f ())
-
- let iter t ~f =
- S.iter f t
-end
-
-module In_channel : sig
- val lines : in_channel -> string Stream.t
-end = struct
- let lines ic =
- Stream.create (fun () ->
- match input_line ic with
- | exception End_of_file ->
- None
- | line ->
- Some line
- )
-end
-
-module Directory : sig
- val find_files : string -> string Stream.t
-end = struct
- let find_files root =
- let dirs = Queue.create () in
- let files = Queue.create () in
- let explore parent =
- Array.iter (Sys.readdir parent) ~f:(fun child ->
- let path = Filename.concat parent child in
- let {Unix.st_kind = file_kind; _} = Unix.lstat path in
- match file_kind with
- | Unix.S_REG ->
- Queue.add path files
- | Unix.S_DIR ->
- Queue.add path dirs
- | Unix.S_CHR
- | Unix.S_BLK
- | Unix.S_LNK
- | Unix.S_FIFO
- | Unix.S_SOCK ->
- ()
- )
- in
- explore root;
- let rec next () =
- match Queue.is_empty files, Queue.is_empty dirs with
- | false, _ -> Some (Queue.take files)
- | true , true -> None
- | true , false ->
- explore (Queue.take dirs);
- next ()
- in
- Stream.create next
-end
+module List = ListLabels
+module StrSet = Set.Make(String)
+module Unix = UnixLabels
+
+module M = Metrics
type input =
- | Root_path of string
- | Paths_on_stdin
+ | Stdin
+ | Directories of string list
+
+type output =
+ | Stdout
+ | Directory of string
+
+type opt =
+ { input : input
+ ; output : output
+ ; ignore : string -> bool
+ ; sample : int
+ ; njobs : int
+ ; delim_null : bool
+ }
-let main input =
- let paths =
+let make_input_stream input ignore ~metrics ~delim_null =
+ let input =
match input with
- | Paths_on_stdin -> In_channel.lines stdin
- | Root_path root -> Directory.find_files root
+ | Stdin ->
+ File.lookup (In_channel.lines stdin ~delim_null)
+ | Directories paths ->
+ let paths = StrSet.elements (StrSet.of_list paths) in
+ Stream.concat (List.map paths ~f:File.find)
+ in
+ Stream.filter input ~f:(fun {File.path; size} ->
+ M.file_considered metrics ~size;
+ let empty = size = 0 in
+ let ignored = ignore path in
+ if empty then M.file_empty metrics;
+ if ignored then M.file_ignored metrics ~size;
+ (not empty) && (not ignored)
+ )
+
+let make_output_fun = function
+ | Stdout ->
+ fun digest n_files files ->
+ printf "%s %d\n%!" (Digest.to_hex digest) n_files;
+ List.iter files ~f:(fun {File.path; _} ->
+ printf " %S\n%!" path
+ )
+ | Directory dir ->
+ fun digest _ files ->
+ let digest = Digest.to_hex digest in
+ let dir = Filename.concat dir (String.sub digest 0 2) in
+ Unix.mkdir dir ~perm:0o700;
+ let oc = open_out (Filename.concat dir digest) in
+ List.iter files ~f:(fun {File.path; _} ->
+ output_string oc (sprintf "%S\n%!" path)
+ );
+ close_out oc
+
+let time_wall () =
+ Unix.gettimeofday ()
+
+let time_proc () =
+ Sys.time ()
+
+let main {input; output; ignore; sample = sample_len; njobs; delim_null} =
+ let wt0_all = time_wall () in
+ let pt0_all = time_proc () in
+ let metrics = M.init () in
+ let output = make_output_fun output in
+ let input = make_input_stream input ignore ~metrics ~delim_null in
+ (* TODO: Make a nice(r) abstraction to re-assemble pieces in the pipeline:
+ *
+ * from input to files_by_size
+ * from files_by_size to files_by_sample
+ * from files_by_sample to files_by_digest
+ * from files_by_digest to output
+ *
+ * input |> files_by_size |> files_by_sample |> files_by_digest |> output
+ *)
+
+ let files = input in
+
+ let wt0_group_by_size = time_wall () in
+ let pt0_group_by_size = time_proc () in
+ eprintf "[debug] filtering-out files with unique size\n%!";
+ let files = File.filter_out_unique_sizes files ~metrics in
+ let pt1_group_by_size = time_proc () in
+ let wt1_group_by_size = time_wall () in
+
+ let wt0_group_by_sample = wt1_group_by_size in
+ let pt0_group_by_sample = pt1_group_by_size in
+ eprintf "[debug] filtering-out files with unique heads\n%!";
+ let files =
+ if njobs > 1 then begin
+ let q = Queue.create () in
+ files
+ |> Stream.bag_map ~njobs ~f:(File.head ~len:sample_len ~metrics)
+ |> Stream.group_by ~f:snd
+ |> Stream.map ~f:(fun (d, n, pairs) -> (d, n, List.map pairs ~f:fst))
+ |> Stream.filter ~f:(fun (_, n, _) -> n > 1)
+ |> Stream.iter ~f:(fun (_, _, fs) -> List.iter fs ~f:(fun f -> Queue.add f q))
+ ;
+ Stream.of_queue q
+ end else
+ File.filter_out_unique_heads files ~len:sample_len ~metrics
in
- let paths_by_digest = Hashtbl.create 1_000_000 in
- let path_count = ref 0 in
- let t0 = Sys.time () in
- Stream.iter paths ~f:(fun path ->
- incr path_count;
- try
- let digest = Digest.file path in
- let paths =
- match Hashtbl.find_opt paths_by_digest digest with
- | None ->
- []
- | Some paths ->
- paths
+ let pt1_group_by_sample = time_proc () in
+ let wt1_group_by_sample = time_wall () in
+
+ let wt0_group_by_digest = wt1_group_by_sample in
+ let pt0_group_by_digest = pt1_group_by_sample in
+ eprintf "[debug] hashing\n%!";
+ let groups =
+ if njobs > 1 then
+ let with_digests =
+ Stream.bag_map files ~njobs ~f:(fun {File.path; _} -> Digest.file path)
in
- Hashtbl.replace paths_by_digest digest (path :: paths)
- with Sys_error e ->
- eprintf "WARNING: Failed to process %S: %S\n%!" path e
+ Stream.map (Stream.group_by with_digests ~f:snd) ~f:(
+ fun (digest, n, file_digest_pairs) ->
+ let files =
+ List.map file_digest_pairs ~f:(fun (file, _) ->
+ M.file_hashed metrics ~size:file.File.size;
+ file
+ )
+ in
+ (digest, n, files)
+ )
+ else
+ Stream.group_by files ~f:(fun {File.path; size} ->
+ M.file_hashed metrics ~size;
+ Digest.file path
+ )
+ in
+ let pt1_group_by_digest = time_proc () in
+ let wt1_group_by_digest = time_wall () in
+
+ eprintf "[debug] reporting\n%!";
+ Stream.iter groups ~f:(fun (d, n, files) ->
+ M.digest metrics;
+ if n > 1 then begin
+ M.redundant_data metrics ~size:(n * (List.hd files).File.size);
+ output d n files
+ end
);
- Hashtbl.iter
- (fun digest paths ->
- let n_paths = List.length paths in
- if n_paths > 1 then begin
- printf "%s %d\n%!" (Digest.to_hex digest) n_paths;
- List.iter paths ~f:(fun path -> printf " %s\n%!" path)
- end
- )
- paths_by_digest;
- let t1 = Sys.time () in
- eprintf "Processed %d files in %f seconds.\n%!" !path_count (t1 -. t0)
-let () =
- let input = ref Paths_on_stdin in
- Arg.parse [] (fun path ->
- if Sys.file_exists path then
- input := Root_path path
- else begin
- eprintf "File does not exist: %S\n%!" path;
+ let pt1_all = time_proc () in
+ let wt1_all = time_wall () in
+
+ M.report metrics
+ ~wall_time_all: (wt1_all -. wt0_all)
+ ~wall_time_group_by_size: (wt1_group_by_size -. wt0_group_by_size)
+ ~wall_time_group_by_head: (wt1_group_by_sample -. wt0_group_by_sample)
+ ~wall_time_group_by_digest:(wt1_group_by_digest -. wt0_group_by_digest)
+ ~proc_time_all: (pt1_all -. pt0_all)
+ ~proc_time_group_by_size: (pt1_group_by_size -. pt0_group_by_size)
+ ~proc_time_group_by_head: (pt1_group_by_sample -. pt0_group_by_sample)
+ ~proc_time_group_by_digest:(pt1_group_by_digest -. pt0_group_by_digest)
+
+let get_opt () : opt =
+ let assert_ test x msg =
+ if not (test x) then begin
+ eprintf "%s\n%!" msg;
exit 1
end
- ) "";
- main !input
+ in
+ let assert_file_exists path =
+ assert_ Sys.file_exists path (sprintf "File does not exist: %S" path)
+ in
+ let assert_file_is_dir path =
+ assert_ Sys.is_directory path (sprintf "File is not a directory: %S" path)
+ in
+ let input = ref Stdin in
+ let output = ref Stdout in
+ let ignore = ref (fun _ -> false) in
+ let sample = ref 512 in
+ let njobs = ref 6 in
+ let input_delim_null = ref false in
+ let spec =
+ [ ( "-out"
+ , Arg.String (fun path ->
+ assert_file_exists path;
+ assert_file_is_dir path;
+ output := Directory path
+ )
+ , " Output to this directory instead of stdout."
+ )
+ ; ( "-ignore"
+ , Arg.String (fun regexp ->
+ let regexp = Str.regexp regexp in
+ ignore := fun string -> Str.string_match regexp string 0)
+ , " Ignore file paths which match this regexp pattern (see Str module)."
+ )
+ ; ( "-sample"
+ , Arg.Set_int sample
+ , (sprintf " Byte size of file samples to use. Default: %d" !sample)
+ )
+ ; ( "-j"
+ , Arg.Set_int njobs
+ , (sprintf " Number of parallel jobs. Default: %d" !njobs)
+ )
+ ; ( "-0"
+ , Arg.Set input_delim_null
+ , ( sprintf
+ ( " Delimit input paths by null character instead of a newline."
+ ^^" Meaningful only when reading candidate paths from stdin."
+ ^^" Default: %B"
+ )
+ !input_delim_null
+ )
+ )
+ ]
+ in
+ Arg.parse
+ (Arg.align spec)
+ (fun path ->
+ assert_file_exists path;
+ assert_file_is_dir path;
+ match !input with
+ | Stdin ->
+ input := Directories [path]
+ | Directories paths ->
+ input := Directories (path :: paths)
+ )
+ "";
+ assert_
+ (fun x -> x > 0)
+ !sample
+ (sprintf "Sample size cannot be negative: %d" !sample);
+ { input = !input
+ ; output = !output
+ ; ignore = !ignore
+ ; sample = !sample
+ ; njobs = !njobs
+ ; delim_null = !input_delim_null
+ }
+
+let () =
+ main (get_opt ())