From ddcbda0046a598d55746850e15d4fa99b3998ce0 Mon Sep 17 00:00:00 2001 From: Siraaj Khandkar Date: Tue, 14 May 2019 12:54:07 -0400 Subject: [PATCH] Move modules into dedicated files --- Makefile | 2 +- dups.ml | 483 +------------------------ lib/file.ml | 95 +++++ lib/file.mli | 17 + lib/in_channel.ml | 19 + lib/in_channel.mli | 1 + input_delim.mll => lib/input_delim.mll | 0 lib/metrics.ml | 122 +++++++ lib/metrics.mli | 36 ++ lib/stream.ml | 166 +++++++++ lib/stream.mli | 18 + 11 files changed, 477 insertions(+), 482 deletions(-) create mode 100644 lib/file.ml create mode 100644 lib/file.mli create mode 100644 lib/in_channel.ml create mode 100644 lib/in_channel.mli rename input_delim.mll => lib/input_delim.mll (100%) create mode 100644 lib/metrics.ml create mode 100644 lib/metrics.mli create mode 100644 lib/stream.ml create mode 100644 lib/stream.mli diff --git a/Makefile b/Makefile index 2807b9b..549cdef 100644 --- a/Makefile +++ b/Makefile @@ -8,7 +8,7 @@ all: @$(MAKE) -s build build: - @ocamlbuild -cflags '-w A' -pkgs 'str,unix' $(EXE_NAME).$(EXE_TYPE) + @ocamlbuild -cflags '-w A' -pkgs 'str,unix' -I lib $(EXE_NAME).$(EXE_TYPE) @cp _build/$(EXE_NAME).$(EXE_TYPE) $(EXE_NAME) @rm -f $(EXE_NAME).$(EXE_TYPE) diff --git a/dups.ml b/dups.ml index 2521b97..1483754 100644 --- a/dups.ml +++ b/dups.ml @@ -1,490 +1,11 @@ open Printf -module Array = ArrayLabels -module List = ListLabels +module List = ListLabels module StrSet = Set.Make(String) -module Unix = UnixLabels - -module Metrics : sig - type t - - val init - : unit -> t - val report - : t - -> wall_time_all:float - -> wall_time_group_by_size:float - -> wall_time_group_by_head:float - -> wall_time_group_by_digest:float - -> proc_time_all:float - -> proc_time_group_by_size:float - -> proc_time_group_by_head:float - -> proc_time_group_by_digest:float - -> unit - - val file_considered - : t -> size:int -> unit - val file_ignored - : t -> size:int -> unit - val file_empty - : t -> unit - val file_sampled - : t -> unit - val chunk_read - : t -> size:int -> unit - val file_unique_size - : t -> size:int -> unit - val file_unique_sample - : t -> size:int -> unit - val file_hashed - : t -> size:int -> unit - val digest - : t -> unit - val redundant_data - : t -> size:int -> unit -end = struct - type t = - { considered_files : int ref - ; considered_bytes : int ref - ; empty : int ref - ; ignored_files : int ref - ; ignored_bytes : int ref - ; unique_size_files : int ref - ; unique_size_bytes : int ref - ; unique_sample_files : int ref - ; unique_sample_bytes : int ref - ; sampled_files : int ref - ; sampled_bytes : int ref - ; hashed_files : int ref - ; hashed_bytes : int ref - ; digests : int ref - ; redundant_data : int ref - } - - let init () = - { considered_files = ref 0 - ; considered_bytes = ref 0 - ; empty = ref 0 - ; ignored_files = ref 0 - ; ignored_bytes = ref 0 - ; unique_size_files = ref 0 - ; unique_size_bytes = ref 0 - ; sampled_files = ref 0 - ; sampled_bytes = ref 0 - ; hashed_files = ref 0 - ; hashed_bytes = ref 0 - ; unique_sample_files = ref 0 - ; unique_sample_bytes = ref 0 - ; digests = ref 0 - ; redundant_data = ref 0 - } - - let add sum addend = - sum := !sum + addend - - let file_considered t ~size = - incr t.considered_files; - add t.considered_bytes size - - let file_ignored {ignored_files; ignored_bytes; _} ~size = - incr ignored_files; - add ignored_bytes size - - let file_empty t = - incr t.empty - - let chunk_read t ~size = - add t.sampled_bytes size - - let file_sampled t = - incr t.sampled_files - - let file_unique_size t ~size = - incr t.unique_size_files; - add t.unique_size_bytes size - - let file_unique_sample t ~size = - incr t.unique_sample_files; - add t.unique_sample_bytes size - - let file_hashed t ~size = - incr t.hashed_files; - add t.hashed_bytes size - - let digest t = - incr t.digests - - let redundant_data t ~size = - add t.redundant_data size - - let report - t - ~wall_time_all - ~wall_time_group_by_size - ~wall_time_group_by_head - ~wall_time_group_by_digest - ~proc_time_all - ~proc_time_group_by_size - ~proc_time_group_by_head - ~proc_time_group_by_digest - = - let b_to_mb b = (float_of_int b) /. 1024. /. 1024. in - let b_to_gb b = (b_to_mb b) /. 1024. in - eprintf "Total time : %.2f wall sec %.2f proc sec\n%!" - wall_time_all - proc_time_all; - eprintf "Considered : %8d files %6.2f Gb\n%!" - !(t.considered_files) - (b_to_gb !(t.considered_bytes)); - eprintf "Sampled : %8d files %6.2f Gb\n%!" - !(t.sampled_files) - (b_to_gb !(t.sampled_bytes)); - eprintf "Hashed : %8d files %6.2f Gb %6.2f wall sec %6.2f proc sec\n%!" - !(t.hashed_files) - (b_to_gb !(t.hashed_bytes)) - wall_time_group_by_digest - proc_time_group_by_digest; - eprintf "Digests : %8d\n%!" - !(t.digests); - eprintf "Duplicates (Hashed - Digests): %8d files %6.2f Gb\n%!" - (!(t.hashed_files) - !(t.digests)) - (b_to_gb !(t.redundant_data)); - eprintf "Skipped due to 0 size : %8d files\n%!" !(t.empty); - eprintf "Skipped due to unique size : %8d files %6.2f Gb %6.2f wall sec %6.2f proc sec\n%!" - !(t.unique_size_files) - (b_to_gb !(t.unique_size_bytes)) - wall_time_group_by_size - proc_time_group_by_size; - eprintf "Skipped due to unique sample : %8d files %6.2f Gb %6.2f wall sec %6.2f proc sec\n%!" - !(t.unique_sample_files) - (b_to_gb !(t.unique_sample_bytes)) - wall_time_group_by_head - proc_time_group_by_head; - eprintf "Ignored due to regex match : %8d files %6.2f Gb\n%!" - !(t.ignored_files) - (b_to_gb !(t.ignored_bytes)) -end +module Unix = UnixLabels module M = Metrics -module Stream : sig - type 'a t - - val create : (unit -> 'a option) -> 'a t - - val of_queue : 'a Queue.t -> 'a t - - val iter : 'a t -> f:('a -> unit) -> unit - - val bag_map : 'a t -> njobs:int -> f:('a -> 'b) -> ('a * 'b) t - (** Parallel map with arbitrarily-reordered elements. *) - - val map : 'a t -> f:('a -> 'b) -> 'b t - - val filter : 'a t -> f:('a -> bool) -> 'a t - - val concat : ('a t) list -> 'a t - - val group_by : 'a t -> f:('a -> 'b) -> ('b * int * 'a list) t -end = struct - module S = Stream - - type 'a t = - {mutable streams : ('a S.t) list} - - type ('input, 'output) msg_from_vassal = - | Ready of int - | Result of (int * ('input * 'output)) - | Exiting of int - - type 'input msg_from_lord = - | Job of 'input option - - let create f = - {streams = [S.from (fun _ -> f ())]} - - let of_queue q = - create (fun () -> - match Queue.take q with - | exception Queue.Empty -> - None - | x -> - Some x - ) - - let rec next t = - match t.streams with - | [] -> - None - | s :: streams -> - (match S.next s with - | exception Stream.Failure -> - t.streams <- streams; - next t - | x -> - Some x - ) - - let map t ~f = - create (fun () -> - match next t with - | None -> None - | Some x -> Some (f x) - ) - - let filter t ~f = - let rec filter () = - match next t with - | None -> - None - | Some x when f x -> - Some x - | Some _ -> - filter () - in - create filter - - let iter t ~f = - List.iter t.streams ~f:(S.iter f) - - let concat ts = - {streams = List.concat (List.map ts ~f:(fun {streams} -> streams))} - - let group_by t ~f = - let groups_tbl = Hashtbl.create 1_000_000 in - let group_update x = - let group = f x in - let members = - match Hashtbl.find_opt groups_tbl group with - | None -> - (1, [x]) - | Some (n, xs) -> - (succ n, x :: xs) - in - Hashtbl.replace groups_tbl group members - in - iter t ~f:group_update; - let groups = Queue.create () in - Hashtbl.iter - (fun name (length, members) -> Queue.add (name, length, members) groups) - groups_tbl; - of_queue groups - - module Ipc : sig - val send : out_channel -> 'a -> unit - val recv : in_channel -> 'a - end = struct - let send oc msg = - Marshal.to_channel oc msg []; - flush oc - - let recv ic = - Marshal.from_channel ic - end - - let lord t ~njobs ~vassals ~ic ~ocs = - let active_vassals = ref njobs in - let results = Queue.create () in - let rec loop () = - match ((Ipc.recv ic) : ('input, 'output) msg_from_vassal) with - | Exiting i -> - close_out ocs.(i); - decr active_vassals; - if !active_vassals = 0 then () else loop () - | Ready i -> - Ipc.send ocs.(i) (Job (next t)); - loop () - | Result (i, result) -> - Queue.add result results; - Ipc.send ocs.(i) (Job (next t)); - loop () - in - let rec wait = function - | [] -> - () - | vassals -> - let pid, _process_status = Unix.wait () in - (* TODO: handle process_status *) - wait (List.filter vassals ~f:(fun p -> p <> pid)) - in - loop (); - close_in ic; - wait vassals; - of_queue results - - let vassal i ~f ~vassal_pipe_r ~lord_pipe_w = - let ic = Unix.in_channel_of_descr vassal_pipe_r in - let oc = Unix.out_channel_of_descr lord_pipe_w in - let rec loop () = - match (Ipc.recv ic : 'input msg_from_lord) with - | Job (Some x) -> - Ipc.send oc (Result (i, (x, f x))); - loop () - | Job None -> - Ipc.send oc (Exiting i) - in - Ipc.send oc (Ready i); - loop (); - close_in ic; - close_out oc; - exit 0 - - let bag_map t ~njobs ~f = - let lord_pipe_r, lord_pipe_w = Unix.pipe () in - let vassal_pipes = Array.init njobs ~f:(fun _ -> Unix.pipe ()) in - let vassal_pipes_r = Array.map vassal_pipes ~f:(fun (r, _) -> r) in - let vassal_pipes_w = Array.map vassal_pipes ~f:(fun (_, w) -> w) in - let vassals = ref [] in - for i=0 to (njobs - 1) do - begin match Unix.fork () with - | 0 -> - Unix.close lord_pipe_r; - vassal i ~f ~lord_pipe_w ~vassal_pipe_r:vassal_pipes_r.(i) - | pid -> - vassals := pid :: !vassals - end - done; - Unix.close lord_pipe_w; - lord - t - ~njobs - ~vassals:!vassals - ~ic:(Unix.in_channel_of_descr lord_pipe_r) - ~ocs:(Array.map vassal_pipes_w ~f:Unix.out_channel_of_descr) -end - -module In_channel : sig - val lines : ?delim_null:bool -> in_channel -> string Stream.t -end = struct - let read_until_newline ic () = - match input_line ic with - | exception End_of_file -> - None - | line -> - Some line - - let read_until_null ic = - let lexbuf = Lexing.from_channel ic in - fun () -> Input_delim.by_null lexbuf - - let lines ?(delim_null=false) ic = - let reader = - if delim_null then - read_until_null ic - else - read_until_newline ic - in - Stream.create reader -end - -module File : sig - type t = - { path : string - ; size : int - } - - val find : string -> t Stream.t - (** Find all files in the directory tree, starting from the given root path *) - - val lookup : string Stream.t -> t Stream.t - (** Lookup file info for given paths *) - - val head : t -> len:int -> metrics:M.t -> string - - val filter_out_unique_sizes : t Stream.t -> metrics:M.t -> t Stream.t - val filter_out_unique_heads : t Stream.t -> len:int -> metrics:M.t -> t Stream.t -end = struct - type t = - { path : string - ; size : int - } - - let lookup paths = - Stream.map paths ~f:(fun path -> - let {Unix.st_size = size; _} = Unix.lstat path in - {path; size} - ) - - let find root = - let dirs = Queue.create () in - let files = Queue.create () in - let explore parent = - Array.iter (Sys.readdir parent) ~f:(fun child -> - let path = Filename.concat parent child in - let {Unix.st_kind = file_kind; st_size; _} = Unix.lstat path in - match file_kind with - | Unix.S_REG -> - let file = {path; size = st_size} in - Queue.add file files - | Unix.S_DIR -> - Queue.add path dirs - | Unix.S_CHR - | Unix.S_BLK - | Unix.S_LNK - | Unix.S_FIFO - | Unix.S_SOCK -> - () - ) - in - explore root; - let rec next () = - match Queue.is_empty files, Queue.is_empty dirs with - | false, _ -> Some (Queue.take files) - | true , true -> None - | true , false -> - explore (Queue.take dirs); - next () - in - Stream.create next - - let filter_out_singletons files ~group ~handle_singleton = - let q = Queue.create () in - Stream.iter (Stream.group_by files ~f:group) ~f:(fun group -> - let (_, n, members) = group in - if n > 1 then - List.iter members ~f:(fun m -> Queue.add m q) - else - handle_singleton group - ); - Stream.of_queue q - - let filter_out_unique_sizes files ~metrics = - filter_out_singletons - files - ~group:(fun {size; _} -> size) - ~handle_singleton:(fun (size, _, _) -> M.file_unique_size metrics ~size) - - let head {path; _} ~len ~metrics = - M.file_sampled metrics; - let buf = Bytes.make len ' ' in - let ic = open_in_bin path in - let rec read pos len = - assert (len >= 0); - if len = 0 then - () - else begin - let chunk_size = input ic buf pos len in - M.chunk_read metrics ~size:chunk_size; - if chunk_size = 0 then (* EOF *) - () - else - read (pos + chunk_size) (len - chunk_size) - end - in - read 0 len; - close_in ic; - Bytes.to_string buf - - let filter_out_unique_heads files ~len ~metrics = - filter_out_singletons - files - ~group:(head ~len ~metrics) - ~handle_singleton:(fun (_, _, files) -> - let {size; _} = List.hd files in (* Guaranteed non-empty *) - M.file_unique_sample metrics ~size - ) -end - type input = | Stdin | Directories of string list diff --git a/lib/file.ml b/lib/file.ml new file mode 100644 index 0000000..df7e608 --- /dev/null +++ b/lib/file.ml @@ -0,0 +1,95 @@ +module Array = ArrayLabels +module List = ListLabels +module Unix = UnixLabels + +module M = Metrics + +type t = + { path : string + ; size : int + } + +let lookup paths = + Stream.map paths ~f:(fun path -> + let {Unix.st_size = size; _} = Unix.lstat path in + {path; size} + ) + +let find root = + let dirs = Queue.create () in + let files = Queue.create () in + let explore parent = + Array.iter (Sys.readdir parent) ~f:(fun child -> + let path = Filename.concat parent child in + let {Unix.st_kind = file_kind; st_size; _} = Unix.lstat path in + match file_kind with + | Unix.S_REG -> + let file = {path; size = st_size} in + Queue.add file files + | Unix.S_DIR -> + Queue.add path dirs + | Unix.S_CHR + | Unix.S_BLK + | Unix.S_LNK + | Unix.S_FIFO + | Unix.S_SOCK -> + () + ) + in + explore root; + let rec next () = + match Queue.is_empty files, Queue.is_empty dirs with + | false, _ -> Some (Queue.take files) + | true , true -> None + | true , false -> + explore (Queue.take dirs); + next () + in + Stream.create next + +let filter_out_singletons files ~group ~handle_singleton = + let q = Queue.create () in + Stream.iter (Stream.group_by files ~f:group) ~f:(fun group -> + let (_, n, members) = group in + if n > 1 then + List.iter members ~f:(fun m -> Queue.add m q) + else + handle_singleton group + ); + Stream.of_queue q + +let filter_out_unique_sizes files ~metrics = + filter_out_singletons + files + ~group:(fun {size; _} -> size) + ~handle_singleton:(fun (size, _, _) -> M.file_unique_size metrics ~size) + +let head {path; _} ~len ~metrics = + M.file_sampled metrics; + let buf = Bytes.make len ' ' in + let ic = open_in_bin path in + let rec read pos len = + assert (len >= 0); + if len = 0 then + () + else begin + let chunk_size = input ic buf pos len in + M.chunk_read metrics ~size:chunk_size; + if chunk_size = 0 then (* EOF *) + () + else + read (pos + chunk_size) (len - chunk_size) + end + in + read 0 len; + close_in ic; + Bytes.to_string buf + +let filter_out_unique_heads files ~len ~metrics = + filter_out_singletons + files + ~group:(head ~len ~metrics) + ~handle_singleton:(fun (_, _, files) -> + let {size; _} = List.hd files in (* Guaranteed non-empty *) + M.file_unique_sample metrics ~size + ) diff --git a/lib/file.mli b/lib/file.mli new file mode 100644 index 0000000..d6e6fa0 --- /dev/null +++ b/lib/file.mli @@ -0,0 +1,17 @@ +module M = Metrics + +type t = + { path : string + ; size : int + } + +val find : string -> t Stream.t +(** Find all files in the directory tree, starting from the given root path *) + +val lookup : string Stream.t -> t Stream.t +(** Lookup file info for given paths *) + +val head : t -> len:int -> metrics:M.t -> string + +val filter_out_unique_sizes : t Stream.t -> metrics:M.t -> t Stream.t +val filter_out_unique_heads : t Stream.t -> len:int -> metrics:M.t -> t Stream.t diff --git a/lib/in_channel.ml b/lib/in_channel.ml new file mode 100644 index 0000000..3581d0e --- /dev/null +++ b/lib/in_channel.ml @@ -0,0 +1,19 @@ +let read_until_newline ic () = + match input_line ic with + | exception End_of_file -> + None + | line -> + Some line + +let read_until_null ic = + let lexbuf = Lexing.from_channel ic in + fun () -> Input_delim.by_null lexbuf + +let lines ?(delim_null=false) ic = + let reader = + if delim_null then + read_until_null ic + else + read_until_newline ic + in + Stream.create reader diff --git a/lib/in_channel.mli b/lib/in_channel.mli new file mode 100644 index 0000000..5b9cad5 --- /dev/null +++ b/lib/in_channel.mli @@ -0,0 +1 @@ +val lines : ?delim_null:bool -> in_channel -> string Stream.t diff --git a/input_delim.mll b/lib/input_delim.mll similarity index 100% rename from input_delim.mll rename to lib/input_delim.mll diff --git a/lib/metrics.ml b/lib/metrics.ml new file mode 100644 index 0000000..8775874 --- /dev/null +++ b/lib/metrics.ml @@ -0,0 +1,122 @@ +open Printf + +type t = + { considered_files : int ref + ; considered_bytes : int ref + ; empty : int ref + ; ignored_files : int ref + ; ignored_bytes : int ref + ; unique_size_files : int ref + ; unique_size_bytes : int ref + ; unique_sample_files : int ref + ; unique_sample_bytes : int ref + ; sampled_files : int ref + ; sampled_bytes : int ref + ; hashed_files : int ref + ; hashed_bytes : int ref + ; digests : int ref + ; redundant_data : int ref + } + +let init () = + { considered_files = ref 0 + ; considered_bytes = ref 0 + ; empty = ref 0 + ; ignored_files = ref 0 + ; ignored_bytes = ref 0 + ; unique_size_files = ref 0 + ; unique_size_bytes = ref 0 + ; sampled_files = ref 0 + ; sampled_bytes = ref 0 + ; hashed_files = ref 0 + ; hashed_bytes = ref 0 + ; unique_sample_files = ref 0 + ; unique_sample_bytes = ref 0 + ; digests = ref 0 + ; redundant_data = ref 0 + } + +let add sum addend = + sum := !sum + addend + +let file_considered t ~size = + incr t.considered_files; + add t.considered_bytes size + +let file_ignored {ignored_files; ignored_bytes; _} ~size = + incr ignored_files; + add ignored_bytes size + +let file_empty t = + incr t.empty + +let chunk_read t ~size = + add t.sampled_bytes size + +let file_sampled t = + incr t.sampled_files + +let file_unique_size t ~size = + incr t.unique_size_files; + add t.unique_size_bytes size + +let file_unique_sample t ~size = + incr t.unique_sample_files; + add t.unique_sample_bytes size + +let file_hashed t ~size = + incr t.hashed_files; + add t.hashed_bytes size + +let digest t = + incr t.digests + +let redundant_data t ~size = + add t.redundant_data size + +let report + t + ~wall_time_all + ~wall_time_group_by_size + ~wall_time_group_by_head + ~wall_time_group_by_digest + ~proc_time_all + ~proc_time_group_by_size + ~proc_time_group_by_head + ~proc_time_group_by_digest += + let b_to_mb b = (float_of_int b) /. 1024. /. 1024. in + let b_to_gb b = (b_to_mb b) /. 1024. in + eprintf "Total time : %.2f wall sec %.2f proc sec\n%!" + wall_time_all + proc_time_all; + eprintf "Considered : %8d files %6.2f Gb\n%!" + !(t.considered_files) + (b_to_gb !(t.considered_bytes)); + eprintf "Sampled : %8d files %6.2f Gb\n%!" + !(t.sampled_files) + (b_to_gb !(t.sampled_bytes)); + eprintf "Hashed : %8d files %6.2f Gb %6.2f wall sec %6.2f proc sec\n%!" + !(t.hashed_files) + (b_to_gb !(t.hashed_bytes)) + wall_time_group_by_digest + proc_time_group_by_digest; + eprintf "Digests : %8d\n%!" + !(t.digests); + eprintf "Duplicates (Hashed - Digests): %8d files %6.2f Gb\n%!" + (!(t.hashed_files) - !(t.digests)) + (b_to_gb !(t.redundant_data)); + eprintf "Skipped due to 0 size : %8d files\n%!" !(t.empty); + eprintf "Skipped due to unique size : %8d files %6.2f Gb %6.2f wall sec %6.2f proc sec\n%!" + !(t.unique_size_files) + (b_to_gb !(t.unique_size_bytes)) + wall_time_group_by_size + proc_time_group_by_size; + eprintf "Skipped due to unique sample : %8d files %6.2f Gb %6.2f wall sec %6.2f proc sec\n%!" + !(t.unique_sample_files) + (b_to_gb !(t.unique_sample_bytes)) + wall_time_group_by_head + proc_time_group_by_head; + eprintf "Ignored due to regex match : %8d files %6.2f Gb\n%!" + !(t.ignored_files) + (b_to_gb !(t.ignored_bytes)) diff --git a/lib/metrics.mli b/lib/metrics.mli new file mode 100644 index 0000000..27adc8c --- /dev/null +++ b/lib/metrics.mli @@ -0,0 +1,36 @@ +type t + +val init + : unit -> t +val report + : t + -> wall_time_all:float + -> wall_time_group_by_size:float + -> wall_time_group_by_head:float + -> wall_time_group_by_digest:float + -> proc_time_all:float + -> proc_time_group_by_size:float + -> proc_time_group_by_head:float + -> proc_time_group_by_digest:float + -> unit + +val file_considered + : t -> size:int -> unit +val file_ignored + : t -> size:int -> unit +val file_empty + : t -> unit +val file_sampled + : t -> unit +val chunk_read + : t -> size:int -> unit +val file_unique_size + : t -> size:int -> unit +val file_unique_sample + : t -> size:int -> unit +val file_hashed + : t -> size:int -> unit +val digest + : t -> unit +val redundant_data + : t -> size:int -> unit diff --git a/lib/stream.ml b/lib/stream.ml new file mode 100644 index 0000000..4d60816 --- /dev/null +++ b/lib/stream.ml @@ -0,0 +1,166 @@ +module Array = ArrayLabels +module List = ListLabels +module S = Stdlib.Stream + +type 'a t = + {mutable streams : ('a S.t) list} + +type ('input, 'output) msg_from_vassal = + | Ready of int + | Result of (int * ('input * 'output)) + | Exiting of int + +type 'input msg_from_lord = + | Job of 'input option + +let create f = + {streams = [S.from (fun _ -> f ())]} + +let of_queue q = + create (fun () -> + match Queue.take q with + | exception Queue.Empty -> + None + | x -> + Some x + ) + +let rec next t = + match t.streams with + | [] -> + None + | s :: streams -> + (match S.next s with + | exception S.Failure -> + t.streams <- streams; + next t + | x -> + Some x + ) + +let map t ~f = + create (fun () -> + match next t with + | None -> None + | Some x -> Some (f x) + ) + +let filter t ~f = + let rec filter () = + match next t with + | None -> + None + | Some x when f x -> + Some x + | Some _ -> + filter () + in + create filter + +let iter t ~f = + List.iter t.streams ~f:(S.iter f) + +let concat ts = + {streams = List.concat (List.map ts ~f:(fun {streams} -> streams))} + +let group_by t ~f = + let groups_tbl = Hashtbl.create 1_000_000 in + let group_update x = + let group = f x in + let members = + match Hashtbl.find_opt groups_tbl group with + | None -> + (1, [x]) + | Some (n, xs) -> + (succ n, x :: xs) + in + Hashtbl.replace groups_tbl group members + in + iter t ~f:group_update; + let groups = Queue.create () in + Hashtbl.iter + (fun name (length, members) -> Queue.add (name, length, members) groups) + groups_tbl; + of_queue groups + +module Ipc : sig + val send : out_channel -> 'a -> unit + val recv : in_channel -> 'a +end = struct + let send oc msg = + Marshal.to_channel oc msg []; + flush oc + + let recv ic = + Marshal.from_channel ic +end + +let lord t ~njobs ~vassals ~ic ~ocs = + let active_vassals = ref njobs in + let results = Queue.create () in + let rec loop () = + match ((Ipc.recv ic) : ('input, 'output) msg_from_vassal) with + | Exiting i -> + close_out ocs.(i); + decr active_vassals; + if !active_vassals = 0 then () else loop () + | Ready i -> + Ipc.send ocs.(i) (Job (next t)); + loop () + | Result (i, result) -> + Queue.add result results; + Ipc.send ocs.(i) (Job (next t)); + loop () + in + let rec wait = function + | [] -> + () + | vassals -> + let pid, _process_status = Unix.wait () in + (* TODO: handle process_status *) + wait (List.filter vassals ~f:(fun p -> p <> pid)) + in + loop (); + close_in ic; + wait vassals; + of_queue results + +let vassal i ~f ~vassal_pipe_r ~lord_pipe_w = + let ic = Unix.in_channel_of_descr vassal_pipe_r in + let oc = Unix.out_channel_of_descr lord_pipe_w in + let rec loop () = + match (Ipc.recv ic : 'input msg_from_lord) with + | Job (Some x) -> + Ipc.send oc (Result (i, (x, f x))); + loop () + | Job None -> + Ipc.send oc (Exiting i) + in + Ipc.send oc (Ready i); + loop (); + close_in ic; + close_out oc; + exit 0 + +let bag_map t ~njobs ~f = + let lord_pipe_r, lord_pipe_w = Unix.pipe () in + let vassal_pipes = Array.init njobs ~f:(fun _ -> Unix.pipe ()) in + let vassal_pipes_r = Array.map vassal_pipes ~f:(fun (r, _) -> r) in + let vassal_pipes_w = Array.map vassal_pipes ~f:(fun (_, w) -> w) in + let vassals = ref [] in + for i=0 to (njobs - 1) do + begin match Unix.fork () with + | 0 -> + Unix.close lord_pipe_r; + vassal i ~f ~lord_pipe_w ~vassal_pipe_r:vassal_pipes_r.(i) + | pid -> + vassals := pid :: !vassals + end + done; + Unix.close lord_pipe_w; + lord + t + ~njobs + ~vassals:!vassals + ~ic:(Unix.in_channel_of_descr lord_pipe_r) + ~ocs:(Array.map vassal_pipes_w ~f:Unix.out_channel_of_descr) diff --git a/lib/stream.mli b/lib/stream.mli new file mode 100644 index 0000000..6cb5b11 --- /dev/null +++ b/lib/stream.mli @@ -0,0 +1,18 @@ +type 'a t + +val create : (unit -> 'a option) -> 'a t + +val of_queue : 'a Queue.t -> 'a t + +val iter : 'a t -> f:('a -> unit) -> unit + +val bag_map : 'a t -> njobs:int -> f:('a -> 'b) -> ('a * 'b) t +(** Parallel map with arbitrarily-reordered elements. *) + +val map : 'a t -> f:('a -> 'b) -> 'b t + +val filter : 'a t -> f:('a -> bool) -> 'a t + +val concat : ('a t) list -> 'a t + +val group_by : 'a t -> f:('a -> 'b) -> ('b * int * 'a list) t -- 2.20.1