Commit | Line | Data |
---|---|---|
cce97c27 SK |
1 | open Printf |
2 | ||
948ee900 SK |
3 | module Array = ArrayLabels |
4 | module List = ListLabels | |
5c0100d2 | 5 | module StrSet = Set.Make(String) |
e09dff7f | 6 | module Unix = UnixLabels |
cce97c27 | 7 | |
1013fbcd SK |
8 | module Metrics : sig |
9 | type t | |
10 | ||
11 | val init | |
12 | : unit -> t | |
13 | val report | |
14 | : t | |
217f8912 SK |
15 | -> wall_time_all:float |
16 | -> wall_time_group_by_size:float | |
17 | -> wall_time_group_by_head:float | |
18 | -> wall_time_group_by_digest:float | |
19 | -> proc_time_all:float | |
20 | -> proc_time_group_by_size:float | |
21 | -> proc_time_group_by_head:float | |
22 | -> proc_time_group_by_digest:float | |
1013fbcd SK |
23 | -> unit |
24 | ||
25 | val file_considered | |
26 | : t -> size:int -> unit | |
27 | val file_ignored | |
28 | : t -> size:int -> unit | |
29 | val file_empty | |
30 | : t -> unit | |
31 | val file_sampled | |
32 | : t -> unit | |
33 | val chunk_read | |
34 | : t -> size:int -> unit | |
35 | val file_unique_size | |
36 | : t -> size:int -> unit | |
37 | val file_unique_sample | |
38 | : t -> size:int -> unit | |
39 | val file_hashed | |
40 | : t -> size:int -> unit | |
41 | val digest | |
42 | : t -> unit | |
389dccaf SK |
43 | val redundant_data |
44 | : t -> size:int -> unit | |
1013fbcd SK |
45 | end = struct |
46 | type t = | |
47 | { considered_files : int ref | |
48 | ; considered_bytes : int ref | |
49 | ; empty : int ref | |
50 | ; ignored_files : int ref | |
51 | ; ignored_bytes : int ref | |
52 | ; unique_size_files : int ref | |
53 | ; unique_size_bytes : int ref | |
54 | ; unique_sample_files : int ref | |
55 | ; unique_sample_bytes : int ref | |
56 | ; sampled_files : int ref | |
57 | ; sampled_bytes : int ref | |
58 | ; hashed_files : int ref | |
59 | ; hashed_bytes : int ref | |
60 | ; digests : int ref | |
389dccaf | 61 | ; redundant_data : int ref |
1013fbcd SK |
62 | } |
63 | ||
64 | let init () = | |
65 | { considered_files = ref 0 | |
66 | ; considered_bytes = ref 0 | |
67 | ; empty = ref 0 | |
68 | ; ignored_files = ref 0 | |
69 | ; ignored_bytes = ref 0 | |
70 | ; unique_size_files = ref 0 | |
71 | ; unique_size_bytes = ref 0 | |
72 | ; sampled_files = ref 0 | |
73 | ; sampled_bytes = ref 0 | |
74 | ; hashed_files = ref 0 | |
75 | ; hashed_bytes = ref 0 | |
76 | ; unique_sample_files = ref 0 | |
77 | ; unique_sample_bytes = ref 0 | |
78 | ; digests = ref 0 | |
389dccaf | 79 | ; redundant_data = ref 0 |
1013fbcd SK |
80 | } |
81 | ||
82 | let add sum addend = | |
83 | sum := !sum + addend | |
84 | ||
85 | let file_considered t ~size = | |
86 | incr t.considered_files; | |
87 | add t.considered_bytes size | |
88 | ||
89 | let file_ignored {ignored_files; ignored_bytes; _} ~size = | |
90 | incr ignored_files; | |
91 | add ignored_bytes size | |
92 | ||
93 | let file_empty t = | |
94 | incr t.empty | |
95 | ||
96 | let chunk_read t ~size = | |
97 | add t.sampled_bytes size | |
98 | ||
99 | let file_sampled t = | |
100 | incr t.sampled_files | |
101 | ||
102 | let file_unique_size t ~size = | |
103 | incr t.unique_size_files; | |
104 | add t.unique_size_bytes size | |
105 | ||
106 | let file_unique_sample t ~size = | |
107 | incr t.unique_sample_files; | |
108 | add t.unique_sample_bytes size | |
109 | ||
110 | let file_hashed t ~size = | |
111 | incr t.hashed_files; | |
112 | add t.hashed_bytes size | |
113 | ||
114 | let digest t = | |
115 | incr t.digests | |
116 | ||
389dccaf SK |
117 | let redundant_data t ~size = |
118 | add t.redundant_data size | |
119 | ||
1013fbcd SK |
120 | let report |
121 | t | |
217f8912 SK |
122 | ~wall_time_all |
123 | ~wall_time_group_by_size | |
124 | ~wall_time_group_by_head | |
125 | ~wall_time_group_by_digest | |
126 | ~proc_time_all | |
127 | ~proc_time_group_by_size | |
128 | ~proc_time_group_by_head | |
129 | ~proc_time_group_by_digest | |
1013fbcd SK |
130 | = |
131 | let b_to_mb b = (float_of_int b) /. 1024. /. 1024. in | |
132 | let b_to_gb b = (b_to_mb b) /. 1024. in | |
217f8912 SK |
133 | eprintf "Total time : %.2f wall sec %.2f proc sec\n%!" |
134 | wall_time_all | |
135 | proc_time_all; | |
1013fbcd SK |
136 | eprintf "Considered : %8d files %6.2f Gb\n%!" |
137 | !(t.considered_files) | |
138 | (b_to_gb !(t.considered_bytes)); | |
139 | eprintf "Sampled : %8d files %6.2f Gb\n%!" | |
140 | !(t.sampled_files) | |
141 | (b_to_gb !(t.sampled_bytes)); | |
217f8912 | 142 | eprintf "Hashed : %8d files %6.2f Gb %6.2f wall sec %6.2f proc sec\n%!" |
1013fbcd SK |
143 | !(t.hashed_files) |
144 | (b_to_gb !(t.hashed_bytes)) | |
217f8912 SK |
145 | wall_time_group_by_digest |
146 | proc_time_group_by_digest; | |
1013fbcd SK |
147 | eprintf "Digests : %8d\n%!" |
148 | !(t.digests); | |
389dccaf SK |
149 | eprintf "Duplicates (Hashed - Digests): %8d files %6.2f Gb\n%!" |
150 | (!(t.hashed_files) - !(t.digests)) | |
151 | (b_to_gb !(t.redundant_data)); | |
1013fbcd | 152 | eprintf "Skipped due to 0 size : %8d files\n%!" !(t.empty); |
217f8912 | 153 | eprintf "Skipped due to unique size : %8d files %6.2f Gb %6.2f wall sec %6.2f proc sec\n%!" |
1013fbcd SK |
154 | !(t.unique_size_files) |
155 | (b_to_gb !(t.unique_size_bytes)) | |
217f8912 SK |
156 | wall_time_group_by_size |
157 | proc_time_group_by_size; | |
158 | eprintf "Skipped due to unique sample : %8d files %6.2f Gb %6.2f wall sec %6.2f proc sec\n%!" | |
1013fbcd SK |
159 | !(t.unique_sample_files) |
160 | (b_to_gb !(t.unique_sample_bytes)) | |
217f8912 SK |
161 | wall_time_group_by_head |
162 | proc_time_group_by_head; | |
1013fbcd SK |
163 | eprintf "Ignored due to regex match : %8d files %6.2f Gb\n%!" |
164 | !(t.ignored_files) | |
165 | (b_to_gb !(t.ignored_bytes)) | |
166 | end | |
167 | ||
168 | module M = Metrics | |
169 | ||
cce97c27 | 170 | module Stream : sig |
948ee900 | 171 | type 'a t |
e13e9ef5 SK |
172 | |
173 | val create : (unit -> 'a option) -> 'a t | |
174 | ||
1013fbcd SK |
175 | val of_queue : 'a Queue.t -> 'a t |
176 | ||
948ee900 | 177 | val iter : 'a t -> f:('a -> unit) -> unit |
8673c3a5 | 178 | |
7c17443d SK |
179 | val bag_map : 'a t -> njobs:int -> f:('a -> 'b) -> ('a * 'b) t |
180 | (** Parallel map with arbitrarily-reordered elements. *) | |
181 | ||
a9a56d74 SK |
182 | val map : 'a t -> f:('a -> 'b) -> 'b t |
183 | ||
184 | val filter : 'a t -> f:('a -> bool) -> 'a t | |
185 | ||
8673c3a5 | 186 | val concat : ('a t) list -> 'a t |
1013fbcd SK |
187 | |
188 | val group_by : 'a t -> f:('a -> 'b) -> ('b * int * 'a list) t | |
cce97c27 SK |
189 | end = struct |
190 | module S = Stream | |
191 | ||
948ee900 | 192 | type 'a t = |
a9a56d74 | 193 | {mutable streams : ('a S.t) list} |
948ee900 | 194 | |
7c17443d SK |
195 | type ('input, 'output) msg_from_vassal = |
196 | | Ready of int | |
197 | | Result of (int * ('input * 'output)) | |
198 | | Exiting of int | |
199 | ||
200 | type 'input msg_from_lord = | |
201 | | Job of 'input option | |
202 | ||
e13e9ef5 | 203 | let create f = |
a9a56d74 SK |
204 | {streams = [S.from (fun _ -> f ())]} |
205 | ||
1013fbcd SK |
206 | let of_queue q = |
207 | create (fun () -> | |
208 | match Queue.take q with | |
209 | | exception Queue.Empty -> | |
210 | None | |
211 | | x -> | |
212 | Some x | |
213 | ) | |
214 | ||
a9a56d74 SK |
215 | let rec next t = |
216 | match t.streams with | |
217 | | [] -> | |
218 | None | |
219 | | s :: streams -> | |
220 | (match S.next s with | |
221 | | exception Stream.Failure -> | |
222 | t.streams <- streams; | |
223 | next t | |
224 | | x -> | |
225 | Some x | |
226 | ) | |
227 | ||
228 | let map t ~f = | |
229 | create (fun () -> | |
230 | match next t with | |
231 | | None -> None | |
232 | | Some x -> Some (f x) | |
233 | ) | |
234 | ||
235 | let filter t ~f = | |
236 | let rec filter () = | |
237 | match next t with | |
238 | | None -> | |
239 | None | |
240 | | Some x when f x -> | |
241 | Some x | |
242 | | Some _ -> | |
243 | filter () | |
244 | in | |
245 | create filter | |
e13e9ef5 SK |
246 | |
247 | let iter t ~f = | |
a9a56d74 | 248 | List.iter t.streams ~f:(S.iter f) |
8673c3a5 SK |
249 | |
250 | let concat ts = | |
a9a56d74 | 251 | {streams = List.concat (List.map ts ~f:(fun {streams} -> streams))} |
1013fbcd SK |
252 | |
253 | let group_by t ~f = | |
254 | let groups_tbl = Hashtbl.create 1_000_000 in | |
255 | let group_update x = | |
256 | let group = f x in | |
257 | let members = | |
258 | match Hashtbl.find_opt groups_tbl group with | |
259 | | None -> | |
260 | (1, [x]) | |
261 | | Some (n, xs) -> | |
262 | (succ n, x :: xs) | |
263 | in | |
264 | Hashtbl.replace groups_tbl group members | |
265 | in | |
266 | iter t ~f:group_update; | |
267 | let groups = Queue.create () in | |
268 | Hashtbl.iter | |
269 | (fun name (length, members) -> Queue.add (name, length, members) groups) | |
270 | groups_tbl; | |
271 | of_queue groups | |
7c17443d SK |
272 | |
273 | module Ipc : sig | |
274 | val send : out_channel -> 'a -> unit | |
275 | val recv : in_channel -> 'a | |
276 | end = struct | |
277 | let send oc msg = | |
278 | Marshal.to_channel oc msg []; | |
279 | flush oc | |
280 | ||
281 | let recv ic = | |
282 | Marshal.from_channel ic | |
283 | end | |
284 | ||
285 | let lord t ~njobs ~vassals ~ic ~ocs = | |
7c17443d SK |
286 | let active_vassals = ref njobs in |
287 | let results = Queue.create () in | |
df1a915e | 288 | let rec loop () = |
2a390e5a SK |
289 | match ((Ipc.recv ic) : ('input, 'output) msg_from_vassal) with |
290 | | Exiting i -> | |
7c17443d SK |
291 | close_out ocs.(i); |
292 | decr active_vassals; | |
df1a915e | 293 | if !active_vassals = 0 then () else loop () |
2a390e5a | 294 | | Ready i -> |
7c17443d | 295 | Ipc.send ocs.(i) (Job (next t)); |
df1a915e | 296 | loop () |
2a390e5a | 297 | | Result (i, result) -> |
7c17443d SK |
298 | Queue.add result results; |
299 | Ipc.send ocs.(i) (Job (next t)); | |
df1a915e | 300 | loop () |
7c17443d SK |
301 | in |
302 | let rec wait = function | |
df1a915e SK |
303 | | [] -> |
304 | () | |
7c17443d SK |
305 | | vassals -> |
306 | let pid, _process_status = Unix.wait () in | |
307 | (* TODO: handle process_status *) | |
308 | wait (List.filter vassals ~f:(fun p -> p <> pid)) | |
309 | in | |
df1a915e | 310 | loop (); |
7c17443d SK |
311 | close_in ic; |
312 | wait vassals; | |
313 | of_queue results | |
314 | ||
315 | let vassal i ~f ~vassal_pipe_r ~lord_pipe_w = | |
7c17443d SK |
316 | let ic = Unix.in_channel_of_descr vassal_pipe_r in |
317 | let oc = Unix.out_channel_of_descr lord_pipe_w in | |
df1a915e | 318 | let rec loop () = |
2a390e5a SK |
319 | match (Ipc.recv ic : 'input msg_from_lord) with |
320 | | Job (Some x) -> | |
df1a915e SK |
321 | Ipc.send oc (Result (i, (x, f x))); |
322 | loop () | |
2a390e5a | 323 | | Job None -> |
7c17443d SK |
324 | Ipc.send oc (Exiting i) |
325 | in | |
df1a915e SK |
326 | Ipc.send oc (Ready i); |
327 | loop (); | |
7c17443d SK |
328 | close_in ic; |
329 | close_out oc; | |
330 | exit 0 | |
331 | ||
332 | let bag_map t ~njobs ~f = | |
333 | let lord_pipe_r, lord_pipe_w = Unix.pipe () in | |
334 | let vassal_pipes = Array.init njobs ~f:(fun _ -> Unix.pipe ()) in | |
335 | let vassal_pipes_r = Array.map vassal_pipes ~f:(fun (r, _) -> r) in | |
336 | let vassal_pipes_w = Array.map vassal_pipes ~f:(fun (_, w) -> w) in | |
337 | let vassals = ref [] in | |
338 | for i=0 to (njobs - 1) do | |
339 | begin match Unix.fork () with | |
340 | | 0 -> | |
341 | Unix.close lord_pipe_r; | |
342 | vassal i ~f ~lord_pipe_w ~vassal_pipe_r:vassal_pipes_r.(i) | |
343 | | pid -> | |
344 | vassals := pid :: !vassals | |
345 | end | |
346 | done; | |
347 | Unix.close lord_pipe_w; | |
348 | lord | |
349 | t | |
350 | ~njobs | |
351 | ~vassals:!vassals | |
352 | ~ic:(Unix.in_channel_of_descr lord_pipe_r) | |
353 | ~ocs:(Array.map vassal_pipes_w ~f:Unix.out_channel_of_descr) | |
e13e9ef5 SK |
354 | end |
355 | ||
356 | module In_channel : sig | |
357 | val lines : in_channel -> string Stream.t | |
358 | end = struct | |
359 | let lines ic = | |
360 | Stream.create (fun () -> | |
361 | match input_line ic with | |
362 | | exception End_of_file -> | |
363 | None | |
364 | | line -> | |
365 | Some line | |
366 | ) | |
367 | end | |
368 | ||
a9a56d74 SK |
369 | module File : sig |
370 | type t = | |
371 | { path : string | |
372 | ; size : int | |
373 | } | |
374 | ||
375 | val find : string -> t Stream.t | |
376 | (** Find all files in the directory tree, starting from the given root path *) | |
377 | ||
378 | val lookup : string Stream.t -> t Stream.t | |
379 | (** Lookup file info for given paths *) | |
5c0100d2 | 380 | |
eb6d0f38 SK |
381 | val head : t -> len:int -> metrics:M.t -> string |
382 | ||
1013fbcd SK |
383 | val filter_out_unique_sizes : t Stream.t -> metrics:M.t -> t Stream.t |
384 | val filter_out_unique_heads : t Stream.t -> len:int -> metrics:M.t -> t Stream.t | |
e13e9ef5 | 385 | end = struct |
a9a56d74 SK |
386 | type t = |
387 | { path : string | |
388 | ; size : int | |
389 | } | |
390 | ||
391 | let lookup paths = | |
392 | Stream.map paths ~f:(fun path -> | |
393 | let {Unix.st_size = size; _} = Unix.lstat path in | |
394 | {path; size} | |
395 | ) | |
396 | ||
397 | let find root = | |
948ee900 SK |
398 | let dirs = Queue.create () in |
399 | let files = Queue.create () in | |
948ee900 SK |
400 | let explore parent = |
401 | Array.iter (Sys.readdir parent) ~f:(fun child -> | |
402 | let path = Filename.concat parent child in | |
a9a56d74 | 403 | let {Unix.st_kind = file_kind; st_size; _} = Unix.lstat path in |
948ee900 SK |
404 | match file_kind with |
405 | | Unix.S_REG -> | |
a9a56d74 SK |
406 | let file = {path; size = st_size} in |
407 | Queue.add file files | |
948ee900 SK |
408 | | Unix.S_DIR -> |
409 | Queue.add path dirs | |
410 | | Unix.S_CHR | |
411 | | Unix.S_BLK | |
412 | | Unix.S_LNK | |
413 | | Unix.S_FIFO | |
414 | | Unix.S_SOCK -> | |
415 | () | |
416 | ) | |
417 | in | |
1f130f74 | 418 | explore root; |
c66266c6 SK |
419 | let rec next () = |
420 | match Queue.is_empty files, Queue.is_empty dirs with | |
421 | | false, _ -> Some (Queue.take files) | |
422 | | true , true -> None | |
423 | | true , false -> | |
424 | explore (Queue.take dirs); | |
425 | next () | |
426 | in | |
427 | Stream.create next | |
5c0100d2 | 428 | |
1013fbcd SK |
429 | let filter_out_singletons files ~group ~handle_singleton = |
430 | let q = Queue.create () in | |
431 | Stream.iter (Stream.group_by files ~f:group) ~f:(fun group -> | |
432 | let (_, n, members) = group in | |
433 | if n > 1 then | |
434 | List.iter members ~f:(fun m -> Queue.add m q) | |
435 | else | |
436 | handle_singleton group | |
437 | ); | |
438 | Stream.of_queue q | |
439 | ||
440 | let filter_out_unique_sizes files ~metrics = | |
441 | filter_out_singletons | |
442 | files | |
443 | ~group:(fun {size; _} -> size) | |
444 | ~handle_singleton:(fun (size, _, _) -> M.file_unique_size metrics ~size) | |
445 | ||
eb6d0f38 SK |
446 | let head {path; _} ~len ~metrics = |
447 | M.file_sampled metrics; | |
1013fbcd SK |
448 | let buf = Bytes.make len ' ' in |
449 | let ic = open_in_bin path in | |
450 | let rec read pos len = | |
451 | assert (len >= 0); | |
452 | if len = 0 then | |
453 | () | |
454 | else begin | |
455 | let chunk_size = input ic buf pos len in | |
456 | M.chunk_read metrics ~size:chunk_size; | |
457 | if chunk_size = 0 then (* EOF *) | |
458 | () | |
459 | else | |
460 | read (pos + chunk_size) (len - chunk_size) | |
461 | end | |
462 | in | |
463 | read 0 len; | |
464 | close_in ic; | |
465 | Bytes.to_string buf | |
466 | ||
467 | let filter_out_unique_heads files ~len ~metrics = | |
468 | filter_out_singletons | |
469 | files | |
eb6d0f38 | 470 | ~group:(head ~len ~metrics) |
1013fbcd SK |
471 | ~handle_singleton:(fun (_, _, files) -> |
472 | let {size; _} = List.hd files in (* Guaranteed non-empty *) | |
473 | M.file_unique_sample metrics ~size | |
474 | ) | |
cce97c27 SK |
475 | end |
476 | ||
948ee900 | 477 | type input = |
cfcdf90a SK |
478 | | Stdin |
479 | | Directories of string list | |
948ee900 | 480 | |
e09dff7f SK |
481 | type output = |
482 | | Stdout | |
483 | | Directory of string | |
484 | ||
1253df34 SK |
485 | type opt = |
486 | { input : input | |
487 | ; output : output | |
9d01fa28 | 488 | ; ignore : string -> bool |
7a0486be | 489 | ; sample : int |
7c17443d | 490 | ; njobs : int |
1253df34 SK |
491 | } |
492 | ||
1013fbcd | 493 | let make_input_stream input ignore ~metrics = |
a9a56d74 SK |
494 | let input = |
495 | match input with | |
496 | | Stdin -> | |
497 | File.lookup (In_channel.lines stdin) | |
498 | | Directories paths -> | |
499 | let paths = StrSet.elements (StrSet.of_list paths) in | |
500 | Stream.concat (List.map paths ~f:File.find) | |
501 | in | |
502 | Stream.filter input ~f:(fun {File.path; size} -> | |
1013fbcd | 503 | M.file_considered metrics ~size; |
a9a56d74 | 504 | let empty = size = 0 in |
9d01fa28 | 505 | let ignored = ignore path in |
1013fbcd | 506 | if empty then M.file_empty metrics; |
9d01fa28 | 507 | if ignored then M.file_ignored metrics ~size; |
a9a56d74 SK |
508 | (not empty) && (not ignored) |
509 | ) | |
e09dff7f SK |
510 | |
511 | let make_output_fun = function | |
512 | | Stdout -> | |
5c0100d2 SK |
513 | fun digest n_files files -> |
514 | printf "%s %d\n%!" (Digest.to_hex digest) n_files; | |
1013fbcd | 515 | List.iter files ~f:(fun {File.path; _} -> |
5c0100d2 SK |
516 | printf " %S\n%!" path |
517 | ) | |
e09dff7f | 518 | | Directory dir -> |
5c0100d2 | 519 | fun digest _ files -> |
e09dff7f SK |
520 | let digest = Digest.to_hex digest in |
521 | let dir = Filename.concat dir (String.sub digest 0 2) in | |
522 | Unix.mkdir dir ~perm:0o700; | |
523 | let oc = open_out (Filename.concat dir digest) in | |
1013fbcd | 524 | List.iter files ~f:(fun {File.path; _} -> |
e09dff7f SK |
525 | output_string oc (sprintf "%S\n%!" path) |
526 | ); | |
527 | close_out oc | |
528 | ||
217f8912 | 529 | let time_wall () = |
7c17443d SK |
530 | Unix.gettimeofday () |
531 | ||
217f8912 SK |
532 | let time_proc () = |
533 | Sys.time () | |
534 | ||
7c17443d | 535 | let main {input; output; ignore; sample = sample_len; njobs} = |
217f8912 SK |
536 | let wt0_all = time_wall () in |
537 | let pt0_all = time_proc () in | |
1013fbcd | 538 | let metrics = M.init () in |
e09dff7f | 539 | let output = make_output_fun output in |
1013fbcd | 540 | let input = make_input_stream input ignore ~metrics in |
8c54ccb8 SK |
541 | (* TODO: Make a nice(r) abstraction to re-assemble pieces in the pipeline: |
542 | * | |
543 | * from input to files_by_size | |
544 | * from files_by_size to files_by_sample | |
545 | * from files_by_sample to files_by_digest | |
546 | * from files_by_digest to output | |
547 | * | |
548 | * input |> files_by_size |> files_by_sample |> files_by_digest |> output | |
549 | *) | |
1013fbcd SK |
550 | |
551 | let files = input in | |
552 | ||
217f8912 SK |
553 | let wt0_group_by_size = time_wall () in |
554 | let pt0_group_by_size = time_proc () in | |
7c17443d | 555 | eprintf "[debug] filtering-out files with unique size\n%!"; |
1013fbcd | 556 | let files = File.filter_out_unique_sizes files ~metrics in |
217f8912 SK |
557 | let pt1_group_by_size = time_proc () in |
558 | let wt1_group_by_size = time_wall () in | |
1013fbcd | 559 | |
217f8912 SK |
560 | let wt0_group_by_sample = wt1_group_by_size in |
561 | let pt0_group_by_sample = pt1_group_by_size in | |
7c17443d | 562 | eprintf "[debug] filtering-out files with unique heads\n%!"; |
eb6d0f38 SK |
563 | let files = |
564 | if njobs > 1 then begin | |
565 | let q = Queue.create () in | |
566 | files | |
567 | |> Stream.bag_map ~njobs ~f:(File.head ~len:sample_len ~metrics) | |
568 | |> Stream.group_by ~f:snd | |
569 | |> Stream.map ~f:(fun (d, n, pairs) -> (d, n, List.map pairs ~f:fst)) | |
570 | |> Stream.filter ~f:(fun (_, n, _) -> n > 1) | |
571 | |> Stream.iter ~f:(fun (_, _, fs) -> List.iter fs ~f:(fun f -> Queue.add f q)) | |
572 | ; | |
573 | Stream.of_queue q | |
574 | end else | |
575 | File.filter_out_unique_heads files ~len:sample_len ~metrics | |
576 | in | |
217f8912 SK |
577 | let pt1_group_by_sample = time_proc () in |
578 | let wt1_group_by_sample = time_wall () in | |
1013fbcd | 579 | |
217f8912 SK |
580 | let wt0_group_by_digest = wt1_group_by_sample in |
581 | let pt0_group_by_digest = pt1_group_by_sample in | |
7c17443d | 582 | eprintf "[debug] hashing\n%!"; |
1013fbcd | 583 | let groups = |
7c17443d | 584 | if njobs > 1 then |
eb6d0f38 | 585 | let with_digests = |
7c17443d SK |
586 | Stream.bag_map files ~njobs ~f:(fun {File.path; _} -> Digest.file path) |
587 | in | |
eb6d0f38 | 588 | Stream.map (Stream.group_by with_digests ~f:snd) ~f:( |
7c17443d SK |
589 | fun (digest, n, file_digest_pairs) -> |
590 | let files = | |
591 | List.map file_digest_pairs ~f:(fun (file, _) -> | |
592 | M.file_hashed metrics ~size:file.File.size; | |
593 | file | |
594 | ) | |
595 | in | |
596 | (digest, n, files) | |
597 | ) | |
598 | else | |
599 | Stream.group_by files ~f:(fun {File.path; size} -> | |
600 | M.file_hashed metrics ~size; | |
601 | Digest.file path | |
602 | ) | |
1013fbcd | 603 | in |
217f8912 SK |
604 | let pt1_group_by_digest = time_proc () in |
605 | let wt1_group_by_digest = time_wall () in | |
1013fbcd | 606 | |
7c17443d | 607 | eprintf "[debug] reporting\n%!"; |
1013fbcd SK |
608 | Stream.iter groups ~f:(fun (d, n, files) -> |
609 | M.digest metrics; | |
389dccaf SK |
610 | if n > 1 then |
611 | M.redundant_data metrics ~size:(n * (List.hd files).File.size); | |
612 | output d n files | |
1013fbcd SK |
613 | ); |
614 | ||
217f8912 SK |
615 | let pt1_all = time_proc () in |
616 | let wt1_all = time_wall () in | |
1013fbcd SK |
617 | |
618 | M.report metrics | |
217f8912 SK |
619 | ~wall_time_all: (wt1_all -. wt0_all) |
620 | ~wall_time_group_by_size: (wt1_group_by_size -. wt0_group_by_size) | |
621 | ~wall_time_group_by_head: (wt1_group_by_sample -. wt0_group_by_sample) | |
622 | ~wall_time_group_by_digest:(wt1_group_by_digest -. wt0_group_by_digest) | |
623 | ~proc_time_all: (pt1_all -. pt0_all) | |
624 | ~proc_time_group_by_size: (pt1_group_by_size -. pt0_group_by_size) | |
625 | ~proc_time_group_by_head: (pt1_group_by_sample -. pt0_group_by_sample) | |
626 | ~proc_time_group_by_digest:(pt1_group_by_digest -. pt0_group_by_digest) | |
cce97c27 | 627 | |
1253df34 SK |
628 | let get_opt () : opt = |
629 | let assert_ test x msg = | |
630 | if not (test x) then begin | |
631 | eprintf "%s\n%!" msg; | |
e09dff7f SK |
632 | exit 1 |
633 | end | |
634 | in | |
1253df34 SK |
635 | let assert_file_exists path = |
636 | assert_ Sys.file_exists path (sprintf "File does not exist: %S" path) | |
637 | in | |
e09dff7f | 638 | let assert_file_is_dir path = |
1253df34 | 639 | assert_ Sys.is_directory path (sprintf "File is not a directory: %S" path) |
e09dff7f | 640 | in |
1253df34 SK |
641 | let input = ref Stdin in |
642 | let output = ref Stdout in | |
9d01fa28 | 643 | let ignore = ref (fun _ -> false) in |
7c17443d | 644 | let sample = ref 512 in |
eb6d0f38 | 645 | let njobs = ref 6 in |
e09dff7f SK |
646 | let spec = |
647 | [ ( "-out" | |
648 | , Arg.String (fun path -> | |
649 | assert_file_exists path; | |
650 | assert_file_is_dir path; | |
651 | output := Directory path | |
8673c3a5 | 652 | ) |
e09dff7f SK |
653 | , " Output to this directory instead of stdout." |
654 | ) | |
34107832 | 655 | ; ( "-ignore" |
9d01fa28 SK |
656 | , Arg.String (fun regexp -> |
657 | let regexp = Str.regexp regexp in | |
658 | ignore := fun string -> Str.string_match regexp string 0) | |
34107832 SK |
659 | , " Ignore file paths which match this regexp pattern (see Str module)." |
660 | ) | |
7a0486be SK |
661 | ; ( "-sample" |
662 | , Arg.Set_int sample | |
663 | , (sprintf " Byte size of file samples to use. Default: %d" !sample) | |
664 | ) | |
7c17443d SK |
665 | ; ( "-j" |
666 | , Arg.Set_int njobs | |
667 | , (sprintf " Number of parallel jobs. Default: %d" !njobs) | |
668 | ) | |
e09dff7f SK |
669 | ] |
670 | in | |
671 | Arg.parse | |
672 | (Arg.align spec) | |
673 | (fun path -> | |
674 | assert_file_exists path; | |
675 | assert_file_is_dir path; | |
676 | match !input with | |
cfcdf90a SK |
677 | | Stdin -> |
678 | input := Directories [path] | |
679 | | Directories paths -> | |
680 | input := Directories (path :: paths) | |
61a05dbb SK |
681 | ) |
682 | ""; | |
7a0486be SK |
683 | assert_ |
684 | (fun x -> x > 0) | |
685 | !sample | |
686 | (sprintf "Sample size cannot be negative: %d" !sample); | |
1253df34 SK |
687 | { input = !input |
688 | ; output = !output | |
689 | ; ignore = !ignore | |
7a0486be | 690 | ; sample = !sample |
7c17443d | 691 | ; njobs = !njobs |
1253df34 SK |
692 | } |
693 | ||
694 | let () = | |
695 | main (get_opt ()) |