| 1 | open Printf |
| 2 | |
| 3 | module List = ListLabels |
| 4 | module StrSet = Set.Make(String) |
| 5 | module Unix = UnixLabels |
| 6 | |
| 7 | module M = Metrics |
| 8 | |
| 9 | type input = |
| 10 | | Stdin |
| 11 | | Directories of string list |
| 12 | |
| 13 | type output = |
| 14 | | Stdout |
| 15 | | Directory of string |
| 16 | |
| 17 | type opt = |
| 18 | { input : input |
| 19 | ; output : output |
| 20 | ; ignore : string -> bool |
| 21 | ; sample : int |
| 22 | ; njobs : int |
| 23 | ; delim_null : bool |
| 24 | } |
| 25 | |
| 26 | let make_input_stream input ignore ~metrics ~delim_null = |
| 27 | let input = |
| 28 | match input with |
| 29 | | Stdin -> |
| 30 | File.lookup (In_channel.lines stdin ~delim_null) |
| 31 | | Directories paths -> |
| 32 | let paths = StrSet.elements (StrSet.of_list paths) in |
| 33 | Stream.concat (List.map paths ~f:File.find) |
| 34 | in |
| 35 | Stream.filter input ~f:(fun {File.path; size} -> |
| 36 | M.file_considered metrics ~size; |
| 37 | let empty = size = 0 in |
| 38 | let ignored = ignore path in |
| 39 | if empty then M.file_empty metrics; |
| 40 | if ignored then M.file_ignored metrics ~size; |
| 41 | (not empty) && (not ignored) |
| 42 | ) |
| 43 | |
| 44 | let make_output_fun = function |
| 45 | | Stdout -> |
| 46 | fun digest n_files files -> |
| 47 | printf "%s %d\n%!" (Digest.to_hex digest) n_files; |
| 48 | List.iter files ~f:(fun {File.path; _} -> |
| 49 | printf " %S\n%!" path |
| 50 | ) |
| 51 | | Directory dir -> |
| 52 | fun digest _ files -> |
| 53 | let digest = Digest.to_hex digest in |
| 54 | let dir = Filename.concat dir (String.sub digest 0 2) in |
| 55 | Unix.mkdir dir ~perm:0o700; |
| 56 | let oc = open_out (Filename.concat dir digest) in |
| 57 | List.iter files ~f:(fun {File.path; _} -> |
| 58 | output_string oc (sprintf "%S\n%!" path) |
| 59 | ); |
| 60 | close_out oc |
| 61 | |
| 62 | let time_wall () = |
| 63 | Unix.gettimeofday () |
| 64 | |
| 65 | let time_proc () = |
| 66 | Sys.time () |
| 67 | |
| 68 | let main {input; output; ignore; sample = sample_len; njobs; delim_null} = |
| 69 | let wt0_all = time_wall () in |
| 70 | let pt0_all = time_proc () in |
| 71 | let metrics = M.init () in |
| 72 | let output = make_output_fun output in |
| 73 | let input = make_input_stream input ignore ~metrics ~delim_null in |
| 74 | (* TODO: Make a nice(r) abstraction to re-assemble pieces in the pipeline: |
| 75 | * |
| 76 | * from input to files_by_size |
| 77 | * from files_by_size to files_by_sample |
| 78 | * from files_by_sample to files_by_digest |
| 79 | * from files_by_digest to output |
| 80 | * |
| 81 | * input |> files_by_size |> files_by_sample |> files_by_digest |> output |
| 82 | *) |
| 83 | |
| 84 | let files = input in |
| 85 | |
| 86 | let wt0_group_by_size = time_wall () in |
| 87 | let pt0_group_by_size = time_proc () in |
| 88 | eprintf "[debug] filtering-out files with unique size\n%!"; |
| 89 | let files = File.filter_out_unique_sizes files ~metrics in |
| 90 | let pt1_group_by_size = time_proc () in |
| 91 | let wt1_group_by_size = time_wall () in |
| 92 | |
| 93 | let wt0_group_by_sample = wt1_group_by_size in |
| 94 | let pt0_group_by_sample = pt1_group_by_size in |
| 95 | eprintf "[debug] filtering-out files with unique heads\n%!"; |
| 96 | let files = |
| 97 | if njobs > 1 then begin |
| 98 | let q = Queue.create () in |
| 99 | files |
| 100 | |> Stream.bag_map ~njobs ~f:(File.head ~len:sample_len ~metrics) |
| 101 | |> Stream.group_by ~f:snd |
| 102 | |> Stream.map ~f:(fun (d, n, pairs) -> (d, n, List.map pairs ~f:fst)) |
| 103 | |> Stream.filter ~f:(fun (_, n, _) -> n > 1) |
| 104 | |> Stream.iter ~f:(fun (_, _, fs) -> List.iter fs ~f:(fun f -> Queue.add f q)) |
| 105 | ; |
| 106 | Stream.of_queue q |
| 107 | end else |
| 108 | File.filter_out_unique_heads files ~len:sample_len ~metrics |
| 109 | in |
| 110 | let pt1_group_by_sample = time_proc () in |
| 111 | let wt1_group_by_sample = time_wall () in |
| 112 | |
| 113 | let wt0_group_by_digest = wt1_group_by_sample in |
| 114 | let pt0_group_by_digest = pt1_group_by_sample in |
| 115 | eprintf "[debug] hashing\n%!"; |
| 116 | let groups = |
| 117 | if njobs > 1 then |
| 118 | let with_digests = |
| 119 | Stream.bag_map files ~njobs ~f:(fun {File.path; _} -> Digest.file path) |
| 120 | in |
| 121 | Stream.map (Stream.group_by with_digests ~f:snd) ~f:( |
| 122 | fun (digest, n, file_digest_pairs) -> |
| 123 | let files = |
| 124 | List.map file_digest_pairs ~f:(fun (file, _) -> |
| 125 | M.file_hashed metrics ~size:file.File.size; |
| 126 | file |
| 127 | ) |
| 128 | in |
| 129 | (digest, n, files) |
| 130 | ) |
| 131 | else |
| 132 | Stream.group_by files ~f:(fun {File.path; size} -> |
| 133 | M.file_hashed metrics ~size; |
| 134 | Digest.file path |
| 135 | ) |
| 136 | in |
| 137 | let pt1_group_by_digest = time_proc () in |
| 138 | let wt1_group_by_digest = time_wall () in |
| 139 | |
| 140 | eprintf "[debug] reporting\n%!"; |
| 141 | Stream.iter groups ~f:(fun (d, n, files) -> |
| 142 | M.digest metrics; |
| 143 | if n > 1 then begin |
| 144 | M.redundant_data metrics ~size:(n * (List.hd files).File.size); |
| 145 | output d n files |
| 146 | end |
| 147 | ); |
| 148 | |
| 149 | let pt1_all = time_proc () in |
| 150 | let wt1_all = time_wall () in |
| 151 | |
| 152 | M.report metrics |
| 153 | ~wall_time_all: (wt1_all -. wt0_all) |
| 154 | ~wall_time_group_by_size: (wt1_group_by_size -. wt0_group_by_size) |
| 155 | ~wall_time_group_by_head: (wt1_group_by_sample -. wt0_group_by_sample) |
| 156 | ~wall_time_group_by_digest:(wt1_group_by_digest -. wt0_group_by_digest) |
| 157 | ~proc_time_all: (pt1_all -. pt0_all) |
| 158 | ~proc_time_group_by_size: (pt1_group_by_size -. pt0_group_by_size) |
| 159 | ~proc_time_group_by_head: (pt1_group_by_sample -. pt0_group_by_sample) |
| 160 | ~proc_time_group_by_digest:(pt1_group_by_digest -. pt0_group_by_digest) |
| 161 | |
| 162 | let get_opt () : opt = |
| 163 | let assert_ test x msg = |
| 164 | if not (test x) then begin |
| 165 | eprintf "%s\n%!" msg; |
| 166 | exit 1 |
| 167 | end |
| 168 | in |
| 169 | let assert_file_exists path = |
| 170 | assert_ Sys.file_exists path (sprintf "File does not exist: %S" path) |
| 171 | in |
| 172 | let assert_file_is_dir path = |
| 173 | assert_ Sys.is_directory path (sprintf "File is not a directory: %S" path) |
| 174 | in |
| 175 | let input = ref Stdin in |
| 176 | let output = ref Stdout in |
| 177 | let ignore = ref (fun _ -> false) in |
| 178 | let sample = ref 512 in |
| 179 | let njobs = ref 6 in |
| 180 | let input_delim_null = ref false in |
| 181 | let spec = |
| 182 | [ ( "-out" |
| 183 | , Arg.String (fun path -> |
| 184 | assert_file_exists path; |
| 185 | assert_file_is_dir path; |
| 186 | output := Directory path |
| 187 | ) |
| 188 | , " Output to this directory instead of stdout." |
| 189 | ) |
| 190 | ; ( "-ignore" |
| 191 | , Arg.String (fun regexp -> |
| 192 | let regexp = Str.regexp regexp in |
| 193 | ignore := fun string -> Str.string_match regexp string 0) |
| 194 | , " Ignore file paths which match this regexp pattern (see Str module)." |
| 195 | ) |
| 196 | ; ( "-sample" |
| 197 | , Arg.Set_int sample |
| 198 | , (sprintf " Byte size of file samples to use. Default: %d" !sample) |
| 199 | ) |
| 200 | ; ( "-j" |
| 201 | , Arg.Set_int njobs |
| 202 | , (sprintf " Number of parallel jobs. Default: %d" !njobs) |
| 203 | ) |
| 204 | ; ( "-0" |
| 205 | , Arg.Set input_delim_null |
| 206 | , ( sprintf |
| 207 | ( " Delimit input paths by null character instead of a newline." |
| 208 | ^^" Meaningful only when reading candidate paths from stdin." |
| 209 | ^^" Default: %B" |
| 210 | ) |
| 211 | !input_delim_null |
| 212 | ) |
| 213 | ) |
| 214 | ] |
| 215 | in |
| 216 | Arg.parse |
| 217 | (Arg.align spec) |
| 218 | (fun path -> |
| 219 | assert_file_exists path; |
| 220 | assert_file_is_dir path; |
| 221 | match !input with |
| 222 | | Stdin -> |
| 223 | input := Directories [path] |
| 224 | | Directories paths -> |
| 225 | input := Directories (path :: paths) |
| 226 | ) |
| 227 | ""; |
| 228 | assert_ |
| 229 | (fun x -> x > 0) |
| 230 | !sample |
| 231 | (sprintf "Sample size cannot be negative: %d" !sample); |
| 232 | { input = !input |
| 233 | ; output = !output |
| 234 | ; ignore = !ignore |
| 235 | ; sample = !sample |
| 236 | ; njobs = !njobs |
| 237 | ; delim_null = !input_delim_null |
| 238 | } |
| 239 | |
| 240 | let () = |
| 241 | main (get_opt ()) |