Commit | Line | Data |
---|---|---|
cce97c27 SK |
1 | open Printf |
2 | ||
ddcbda00 | 3 | module List = ListLabels |
5c0100d2 | 4 | module StrSet = Set.Make(String) |
ddcbda00 | 5 | module Unix = UnixLabels |
1013fbcd SK |
6 | |
7 | module M = Metrics | |
8 | ||
948ee900 | 9 | type input = |
cfcdf90a SK |
10 | | Stdin |
11 | | Directories of string list | |
948ee900 | 12 | |
e09dff7f SK |
13 | type output = |
14 | | Stdout | |
15 | | Directory of string | |
16 | ||
1253df34 SK |
17 | type opt = |
18 | { input : input | |
19 | ; output : output | |
9d01fa28 | 20 | ; ignore : string -> bool |
7a0486be | 21 | ; sample : int |
7c17443d | 22 | ; njobs : int |
6b873e5a | 23 | ; delim_null : bool |
1253df34 SK |
24 | } |
25 | ||
6b873e5a | 26 | let make_input_stream input ignore ~metrics ~delim_null = |
a9a56d74 SK |
27 | let input = |
28 | match input with | |
29 | | Stdin -> | |
6b873e5a | 30 | File.lookup (In_channel.lines stdin ~delim_null) |
a9a56d74 SK |
31 | | Directories paths -> |
32 | let paths = StrSet.elements (StrSet.of_list paths) in | |
33 | Stream.concat (List.map paths ~f:File.find) | |
34 | in | |
35 | Stream.filter input ~f:(fun {File.path; size} -> | |
1013fbcd | 36 | M.file_considered metrics ~size; |
a9a56d74 | 37 | let empty = size = 0 in |
9d01fa28 | 38 | let ignored = ignore path in |
1013fbcd | 39 | if empty then M.file_empty metrics; |
9d01fa28 | 40 | if ignored then M.file_ignored metrics ~size; |
a9a56d74 SK |
41 | (not empty) && (not ignored) |
42 | ) | |
e09dff7f SK |
43 | |
44 | let make_output_fun = function | |
45 | | Stdout -> | |
5c0100d2 SK |
46 | fun digest n_files files -> |
47 | printf "%s %d\n%!" (Digest.to_hex digest) n_files; | |
1013fbcd | 48 | List.iter files ~f:(fun {File.path; _} -> |
5c0100d2 SK |
49 | printf " %S\n%!" path |
50 | ) | |
e09dff7f | 51 | | Directory dir -> |
5c0100d2 | 52 | fun digest _ files -> |
e09dff7f SK |
53 | let digest = Digest.to_hex digest in |
54 | let dir = Filename.concat dir (String.sub digest 0 2) in | |
55 | Unix.mkdir dir ~perm:0o700; | |
56 | let oc = open_out (Filename.concat dir digest) in | |
1013fbcd | 57 | List.iter files ~f:(fun {File.path; _} -> |
e09dff7f SK |
58 | output_string oc (sprintf "%S\n%!" path) |
59 | ); | |
60 | close_out oc | |
61 | ||
217f8912 | 62 | let time_wall () = |
7c17443d SK |
63 | Unix.gettimeofday () |
64 | ||
217f8912 SK |
65 | let time_proc () = |
66 | Sys.time () | |
67 | ||
6b873e5a | 68 | let main {input; output; ignore; sample = sample_len; njobs; delim_null} = |
217f8912 SK |
69 | let wt0_all = time_wall () in |
70 | let pt0_all = time_proc () in | |
1013fbcd | 71 | let metrics = M.init () in |
e09dff7f | 72 | let output = make_output_fun output in |
6b873e5a | 73 | let input = make_input_stream input ignore ~metrics ~delim_null in |
8c54ccb8 SK |
74 | (* TODO: Make a nice(r) abstraction to re-assemble pieces in the pipeline: |
75 | * | |
76 | * from input to files_by_size | |
77 | * from files_by_size to files_by_sample | |
78 | * from files_by_sample to files_by_digest | |
79 | * from files_by_digest to output | |
80 | * | |
81 | * input |> files_by_size |> files_by_sample |> files_by_digest |> output | |
82 | *) | |
1013fbcd SK |
83 | |
84 | let files = input in | |
85 | ||
217f8912 SK |
86 | let wt0_group_by_size = time_wall () in |
87 | let pt0_group_by_size = time_proc () in | |
7c17443d | 88 | eprintf "[debug] filtering-out files with unique size\n%!"; |
1013fbcd | 89 | let files = File.filter_out_unique_sizes files ~metrics in |
217f8912 SK |
90 | let pt1_group_by_size = time_proc () in |
91 | let wt1_group_by_size = time_wall () in | |
1013fbcd | 92 | |
217f8912 SK |
93 | let wt0_group_by_sample = wt1_group_by_size in |
94 | let pt0_group_by_sample = pt1_group_by_size in | |
7c17443d | 95 | eprintf "[debug] filtering-out files with unique heads\n%!"; |
eb6d0f38 SK |
96 | let files = |
97 | if njobs > 1 then begin | |
98 | let q = Queue.create () in | |
99 | files | |
100 | |> Stream.bag_map ~njobs ~f:(File.head ~len:sample_len ~metrics) | |
101 | |> Stream.group_by ~f:snd | |
102 | |> Stream.map ~f:(fun (d, n, pairs) -> (d, n, List.map pairs ~f:fst)) | |
103 | |> Stream.filter ~f:(fun (_, n, _) -> n > 1) | |
104 | |> Stream.iter ~f:(fun (_, _, fs) -> List.iter fs ~f:(fun f -> Queue.add f q)) | |
105 | ; | |
106 | Stream.of_queue q | |
107 | end else | |
108 | File.filter_out_unique_heads files ~len:sample_len ~metrics | |
109 | in | |
217f8912 SK |
110 | let pt1_group_by_sample = time_proc () in |
111 | let wt1_group_by_sample = time_wall () in | |
1013fbcd | 112 | |
217f8912 SK |
113 | let wt0_group_by_digest = wt1_group_by_sample in |
114 | let pt0_group_by_digest = pt1_group_by_sample in | |
7c17443d | 115 | eprintf "[debug] hashing\n%!"; |
1013fbcd | 116 | let groups = |
7c17443d | 117 | if njobs > 1 then |
eb6d0f38 | 118 | let with_digests = |
7c17443d SK |
119 | Stream.bag_map files ~njobs ~f:(fun {File.path; _} -> Digest.file path) |
120 | in | |
eb6d0f38 | 121 | Stream.map (Stream.group_by with_digests ~f:snd) ~f:( |
7c17443d SK |
122 | fun (digest, n, file_digest_pairs) -> |
123 | let files = | |
124 | List.map file_digest_pairs ~f:(fun (file, _) -> | |
125 | M.file_hashed metrics ~size:file.File.size; | |
126 | file | |
127 | ) | |
128 | in | |
129 | (digest, n, files) | |
130 | ) | |
131 | else | |
132 | Stream.group_by files ~f:(fun {File.path; size} -> | |
133 | M.file_hashed metrics ~size; | |
134 | Digest.file path | |
135 | ) | |
1013fbcd | 136 | in |
217f8912 SK |
137 | let pt1_group_by_digest = time_proc () in |
138 | let wt1_group_by_digest = time_wall () in | |
1013fbcd | 139 | |
7c17443d | 140 | eprintf "[debug] reporting\n%!"; |
1013fbcd SK |
141 | Stream.iter groups ~f:(fun (d, n, files) -> |
142 | M.digest metrics; | |
b0c3b3f0 | 143 | if n > 1 then begin |
389dccaf SK |
144 | M.redundant_data metrics ~size:(n * (List.hd files).File.size); |
145 | output d n files | |
b0c3b3f0 | 146 | end |
1013fbcd SK |
147 | ); |
148 | ||
217f8912 SK |
149 | let pt1_all = time_proc () in |
150 | let wt1_all = time_wall () in | |
1013fbcd SK |
151 | |
152 | M.report metrics | |
217f8912 SK |
153 | ~wall_time_all: (wt1_all -. wt0_all) |
154 | ~wall_time_group_by_size: (wt1_group_by_size -. wt0_group_by_size) | |
155 | ~wall_time_group_by_head: (wt1_group_by_sample -. wt0_group_by_sample) | |
156 | ~wall_time_group_by_digest:(wt1_group_by_digest -. wt0_group_by_digest) | |
157 | ~proc_time_all: (pt1_all -. pt0_all) | |
158 | ~proc_time_group_by_size: (pt1_group_by_size -. pt0_group_by_size) | |
159 | ~proc_time_group_by_head: (pt1_group_by_sample -. pt0_group_by_sample) | |
160 | ~proc_time_group_by_digest:(pt1_group_by_digest -. pt0_group_by_digest) | |
cce97c27 | 161 | |
1253df34 SK |
162 | let get_opt () : opt = |
163 | let assert_ test x msg = | |
164 | if not (test x) then begin | |
165 | eprintf "%s\n%!" msg; | |
e09dff7f SK |
166 | exit 1 |
167 | end | |
168 | in | |
1253df34 SK |
169 | let assert_file_exists path = |
170 | assert_ Sys.file_exists path (sprintf "File does not exist: %S" path) | |
171 | in | |
e09dff7f | 172 | let assert_file_is_dir path = |
1253df34 | 173 | assert_ Sys.is_directory path (sprintf "File is not a directory: %S" path) |
e09dff7f | 174 | in |
1253df34 SK |
175 | let input = ref Stdin in |
176 | let output = ref Stdout in | |
9d01fa28 | 177 | let ignore = ref (fun _ -> false) in |
7c17443d | 178 | let sample = ref 512 in |
eb6d0f38 | 179 | let njobs = ref 6 in |
6b873e5a | 180 | let input_delim_null = ref false in |
e09dff7f SK |
181 | let spec = |
182 | [ ( "-out" | |
183 | , Arg.String (fun path -> | |
184 | assert_file_exists path; | |
185 | assert_file_is_dir path; | |
186 | output := Directory path | |
8673c3a5 | 187 | ) |
e09dff7f SK |
188 | , " Output to this directory instead of stdout." |
189 | ) | |
34107832 | 190 | ; ( "-ignore" |
9d01fa28 SK |
191 | , Arg.String (fun regexp -> |
192 | let regexp = Str.regexp regexp in | |
193 | ignore := fun string -> Str.string_match regexp string 0) | |
34107832 SK |
194 | , " Ignore file paths which match this regexp pattern (see Str module)." |
195 | ) | |
7a0486be SK |
196 | ; ( "-sample" |
197 | , Arg.Set_int sample | |
198 | , (sprintf " Byte size of file samples to use. Default: %d" !sample) | |
199 | ) | |
7c17443d SK |
200 | ; ( "-j" |
201 | , Arg.Set_int njobs | |
202 | , (sprintf " Number of parallel jobs. Default: %d" !njobs) | |
203 | ) | |
6b873e5a SK |
204 | ; ( "-0" |
205 | , Arg.Set input_delim_null | |
206 | , ( sprintf | |
207 | ( " Delimit input paths by null character instead of a newline." | |
208 | ^^" Meaningful only when reading candidate paths from stdin." | |
209 | ^^" Default: %B" | |
210 | ) | |
211 | !input_delim_null | |
212 | ) | |
213 | ) | |
e09dff7f SK |
214 | ] |
215 | in | |
216 | Arg.parse | |
217 | (Arg.align spec) | |
218 | (fun path -> | |
219 | assert_file_exists path; | |
220 | assert_file_is_dir path; | |
221 | match !input with | |
cfcdf90a SK |
222 | | Stdin -> |
223 | input := Directories [path] | |
224 | | Directories paths -> | |
225 | input := Directories (path :: paths) | |
61a05dbb SK |
226 | ) |
227 | ""; | |
7a0486be SK |
228 | assert_ |
229 | (fun x -> x > 0) | |
230 | !sample | |
231 | (sprintf "Sample size cannot be negative: %d" !sample); | |
1253df34 SK |
232 | { input = !input |
233 | ; output = !output | |
234 | ; ignore = !ignore | |
7a0486be | 235 | ; sample = !sample |
7c17443d | 236 | ; njobs = !njobs |
6b873e5a | 237 | ; delim_null = !input_delim_null |
1253df34 SK |
238 | } |
239 | ||
240 | let () = | |
241 | main (get_opt ()) |