Move modules into dedicated files
[dups.git] / lib / stream.ml
1 module Array = ArrayLabels
2 module List = ListLabels
3 module S = Stdlib.Stream
4
5 type 'a t =
6 {mutable streams : ('a S.t) list}
7
8 type ('input, 'output) msg_from_vassal =
9 | Ready of int
10 | Result of (int * ('input * 'output))
11 | Exiting of int
12
13 type 'input msg_from_lord =
14 | Job of 'input option
15
16 let create f =
17 {streams = [S.from (fun _ -> f ())]}
18
19 let of_queue q =
20 create (fun () ->
21 match Queue.take q with
22 | exception Queue.Empty ->
23 None
24 | x ->
25 Some x
26 )
27
28 let rec next t =
29 match t.streams with
30 | [] ->
31 None
32 | s :: streams ->
33 (match S.next s with
34 | exception S.Failure ->
35 t.streams <- streams;
36 next t
37 | x ->
38 Some x
39 )
40
41 let map t ~f =
42 create (fun () ->
43 match next t with
44 | None -> None
45 | Some x -> Some (f x)
46 )
47
48 let filter t ~f =
49 let rec filter () =
50 match next t with
51 | None ->
52 None
53 | Some x when f x ->
54 Some x
55 | Some _ ->
56 filter ()
57 in
58 create filter
59
60 let iter t ~f =
61 List.iter t.streams ~f:(S.iter f)
62
63 let concat ts =
64 {streams = List.concat (List.map ts ~f:(fun {streams} -> streams))}
65
66 let group_by t ~f =
67 let groups_tbl = Hashtbl.create 1_000_000 in
68 let group_update x =
69 let group = f x in
70 let members =
71 match Hashtbl.find_opt groups_tbl group with
72 | None ->
73 (1, [x])
74 | Some (n, xs) ->
75 (succ n, x :: xs)
76 in
77 Hashtbl.replace groups_tbl group members
78 in
79 iter t ~f:group_update;
80 let groups = Queue.create () in
81 Hashtbl.iter
82 (fun name (length, members) -> Queue.add (name, length, members) groups)
83 groups_tbl;
84 of_queue groups
85
86 module Ipc : sig
87 val send : out_channel -> 'a -> unit
88 val recv : in_channel -> 'a
89 end = struct
90 let send oc msg =
91 Marshal.to_channel oc msg [];
92 flush oc
93
94 let recv ic =
95 Marshal.from_channel ic
96 end
97
98 let lord t ~njobs ~vassals ~ic ~ocs =
99 let active_vassals = ref njobs in
100 let results = Queue.create () in
101 let rec loop () =
102 match ((Ipc.recv ic) : ('input, 'output) msg_from_vassal) with
103 | Exiting i ->
104 close_out ocs.(i);
105 decr active_vassals;
106 if !active_vassals = 0 then () else loop ()
107 | Ready i ->
108 Ipc.send ocs.(i) (Job (next t));
109 loop ()
110 | Result (i, result) ->
111 Queue.add result results;
112 Ipc.send ocs.(i) (Job (next t));
113 loop ()
114 in
115 let rec wait = function
116 | [] ->
117 ()
118 | vassals ->
119 let pid, _process_status = Unix.wait () in
120 (* TODO: handle process_status *)
121 wait (List.filter vassals ~f:(fun p -> p <> pid))
122 in
123 loop ();
124 close_in ic;
125 wait vassals;
126 of_queue results
127
128 let vassal i ~f ~vassal_pipe_r ~lord_pipe_w =
129 let ic = Unix.in_channel_of_descr vassal_pipe_r in
130 let oc = Unix.out_channel_of_descr lord_pipe_w in
131 let rec loop () =
132 match (Ipc.recv ic : 'input msg_from_lord) with
133 | Job (Some x) ->
134 Ipc.send oc (Result (i, (x, f x)));
135 loop ()
136 | Job None ->
137 Ipc.send oc (Exiting i)
138 in
139 Ipc.send oc (Ready i);
140 loop ();
141 close_in ic;
142 close_out oc;
143 exit 0
144
145 let bag_map t ~njobs ~f =
146 let lord_pipe_r, lord_pipe_w = Unix.pipe () in
147 let vassal_pipes = Array.init njobs ~f:(fun _ -> Unix.pipe ()) in
148 let vassal_pipes_r = Array.map vassal_pipes ~f:(fun (r, _) -> r) in
149 let vassal_pipes_w = Array.map vassal_pipes ~f:(fun (_, w) -> w) in
150 let vassals = ref [] in
151 for i=0 to (njobs - 1) do
152 begin match Unix.fork () with
153 | 0 ->
154 Unix.close lord_pipe_r;
155 vassal i ~f ~lord_pipe_w ~vassal_pipe_r:vassal_pipes_r.(i)
156 | pid ->
157 vassals := pid :: !vassals
158 end
159 done;
160 Unix.close lord_pipe_w;
161 lord
162 t
163 ~njobs
164 ~vassals:!vassals
165 ~ic:(Unix.in_channel_of_descr lord_pipe_r)
166 ~ocs:(Array.map vassal_pipes_w ~f:Unix.out_channel_of_descr)
This page took 0.065414 seconds and 4 git commands to generate.