let lord t ~njobs ~vassals ~ic ~ocs =
let active_vassals = ref njobs in
let results = Queue.create () in
- let rec dispatch () =
+ let rec loop () =
match ((Ipc.recv ic) : ('input, 'output) msg_from_vassal) with
| Exiting i ->
close_out ocs.(i);
decr active_vassals;
- if !active_vassals = 0 then
- ()
- else
- dispatch ()
+ if !active_vassals = 0 then () else loop ()
| Ready i ->
Ipc.send ocs.(i) (Job (next t));
- dispatch ()
+ loop ()
| Result (i, result) ->
Queue.add result results;
Ipc.send ocs.(i) (Job (next t));
- dispatch ()
+ loop ()
in
let rec wait = function
- | [] -> ()
+ | [] ->
+ ()
| vassals ->
let pid, _process_status = Unix.wait () in
(* TODO: handle process_status *)
wait (List.filter vassals ~f:(fun p -> p <> pid))
in
- dispatch ();
+ loop ();
close_in ic;
wait vassals;
of_queue results
let vassal i ~f ~vassal_pipe_r ~lord_pipe_w =
let ic = Unix.in_channel_of_descr vassal_pipe_r in
let oc = Unix.out_channel_of_descr lord_pipe_w in
- let rec work msg =
- Ipc.send oc msg;
+ let rec loop () =
match (Ipc.recv ic : 'input msg_from_lord) with
| Job (Some x) ->
- work (Result (i, (x, f x)))
+ Ipc.send oc (Result (i, (x, f x)));
+ loop ()
| Job None ->
Ipc.send oc (Exiting i)
in
- work (Ready i);
+ Ipc.send oc (Ready i);
+ loop ();
close_in ic;
close_out oc;
exit 0