1 module Array = ArrayLabels
2 module List = ListLabels
3 module S = Stdlib.Stream
6 {mutable streams : ('a S.t) list}
8 type ('input, 'output) msg_from_vassal =
10 | Result of (int * ('input * 'output))
13 type 'input msg_from_lord =
14 | Job of 'input option
17 {streams = [S.from (fun _ -> f ())]}
21 match Queue.take q with
22 | exception Queue.Empty ->
34 | exception S.Failure ->
45 | Some x -> Some (f x)
61 List.iter t.streams ~f:(S.iter f)
64 {streams = List.concat (List.map ts ~f:(fun {streams} -> streams))}
67 let groups_tbl = Hashtbl.create 1_000_000 in
71 match Hashtbl.find_opt groups_tbl group with
77 Hashtbl.replace groups_tbl group members
79 iter t ~f:group_update;
80 let groups = Queue.create () in
82 (fun name (length, members) -> Queue.add (name, length, members) groups)
87 val send : out_channel -> 'a -> unit
88 val recv : in_channel -> 'a
91 Marshal.to_channel oc msg [];
95 Marshal.from_channel ic
98 let lord t ~njobs ~vassals ~ic ~ocs =
99 let active_vassals = ref njobs in
100 let results = Queue.create () in
102 match ((Ipc.recv ic) : ('input, 'output) msg_from_vassal) with
106 if !active_vassals = 0 then () else loop ()
108 Ipc.send ocs.(i) (Job (next t));
110 | Result (i, result) ->
111 Queue.add result results;
112 Ipc.send ocs.(i) (Job (next t));
115 let rec wait = function
119 let pid, _process_status = Unix.wait () in
120 (* TODO: handle process_status *)
121 wait (List.filter vassals ~f:(fun p -> p <> pid))
128 let vassal i ~f ~vassal_pipe_r ~lord_pipe_w =
129 let ic = Unix.in_channel_of_descr vassal_pipe_r in
130 let oc = Unix.out_channel_of_descr lord_pipe_w in
132 match (Ipc.recv ic : 'input msg_from_lord) with
134 Ipc.send oc (Result (i, (x, f x)));
137 Ipc.send oc (Exiting i)
139 Ipc.send oc (Ready i);
145 let bag_map t ~njobs ~f =
146 let lord_pipe_r, lord_pipe_w = Unix.pipe () in
147 let vassal_pipes = Array.init njobs ~f:(fun _ -> Unix.pipe ()) in
148 let vassal_pipes_r = Array.map vassal_pipes ~f:(fun (r, _) -> r) in
149 let vassal_pipes_w = Array.map vassal_pipes ~f:(fun (_, w) -> w) in
150 let vassals = ref [] in
151 for i=0 to (njobs - 1) do
152 begin match Unix.fork () with
154 Unix.close lord_pipe_r;
155 vassal i ~f ~lord_pipe_w ~vassal_pipe_r:vassal_pipes_r.(i)
157 vassals := pid :: !vassals
160 Unix.close lord_pipe_w;
165 ~ic:(Unix.in_channel_of_descr lord_pipe_r)
166 ~ocs:(Array.map vassal_pipes_w ~f:Unix.out_channel_of_descr)