Commit | Line | Data |
---|---|---|
ddcbda00 SK |
1 | module Array = ArrayLabels |
2 | module List = ListLabels | |
3 | module S = Stdlib.Stream | |
4 | ||
5 | type 'a t = | |
6 | {mutable streams : ('a S.t) list} | |
7 | ||
8 | type ('input, 'output) msg_from_vassal = | |
9 | | Ready of int | |
10 | | Result of (int * ('input * 'output)) | |
11 | | Exiting of int | |
12 | ||
13 | type 'input msg_from_lord = | |
14 | | Job of 'input option | |
15 | ||
16 | let create f = | |
17 | {streams = [S.from (fun _ -> f ())]} | |
18 | ||
19 | let of_queue q = | |
20 | create (fun () -> | |
21 | match Queue.take q with | |
22 | | exception Queue.Empty -> | |
23 | None | |
24 | | x -> | |
25 | Some x | |
26 | ) | |
27 | ||
28 | let rec next t = | |
29 | match t.streams with | |
30 | | [] -> | |
31 | None | |
32 | | s :: streams -> | |
33 | (match S.next s with | |
34 | | exception S.Failure -> | |
35 | t.streams <- streams; | |
36 | next t | |
37 | | x -> | |
38 | Some x | |
39 | ) | |
40 | ||
41 | let map t ~f = | |
42 | create (fun () -> | |
43 | match next t with | |
44 | | None -> None | |
45 | | Some x -> Some (f x) | |
46 | ) | |
47 | ||
48 | let filter t ~f = | |
49 | let rec filter () = | |
50 | match next t with | |
51 | | None -> | |
52 | None | |
53 | | Some x when f x -> | |
54 | Some x | |
55 | | Some _ -> | |
56 | filter () | |
57 | in | |
58 | create filter | |
59 | ||
60 | let iter t ~f = | |
61 | List.iter t.streams ~f:(S.iter f) | |
62 | ||
63 | let concat ts = | |
64 | {streams = List.concat (List.map ts ~f:(fun {streams} -> streams))} | |
65 | ||
66 | let group_by t ~f = | |
67 | let groups_tbl = Hashtbl.create 1_000_000 in | |
68 | let group_update x = | |
69 | let group = f x in | |
70 | let members = | |
71 | match Hashtbl.find_opt groups_tbl group with | |
72 | | None -> | |
73 | (1, [x]) | |
74 | | Some (n, xs) -> | |
75 | (succ n, x :: xs) | |
76 | in | |
77 | Hashtbl.replace groups_tbl group members | |
78 | in | |
79 | iter t ~f:group_update; | |
80 | let groups = Queue.create () in | |
81 | Hashtbl.iter | |
82 | (fun name (length, members) -> Queue.add (name, length, members) groups) | |
83 | groups_tbl; | |
84 | of_queue groups | |
85 | ||
86 | module Ipc : sig | |
87 | val send : out_channel -> 'a -> unit | |
88 | val recv : in_channel -> 'a | |
89 | end = struct | |
90 | let send oc msg = | |
91 | Marshal.to_channel oc msg []; | |
92 | flush oc | |
93 | ||
94 | let recv ic = | |
95 | Marshal.from_channel ic | |
96 | end | |
97 | ||
98 | let lord t ~njobs ~vassals ~ic ~ocs = | |
99 | let active_vassals = ref njobs in | |
100 | let results = Queue.create () in | |
101 | let rec loop () = | |
102 | match ((Ipc.recv ic) : ('input, 'output) msg_from_vassal) with | |
103 | | Exiting i -> | |
104 | close_out ocs.(i); | |
105 | decr active_vassals; | |
106 | if !active_vassals = 0 then () else loop () | |
107 | | Ready i -> | |
108 | Ipc.send ocs.(i) (Job (next t)); | |
109 | loop () | |
110 | | Result (i, result) -> | |
111 | Queue.add result results; | |
112 | Ipc.send ocs.(i) (Job (next t)); | |
113 | loop () | |
114 | in | |
115 | let rec wait = function | |
116 | | [] -> | |
117 | () | |
118 | | vassals -> | |
119 | let pid, _process_status = Unix.wait () in | |
120 | (* TODO: handle process_status *) | |
121 | wait (List.filter vassals ~f:(fun p -> p <> pid)) | |
122 | in | |
123 | loop (); | |
124 | close_in ic; | |
125 | wait vassals; | |
126 | of_queue results | |
127 | ||
128 | let vassal i ~f ~vassal_pipe_r ~lord_pipe_w = | |
129 | let ic = Unix.in_channel_of_descr vassal_pipe_r in | |
130 | let oc = Unix.out_channel_of_descr lord_pipe_w in | |
131 | let rec loop () = | |
132 | match (Ipc.recv ic : 'input msg_from_lord) with | |
133 | | Job (Some x) -> | |
134 | Ipc.send oc (Result (i, (x, f x))); | |
135 | loop () | |
136 | | Job None -> | |
137 | Ipc.send oc (Exiting i) | |
138 | in | |
139 | Ipc.send oc (Ready i); | |
140 | loop (); | |
141 | close_in ic; | |
142 | close_out oc; | |
143 | exit 0 | |
144 | ||
145 | let bag_map t ~njobs ~f = | |
146 | let lord_pipe_r, lord_pipe_w = Unix.pipe () in | |
147 | let vassal_pipes = Array.init njobs ~f:(fun _ -> Unix.pipe ()) in | |
148 | let vassal_pipes_r = Array.map vassal_pipes ~f:(fun (r, _) -> r) in | |
149 | let vassal_pipes_w = Array.map vassal_pipes ~f:(fun (_, w) -> w) in | |
150 | let vassals = ref [] in | |
151 | for i=0 to (njobs - 1) do | |
152 | begin match Unix.fork () with | |
153 | | 0 -> | |
154 | Unix.close lord_pipe_r; | |
155 | vassal i ~f ~lord_pipe_w ~vassal_pipe_r:vassal_pipes_r.(i) | |
156 | | pid -> | |
157 | vassals := pid :: !vassals | |
158 | end | |
159 | done; | |
160 | Unix.close lord_pipe_w; | |
161 | lord | |
162 | t | |
163 | ~njobs | |
164 | ~vassals:!vassals | |
165 | ~ic:(Unix.in_channel_of_descr lord_pipe_r) | |
166 | ~ocs:(Array.map vassal_pipes_w ~f:Unix.out_channel_of_descr) |