+
+ let files = input in
+
+ let wt0_group_by_size = time_wall () in
+ let pt0_group_by_size = time_proc () in
+ eprintf "[debug] filtering-out files with unique size\n%!";
+ let files = File.filter_out_unique_sizes files ~metrics in
+ let pt1_group_by_size = time_proc () in
+ let wt1_group_by_size = time_wall () in
+
+ let wt0_group_by_sample = wt1_group_by_size in
+ let pt0_group_by_sample = pt1_group_by_size in
+ eprintf "[debug] filtering-out files with unique heads\n%!";
+ let files =
+ if njobs > 1 then begin
+ let q = Queue.create () in
+ files
+ |> Stream.bag_map ~njobs ~f:(File.head ~len:sample_len ~metrics)
+ |> Stream.group_by ~f:snd
+ |> Stream.map ~f:(fun (d, n, pairs) -> (d, n, List.map pairs ~f:fst))
+ |> Stream.filter ~f:(fun (_, n, _) -> n > 1)
+ |> Stream.iter ~f:(fun (_, _, fs) -> List.iter fs ~f:(fun f -> Queue.add f q))
+ ;
+ Stream.of_queue q
+ end else
+ File.filter_out_unique_heads files ~len:sample_len ~metrics
+ in
+ let pt1_group_by_sample = time_proc () in
+ let wt1_group_by_sample = time_wall () in
+
+ let wt0_group_by_digest = wt1_group_by_sample in
+ let pt0_group_by_digest = pt1_group_by_sample in
+ eprintf "[debug] hashing\n%!";
+ let groups =
+ if njobs > 1 then
+ let with_digests =
+ Stream.bag_map files ~njobs ~f:(fun {File.path; _} -> Digest.file path)
+ in
+ Stream.map (Stream.group_by with_digests ~f:snd) ~f:(
+ fun (digest, n, file_digest_pairs) ->
+ let files =
+ List.map file_digest_pairs ~f:(fun (file, _) ->
+ M.file_hashed metrics ~size:file.File.size;
+ file
+ )
+ in
+ (digest, n, files)
+ )
+ else
+ Stream.group_by files ~f:(fun {File.path; size} ->
+ M.file_hashed metrics ~size;
+ Digest.file path
+ )
+ in
+ let pt1_group_by_digest = time_proc () in
+ let wt1_group_by_digest = time_wall () in
+
+ eprintf "[debug] reporting\n%!";
+ Stream.iter groups ~f:(fun (d, n, files) ->
+ M.digest metrics;
+ if n > 1 then begin
+ M.redundant_data metrics ~size:(n * (List.hd files).File.size);
+ output d n files
+ end