Move modules into dedicated files
[dups.git] / lib / stream.ml
diff --git a/lib/stream.ml b/lib/stream.ml
new file mode 100644 (file)
index 0000000..4d60816
--- /dev/null
@@ -0,0 +1,166 @@
+module Array = ArrayLabels
+module List  = ListLabels
+module S     = Stdlib.Stream
+
+type 'a t =
+  {mutable streams : ('a S.t) list}
+
+type ('input, 'output) msg_from_vassal =
+  | Ready of int
+  | Result of (int * ('input * 'output))
+  | Exiting of int
+
+type 'input msg_from_lord =
+  | Job of 'input option
+
+let create f =
+  {streams = [S.from (fun _ -> f ())]}
+
+let of_queue q =
+  create (fun () ->
+    match Queue.take q with
+    | exception Queue.Empty ->
+        None
+    | x ->
+        Some x
+  )
+
+let rec next t =
+  match t.streams with
+  | [] ->
+      None
+  | s :: streams ->
+      (match S.next s with
+      | exception S.Failure ->
+          t.streams <- streams;
+          next t
+      | x ->
+          Some x
+      )
+
+let map t ~f =
+  create (fun () ->
+    match next t with
+    | None   -> None
+    | Some x -> Some (f x)
+  )
+
+let filter t ~f =
+  let rec filter () =
+    match next t with
+    | None ->
+        None
+    | Some x when f x ->
+        Some x
+    | Some _ ->
+        filter ()
+  in
+  create filter
+
+let iter t ~f =
+  List.iter t.streams ~f:(S.iter f)
+
+let concat ts =
+  {streams = List.concat (List.map ts ~f:(fun {streams} -> streams))}
+
+let group_by t ~f =
+  let groups_tbl = Hashtbl.create 1_000_000 in
+  let group_update x =
+    let group = f x in
+    let members =
+      match Hashtbl.find_opt groups_tbl group with
+      | None ->
+          (1, [x])
+      | Some (n, xs) ->
+          (succ n, x :: xs)
+    in
+    Hashtbl.replace groups_tbl group members
+  in
+  iter t ~f:group_update;
+  let groups = Queue.create () in
+  Hashtbl.iter
+    (fun name (length, members) -> Queue.add (name, length, members) groups)
+    groups_tbl;
+  of_queue groups
+
+module Ipc : sig
+  val send : out_channel -> 'a -> unit
+  val recv : in_channel -> 'a
+end = struct
+  let send oc msg =
+    Marshal.to_channel oc msg [];
+    flush oc
+
+  let recv ic =
+    Marshal.from_channel ic
+end
+
+let lord t ~njobs ~vassals ~ic ~ocs =
+  let active_vassals = ref njobs in
+  let results = Queue.create () in
+  let rec loop () =
+    match ((Ipc.recv ic) : ('input, 'output) msg_from_vassal) with
+    | Exiting i ->
+        close_out ocs.(i);
+        decr active_vassals;
+        if !active_vassals = 0 then () else loop ()
+    | Ready i ->
+        Ipc.send ocs.(i) (Job (next t));
+        loop ()
+    | Result (i, result) ->
+        Queue.add result results;
+        Ipc.send ocs.(i) (Job (next t));
+        loop ()
+  in
+  let rec wait = function
+    | [] ->
+        ()
+    | vassals ->
+        let pid, _process_status = Unix.wait () in
+        (* TODO: handle process_status *)
+        wait (List.filter vassals ~f:(fun p -> p <> pid))
+  in
+  loop ();
+  close_in ic;
+  wait vassals;
+  of_queue results
+
+let vassal i ~f ~vassal_pipe_r ~lord_pipe_w =
+  let ic = Unix.in_channel_of_descr vassal_pipe_r in
+  let oc = Unix.out_channel_of_descr lord_pipe_w in
+  let rec loop () =
+    match (Ipc.recv ic : 'input msg_from_lord) with
+    | Job (Some x) ->
+        Ipc.send oc (Result (i, (x, f x)));
+        loop ()
+    | Job None ->
+        Ipc.send oc (Exiting i)
+  in
+  Ipc.send oc (Ready i);
+  loop ();
+  close_in ic;
+  close_out oc;
+  exit 0
+
+let bag_map t ~njobs ~f =
+  let lord_pipe_r, lord_pipe_w = Unix.pipe () in
+  let vassal_pipes   = Array.init njobs ~f:(fun _ -> Unix.pipe ()) in
+  let vassal_pipes_r = Array.map vassal_pipes ~f:(fun (r, _) -> r) in
+  let vassal_pipes_w = Array.map vassal_pipes ~f:(fun (_, w) -> w) in
+  let vassals = ref [] in
+  for i=0 to (njobs - 1) do
+    begin match Unix.fork () with
+    | 0 ->
+        Unix.close lord_pipe_r;
+        vassal i ~f ~lord_pipe_w ~vassal_pipe_r:vassal_pipes_r.(i)
+    | pid ->
+        vassals := pid :: !vassals
+    end
+  done;
+  Unix.close lord_pipe_w;
+  lord
+    t
+    ~njobs
+    ~vassals:!vassals
+    ~ic:(Unix.in_channel_of_descr lord_pipe_r)
+    ~ocs:(Array.map vassal_pipes_w ~f:Unix.out_channel_of_descr)
This page took 0.026863 seconds and 4 git commands to generate.