Move modules into dedicated files
[dups.git] / dups.ml
1 open Printf
2
3 module List = ListLabels
4 module StrSet = Set.Make(String)
5 module Unix = UnixLabels
6
7 module M = Metrics
8
9 type input =
10 | Stdin
11 | Directories of string list
12
13 type output =
14 | Stdout
15 | Directory of string
16
17 type opt =
18 { input : input
19 ; output : output
20 ; ignore : string -> bool
21 ; sample : int
22 ; njobs : int
23 ; delim_null : bool
24 }
25
26 let make_input_stream input ignore ~metrics ~delim_null =
27 let input =
28 match input with
29 | Stdin ->
30 File.lookup (In_channel.lines stdin ~delim_null)
31 | Directories paths ->
32 let paths = StrSet.elements (StrSet.of_list paths) in
33 Stream.concat (List.map paths ~f:File.find)
34 in
35 Stream.filter input ~f:(fun {File.path; size} ->
36 M.file_considered metrics ~size;
37 let empty = size = 0 in
38 let ignored = ignore path in
39 if empty then M.file_empty metrics;
40 if ignored then M.file_ignored metrics ~size;
41 (not empty) && (not ignored)
42 )
43
44 let make_output_fun = function
45 | Stdout ->
46 fun digest n_files files ->
47 printf "%s %d\n%!" (Digest.to_hex digest) n_files;
48 List.iter files ~f:(fun {File.path; _} ->
49 printf " %S\n%!" path
50 )
51 | Directory dir ->
52 fun digest _ files ->
53 let digest = Digest.to_hex digest in
54 let dir = Filename.concat dir (String.sub digest 0 2) in
55 Unix.mkdir dir ~perm:0o700;
56 let oc = open_out (Filename.concat dir digest) in
57 List.iter files ~f:(fun {File.path; _} ->
58 output_string oc (sprintf "%S\n%!" path)
59 );
60 close_out oc
61
62 let time_wall () =
63 Unix.gettimeofday ()
64
65 let time_proc () =
66 Sys.time ()
67
68 let main {input; output; ignore; sample = sample_len; njobs; delim_null} =
69 let wt0_all = time_wall () in
70 let pt0_all = time_proc () in
71 let metrics = M.init () in
72 let output = make_output_fun output in
73 let input = make_input_stream input ignore ~metrics ~delim_null in
74 (* TODO: Make a nice(r) abstraction to re-assemble pieces in the pipeline:
75 *
76 * from input to files_by_size
77 * from files_by_size to files_by_sample
78 * from files_by_sample to files_by_digest
79 * from files_by_digest to output
80 *
81 * input |> files_by_size |> files_by_sample |> files_by_digest |> output
82 *)
83
84 let files = input in
85
86 let wt0_group_by_size = time_wall () in
87 let pt0_group_by_size = time_proc () in
88 eprintf "[debug] filtering-out files with unique size\n%!";
89 let files = File.filter_out_unique_sizes files ~metrics in
90 let pt1_group_by_size = time_proc () in
91 let wt1_group_by_size = time_wall () in
92
93 let wt0_group_by_sample = wt1_group_by_size in
94 let pt0_group_by_sample = pt1_group_by_size in
95 eprintf "[debug] filtering-out files with unique heads\n%!";
96 let files =
97 if njobs > 1 then begin
98 let q = Queue.create () in
99 files
100 |> Stream.bag_map ~njobs ~f:(File.head ~len:sample_len ~metrics)
101 |> Stream.group_by ~f:snd
102 |> Stream.map ~f:(fun (d, n, pairs) -> (d, n, List.map pairs ~f:fst))
103 |> Stream.filter ~f:(fun (_, n, _) -> n > 1)
104 |> Stream.iter ~f:(fun (_, _, fs) -> List.iter fs ~f:(fun f -> Queue.add f q))
105 ;
106 Stream.of_queue q
107 end else
108 File.filter_out_unique_heads files ~len:sample_len ~metrics
109 in
110 let pt1_group_by_sample = time_proc () in
111 let wt1_group_by_sample = time_wall () in
112
113 let wt0_group_by_digest = wt1_group_by_sample in
114 let pt0_group_by_digest = pt1_group_by_sample in
115 eprintf "[debug] hashing\n%!";
116 let groups =
117 if njobs > 1 then
118 let with_digests =
119 Stream.bag_map files ~njobs ~f:(fun {File.path; _} -> Digest.file path)
120 in
121 Stream.map (Stream.group_by with_digests ~f:snd) ~f:(
122 fun (digest, n, file_digest_pairs) ->
123 let files =
124 List.map file_digest_pairs ~f:(fun (file, _) ->
125 M.file_hashed metrics ~size:file.File.size;
126 file
127 )
128 in
129 (digest, n, files)
130 )
131 else
132 Stream.group_by files ~f:(fun {File.path; size} ->
133 M.file_hashed metrics ~size;
134 Digest.file path
135 )
136 in
137 let pt1_group_by_digest = time_proc () in
138 let wt1_group_by_digest = time_wall () in
139
140 eprintf "[debug] reporting\n%!";
141 Stream.iter groups ~f:(fun (d, n, files) ->
142 M.digest metrics;
143 if n > 1 then begin
144 M.redundant_data metrics ~size:(n * (List.hd files).File.size);
145 output d n files
146 end
147 );
148
149 let pt1_all = time_proc () in
150 let wt1_all = time_wall () in
151
152 M.report metrics
153 ~wall_time_all: (wt1_all -. wt0_all)
154 ~wall_time_group_by_size: (wt1_group_by_size -. wt0_group_by_size)
155 ~wall_time_group_by_head: (wt1_group_by_sample -. wt0_group_by_sample)
156 ~wall_time_group_by_digest:(wt1_group_by_digest -. wt0_group_by_digest)
157 ~proc_time_all: (pt1_all -. pt0_all)
158 ~proc_time_group_by_size: (pt1_group_by_size -. pt0_group_by_size)
159 ~proc_time_group_by_head: (pt1_group_by_sample -. pt0_group_by_sample)
160 ~proc_time_group_by_digest:(pt1_group_by_digest -. pt0_group_by_digest)
161
162 let get_opt () : opt =
163 let assert_ test x msg =
164 if not (test x) then begin
165 eprintf "%s\n%!" msg;
166 exit 1
167 end
168 in
169 let assert_file_exists path =
170 assert_ Sys.file_exists path (sprintf "File does not exist: %S" path)
171 in
172 let assert_file_is_dir path =
173 assert_ Sys.is_directory path (sprintf "File is not a directory: %S" path)
174 in
175 let input = ref Stdin in
176 let output = ref Stdout in
177 let ignore = ref (fun _ -> false) in
178 let sample = ref 512 in
179 let njobs = ref 6 in
180 let input_delim_null = ref false in
181 let spec =
182 [ ( "-out"
183 , Arg.String (fun path ->
184 assert_file_exists path;
185 assert_file_is_dir path;
186 output := Directory path
187 )
188 , " Output to this directory instead of stdout."
189 )
190 ; ( "-ignore"
191 , Arg.String (fun regexp ->
192 let regexp = Str.regexp regexp in
193 ignore := fun string -> Str.string_match regexp string 0)
194 , " Ignore file paths which match this regexp pattern (see Str module)."
195 )
196 ; ( "-sample"
197 , Arg.Set_int sample
198 , (sprintf " Byte size of file samples to use. Default: %d" !sample)
199 )
200 ; ( "-j"
201 , Arg.Set_int njobs
202 , (sprintf " Number of parallel jobs. Default: %d" !njobs)
203 )
204 ; ( "-0"
205 , Arg.Set input_delim_null
206 , ( sprintf
207 ( " Delimit input paths by null character instead of a newline."
208 ^^" Meaningful only when reading candidate paths from stdin."
209 ^^" Default: %B"
210 )
211 !input_delim_null
212 )
213 )
214 ]
215 in
216 Arg.parse
217 (Arg.align spec)
218 (fun path ->
219 assert_file_exists path;
220 assert_file_is_dir path;
221 match !input with
222 | Stdin ->
223 input := Directories [path]
224 | Directories paths ->
225 input := Directories (path :: paths)
226 )
227 "";
228 assert_
229 (fun x -> x > 0)
230 !sample
231 (sprintf "Sample size cannot be negative: %d" !sample);
232 { input = !input
233 ; output = !output
234 ; ignore = !ignore
235 ; sample = !sample
236 ; njobs = !njobs
237 ; delim_null = !input_delim_null
238 }
239
240 let () =
241 main (get_opt ())
This page took 0.052513 seconds and 4 git commands to generate.