Commit | Line | Data |
---|---|---|
cce97c27 SK |
1 | open Printf |
2 | ||
948ee900 SK |
3 | module Array = ArrayLabels |
4 | module List = ListLabels | |
5c0100d2 | 5 | module StrSet = Set.Make(String) |
e09dff7f | 6 | module Unix = UnixLabels |
cce97c27 | 7 | |
1013fbcd SK |
8 | module Metrics : sig |
9 | type t | |
10 | ||
11 | val init | |
12 | : unit -> t | |
13 | val report | |
14 | : t | |
217f8912 SK |
15 | -> wall_time_all:float |
16 | -> wall_time_group_by_size:float | |
17 | -> wall_time_group_by_head:float | |
18 | -> wall_time_group_by_digest:float | |
19 | -> proc_time_all:float | |
20 | -> proc_time_group_by_size:float | |
21 | -> proc_time_group_by_head:float | |
22 | -> proc_time_group_by_digest:float | |
1013fbcd SK |
23 | -> unit |
24 | ||
25 | val file_considered | |
26 | : t -> size:int -> unit | |
27 | val file_ignored | |
28 | : t -> size:int -> unit | |
29 | val file_empty | |
30 | : t -> unit | |
31 | val file_sampled | |
32 | : t -> unit | |
33 | val chunk_read | |
34 | : t -> size:int -> unit | |
35 | val file_unique_size | |
36 | : t -> size:int -> unit | |
37 | val file_unique_sample | |
38 | : t -> size:int -> unit | |
39 | val file_hashed | |
40 | : t -> size:int -> unit | |
41 | val digest | |
42 | : t -> unit | |
389dccaf SK |
43 | val redundant_data |
44 | : t -> size:int -> unit | |
1013fbcd SK |
45 | end = struct |
46 | type t = | |
47 | { considered_files : int ref | |
48 | ; considered_bytes : int ref | |
49 | ; empty : int ref | |
50 | ; ignored_files : int ref | |
51 | ; ignored_bytes : int ref | |
52 | ; unique_size_files : int ref | |
53 | ; unique_size_bytes : int ref | |
54 | ; unique_sample_files : int ref | |
55 | ; unique_sample_bytes : int ref | |
56 | ; sampled_files : int ref | |
57 | ; sampled_bytes : int ref | |
58 | ; hashed_files : int ref | |
59 | ; hashed_bytes : int ref | |
60 | ; digests : int ref | |
389dccaf | 61 | ; redundant_data : int ref |
1013fbcd SK |
62 | } |
63 | ||
64 | let init () = | |
65 | { considered_files = ref 0 | |
66 | ; considered_bytes = ref 0 | |
67 | ; empty = ref 0 | |
68 | ; ignored_files = ref 0 | |
69 | ; ignored_bytes = ref 0 | |
70 | ; unique_size_files = ref 0 | |
71 | ; unique_size_bytes = ref 0 | |
72 | ; sampled_files = ref 0 | |
73 | ; sampled_bytes = ref 0 | |
74 | ; hashed_files = ref 0 | |
75 | ; hashed_bytes = ref 0 | |
76 | ; unique_sample_files = ref 0 | |
77 | ; unique_sample_bytes = ref 0 | |
78 | ; digests = ref 0 | |
389dccaf | 79 | ; redundant_data = ref 0 |
1013fbcd SK |
80 | } |
81 | ||
82 | let add sum addend = | |
83 | sum := !sum + addend | |
84 | ||
85 | let file_considered t ~size = | |
86 | incr t.considered_files; | |
87 | add t.considered_bytes size | |
88 | ||
89 | let file_ignored {ignored_files; ignored_bytes; _} ~size = | |
90 | incr ignored_files; | |
91 | add ignored_bytes size | |
92 | ||
93 | let file_empty t = | |
94 | incr t.empty | |
95 | ||
96 | let chunk_read t ~size = | |
97 | add t.sampled_bytes size | |
98 | ||
99 | let file_sampled t = | |
100 | incr t.sampled_files | |
101 | ||
102 | let file_unique_size t ~size = | |
103 | incr t.unique_size_files; | |
104 | add t.unique_size_bytes size | |
105 | ||
106 | let file_unique_sample t ~size = | |
107 | incr t.unique_sample_files; | |
108 | add t.unique_sample_bytes size | |
109 | ||
110 | let file_hashed t ~size = | |
111 | incr t.hashed_files; | |
112 | add t.hashed_bytes size | |
113 | ||
114 | let digest t = | |
115 | incr t.digests | |
116 | ||
389dccaf SK |
117 | let redundant_data t ~size = |
118 | add t.redundant_data size | |
119 | ||
1013fbcd SK |
120 | let report |
121 | t | |
217f8912 SK |
122 | ~wall_time_all |
123 | ~wall_time_group_by_size | |
124 | ~wall_time_group_by_head | |
125 | ~wall_time_group_by_digest | |
126 | ~proc_time_all | |
127 | ~proc_time_group_by_size | |
128 | ~proc_time_group_by_head | |
129 | ~proc_time_group_by_digest | |
1013fbcd SK |
130 | = |
131 | let b_to_mb b = (float_of_int b) /. 1024. /. 1024. in | |
132 | let b_to_gb b = (b_to_mb b) /. 1024. in | |
217f8912 SK |
133 | eprintf "Total time : %.2f wall sec %.2f proc sec\n%!" |
134 | wall_time_all | |
135 | proc_time_all; | |
1013fbcd SK |
136 | eprintf "Considered : %8d files %6.2f Gb\n%!" |
137 | !(t.considered_files) | |
138 | (b_to_gb !(t.considered_bytes)); | |
139 | eprintf "Sampled : %8d files %6.2f Gb\n%!" | |
140 | !(t.sampled_files) | |
141 | (b_to_gb !(t.sampled_bytes)); | |
217f8912 | 142 | eprintf "Hashed : %8d files %6.2f Gb %6.2f wall sec %6.2f proc sec\n%!" |
1013fbcd SK |
143 | !(t.hashed_files) |
144 | (b_to_gb !(t.hashed_bytes)) | |
217f8912 SK |
145 | wall_time_group_by_digest |
146 | proc_time_group_by_digest; | |
1013fbcd SK |
147 | eprintf "Digests : %8d\n%!" |
148 | !(t.digests); | |
389dccaf SK |
149 | eprintf "Duplicates (Hashed - Digests): %8d files %6.2f Gb\n%!" |
150 | (!(t.hashed_files) - !(t.digests)) | |
151 | (b_to_gb !(t.redundant_data)); | |
1013fbcd | 152 | eprintf "Skipped due to 0 size : %8d files\n%!" !(t.empty); |
217f8912 | 153 | eprintf "Skipped due to unique size : %8d files %6.2f Gb %6.2f wall sec %6.2f proc sec\n%!" |
1013fbcd SK |
154 | !(t.unique_size_files) |
155 | (b_to_gb !(t.unique_size_bytes)) | |
217f8912 SK |
156 | wall_time_group_by_size |
157 | proc_time_group_by_size; | |
158 | eprintf "Skipped due to unique sample : %8d files %6.2f Gb %6.2f wall sec %6.2f proc sec\n%!" | |
1013fbcd SK |
159 | !(t.unique_sample_files) |
160 | (b_to_gb !(t.unique_sample_bytes)) | |
217f8912 SK |
161 | wall_time_group_by_head |
162 | proc_time_group_by_head; | |
1013fbcd SK |
163 | eprintf "Ignored due to regex match : %8d files %6.2f Gb\n%!" |
164 | !(t.ignored_files) | |
165 | (b_to_gb !(t.ignored_bytes)) | |
166 | end | |
167 | ||
168 | module M = Metrics | |
169 | ||
cce97c27 | 170 | module Stream : sig |
948ee900 | 171 | type 'a t |
e13e9ef5 SK |
172 | |
173 | val create : (unit -> 'a option) -> 'a t | |
174 | ||
1013fbcd SK |
175 | val of_queue : 'a Queue.t -> 'a t |
176 | ||
948ee900 | 177 | val iter : 'a t -> f:('a -> unit) -> unit |
8673c3a5 | 178 | |
7c17443d SK |
179 | val bag_map : 'a t -> njobs:int -> f:('a -> 'b) -> ('a * 'b) t |
180 | (** Parallel map with arbitrarily-reordered elements. *) | |
181 | ||
a9a56d74 SK |
182 | val map : 'a t -> f:('a -> 'b) -> 'b t |
183 | ||
184 | val filter : 'a t -> f:('a -> bool) -> 'a t | |
185 | ||
8673c3a5 | 186 | val concat : ('a t) list -> 'a t |
1013fbcd SK |
187 | |
188 | val group_by : 'a t -> f:('a -> 'b) -> ('b * int * 'a list) t | |
cce97c27 SK |
189 | end = struct |
190 | module S = Stream | |
191 | ||
948ee900 | 192 | type 'a t = |
a9a56d74 | 193 | {mutable streams : ('a S.t) list} |
948ee900 | 194 | |
7c17443d SK |
195 | type ('input, 'output) msg_from_vassal = |
196 | | Ready of int | |
197 | | Result of (int * ('input * 'output)) | |
198 | | Exiting of int | |
199 | ||
200 | type 'input msg_from_lord = | |
201 | | Job of 'input option | |
202 | ||
e13e9ef5 | 203 | let create f = |
a9a56d74 SK |
204 | {streams = [S.from (fun _ -> f ())]} |
205 | ||
1013fbcd SK |
206 | let of_queue q = |
207 | create (fun () -> | |
208 | match Queue.take q with | |
209 | | exception Queue.Empty -> | |
210 | None | |
211 | | x -> | |
212 | Some x | |
213 | ) | |
214 | ||
a9a56d74 SK |
215 | let rec next t = |
216 | match t.streams with | |
217 | | [] -> | |
218 | None | |
219 | | s :: streams -> | |
220 | (match S.next s with | |
221 | | exception Stream.Failure -> | |
222 | t.streams <- streams; | |
223 | next t | |
224 | | x -> | |
225 | Some x | |
226 | ) | |
227 | ||
228 | let map t ~f = | |
229 | create (fun () -> | |
230 | match next t with | |
231 | | None -> None | |
232 | | Some x -> Some (f x) | |
233 | ) | |
234 | ||
235 | let filter t ~f = | |
236 | let rec filter () = | |
237 | match next t with | |
238 | | None -> | |
239 | None | |
240 | | Some x when f x -> | |
241 | Some x | |
242 | | Some _ -> | |
243 | filter () | |
244 | in | |
245 | create filter | |
e13e9ef5 SK |
246 | |
247 | let iter t ~f = | |
a9a56d74 | 248 | List.iter t.streams ~f:(S.iter f) |
8673c3a5 SK |
249 | |
250 | let concat ts = | |
a9a56d74 | 251 | {streams = List.concat (List.map ts ~f:(fun {streams} -> streams))} |
1013fbcd SK |
252 | |
253 | let group_by t ~f = | |
254 | let groups_tbl = Hashtbl.create 1_000_000 in | |
255 | let group_update x = | |
256 | let group = f x in | |
257 | let members = | |
258 | match Hashtbl.find_opt groups_tbl group with | |
259 | | None -> | |
260 | (1, [x]) | |
261 | | Some (n, xs) -> | |
262 | (succ n, x :: xs) | |
263 | in | |
264 | Hashtbl.replace groups_tbl group members | |
265 | in | |
266 | iter t ~f:group_update; | |
267 | let groups = Queue.create () in | |
268 | Hashtbl.iter | |
269 | (fun name (length, members) -> Queue.add (name, length, members) groups) | |
270 | groups_tbl; | |
271 | of_queue groups | |
7c17443d SK |
272 | |
273 | module Ipc : sig | |
274 | val send : out_channel -> 'a -> unit | |
275 | val recv : in_channel -> 'a | |
276 | end = struct | |
277 | let send oc msg = | |
278 | Marshal.to_channel oc msg []; | |
279 | flush oc | |
280 | ||
281 | let recv ic = | |
282 | Marshal.from_channel ic | |
283 | end | |
284 | ||
285 | let lord t ~njobs ~vassals ~ic ~ocs = | |
7c17443d SK |
286 | let active_vassals = ref njobs in |
287 | let results = Queue.create () in | |
df1a915e | 288 | let rec loop () = |
2a390e5a SK |
289 | match ((Ipc.recv ic) : ('input, 'output) msg_from_vassal) with |
290 | | Exiting i -> | |
7c17443d SK |
291 | close_out ocs.(i); |
292 | decr active_vassals; | |
df1a915e | 293 | if !active_vassals = 0 then () else loop () |
2a390e5a | 294 | | Ready i -> |
7c17443d | 295 | Ipc.send ocs.(i) (Job (next t)); |
df1a915e | 296 | loop () |
2a390e5a | 297 | | Result (i, result) -> |
7c17443d SK |
298 | Queue.add result results; |
299 | Ipc.send ocs.(i) (Job (next t)); | |
df1a915e | 300 | loop () |
7c17443d SK |
301 | in |
302 | let rec wait = function | |
df1a915e SK |
303 | | [] -> |
304 | () | |
7c17443d SK |
305 | | vassals -> |
306 | let pid, _process_status = Unix.wait () in | |
307 | (* TODO: handle process_status *) | |
308 | wait (List.filter vassals ~f:(fun p -> p <> pid)) | |
309 | in | |
df1a915e | 310 | loop (); |
7c17443d SK |
311 | close_in ic; |
312 | wait vassals; | |
313 | of_queue results | |
314 | ||
315 | let vassal i ~f ~vassal_pipe_r ~lord_pipe_w = | |
7c17443d SK |
316 | let ic = Unix.in_channel_of_descr vassal_pipe_r in |
317 | let oc = Unix.out_channel_of_descr lord_pipe_w in | |
df1a915e | 318 | let rec loop () = |
2a390e5a SK |
319 | match (Ipc.recv ic : 'input msg_from_lord) with |
320 | | Job (Some x) -> | |
df1a915e SK |
321 | Ipc.send oc (Result (i, (x, f x))); |
322 | loop () | |
2a390e5a | 323 | | Job None -> |
7c17443d SK |
324 | Ipc.send oc (Exiting i) |
325 | in | |
df1a915e SK |
326 | Ipc.send oc (Ready i); |
327 | loop (); | |
7c17443d SK |
328 | close_in ic; |
329 | close_out oc; | |
330 | exit 0 | |
331 | ||
332 | let bag_map t ~njobs ~f = | |
333 | let lord_pipe_r, lord_pipe_w = Unix.pipe () in | |
334 | let vassal_pipes = Array.init njobs ~f:(fun _ -> Unix.pipe ()) in | |
335 | let vassal_pipes_r = Array.map vassal_pipes ~f:(fun (r, _) -> r) in | |
336 | let vassal_pipes_w = Array.map vassal_pipes ~f:(fun (_, w) -> w) in | |
337 | let vassals = ref [] in | |
338 | for i=0 to (njobs - 1) do | |
339 | begin match Unix.fork () with | |
340 | | 0 -> | |
341 | Unix.close lord_pipe_r; | |
342 | vassal i ~f ~lord_pipe_w ~vassal_pipe_r:vassal_pipes_r.(i) | |
343 | | pid -> | |
344 | vassals := pid :: !vassals | |
345 | end | |
346 | done; | |
347 | Unix.close lord_pipe_w; | |
348 | lord | |
349 | t | |
350 | ~njobs | |
351 | ~vassals:!vassals | |
352 | ~ic:(Unix.in_channel_of_descr lord_pipe_r) | |
353 | ~ocs:(Array.map vassal_pipes_w ~f:Unix.out_channel_of_descr) | |
e13e9ef5 SK |
354 | end |
355 | ||
356 | module In_channel : sig | |
6b873e5a | 357 | val lines : ?delim_null:bool -> in_channel -> string Stream.t |
e13e9ef5 | 358 | end = struct |
6b873e5a SK |
359 | let read_until_newline ic () = |
360 | match input_line ic with | |
361 | | exception End_of_file -> | |
362 | None | |
363 | | line -> | |
364 | Some line | |
365 | ||
366 | let read_until_null ic = | |
367 | let lexbuf = Lexing.from_channel ic in | |
368 | fun () -> Input_delim.by_null lexbuf | |
369 | ||
370 | let lines ?(delim_null=false) ic = | |
371 | let reader = | |
372 | if delim_null then | |
373 | read_until_null ic | |
374 | else | |
375 | read_until_newline ic | |
376 | in | |
377 | Stream.create reader | |
e13e9ef5 SK |
378 | end |
379 | ||
a9a56d74 SK |
380 | module File : sig |
381 | type t = | |
382 | { path : string | |
383 | ; size : int | |
384 | } | |
385 | ||
386 | val find : string -> t Stream.t | |
387 | (** Find all files in the directory tree, starting from the given root path *) | |
388 | ||
389 | val lookup : string Stream.t -> t Stream.t | |
390 | (** Lookup file info for given paths *) | |
5c0100d2 | 391 | |
eb6d0f38 SK |
392 | val head : t -> len:int -> metrics:M.t -> string |
393 | ||
1013fbcd SK |
394 | val filter_out_unique_sizes : t Stream.t -> metrics:M.t -> t Stream.t |
395 | val filter_out_unique_heads : t Stream.t -> len:int -> metrics:M.t -> t Stream.t | |
e13e9ef5 | 396 | end = struct |
a9a56d74 SK |
397 | type t = |
398 | { path : string | |
399 | ; size : int | |
400 | } | |
401 | ||
402 | let lookup paths = | |
403 | Stream.map paths ~f:(fun path -> | |
404 | let {Unix.st_size = size; _} = Unix.lstat path in | |
405 | {path; size} | |
406 | ) | |
407 | ||
408 | let find root = | |
948ee900 SK |
409 | let dirs = Queue.create () in |
410 | let files = Queue.create () in | |
948ee900 SK |
411 | let explore parent = |
412 | Array.iter (Sys.readdir parent) ~f:(fun child -> | |
413 | let path = Filename.concat parent child in | |
a9a56d74 | 414 | let {Unix.st_kind = file_kind; st_size; _} = Unix.lstat path in |
948ee900 SK |
415 | match file_kind with |
416 | | Unix.S_REG -> | |
a9a56d74 SK |
417 | let file = {path; size = st_size} in |
418 | Queue.add file files | |
948ee900 SK |
419 | | Unix.S_DIR -> |
420 | Queue.add path dirs | |
421 | | Unix.S_CHR | |
422 | | Unix.S_BLK | |
423 | | Unix.S_LNK | |
424 | | Unix.S_FIFO | |
425 | | Unix.S_SOCK -> | |
426 | () | |
427 | ) | |
428 | in | |
1f130f74 | 429 | explore root; |
c66266c6 SK |
430 | let rec next () = |
431 | match Queue.is_empty files, Queue.is_empty dirs with | |
432 | | false, _ -> Some (Queue.take files) | |
433 | | true , true -> None | |
434 | | true , false -> | |
435 | explore (Queue.take dirs); | |
436 | next () | |
437 | in | |
438 | Stream.create next | |
5c0100d2 | 439 | |
1013fbcd SK |
440 | let filter_out_singletons files ~group ~handle_singleton = |
441 | let q = Queue.create () in | |
442 | Stream.iter (Stream.group_by files ~f:group) ~f:(fun group -> | |
443 | let (_, n, members) = group in | |
444 | if n > 1 then | |
445 | List.iter members ~f:(fun m -> Queue.add m q) | |
446 | else | |
447 | handle_singleton group | |
448 | ); | |
449 | Stream.of_queue q | |
450 | ||
451 | let filter_out_unique_sizes files ~metrics = | |
452 | filter_out_singletons | |
453 | files | |
454 | ~group:(fun {size; _} -> size) | |
455 | ~handle_singleton:(fun (size, _, _) -> M.file_unique_size metrics ~size) | |
456 | ||
eb6d0f38 SK |
457 | let head {path; _} ~len ~metrics = |
458 | M.file_sampled metrics; | |
1013fbcd SK |
459 | let buf = Bytes.make len ' ' in |
460 | let ic = open_in_bin path in | |
461 | let rec read pos len = | |
462 | assert (len >= 0); | |
463 | if len = 0 then | |
464 | () | |
465 | else begin | |
466 | let chunk_size = input ic buf pos len in | |
467 | M.chunk_read metrics ~size:chunk_size; | |
468 | if chunk_size = 0 then (* EOF *) | |
469 | () | |
470 | else | |
471 | read (pos + chunk_size) (len - chunk_size) | |
472 | end | |
473 | in | |
474 | read 0 len; | |
475 | close_in ic; | |
476 | Bytes.to_string buf | |
477 | ||
478 | let filter_out_unique_heads files ~len ~metrics = | |
479 | filter_out_singletons | |
480 | files | |
eb6d0f38 | 481 | ~group:(head ~len ~metrics) |
1013fbcd SK |
482 | ~handle_singleton:(fun (_, _, files) -> |
483 | let {size; _} = List.hd files in (* Guaranteed non-empty *) | |
484 | M.file_unique_sample metrics ~size | |
485 | ) | |
cce97c27 SK |
486 | end |
487 | ||
948ee900 | 488 | type input = |
cfcdf90a SK |
489 | | Stdin |
490 | | Directories of string list | |
948ee900 | 491 | |
e09dff7f SK |
492 | type output = |
493 | | Stdout | |
494 | | Directory of string | |
495 | ||
1253df34 SK |
496 | type opt = |
497 | { input : input | |
498 | ; output : output | |
9d01fa28 | 499 | ; ignore : string -> bool |
7a0486be | 500 | ; sample : int |
7c17443d | 501 | ; njobs : int |
6b873e5a | 502 | ; delim_null : bool |
1253df34 SK |
503 | } |
504 | ||
6b873e5a | 505 | let make_input_stream input ignore ~metrics ~delim_null = |
a9a56d74 SK |
506 | let input = |
507 | match input with | |
508 | | Stdin -> | |
6b873e5a | 509 | File.lookup (In_channel.lines stdin ~delim_null) |
a9a56d74 SK |
510 | | Directories paths -> |
511 | let paths = StrSet.elements (StrSet.of_list paths) in | |
512 | Stream.concat (List.map paths ~f:File.find) | |
513 | in | |
514 | Stream.filter input ~f:(fun {File.path; size} -> | |
1013fbcd | 515 | M.file_considered metrics ~size; |
a9a56d74 | 516 | let empty = size = 0 in |
9d01fa28 | 517 | let ignored = ignore path in |
1013fbcd | 518 | if empty then M.file_empty metrics; |
9d01fa28 | 519 | if ignored then M.file_ignored metrics ~size; |
a9a56d74 SK |
520 | (not empty) && (not ignored) |
521 | ) | |
e09dff7f SK |
522 | |
523 | let make_output_fun = function | |
524 | | Stdout -> | |
5c0100d2 SK |
525 | fun digest n_files files -> |
526 | printf "%s %d\n%!" (Digest.to_hex digest) n_files; | |
1013fbcd | 527 | List.iter files ~f:(fun {File.path; _} -> |
5c0100d2 SK |
528 | printf " %S\n%!" path |
529 | ) | |
e09dff7f | 530 | | Directory dir -> |
5c0100d2 | 531 | fun digest _ files -> |
e09dff7f SK |
532 | let digest = Digest.to_hex digest in |
533 | let dir = Filename.concat dir (String.sub digest 0 2) in | |
534 | Unix.mkdir dir ~perm:0o700; | |
535 | let oc = open_out (Filename.concat dir digest) in | |
1013fbcd | 536 | List.iter files ~f:(fun {File.path; _} -> |
e09dff7f SK |
537 | output_string oc (sprintf "%S\n%!" path) |
538 | ); | |
539 | close_out oc | |
540 | ||
217f8912 | 541 | let time_wall () = |
7c17443d SK |
542 | Unix.gettimeofday () |
543 | ||
217f8912 SK |
544 | let time_proc () = |
545 | Sys.time () | |
546 | ||
6b873e5a | 547 | let main {input; output; ignore; sample = sample_len; njobs; delim_null} = |
217f8912 SK |
548 | let wt0_all = time_wall () in |
549 | let pt0_all = time_proc () in | |
1013fbcd | 550 | let metrics = M.init () in |
e09dff7f | 551 | let output = make_output_fun output in |
6b873e5a | 552 | let input = make_input_stream input ignore ~metrics ~delim_null in |
8c54ccb8 SK |
553 | (* TODO: Make a nice(r) abstraction to re-assemble pieces in the pipeline: |
554 | * | |
555 | * from input to files_by_size | |
556 | * from files_by_size to files_by_sample | |
557 | * from files_by_sample to files_by_digest | |
558 | * from files_by_digest to output | |
559 | * | |
560 | * input |> files_by_size |> files_by_sample |> files_by_digest |> output | |
561 | *) | |
1013fbcd SK |
562 | |
563 | let files = input in | |
564 | ||
217f8912 SK |
565 | let wt0_group_by_size = time_wall () in |
566 | let pt0_group_by_size = time_proc () in | |
7c17443d | 567 | eprintf "[debug] filtering-out files with unique size\n%!"; |
1013fbcd | 568 | let files = File.filter_out_unique_sizes files ~metrics in |
217f8912 SK |
569 | let pt1_group_by_size = time_proc () in |
570 | let wt1_group_by_size = time_wall () in | |
1013fbcd | 571 | |
217f8912 SK |
572 | let wt0_group_by_sample = wt1_group_by_size in |
573 | let pt0_group_by_sample = pt1_group_by_size in | |
7c17443d | 574 | eprintf "[debug] filtering-out files with unique heads\n%!"; |
eb6d0f38 SK |
575 | let files = |
576 | if njobs > 1 then begin | |
577 | let q = Queue.create () in | |
578 | files | |
579 | |> Stream.bag_map ~njobs ~f:(File.head ~len:sample_len ~metrics) | |
580 | |> Stream.group_by ~f:snd | |
581 | |> Stream.map ~f:(fun (d, n, pairs) -> (d, n, List.map pairs ~f:fst)) | |
582 | |> Stream.filter ~f:(fun (_, n, _) -> n > 1) | |
583 | |> Stream.iter ~f:(fun (_, _, fs) -> List.iter fs ~f:(fun f -> Queue.add f q)) | |
584 | ; | |
585 | Stream.of_queue q | |
586 | end else | |
587 | File.filter_out_unique_heads files ~len:sample_len ~metrics | |
588 | in | |
217f8912 SK |
589 | let pt1_group_by_sample = time_proc () in |
590 | let wt1_group_by_sample = time_wall () in | |
1013fbcd | 591 | |
217f8912 SK |
592 | let wt0_group_by_digest = wt1_group_by_sample in |
593 | let pt0_group_by_digest = pt1_group_by_sample in | |
7c17443d | 594 | eprintf "[debug] hashing\n%!"; |
1013fbcd | 595 | let groups = |
7c17443d | 596 | if njobs > 1 then |
eb6d0f38 | 597 | let with_digests = |
7c17443d SK |
598 | Stream.bag_map files ~njobs ~f:(fun {File.path; _} -> Digest.file path) |
599 | in | |
eb6d0f38 | 600 | Stream.map (Stream.group_by with_digests ~f:snd) ~f:( |
7c17443d SK |
601 | fun (digest, n, file_digest_pairs) -> |
602 | let files = | |
603 | List.map file_digest_pairs ~f:(fun (file, _) -> | |
604 | M.file_hashed metrics ~size:file.File.size; | |
605 | file | |
606 | ) | |
607 | in | |
608 | (digest, n, files) | |
609 | ) | |
610 | else | |
611 | Stream.group_by files ~f:(fun {File.path; size} -> | |
612 | M.file_hashed metrics ~size; | |
613 | Digest.file path | |
614 | ) | |
1013fbcd | 615 | in |
217f8912 SK |
616 | let pt1_group_by_digest = time_proc () in |
617 | let wt1_group_by_digest = time_wall () in | |
1013fbcd | 618 | |
7c17443d | 619 | eprintf "[debug] reporting\n%!"; |
1013fbcd SK |
620 | Stream.iter groups ~f:(fun (d, n, files) -> |
621 | M.digest metrics; | |
389dccaf SK |
622 | if n > 1 then |
623 | M.redundant_data metrics ~size:(n * (List.hd files).File.size); | |
624 | output d n files | |
1013fbcd SK |
625 | ); |
626 | ||
217f8912 SK |
627 | let pt1_all = time_proc () in |
628 | let wt1_all = time_wall () in | |
1013fbcd SK |
629 | |
630 | M.report metrics | |
217f8912 SK |
631 | ~wall_time_all: (wt1_all -. wt0_all) |
632 | ~wall_time_group_by_size: (wt1_group_by_size -. wt0_group_by_size) | |
633 | ~wall_time_group_by_head: (wt1_group_by_sample -. wt0_group_by_sample) | |
634 | ~wall_time_group_by_digest:(wt1_group_by_digest -. wt0_group_by_digest) | |
635 | ~proc_time_all: (pt1_all -. pt0_all) | |
636 | ~proc_time_group_by_size: (pt1_group_by_size -. pt0_group_by_size) | |
637 | ~proc_time_group_by_head: (pt1_group_by_sample -. pt0_group_by_sample) | |
638 | ~proc_time_group_by_digest:(pt1_group_by_digest -. pt0_group_by_digest) | |
cce97c27 | 639 | |
1253df34 SK |
640 | let get_opt () : opt = |
641 | let assert_ test x msg = | |
642 | if not (test x) then begin | |
643 | eprintf "%s\n%!" msg; | |
e09dff7f SK |
644 | exit 1 |
645 | end | |
646 | in | |
1253df34 SK |
647 | let assert_file_exists path = |
648 | assert_ Sys.file_exists path (sprintf "File does not exist: %S" path) | |
649 | in | |
e09dff7f | 650 | let assert_file_is_dir path = |
1253df34 | 651 | assert_ Sys.is_directory path (sprintf "File is not a directory: %S" path) |
e09dff7f | 652 | in |
1253df34 SK |
653 | let input = ref Stdin in |
654 | let output = ref Stdout in | |
9d01fa28 | 655 | let ignore = ref (fun _ -> false) in |
7c17443d | 656 | let sample = ref 512 in |
eb6d0f38 | 657 | let njobs = ref 6 in |
6b873e5a | 658 | let input_delim_null = ref false in |
e09dff7f SK |
659 | let spec = |
660 | [ ( "-out" | |
661 | , Arg.String (fun path -> | |
662 | assert_file_exists path; | |
663 | assert_file_is_dir path; | |
664 | output := Directory path | |
8673c3a5 | 665 | ) |
e09dff7f SK |
666 | , " Output to this directory instead of stdout." |
667 | ) | |
34107832 | 668 | ; ( "-ignore" |
9d01fa28 SK |
669 | , Arg.String (fun regexp -> |
670 | let regexp = Str.regexp regexp in | |
671 | ignore := fun string -> Str.string_match regexp string 0) | |
34107832 SK |
672 | , " Ignore file paths which match this regexp pattern (see Str module)." |
673 | ) | |
7a0486be SK |
674 | ; ( "-sample" |
675 | , Arg.Set_int sample | |
676 | , (sprintf " Byte size of file samples to use. Default: %d" !sample) | |
677 | ) | |
7c17443d SK |
678 | ; ( "-j" |
679 | , Arg.Set_int njobs | |
680 | , (sprintf " Number of parallel jobs. Default: %d" !njobs) | |
681 | ) | |
6b873e5a SK |
682 | ; ( "-0" |
683 | , Arg.Set input_delim_null | |
684 | , ( sprintf | |
685 | ( " Delimit input paths by null character instead of a newline." | |
686 | ^^" Meaningful only when reading candidate paths from stdin." | |
687 | ^^" Default: %B" | |
688 | ) | |
689 | !input_delim_null | |
690 | ) | |
691 | ) | |
e09dff7f SK |
692 | ] |
693 | in | |
694 | Arg.parse | |
695 | (Arg.align spec) | |
696 | (fun path -> | |
697 | assert_file_exists path; | |
698 | assert_file_is_dir path; | |
699 | match !input with | |
cfcdf90a SK |
700 | | Stdin -> |
701 | input := Directories [path] | |
702 | | Directories paths -> | |
703 | input := Directories (path :: paths) | |
61a05dbb SK |
704 | ) |
705 | ""; | |
7a0486be SK |
706 | assert_ |
707 | (fun x -> x > 0) | |
708 | !sample | |
709 | (sprintf "Sample size cannot be negative: %d" !sample); | |
1253df34 SK |
710 | { input = !input |
711 | ; output = !output | |
712 | ; ignore = !ignore | |
7a0486be | 713 | ; sample = !sample |
7c17443d | 714 | ; njobs = !njobs |
6b873e5a | 715 | ; delim_null = !input_delim_null |
1253df34 SK |
716 | } |
717 | ||
718 | let () = | |
719 | main (get_opt ()) |