- let input = make_input_stream input ignore count in
- let files_by_size = Hashtbl.create 1_000_000 in
- let files_by_sample = Hashtbl.create 1_000_000 in
- let files_by_digest = Hashtbl.create 1_000_000 in
- let process tbl ~group ~file =
- let count, files =
- match Hashtbl.find_opt tbl group with
- | None ->
- (0, File.Set.empty)
- | Some (n, files) ->
- (n, files)
- in
- Hashtbl.replace tbl group (count + 1, File.Set.add file files)
+ let input = make_input_stream input ignore ~metrics ~delim_null in
+ (* TODO: Make a nice(r) abstraction to re-assemble pieces in the pipeline:
+ *
+ * from input to files_by_size
+ * from files_by_size to files_by_sample
+ * from files_by_sample to files_by_digest
+ * from files_by_digest to output
+ *
+ * input |> files_by_size |> files_by_sample |> files_by_digest |> output
+ *)
+
+ let files = input in
+
+ let wt0_group_by_size = time_wall () in
+ let pt0_group_by_size = time_proc () in
+ eprintf "[debug] filtering-out files with unique size\n%!";
+ let files = File.filter_out_unique_sizes files ~metrics in
+ let pt1_group_by_size = time_proc () in
+ let wt1_group_by_size = time_wall () in
+
+ let wt0_group_by_sample = wt1_group_by_size in
+ let pt0_group_by_sample = pt1_group_by_size in
+ eprintf "[debug] filtering-out files with unique heads\n%!";
+ let files =
+ if njobs > 1 then begin
+ let q = Queue.create () in
+ files
+ |> Stream.bag_map ~njobs ~f:(File.head ~len:sample_len ~metrics)
+ |> Stream.group_by ~f:snd
+ |> Stream.map ~f:(fun (d, n, pairs) -> (d, n, List.map pairs ~f:fst))
+ |> Stream.filter ~f:(fun (_, n, _) -> n > 1)
+ |> Stream.iter ~f:(fun (_, _, fs) -> List.iter fs ~f:(fun f -> Queue.add f q))
+ ;
+ Stream.of_queue q
+ end else
+ File.filter_out_unique_heads files ~len:sample_len ~metrics
+ in
+ let pt1_group_by_sample = time_proc () in
+ let wt1_group_by_sample = time_wall () in
+
+ let wt0_group_by_digest = wt1_group_by_sample in
+ let pt0_group_by_digest = pt1_group_by_sample in
+ eprintf "[debug] hashing\n%!";
+ let groups =
+ if njobs > 1 then
+ let with_digests =
+ Stream.bag_map files ~njobs ~f:(fun {File.path; _} -> Digest.file path)
+ in
+ Stream.map (Stream.group_by with_digests ~f:snd) ~f:(
+ fun (digest, n, file_digest_pairs) ->
+ let files =
+ List.map file_digest_pairs ~f:(fun (file, _) ->
+ M.file_hashed metrics ~size:file.File.size;
+ file
+ )
+ in
+ (digest, n, files)
+ )
+ else
+ Stream.group_by files ~f:(fun {File.path; size} ->
+ M.file_hashed metrics ~size;
+ Digest.file path
+ )