X-Git-Url: https://git.xandkar.net/?p=dups.git;a=blobdiff_plain;f=lib%2Fstream.ml;fp=lib%2Fstream.ml;h=4d60816c05b717ea38f96cdb0b7d14b6a1aab232;hp=0000000000000000000000000000000000000000;hb=ddcbda0046a598d55746850e15d4fa99b3998ce0;hpb=21e1d14c1e23d2c586ebe1480add8e9d87e7ad7a diff --git a/lib/stream.ml b/lib/stream.ml new file mode 100644 index 0000000..4d60816 --- /dev/null +++ b/lib/stream.ml @@ -0,0 +1,166 @@ +module Array = ArrayLabels +module List = ListLabels +module S = Stdlib.Stream + +type 'a t = + {mutable streams : ('a S.t) list} + +type ('input, 'output) msg_from_vassal = + | Ready of int + | Result of (int * ('input * 'output)) + | Exiting of int + +type 'input msg_from_lord = + | Job of 'input option + +let create f = + {streams = [S.from (fun _ -> f ())]} + +let of_queue q = + create (fun () -> + match Queue.take q with + | exception Queue.Empty -> + None + | x -> + Some x + ) + +let rec next t = + match t.streams with + | [] -> + None + | s :: streams -> + (match S.next s with + | exception S.Failure -> + t.streams <- streams; + next t + | x -> + Some x + ) + +let map t ~f = + create (fun () -> + match next t with + | None -> None + | Some x -> Some (f x) + ) + +let filter t ~f = + let rec filter () = + match next t with + | None -> + None + | Some x when f x -> + Some x + | Some _ -> + filter () + in + create filter + +let iter t ~f = + List.iter t.streams ~f:(S.iter f) + +let concat ts = + {streams = List.concat (List.map ts ~f:(fun {streams} -> streams))} + +let group_by t ~f = + let groups_tbl = Hashtbl.create 1_000_000 in + let group_update x = + let group = f x in + let members = + match Hashtbl.find_opt groups_tbl group with + | None -> + (1, [x]) + | Some (n, xs) -> + (succ n, x :: xs) + in + Hashtbl.replace groups_tbl group members + in + iter t ~f:group_update; + let groups = Queue.create () in + Hashtbl.iter + (fun name (length, members) -> Queue.add (name, length, members) groups) + groups_tbl; + of_queue groups + +module Ipc : sig + val send : out_channel -> 'a -> unit + val recv : in_channel -> 'a +end = struct + let send oc msg = + Marshal.to_channel oc msg []; + flush oc + + let recv ic = + Marshal.from_channel ic +end + +let lord t ~njobs ~vassals ~ic ~ocs = + let active_vassals = ref njobs in + let results = Queue.create () in + let rec loop () = + match ((Ipc.recv ic) : ('input, 'output) msg_from_vassal) with + | Exiting i -> + close_out ocs.(i); + decr active_vassals; + if !active_vassals = 0 then () else loop () + | Ready i -> + Ipc.send ocs.(i) (Job (next t)); + loop () + | Result (i, result) -> + Queue.add result results; + Ipc.send ocs.(i) (Job (next t)); + loop () + in + let rec wait = function + | [] -> + () + | vassals -> + let pid, _process_status = Unix.wait () in + (* TODO: handle process_status *) + wait (List.filter vassals ~f:(fun p -> p <> pid)) + in + loop (); + close_in ic; + wait vassals; + of_queue results + +let vassal i ~f ~vassal_pipe_r ~lord_pipe_w = + let ic = Unix.in_channel_of_descr vassal_pipe_r in + let oc = Unix.out_channel_of_descr lord_pipe_w in + let rec loop () = + match (Ipc.recv ic : 'input msg_from_lord) with + | Job (Some x) -> + Ipc.send oc (Result (i, (x, f x))); + loop () + | Job None -> + Ipc.send oc (Exiting i) + in + Ipc.send oc (Ready i); + loop (); + close_in ic; + close_out oc; + exit 0 + +let bag_map t ~njobs ~f = + let lord_pipe_r, lord_pipe_w = Unix.pipe () in + let vassal_pipes = Array.init njobs ~f:(fun _ -> Unix.pipe ()) in + let vassal_pipes_r = Array.map vassal_pipes ~f:(fun (r, _) -> r) in + let vassal_pipes_w = Array.map vassal_pipes ~f:(fun (_, w) -> w) in + let vassals = ref [] in + for i=0 to (njobs - 1) do + begin match Unix.fork () with + | 0 -> + Unix.close lord_pipe_r; + vassal i ~f ~lord_pipe_w ~vassal_pipe_r:vassal_pipes_r.(i) + | pid -> + vassals := pid :: !vassals + end + done; + Unix.close lord_pipe_w; + lord + t + ~njobs + ~vassals:!vassals + ~ic:(Unix.in_channel_of_descr lord_pipe_r) + ~ocs:(Array.map vassal_pipes_w ~f:Unix.out_channel_of_descr)