| 1 | module Array = ArrayLabels |
| 2 | module List = ListLabels |
| 3 | module S = Stdlib.Stream |
| 4 | |
| 5 | type 'a t = |
| 6 | {mutable streams : ('a S.t) list} |
| 7 | |
| 8 | type ('input, 'output) msg_from_vassal = |
| 9 | | Ready of int |
| 10 | | Result of (int * ('input * 'output)) |
| 11 | | Exiting of int |
| 12 | |
| 13 | type 'input msg_from_lord = |
| 14 | | Job of 'input option |
| 15 | |
| 16 | let create f = |
| 17 | {streams = [S.from (fun _ -> f ())]} |
| 18 | |
| 19 | let of_queue q = |
| 20 | create (fun () -> |
| 21 | match Queue.take q with |
| 22 | | exception Queue.Empty -> |
| 23 | None |
| 24 | | x -> |
| 25 | Some x |
| 26 | ) |
| 27 | |
| 28 | let rec next t = |
| 29 | match t.streams with |
| 30 | | [] -> |
| 31 | None |
| 32 | | s :: streams -> |
| 33 | (match S.next s with |
| 34 | | exception S.Failure -> |
| 35 | t.streams <- streams; |
| 36 | next t |
| 37 | | x -> |
| 38 | Some x |
| 39 | ) |
| 40 | |
| 41 | let map t ~f = |
| 42 | create (fun () -> |
| 43 | match next t with |
| 44 | | None -> None |
| 45 | | Some x -> Some (f x) |
| 46 | ) |
| 47 | |
| 48 | let filter t ~f = |
| 49 | let rec filter () = |
| 50 | match next t with |
| 51 | | None -> |
| 52 | None |
| 53 | | Some x when f x -> |
| 54 | Some x |
| 55 | | Some _ -> |
| 56 | filter () |
| 57 | in |
| 58 | create filter |
| 59 | |
| 60 | let iter t ~f = |
| 61 | List.iter t.streams ~f:(S.iter f) |
| 62 | |
| 63 | let concat ts = |
| 64 | {streams = List.concat (List.map ts ~f:(fun {streams} -> streams))} |
| 65 | |
| 66 | let group_by t ~f = |
| 67 | let groups_tbl = Hashtbl.create 1_000_000 in |
| 68 | let group_update x = |
| 69 | let group = f x in |
| 70 | let members = |
| 71 | match Hashtbl.find_opt groups_tbl group with |
| 72 | | None -> |
| 73 | (1, [x]) |
| 74 | | Some (n, xs) -> |
| 75 | (succ n, x :: xs) |
| 76 | in |
| 77 | Hashtbl.replace groups_tbl group members |
| 78 | in |
| 79 | iter t ~f:group_update; |
| 80 | let groups = Queue.create () in |
| 81 | Hashtbl.iter |
| 82 | (fun name (length, members) -> Queue.add (name, length, members) groups) |
| 83 | groups_tbl; |
| 84 | of_queue groups |
| 85 | |
| 86 | module Ipc : sig |
| 87 | val send : out_channel -> 'a -> unit |
| 88 | val recv : in_channel -> 'a |
| 89 | end = struct |
| 90 | let send oc msg = |
| 91 | Marshal.to_channel oc msg []; |
| 92 | flush oc |
| 93 | |
| 94 | let recv ic = |
| 95 | Marshal.from_channel ic |
| 96 | end |
| 97 | |
| 98 | let lord t ~njobs ~vassals ~ic ~ocs = |
| 99 | let active_vassals = ref njobs in |
| 100 | let results = Queue.create () in |
| 101 | let rec loop () = |
| 102 | match ((Ipc.recv ic) : ('input, 'output) msg_from_vassal) with |
| 103 | | Exiting i -> |
| 104 | close_out ocs.(i); |
| 105 | decr active_vassals; |
| 106 | if !active_vassals = 0 then () else loop () |
| 107 | | Ready i -> |
| 108 | Ipc.send ocs.(i) (Job (next t)); |
| 109 | loop () |
| 110 | | Result (i, result) -> |
| 111 | Queue.add result results; |
| 112 | Ipc.send ocs.(i) (Job (next t)); |
| 113 | loop () |
| 114 | in |
| 115 | let rec wait = function |
| 116 | | [] -> |
| 117 | () |
| 118 | | vassals -> |
| 119 | let pid, _process_status = Unix.wait () in |
| 120 | (* TODO: handle process_status *) |
| 121 | wait (List.filter vassals ~f:(fun p -> p <> pid)) |
| 122 | in |
| 123 | loop (); |
| 124 | close_in ic; |
| 125 | wait vassals; |
| 126 | of_queue results |
| 127 | |
| 128 | let vassal i ~f ~vassal_pipe_r ~lord_pipe_w = |
| 129 | let ic = Unix.in_channel_of_descr vassal_pipe_r in |
| 130 | let oc = Unix.out_channel_of_descr lord_pipe_w in |
| 131 | let rec loop () = |
| 132 | match (Ipc.recv ic : 'input msg_from_lord) with |
| 133 | | Job (Some x) -> |
| 134 | Ipc.send oc (Result (i, (x, f x))); |
| 135 | loop () |
| 136 | | Job None -> |
| 137 | Ipc.send oc (Exiting i) |
| 138 | in |
| 139 | Ipc.send oc (Ready i); |
| 140 | loop (); |
| 141 | close_in ic; |
| 142 | close_out oc; |
| 143 | exit 0 |
| 144 | |
| 145 | let bag_map t ~njobs ~f = |
| 146 | let lord_pipe_r, lord_pipe_w = Unix.pipe () in |
| 147 | let vassal_pipes = Array.init njobs ~f:(fun _ -> Unix.pipe ()) in |
| 148 | let vassal_pipes_r = Array.map vassal_pipes ~f:(fun (r, _) -> r) in |
| 149 | let vassal_pipes_w = Array.map vassal_pipes ~f:(fun (_, w) -> w) in |
| 150 | let vassals = ref [] in |
| 151 | for i=0 to (njobs - 1) do |
| 152 | begin match Unix.fork () with |
| 153 | | 0 -> |
| 154 | Unix.close lord_pipe_r; |
| 155 | vassal i ~f ~lord_pipe_w ~vassal_pipe_r:vassal_pipes_r.(i) |
| 156 | | pid -> |
| 157 | vassals := pid :: !vassals |
| 158 | end |
| 159 | done; |
| 160 | Unix.close lord_pipe_w; |
| 161 | lord |
| 162 | t |
| 163 | ~njobs |
| 164 | ~vassals:!vassals |
| 165 | ~ic:(Unix.in_channel_of_descr lord_pipe_r) |
| 166 | ~ocs:(Array.map vassal_pipes_w ~f:Unix.out_channel_of_descr) |