open Printf module List = ListLabels module StrSet = Set.Make(String) module Unix = UnixLabels module M = Metrics type input = | Stdin | Directories of string list type output = | Stdout | Directory of string type opt = { input : input ; output : output ; ignore : string -> bool ; sample : int ; njobs : int ; delim_null : bool } let make_input_stream input ignore ~metrics ~delim_null = let input = match input with | Stdin -> File.lookup (In_channel.lines stdin ~delim_null) | Directories paths -> let paths = StrSet.elements (StrSet.of_list paths) in Stream.concat ( paths ~f:File.find) in Stream.filter input ~f:(fun {File.path; size} -> M.file_considered metrics ~size; let empty = size = 0 in let ignored = ignore path in if empty then M.file_empty metrics; if ignored then M.file_ignored metrics ~size; (not empty) && (not ignored) ) let make_output_fun = function | Stdout -> fun digest n_files files -> printf "%s %d\n%!" (Digest.to_hex digest) n_files; List.iter files ~f:(fun {File.path; _} -> printf " %S\n%!" path ) | Directory dir -> fun digest _ files -> let digest = Digest.to_hex digest in let dir = Filename.concat dir (String.sub digest 0 2) in Unix.mkdir dir ~perm:0o700; let oc = open_out (Filename.concat dir digest) in List.iter files ~f:(fun {File.path; _} -> output_string oc (sprintf "%S\n%!" path) ); close_out oc let time_wall () = Unix.gettimeofday () let time_proc () = Sys.time () let main {input; output; ignore; sample = sample_len; njobs; delim_null} = let wt0_all = time_wall () in let pt0_all = time_proc () in let metrics = M.init () in let output = make_output_fun output in let input = make_input_stream input ignore ~metrics ~delim_null in (* TODO: Make a nice(r) abstraction to re-assemble pieces in the pipeline: * * from input to files_by_size * from files_by_size to files_by_sample * from files_by_sample to files_by_digest * from files_by_digest to output * * input |> files_by_size |> files_by_sample |> files_by_digest |> output *) let files = input in let wt0_group_by_size = time_wall () in let pt0_group_by_size = time_proc () in eprintf "[debug] filtering-out files with unique size\n%!"; let files = File.filter_out_unique_sizes files ~metrics in let pt1_group_by_size = time_proc () in let wt1_group_by_size = time_wall () in let wt0_group_by_sample = wt1_group_by_size in let pt0_group_by_sample = pt1_group_by_size in eprintf "[debug] filtering-out files with unique heads\n%!"; let files = if njobs > 1 then begin let q = Queue.create () in files |> Stream.bag_map ~njobs ~f:(File.head ~len:sample_len ~metrics) |> Stream.group_by ~f:snd |> ~f:(fun (d, n, pairs) -> (d, n, pairs ~f:fst)) |> Stream.filter ~f:(fun (_, n, _) -> n > 1) |> Stream.iter ~f:(fun (_, _, fs) -> List.iter fs ~f:(fun f -> Queue.add f q)) ; Stream.of_queue q end else File.filter_out_unique_heads files ~len:sample_len ~metrics in let pt1_group_by_sample = time_proc () in let wt1_group_by_sample = time_wall () in let wt0_group_by_digest = wt1_group_by_sample in let pt0_group_by_digest = pt1_group_by_sample in eprintf "[debug] hashing\n%!"; let groups = if njobs > 1 then let with_digests = Stream.bag_map files ~njobs ~f:(fun {File.path; _} -> Digest.file path) in (Stream.group_by with_digests ~f:snd) ~f:( fun (digest, n, file_digest_pairs) -> let files = file_digest_pairs ~f:(fun (file, _) -> M.file_hashed metrics ~size:file.File.size; file ) in (digest, n, files) ) else Stream.group_by files ~f:(fun {File.path; size} -> M.file_hashed metrics ~size; Digest.file path ) in let pt1_group_by_digest = time_proc () in let wt1_group_by_digest = time_wall () in eprintf "[debug] reporting\n%!"; Stream.iter groups ~f:(fun (d, n, files) -> M.digest metrics; if n > 1 then begin M.redundant_data metrics ~size:(n * (List.hd files).File.size); output d n files end ); let pt1_all = time_proc () in let wt1_all = time_wall () in metrics ~wall_time_all: (wt1_all -. wt0_all) ~wall_time_group_by_size: (wt1_group_by_size -. wt0_group_by_size) ~wall_time_group_by_head: (wt1_group_by_sample -. wt0_group_by_sample) ~wall_time_group_by_digest:(wt1_group_by_digest -. wt0_group_by_digest) ~proc_time_all: (pt1_all -. pt0_all) ~proc_time_group_by_size: (pt1_group_by_size -. pt0_group_by_size) ~proc_time_group_by_head: (pt1_group_by_sample -. pt0_group_by_sample) ~proc_time_group_by_digest:(pt1_group_by_digest -. pt0_group_by_digest) let get_opt () : opt = let assert_ test x msg = if not (test x) then begin eprintf "%s\n%!" msg; exit 1 end in let assert_file_exists path = assert_ Sys.file_exists path (sprintf "File does not exist: %S" path) in let assert_file_is_dir path = assert_ Sys.is_directory path (sprintf "File is not a directory: %S" path) in let input = ref Stdin in let output = ref Stdout in let ignore = ref (fun _ -> false) in let sample = ref 512 in let njobs = ref 6 in let input_delim_null = ref false in let spec = [ ( "-out" , Arg.String (fun path -> assert_file_exists path; assert_file_is_dir path; output := Directory path ) , " Output to this directory instead of stdout." ) ; ( "-ignore" , Arg.String (fun regexp -> let regexp = Str.regexp regexp in ignore := fun string -> Str.string_match regexp string 0) , " Ignore file paths which match this regexp pattern (see Str module)." ) ; ( "-sample" , Arg.Set_int sample , (sprintf " Byte size of file samples to use. Default: %d" !sample) ) ; ( "-j" , Arg.Set_int njobs , (sprintf " Number of parallel jobs. Default: %d" !njobs) ) ; ( "-0" , Arg.Set input_delim_null , ( sprintf ( " Delimit input paths by null character instead of a newline." ^^" Meaningful only when reading candidate paths from stdin." ^^" Default: %B" ) !input_delim_null ) ) ] in Arg.parse (Arg.align spec) (fun path -> assert_file_exists path; assert_file_is_dir path; match !input with | Stdin -> input := Directories [path] | Directories paths -> input := Directories (path :: paths) ) ""; assert_ (fun x -> x > 0) !sample (sprintf "Sample size cannot be negative: %d" !sample); { input = !input ; output = !output ; ignore = !ignore ; sample = !sample ; njobs = !njobs ; delim_null = !input_delim_null } let () = main (get_opt ())