+
+ module Ipc : sig
+ val send : out_channel -> 'a -> unit
+ val recv : in_channel -> 'a
+ end = struct
+ let send oc msg =
+ Marshal.to_channel oc msg [];
+ flush oc
+
+ let recv ic =
+ Marshal.from_channel ic
+ end
+
+ let lord t ~njobs ~vassals ~ic ~ocs =
+ let active_vassals = ref njobs in
+ let results = Queue.create () in
+ let rec dispatch () =
+ match ((Ipc.recv ic) : ('input, 'output) msg_from_vassal) with
+ | Exiting i ->
+ close_out ocs.(i);
+ decr active_vassals;
+ if !active_vassals = 0 then
+ ()
+ else
+ dispatch ()
+ | Ready i ->
+ Ipc.send ocs.(i) (Job (next t));
+ dispatch ()
+ | Result (i, result) ->
+ Queue.add result results;
+ Ipc.send ocs.(i) (Job (next t));
+ dispatch ()
+ in
+ let rec wait = function
+ | [] -> ()
+ | vassals ->
+ let pid, _process_status = Unix.wait () in
+ (* TODO: handle process_status *)
+ wait (List.filter vassals ~f:(fun p -> p <> pid))
+ in
+ dispatch ();
+ close_in ic;
+ wait vassals;
+ of_queue results
+
+ let vassal i ~f ~vassal_pipe_r ~lord_pipe_w =
+ let ic = Unix.in_channel_of_descr vassal_pipe_r in
+ let oc = Unix.out_channel_of_descr lord_pipe_w in
+ let rec work msg =
+ Ipc.send oc msg;
+ match (Ipc.recv ic : 'input msg_from_lord) with
+ | Job (Some x) ->
+ work (Result (i, (x, f x)))
+ | Job None ->
+ Ipc.send oc (Exiting i)
+ in
+ work (Ready i);
+ close_in ic;
+ close_out oc;
+ exit 0
+
+ let bag_map t ~njobs ~f =
+ let lord_pipe_r, lord_pipe_w = Unix.pipe () in
+ let vassal_pipes = Array.init njobs ~f:(fun _ -> Unix.pipe ()) in
+ let vassal_pipes_r = Array.map vassal_pipes ~f:(fun (r, _) -> r) in
+ let vassal_pipes_w = Array.map vassal_pipes ~f:(fun (_, w) -> w) in
+ let vassals = ref [] in
+ for i=0 to (njobs - 1) do
+ begin match Unix.fork () with
+ | 0 ->
+ Unix.close lord_pipe_r;
+ vassal i ~f ~lord_pipe_w ~vassal_pipe_r:vassal_pipes_r.(i)
+ | pid ->
+ vassals := pid :: !vassals
+ end
+ done;
+ Unix.close lord_pipe_w;
+ lord
+ t
+ ~njobs
+ ~vassals:!vassals
+ ~ic:(Unix.in_channel_of_descr lord_pipe_r)
+ ~ocs:(Array.map vassal_pipes_w ~f:Unix.out_channel_of_descr)