--- /dev/null
+module Array = ArrayLabels
+module List = ListLabels
+module S = Stdlib.Stream
+
+type 'a t =
+ {mutable streams : ('a S.t) list}
+
+type ('input, 'output) msg_from_vassal =
+ | Ready of int
+ | Result of (int * ('input * 'output))
+ | Exiting of int
+
+type 'input msg_from_lord =
+ | Job of 'input option
+
+let create f =
+ {streams = [S.from (fun _ -> f ())]}
+
+let of_queue q =
+ create (fun () ->
+ match Queue.take q with
+ | exception Queue.Empty ->
+ None
+ | x ->
+ Some x
+ )
+
+let rec next t =
+ match t.streams with
+ | [] ->
+ None
+ | s :: streams ->
+ (match S.next s with
+ | exception S.Failure ->
+ t.streams <- streams;
+ next t
+ | x ->
+ Some x
+ )
+
+let map t ~f =
+ create (fun () ->
+ match next t with
+ | None -> None
+ | Some x -> Some (f x)
+ )
+
+let filter t ~f =
+ let rec filter () =
+ match next t with
+ | None ->
+ None
+ | Some x when f x ->
+ Some x
+ | Some _ ->
+ filter ()
+ in
+ create filter
+
+let iter t ~f =
+ List.iter t.streams ~f:(S.iter f)
+
+let concat ts =
+ {streams = List.concat (List.map ts ~f:(fun {streams} -> streams))}
+
+let group_by t ~f =
+ let groups_tbl = Hashtbl.create 1_000_000 in
+ let group_update x =
+ let group = f x in
+ let members =
+ match Hashtbl.find_opt groups_tbl group with
+ | None ->
+ (1, [x])
+ | Some (n, xs) ->
+ (succ n, x :: xs)
+ in
+ Hashtbl.replace groups_tbl group members
+ in
+ iter t ~f:group_update;
+ let groups = Queue.create () in
+ Hashtbl.iter
+ (fun name (length, members) -> Queue.add (name, length, members) groups)
+ groups_tbl;
+ of_queue groups
+
+module Ipc : sig
+ val send : out_channel -> 'a -> unit
+ val recv : in_channel -> 'a
+end = struct
+ let send oc msg =
+ Marshal.to_channel oc msg [];
+ flush oc
+
+ let recv ic =
+ Marshal.from_channel ic
+end
+
+let lord t ~njobs ~vassals ~ic ~ocs =
+ let active_vassals = ref njobs in
+ let results = Queue.create () in
+ let rec loop () =
+ match ((Ipc.recv ic) : ('input, 'output) msg_from_vassal) with
+ | Exiting i ->
+ close_out ocs.(i);
+ decr active_vassals;
+ if !active_vassals = 0 then () else loop ()
+ | Ready i ->
+ Ipc.send ocs.(i) (Job (next t));
+ loop ()
+ | Result (i, result) ->
+ Queue.add result results;
+ Ipc.send ocs.(i) (Job (next t));
+ loop ()
+ in
+ let rec wait = function
+ | [] ->
+ ()
+ | vassals ->
+ let pid, _process_status = Unix.wait () in
+ (* TODO: handle process_status *)
+ wait (List.filter vassals ~f:(fun p -> p <> pid))
+ in
+ loop ();
+ close_in ic;
+ wait vassals;
+ of_queue results
+
+let vassal i ~f ~vassal_pipe_r ~lord_pipe_w =
+ let ic = Unix.in_channel_of_descr vassal_pipe_r in
+ let oc = Unix.out_channel_of_descr lord_pipe_w in
+ let rec loop () =
+ match (Ipc.recv ic : 'input msg_from_lord) with
+ | Job (Some x) ->
+ Ipc.send oc (Result (i, (x, f x)));
+ loop ()
+ | Job None ->
+ Ipc.send oc (Exiting i)
+ in
+ Ipc.send oc (Ready i);
+ loop ();
+ close_in ic;
+ close_out oc;
+ exit 0
+
+let bag_map t ~njobs ~f =
+ let lord_pipe_r, lord_pipe_w = Unix.pipe () in
+ let vassal_pipes = Array.init njobs ~f:(fun _ -> Unix.pipe ()) in
+ let vassal_pipes_r = Array.map vassal_pipes ~f:(fun (r, _) -> r) in
+ let vassal_pipes_w = Array.map vassal_pipes ~f:(fun (_, w) -> w) in
+ let vassals = ref [] in
+ for i=0 to (njobs - 1) do
+ begin match Unix.fork () with
+ | 0 ->
+ Unix.close lord_pipe_r;
+ vassal i ~f ~lord_pipe_w ~vassal_pipe_r:vassal_pipes_r.(i)
+ | pid ->
+ vassals := pid :: !vassals
+ end
+ done;
+ Unix.close lord_pipe_w;
+ lord
+ t
+ ~njobs
+ ~vassals:!vassals
+ ~ic:(Unix.in_channel_of_descr lord_pipe_r)
+ ~ocs:(Array.map vassal_pipes_w ~f:Unix.out_channel_of_descr)