module Array = ArrayLabels module List = ListLabels module S = Stdlib.Stream type 'a t = {mutable streams : ('a S.t) list} type ('input, 'output) msg_from_vassal = | Ready of int | Result of (int * ('input * 'output)) | Exiting of int type 'input msg_from_lord = | Job of 'input option let create f = {streams = [S.from (fun _ -> f ())]} let of_queue q = create (fun () -> match Queue.take q with | exception Queue.Empty -> None | x -> Some x ) let rec next t = match t.streams with | [] -> None | s :: streams -> (match S.next s with | exception S.Failure -> t.streams <- streams; next t | x -> Some x ) let map t ~f = create (fun () -> match next t with | None -> None | Some x -> Some (f x) ) let filter t ~f = let rec filter () = match next t with | None -> None | Some x when f x -> Some x | Some _ -> filter () in create filter let iter t ~f = List.iter t.streams ~f:(S.iter f) let concat ts = {streams = List.concat (List.map ts ~f:(fun {streams} -> streams))} let group_by t ~f = let groups_tbl = Hashtbl.create 1_000_000 in let group_update x = let group = f x in let members = match Hashtbl.find_opt groups_tbl group with | None -> (1, [x]) | Some (n, xs) -> (succ n, x :: xs) in Hashtbl.replace groups_tbl group members in iter t ~f:group_update; let groups = Queue.create () in Hashtbl.iter (fun name (length, members) -> Queue.add (name, length, members) groups) groups_tbl; of_queue groups module Ipc : sig val send : out_channel -> 'a -> unit val recv : in_channel -> 'a end = struct let send oc msg = Marshal.to_channel oc msg []; flush oc let recv ic = Marshal.from_channel ic end let lord t ~njobs ~vassals ~ic ~ocs = let active_vassals = ref njobs in let results = Queue.create () in let rec loop () = match ((Ipc.recv ic) : ('input, 'output) msg_from_vassal) with | Exiting i -> close_out ocs.(i); decr active_vassals; if !active_vassals = 0 then () else loop () | Ready i -> Ipc.send ocs.(i) (Job (next t)); loop () | Result (i, result) -> Queue.add result results; Ipc.send ocs.(i) (Job (next t)); loop () in let rec wait = function | [] -> () | vassals -> let pid, _process_status = Unix.wait () in (* TODO: handle process_status *) wait (List.filter vassals ~f:(fun p -> p <> pid)) in loop (); close_in ic; wait vassals; of_queue results let vassal i ~f ~vassal_pipe_r ~lord_pipe_w = let ic = Unix.in_channel_of_descr vassal_pipe_r in let oc = Unix.out_channel_of_descr lord_pipe_w in let rec loop () = match (Ipc.recv ic : 'input msg_from_lord) with | Job (Some x) -> Ipc.send oc (Result (i, (x, f x))); loop () | Job None -> Ipc.send oc (Exiting i) in Ipc.send oc (Ready i); loop (); close_in ic; close_out oc; exit 0 let bag_map t ~njobs ~f = let lord_pipe_r, lord_pipe_w = Unix.pipe () in let vassal_pipes = Array.init njobs ~f:(fun _ -> Unix.pipe ()) in let vassal_pipes_r = Array.map vassal_pipes ~f:(fun (r, _) -> r) in let vassal_pipes_w = Array.map vassal_pipes ~f:(fun (_, w) -> w) in let vassals = ref [] in for i=0 to (njobs - 1) do begin match Unix.fork () with | 0 -> Unix.close lord_pipe_r; vassal i ~f ~lord_pipe_w ~vassal_pipe_r:vassal_pipes_r.(i) | pid -> vassals := pid :: !vassals end done; Unix.close lord_pipe_w; lord t ~njobs ~vassals:!vassals ~ic:(Unix.in_channel_of_descr lord_pipe_r) ~ocs:(Array.map vassal_pipes_w ~f:Unix.out_channel_of_descr)