Commit | Line | Data |
---|---|---|
cce97c27 SK |
1 | open Printf |
2 | ||
948ee900 SK |
3 | module Array = ArrayLabels |
4 | module List = ListLabels | |
5c0100d2 | 5 | module StrSet = Set.Make(String) |
e09dff7f | 6 | module Unix = UnixLabels |
cce97c27 | 7 | |
1013fbcd SK |
8 | module Metrics : sig |
9 | type t | |
10 | ||
11 | val init | |
12 | : unit -> t | |
13 | val report | |
14 | : t | |
217f8912 SK |
15 | -> wall_time_all:float |
16 | -> wall_time_group_by_size:float | |
17 | -> wall_time_group_by_head:float | |
18 | -> wall_time_group_by_digest:float | |
19 | -> proc_time_all:float | |
20 | -> proc_time_group_by_size:float | |
21 | -> proc_time_group_by_head:float | |
22 | -> proc_time_group_by_digest:float | |
1013fbcd SK |
23 | -> unit |
24 | ||
25 | val file_considered | |
26 | : t -> size:int -> unit | |
27 | val file_ignored | |
28 | : t -> size:int -> unit | |
29 | val file_empty | |
30 | : t -> unit | |
31 | val file_sampled | |
32 | : t -> unit | |
33 | val chunk_read | |
34 | : t -> size:int -> unit | |
35 | val file_unique_size | |
36 | : t -> size:int -> unit | |
37 | val file_unique_sample | |
38 | : t -> size:int -> unit | |
39 | val file_hashed | |
40 | : t -> size:int -> unit | |
41 | val digest | |
42 | : t -> unit | |
389dccaf SK |
43 | val redundant_data |
44 | : t -> size:int -> unit | |
1013fbcd SK |
45 | end = struct |
46 | type t = | |
47 | { considered_files : int ref | |
48 | ; considered_bytes : int ref | |
49 | ; empty : int ref | |
50 | ; ignored_files : int ref | |
51 | ; ignored_bytes : int ref | |
52 | ; unique_size_files : int ref | |
53 | ; unique_size_bytes : int ref | |
54 | ; unique_sample_files : int ref | |
55 | ; unique_sample_bytes : int ref | |
56 | ; sampled_files : int ref | |
57 | ; sampled_bytes : int ref | |
58 | ; hashed_files : int ref | |
59 | ; hashed_bytes : int ref | |
60 | ; digests : int ref | |
389dccaf | 61 | ; redundant_data : int ref |
1013fbcd SK |
62 | } |
63 | ||
64 | let init () = | |
65 | { considered_files = ref 0 | |
66 | ; considered_bytes = ref 0 | |
67 | ; empty = ref 0 | |
68 | ; ignored_files = ref 0 | |
69 | ; ignored_bytes = ref 0 | |
70 | ; unique_size_files = ref 0 | |
71 | ; unique_size_bytes = ref 0 | |
72 | ; sampled_files = ref 0 | |
73 | ; sampled_bytes = ref 0 | |
74 | ; hashed_files = ref 0 | |
75 | ; hashed_bytes = ref 0 | |
76 | ; unique_sample_files = ref 0 | |
77 | ; unique_sample_bytes = ref 0 | |
78 | ; digests = ref 0 | |
389dccaf | 79 | ; redundant_data = ref 0 |
1013fbcd SK |
80 | } |
81 | ||
82 | let add sum addend = | |
83 | sum := !sum + addend | |
84 | ||
85 | let file_considered t ~size = | |
86 | incr t.considered_files; | |
87 | add t.considered_bytes size | |
88 | ||
89 | let file_ignored {ignored_files; ignored_bytes; _} ~size = | |
90 | incr ignored_files; | |
91 | add ignored_bytes size | |
92 | ||
93 | let file_empty t = | |
94 | incr t.empty | |
95 | ||
96 | let chunk_read t ~size = | |
97 | add t.sampled_bytes size | |
98 | ||
99 | let file_sampled t = | |
100 | incr t.sampled_files | |
101 | ||
102 | let file_unique_size t ~size = | |
103 | incr t.unique_size_files; | |
104 | add t.unique_size_bytes size | |
105 | ||
106 | let file_unique_sample t ~size = | |
107 | incr t.unique_sample_files; | |
108 | add t.unique_sample_bytes size | |
109 | ||
110 | let file_hashed t ~size = | |
111 | incr t.hashed_files; | |
112 | add t.hashed_bytes size | |
113 | ||
114 | let digest t = | |
115 | incr t.digests | |
116 | ||
389dccaf SK |
117 | let redundant_data t ~size = |
118 | add t.redundant_data size | |
119 | ||
1013fbcd SK |
120 | let report |
121 | t | |
217f8912 SK |
122 | ~wall_time_all |
123 | ~wall_time_group_by_size | |
124 | ~wall_time_group_by_head | |
125 | ~wall_time_group_by_digest | |
126 | ~proc_time_all | |
127 | ~proc_time_group_by_size | |
128 | ~proc_time_group_by_head | |
129 | ~proc_time_group_by_digest | |
1013fbcd SK |
130 | = |
131 | let b_to_mb b = (float_of_int b) /. 1024. /. 1024. in | |
132 | let b_to_gb b = (b_to_mb b) /. 1024. in | |
217f8912 SK |
133 | eprintf "Total time : %.2f wall sec %.2f proc sec\n%!" |
134 | wall_time_all | |
135 | proc_time_all; | |
1013fbcd SK |
136 | eprintf "Considered : %8d files %6.2f Gb\n%!" |
137 | !(t.considered_files) | |
138 | (b_to_gb !(t.considered_bytes)); | |
139 | eprintf "Sampled : %8d files %6.2f Gb\n%!" | |
140 | !(t.sampled_files) | |
141 | (b_to_gb !(t.sampled_bytes)); | |
217f8912 | 142 | eprintf "Hashed : %8d files %6.2f Gb %6.2f wall sec %6.2f proc sec\n%!" |
1013fbcd SK |
143 | !(t.hashed_files) |
144 | (b_to_gb !(t.hashed_bytes)) | |
217f8912 SK |
145 | wall_time_group_by_digest |
146 | proc_time_group_by_digest; | |
1013fbcd SK |
147 | eprintf "Digests : %8d\n%!" |
148 | !(t.digests); | |
389dccaf SK |
149 | eprintf "Duplicates (Hashed - Digests): %8d files %6.2f Gb\n%!" |
150 | (!(t.hashed_files) - !(t.digests)) | |
151 | (b_to_gb !(t.redundant_data)); | |
1013fbcd | 152 | eprintf "Skipped due to 0 size : %8d files\n%!" !(t.empty); |
217f8912 | 153 | eprintf "Skipped due to unique size : %8d files %6.2f Gb %6.2f wall sec %6.2f proc sec\n%!" |
1013fbcd SK |
154 | !(t.unique_size_files) |
155 | (b_to_gb !(t.unique_size_bytes)) | |
217f8912 SK |
156 | wall_time_group_by_size |
157 | proc_time_group_by_size; | |
158 | eprintf "Skipped due to unique sample : %8d files %6.2f Gb %6.2f wall sec %6.2f proc sec\n%!" | |
1013fbcd SK |
159 | !(t.unique_sample_files) |
160 | (b_to_gb !(t.unique_sample_bytes)) | |
217f8912 SK |
161 | wall_time_group_by_head |
162 | proc_time_group_by_head; | |
1013fbcd SK |
163 | eprintf "Ignored due to regex match : %8d files %6.2f Gb\n%!" |
164 | !(t.ignored_files) | |
165 | (b_to_gb !(t.ignored_bytes)) | |
166 | end | |
167 | ||
168 | module M = Metrics | |
169 | ||
cce97c27 | 170 | module Stream : sig |
948ee900 | 171 | type 'a t |
e13e9ef5 SK |
172 | |
173 | val create : (unit -> 'a option) -> 'a t | |
174 | ||
1013fbcd SK |
175 | val of_queue : 'a Queue.t -> 'a t |
176 | ||
948ee900 | 177 | val iter : 'a t -> f:('a -> unit) -> unit |
8673c3a5 | 178 | |
7c17443d SK |
179 | val bag_map : 'a t -> njobs:int -> f:('a -> 'b) -> ('a * 'b) t |
180 | (** Parallel map with arbitrarily-reordered elements. *) | |
181 | ||
a9a56d74 SK |
182 | val map : 'a t -> f:('a -> 'b) -> 'b t |
183 | ||
184 | val filter : 'a t -> f:('a -> bool) -> 'a t | |
185 | ||
8673c3a5 | 186 | val concat : ('a t) list -> 'a t |
1013fbcd SK |
187 | |
188 | val group_by : 'a t -> f:('a -> 'b) -> ('b * int * 'a list) t | |
cce97c27 SK |
189 | end = struct |
190 | module S = Stream | |
191 | ||
948ee900 | 192 | type 'a t = |
a9a56d74 | 193 | {mutable streams : ('a S.t) list} |
948ee900 | 194 | |
7c17443d SK |
195 | type ('input, 'output) msg_from_vassal = |
196 | | Ready of int | |
197 | | Result of (int * ('input * 'output)) | |
198 | | Exiting of int | |
199 | ||
200 | type 'input msg_from_lord = | |
201 | | Job of 'input option | |
202 | ||
e13e9ef5 | 203 | let create f = |
a9a56d74 SK |
204 | {streams = [S.from (fun _ -> f ())]} |
205 | ||
1013fbcd SK |
206 | let of_queue q = |
207 | create (fun () -> | |
208 | match Queue.take q with | |
209 | | exception Queue.Empty -> | |
210 | None | |
211 | | x -> | |
212 | Some x | |
213 | ) | |
214 | ||
a9a56d74 SK |
215 | let rec next t = |
216 | match t.streams with | |
217 | | [] -> | |
218 | None | |
219 | | s :: streams -> | |
220 | (match S.next s with | |
221 | | exception Stream.Failure -> | |
222 | t.streams <- streams; | |
223 | next t | |
224 | | x -> | |
225 | Some x | |
226 | ) | |
227 | ||
228 | let map t ~f = | |
229 | create (fun () -> | |
230 | match next t with | |
231 | | None -> None | |
232 | | Some x -> Some (f x) | |
233 | ) | |
234 | ||
235 | let filter t ~f = | |
236 | let rec filter () = | |
237 | match next t with | |
238 | | None -> | |
239 | None | |
240 | | Some x when f x -> | |
241 | Some x | |
242 | | Some _ -> | |
243 | filter () | |
244 | in | |
245 | create filter | |
e13e9ef5 SK |
246 | |
247 | let iter t ~f = | |
a9a56d74 | 248 | List.iter t.streams ~f:(S.iter f) |
8673c3a5 SK |
249 | |
250 | let concat ts = | |
a9a56d74 | 251 | {streams = List.concat (List.map ts ~f:(fun {streams} -> streams))} |
1013fbcd SK |
252 | |
253 | let group_by t ~f = | |
254 | let groups_tbl = Hashtbl.create 1_000_000 in | |
255 | let group_update x = | |
256 | let group = f x in | |
257 | let members = | |
258 | match Hashtbl.find_opt groups_tbl group with | |
259 | | None -> | |
260 | (1, [x]) | |
261 | | Some (n, xs) -> | |
262 | (succ n, x :: xs) | |
263 | in | |
264 | Hashtbl.replace groups_tbl group members | |
265 | in | |
266 | iter t ~f:group_update; | |
267 | let groups = Queue.create () in | |
268 | Hashtbl.iter | |
269 | (fun name (length, members) -> Queue.add (name, length, members) groups) | |
270 | groups_tbl; | |
271 | of_queue groups | |
7c17443d SK |
272 | |
273 | module Ipc : sig | |
274 | val send : out_channel -> 'a -> unit | |
275 | val recv : in_channel -> 'a | |
276 | end = struct | |
277 | let send oc msg = | |
278 | Marshal.to_channel oc msg []; | |
279 | flush oc | |
280 | ||
281 | let recv ic = | |
282 | Marshal.from_channel ic | |
283 | end | |
284 | ||
285 | let lord t ~njobs ~vassals ~ic ~ocs = | |
7c17443d SK |
286 | let active_vassals = ref njobs in |
287 | let results = Queue.create () in | |
288 | let rec dispatch () = | |
289 | match Ipc.recv ic with | |
290 | | ((Exiting i) : ('input, 'output) msg_from_vassal) -> | |
291 | close_out ocs.(i); | |
292 | decr active_vassals; | |
293 | if !active_vassals = 0 then | |
294 | () | |
295 | else | |
296 | dispatch () | |
297 | | ((Ready i) : ('input, 'output) msg_from_vassal) -> | |
298 | Ipc.send ocs.(i) (Job (next t)); | |
299 | dispatch () | |
300 | | ((Result (i, result)) : ('input, 'output) msg_from_vassal) -> | |
301 | Queue.add result results; | |
302 | Ipc.send ocs.(i) (Job (next t)); | |
303 | dispatch () | |
304 | in | |
305 | let rec wait = function | |
306 | | [] -> () | |
307 | | vassals -> | |
308 | let pid, _process_status = Unix.wait () in | |
309 | (* TODO: handle process_status *) | |
310 | wait (List.filter vassals ~f:(fun p -> p <> pid)) | |
311 | in | |
312 | dispatch (); | |
313 | close_in ic; | |
314 | wait vassals; | |
315 | of_queue results | |
316 | ||
317 | let vassal i ~f ~vassal_pipe_r ~lord_pipe_w = | |
7c17443d SK |
318 | let ic = Unix.in_channel_of_descr vassal_pipe_r in |
319 | let oc = Unix.out_channel_of_descr lord_pipe_w in | |
320 | let rec work msg = | |
321 | Ipc.send oc msg; | |
322 | match Ipc.recv ic with | |
323 | | (Job (Some x) : 'input msg_from_lord) -> | |
324 | work (Result (i, (x, f x))) | |
325 | | (Job None : 'input msg_from_lord) -> | |
326 | Ipc.send oc (Exiting i) | |
327 | in | |
328 | work (Ready i); | |
329 | close_in ic; | |
330 | close_out oc; | |
331 | exit 0 | |
332 | ||
333 | let bag_map t ~njobs ~f = | |
334 | let lord_pipe_r, lord_pipe_w = Unix.pipe () in | |
335 | let vassal_pipes = Array.init njobs ~f:(fun _ -> Unix.pipe ()) in | |
336 | let vassal_pipes_r = Array.map vassal_pipes ~f:(fun (r, _) -> r) in | |
337 | let vassal_pipes_w = Array.map vassal_pipes ~f:(fun (_, w) -> w) in | |
338 | let vassals = ref [] in | |
339 | for i=0 to (njobs - 1) do | |
340 | begin match Unix.fork () with | |
341 | | 0 -> | |
342 | Unix.close lord_pipe_r; | |
343 | vassal i ~f ~lord_pipe_w ~vassal_pipe_r:vassal_pipes_r.(i) | |
344 | | pid -> | |
345 | vassals := pid :: !vassals | |
346 | end | |
347 | done; | |
348 | Unix.close lord_pipe_w; | |
349 | lord | |
350 | t | |
351 | ~njobs | |
352 | ~vassals:!vassals | |
353 | ~ic:(Unix.in_channel_of_descr lord_pipe_r) | |
354 | ~ocs:(Array.map vassal_pipes_w ~f:Unix.out_channel_of_descr) | |
e13e9ef5 SK |
355 | end |
356 | ||
357 | module In_channel : sig | |
358 | val lines : in_channel -> string Stream.t | |
359 | end = struct | |
360 | let lines ic = | |
361 | Stream.create (fun () -> | |
362 | match input_line ic with | |
363 | | exception End_of_file -> | |
364 | None | |
365 | | line -> | |
366 | Some line | |
367 | ) | |
368 | end | |
369 | ||
a9a56d74 SK |
370 | module File : sig |
371 | type t = | |
372 | { path : string | |
373 | ; size : int | |
374 | } | |
375 | ||
376 | val find : string -> t Stream.t | |
377 | (** Find all files in the directory tree, starting from the given root path *) | |
378 | ||
379 | val lookup : string Stream.t -> t Stream.t | |
380 | (** Lookup file info for given paths *) | |
5c0100d2 | 381 | |
eb6d0f38 SK |
382 | val head : t -> len:int -> metrics:M.t -> string |
383 | ||
1013fbcd SK |
384 | val filter_out_unique_sizes : t Stream.t -> metrics:M.t -> t Stream.t |
385 | val filter_out_unique_heads : t Stream.t -> len:int -> metrics:M.t -> t Stream.t | |
e13e9ef5 | 386 | end = struct |
a9a56d74 SK |
387 | type t = |
388 | { path : string | |
389 | ; size : int | |
390 | } | |
391 | ||
392 | let lookup paths = | |
393 | Stream.map paths ~f:(fun path -> | |
394 | let {Unix.st_size = size; _} = Unix.lstat path in | |
395 | {path; size} | |
396 | ) | |
397 | ||
398 | let find root = | |
948ee900 SK |
399 | let dirs = Queue.create () in |
400 | let files = Queue.create () in | |
948ee900 SK |
401 | let explore parent = |
402 | Array.iter (Sys.readdir parent) ~f:(fun child -> | |
403 | let path = Filename.concat parent child in | |
a9a56d74 | 404 | let {Unix.st_kind = file_kind; st_size; _} = Unix.lstat path in |
948ee900 SK |
405 | match file_kind with |
406 | | Unix.S_REG -> | |
a9a56d74 SK |
407 | let file = {path; size = st_size} in |
408 | Queue.add file files | |
948ee900 SK |
409 | | Unix.S_DIR -> |
410 | Queue.add path dirs | |
411 | | Unix.S_CHR | |
412 | | Unix.S_BLK | |
413 | | Unix.S_LNK | |
414 | | Unix.S_FIFO | |
415 | | Unix.S_SOCK -> | |
416 | () | |
417 | ) | |
418 | in | |
1f130f74 | 419 | explore root; |
c66266c6 SK |
420 | let rec next () = |
421 | match Queue.is_empty files, Queue.is_empty dirs with | |
422 | | false, _ -> Some (Queue.take files) | |
423 | | true , true -> None | |
424 | | true , false -> | |
425 | explore (Queue.take dirs); | |
426 | next () | |
427 | in | |
428 | Stream.create next | |
5c0100d2 | 429 | |
1013fbcd SK |
430 | let filter_out_singletons files ~group ~handle_singleton = |
431 | let q = Queue.create () in | |
432 | Stream.iter (Stream.group_by files ~f:group) ~f:(fun group -> | |
433 | let (_, n, members) = group in | |
434 | if n > 1 then | |
435 | List.iter members ~f:(fun m -> Queue.add m q) | |
436 | else | |
437 | handle_singleton group | |
438 | ); | |
439 | Stream.of_queue q | |
440 | ||
441 | let filter_out_unique_sizes files ~metrics = | |
442 | filter_out_singletons | |
443 | files | |
444 | ~group:(fun {size; _} -> size) | |
445 | ~handle_singleton:(fun (size, _, _) -> M.file_unique_size metrics ~size) | |
446 | ||
eb6d0f38 SK |
447 | let head {path; _} ~len ~metrics = |
448 | M.file_sampled metrics; | |
1013fbcd SK |
449 | let buf = Bytes.make len ' ' in |
450 | let ic = open_in_bin path in | |
451 | let rec read pos len = | |
452 | assert (len >= 0); | |
453 | if len = 0 then | |
454 | () | |
455 | else begin | |
456 | let chunk_size = input ic buf pos len in | |
457 | M.chunk_read metrics ~size:chunk_size; | |
458 | if chunk_size = 0 then (* EOF *) | |
459 | () | |
460 | else | |
461 | read (pos + chunk_size) (len - chunk_size) | |
462 | end | |
463 | in | |
464 | read 0 len; | |
465 | close_in ic; | |
466 | Bytes.to_string buf | |
467 | ||
468 | let filter_out_unique_heads files ~len ~metrics = | |
469 | filter_out_singletons | |
470 | files | |
eb6d0f38 | 471 | ~group:(head ~len ~metrics) |
1013fbcd SK |
472 | ~handle_singleton:(fun (_, _, files) -> |
473 | let {size; _} = List.hd files in (* Guaranteed non-empty *) | |
474 | M.file_unique_sample metrics ~size | |
475 | ) | |
cce97c27 SK |
476 | end |
477 | ||
948ee900 | 478 | type input = |
cfcdf90a SK |
479 | | Stdin |
480 | | Directories of string list | |
948ee900 | 481 | |
e09dff7f SK |
482 | type output = |
483 | | Stdout | |
484 | | Directory of string | |
485 | ||
1253df34 SK |
486 | type opt = |
487 | { input : input | |
488 | ; output : output | |
9d01fa28 | 489 | ; ignore : string -> bool |
7a0486be | 490 | ; sample : int |
7c17443d | 491 | ; njobs : int |
1253df34 SK |
492 | } |
493 | ||
1013fbcd | 494 | let make_input_stream input ignore ~metrics = |
a9a56d74 SK |
495 | let input = |
496 | match input with | |
497 | | Stdin -> | |
498 | File.lookup (In_channel.lines stdin) | |
499 | | Directories paths -> | |
500 | let paths = StrSet.elements (StrSet.of_list paths) in | |
501 | Stream.concat (List.map paths ~f:File.find) | |
502 | in | |
503 | Stream.filter input ~f:(fun {File.path; size} -> | |
1013fbcd | 504 | M.file_considered metrics ~size; |
a9a56d74 | 505 | let empty = size = 0 in |
9d01fa28 | 506 | let ignored = ignore path in |
1013fbcd | 507 | if empty then M.file_empty metrics; |
9d01fa28 | 508 | if ignored then M.file_ignored metrics ~size; |
a9a56d74 SK |
509 | (not empty) && (not ignored) |
510 | ) | |
e09dff7f SK |
511 | |
512 | let make_output_fun = function | |
513 | | Stdout -> | |
5c0100d2 SK |
514 | fun digest n_files files -> |
515 | printf "%s %d\n%!" (Digest.to_hex digest) n_files; | |
1013fbcd | 516 | List.iter files ~f:(fun {File.path; _} -> |
5c0100d2 SK |
517 | printf " %S\n%!" path |
518 | ) | |
e09dff7f | 519 | | Directory dir -> |
5c0100d2 | 520 | fun digest _ files -> |
e09dff7f SK |
521 | let digest = Digest.to_hex digest in |
522 | let dir = Filename.concat dir (String.sub digest 0 2) in | |
523 | Unix.mkdir dir ~perm:0o700; | |
524 | let oc = open_out (Filename.concat dir digest) in | |
1013fbcd | 525 | List.iter files ~f:(fun {File.path; _} -> |
e09dff7f SK |
526 | output_string oc (sprintf "%S\n%!" path) |
527 | ); | |
528 | close_out oc | |
529 | ||
217f8912 | 530 | let time_wall () = |
7c17443d SK |
531 | Unix.gettimeofday () |
532 | ||
217f8912 SK |
533 | let time_proc () = |
534 | Sys.time () | |
535 | ||
7c17443d | 536 | let main {input; output; ignore; sample = sample_len; njobs} = |
217f8912 SK |
537 | let wt0_all = time_wall () in |
538 | let pt0_all = time_proc () in | |
1013fbcd | 539 | let metrics = M.init () in |
e09dff7f | 540 | let output = make_output_fun output in |
1013fbcd | 541 | let input = make_input_stream input ignore ~metrics in |
8c54ccb8 SK |
542 | (* TODO: Make a nice(r) abstraction to re-assemble pieces in the pipeline: |
543 | * | |
544 | * from input to files_by_size | |
545 | * from files_by_size to files_by_sample | |
546 | * from files_by_sample to files_by_digest | |
547 | * from files_by_digest to output | |
548 | * | |
549 | * input |> files_by_size |> files_by_sample |> files_by_digest |> output | |
550 | *) | |
1013fbcd SK |
551 | |
552 | let files = input in | |
553 | ||
217f8912 SK |
554 | let wt0_group_by_size = time_wall () in |
555 | let pt0_group_by_size = time_proc () in | |
7c17443d | 556 | eprintf "[debug] filtering-out files with unique size\n%!"; |
1013fbcd | 557 | let files = File.filter_out_unique_sizes files ~metrics in |
217f8912 SK |
558 | let pt1_group_by_size = time_proc () in |
559 | let wt1_group_by_size = time_wall () in | |
1013fbcd | 560 | |
217f8912 SK |
561 | let wt0_group_by_sample = wt1_group_by_size in |
562 | let pt0_group_by_sample = pt1_group_by_size in | |
7c17443d | 563 | eprintf "[debug] filtering-out files with unique heads\n%!"; |
eb6d0f38 SK |
564 | let files = |
565 | if njobs > 1 then begin | |
566 | let q = Queue.create () in | |
567 | files | |
568 | |> Stream.bag_map ~njobs ~f:(File.head ~len:sample_len ~metrics) | |
569 | |> Stream.group_by ~f:snd | |
570 | |> Stream.map ~f:(fun (d, n, pairs) -> (d, n, List.map pairs ~f:fst)) | |
571 | |> Stream.filter ~f:(fun (_, n, _) -> n > 1) | |
572 | |> Stream.iter ~f:(fun (_, _, fs) -> List.iter fs ~f:(fun f -> Queue.add f q)) | |
573 | ; | |
574 | Stream.of_queue q | |
575 | end else | |
576 | File.filter_out_unique_heads files ~len:sample_len ~metrics | |
577 | in | |
217f8912 SK |
578 | let pt1_group_by_sample = time_proc () in |
579 | let wt1_group_by_sample = time_wall () in | |
1013fbcd | 580 | |
217f8912 SK |
581 | let wt0_group_by_digest = wt1_group_by_sample in |
582 | let pt0_group_by_digest = pt1_group_by_sample in | |
7c17443d | 583 | eprintf "[debug] hashing\n%!"; |
1013fbcd | 584 | let groups = |
7c17443d | 585 | if njobs > 1 then |
eb6d0f38 | 586 | let with_digests = |
7c17443d SK |
587 | Stream.bag_map files ~njobs ~f:(fun {File.path; _} -> Digest.file path) |
588 | in | |
eb6d0f38 | 589 | Stream.map (Stream.group_by with_digests ~f:snd) ~f:( |
7c17443d SK |
590 | fun (digest, n, file_digest_pairs) -> |
591 | let files = | |
592 | List.map file_digest_pairs ~f:(fun (file, _) -> | |
593 | M.file_hashed metrics ~size:file.File.size; | |
594 | file | |
595 | ) | |
596 | in | |
597 | (digest, n, files) | |
598 | ) | |
599 | else | |
600 | Stream.group_by files ~f:(fun {File.path; size} -> | |
601 | M.file_hashed metrics ~size; | |
602 | Digest.file path | |
603 | ) | |
1013fbcd | 604 | in |
217f8912 SK |
605 | let pt1_group_by_digest = time_proc () in |
606 | let wt1_group_by_digest = time_wall () in | |
1013fbcd | 607 | |
7c17443d | 608 | eprintf "[debug] reporting\n%!"; |
1013fbcd SK |
609 | Stream.iter groups ~f:(fun (d, n, files) -> |
610 | M.digest metrics; | |
389dccaf SK |
611 | if n > 1 then |
612 | M.redundant_data metrics ~size:(n * (List.hd files).File.size); | |
613 | output d n files | |
1013fbcd SK |
614 | ); |
615 | ||
217f8912 SK |
616 | let pt1_all = time_proc () in |
617 | let wt1_all = time_wall () in | |
1013fbcd SK |
618 | |
619 | M.report metrics | |
217f8912 SK |
620 | ~wall_time_all: (wt1_all -. wt0_all) |
621 | ~wall_time_group_by_size: (wt1_group_by_size -. wt0_group_by_size) | |
622 | ~wall_time_group_by_head: (wt1_group_by_sample -. wt0_group_by_sample) | |
623 | ~wall_time_group_by_digest:(wt1_group_by_digest -. wt0_group_by_digest) | |
624 | ~proc_time_all: (pt1_all -. pt0_all) | |
625 | ~proc_time_group_by_size: (pt1_group_by_size -. pt0_group_by_size) | |
626 | ~proc_time_group_by_head: (pt1_group_by_sample -. pt0_group_by_sample) | |
627 | ~proc_time_group_by_digest:(pt1_group_by_digest -. pt0_group_by_digest) | |
cce97c27 | 628 | |
1253df34 SK |
629 | let get_opt () : opt = |
630 | let assert_ test x msg = | |
631 | if not (test x) then begin | |
632 | eprintf "%s\n%!" msg; | |
e09dff7f SK |
633 | exit 1 |
634 | end | |
635 | in | |
1253df34 SK |
636 | let assert_file_exists path = |
637 | assert_ Sys.file_exists path (sprintf "File does not exist: %S" path) | |
638 | in | |
e09dff7f | 639 | let assert_file_is_dir path = |
1253df34 | 640 | assert_ Sys.is_directory path (sprintf "File is not a directory: %S" path) |
e09dff7f | 641 | in |
1253df34 SK |
642 | let input = ref Stdin in |
643 | let output = ref Stdout in | |
9d01fa28 | 644 | let ignore = ref (fun _ -> false) in |
7c17443d | 645 | let sample = ref 512 in |
eb6d0f38 | 646 | let njobs = ref 6 in |
e09dff7f SK |
647 | let spec = |
648 | [ ( "-out" | |
649 | , Arg.String (fun path -> | |
650 | assert_file_exists path; | |
651 | assert_file_is_dir path; | |
652 | output := Directory path | |
8673c3a5 | 653 | ) |
e09dff7f SK |
654 | , " Output to this directory instead of stdout." |
655 | ) | |
34107832 | 656 | ; ( "-ignore" |
9d01fa28 SK |
657 | , Arg.String (fun regexp -> |
658 | let regexp = Str.regexp regexp in | |
659 | ignore := fun string -> Str.string_match regexp string 0) | |
34107832 SK |
660 | , " Ignore file paths which match this regexp pattern (see Str module)." |
661 | ) | |
7a0486be SK |
662 | ; ( "-sample" |
663 | , Arg.Set_int sample | |
664 | , (sprintf " Byte size of file samples to use. Default: %d" !sample) | |
665 | ) | |
7c17443d SK |
666 | ; ( "-j" |
667 | , Arg.Set_int njobs | |
668 | , (sprintf " Number of parallel jobs. Default: %d" !njobs) | |
669 | ) | |
e09dff7f SK |
670 | ] |
671 | in | |
672 | Arg.parse | |
673 | (Arg.align spec) | |
674 | (fun path -> | |
675 | assert_file_exists path; | |
676 | assert_file_is_dir path; | |
677 | match !input with | |
cfcdf90a SK |
678 | | Stdin -> |
679 | input := Directories [path] | |
680 | | Directories paths -> | |
681 | input := Directories (path :: paths) | |
61a05dbb SK |
682 | ) |
683 | ""; | |
7a0486be SK |
684 | assert_ |
685 | (fun x -> x > 0) | |
686 | !sample | |
687 | (sprintf "Sample size cannot be negative: %d" !sample); | |
1253df34 SK |
688 | { input = !input |
689 | ; output = !output | |
690 | ; ignore = !ignore | |
7a0486be | 691 | ; sample = !sample |
7c17443d | 692 | ; njobs = !njobs |
1253df34 SK |
693 | } |
694 | ||
695 | let () = | |
696 | main (get_opt ()) |