- let input = make_input_stream input in
- let paths_by_digest = Hashtbl.create 1_000_000 in
- let path_count = ref 0 in
- let t0 = Sys.time () in
- let process path =
- try
- let digest = Digest.file path in
- let count, paths =
- match Hashtbl.find_opt paths_by_digest digest with
- | None ->
- (0, StrSet.empty)
- | Some (n, paths) ->
- (n, paths)
+ let input = make_input_stream input ignore ~metrics ~delim_null in
+ (* TODO: Make a nice(r) abstraction to re-assemble pieces in the pipeline:
+ *
+ * from input to files_by_size
+ * from files_by_size to files_by_sample
+ * from files_by_sample to files_by_digest
+ * from files_by_digest to output
+ *
+ * input |> files_by_size |> files_by_sample |> files_by_digest |> output
+ *)
+
+ let files = input in
+
+ let wt0_group_by_size = time_wall () in
+ let pt0_group_by_size = time_proc () in
+ eprintf "[debug] filtering-out files with unique size\n%!";
+ let files = File.filter_out_unique_sizes files ~metrics in
+ let pt1_group_by_size = time_proc () in
+ let wt1_group_by_size = time_wall () in
+
+ let wt0_group_by_sample = wt1_group_by_size in
+ let pt0_group_by_sample = pt1_group_by_size in
+ eprintf "[debug] filtering-out files with unique heads\n%!";
+ let files =
+ if njobs > 1 then begin
+ let q = Queue.create () in
+ files
+ |> Stream.bag_map ~njobs ~f:(File.head ~len:sample_len ~metrics)
+ |> Stream.group_by ~f:snd
+ |> Stream.map ~f:(fun (d, n, pairs) -> (d, n, List.map pairs ~f:fst))
+ |> Stream.filter ~f:(fun (_, n, _) -> n > 1)
+ |> Stream.iter ~f:(fun (_, _, fs) -> List.iter fs ~f:(fun f -> Queue.add f q))
+ ;
+ Stream.of_queue q
+ end else
+ File.filter_out_unique_heads files ~len:sample_len ~metrics
+ in
+ let pt1_group_by_sample = time_proc () in
+ let wt1_group_by_sample = time_wall () in
+
+ let wt0_group_by_digest = wt1_group_by_sample in
+ let pt0_group_by_digest = pt1_group_by_sample in
+ eprintf "[debug] hashing\n%!";
+ let groups =
+ if njobs > 1 then
+ let with_digests =
+ Stream.bag_map files ~njobs ~f:(fun {File.path; _} -> Digest.file path)