X-Git-Url: https://git.xandkar.net/?a=blobdiff_plain;f=src%2Focaml%2Flib%2Fkhatus_msg_stream.ml;fp=src%2Focaml%2Flib%2Fkhatus_msg_stream.ml;h=3013d53e378208f4dbbf31098bb0d5b82d6e0208;hb=6f35286a0c7e5c5b6569f742495efc267772f7a1;hp=0000000000000000000000000000000000000000;hpb=5bdce5de1d06bd71c016cfe969f8d8e406f54a9e;p=khatus.git diff --git a/src/ocaml/lib/khatus_msg_stream.ml b/src/ocaml/lib/khatus_msg_stream.ml new file mode 100644 index 0000000..3013d53 --- /dev/null +++ b/src/ocaml/lib/khatus_msg_stream.ml @@ -0,0 +1,61 @@ +open Printf + +module Msg = Khatus_msg +module Msg_parser = Khatus_msg_parser +module State = Khatus_state + +type t = + { state : State.t + ; stream : Msg.t Stream.t + } + +let init ~node ~modul = + let line_stream = + Stream.from (fun _ -> + match read_line () with + | exception End_of_file -> + None + | line -> + Some line + ) + in + let rec parse_next msg_count = + (match Stream.next line_stream with + | exception Stream.Failure -> + None + | line -> + (match (Msg_parser.parse_msg (Lexing.from_string line)) with + | Ok msg -> + Some msg + | Error e -> + let e = + match e with + | `Bad_format_of_msg_head -> "Bad_format_of_msg_head" + | `Bad_format_of_msg_content -> "Bad_format_of_msg_content" + in + eprintf + "%s\n%!" + Msg.(to_string + { node + ; modul + ; content = Log + { location = "khatus_msg_stream:fold" + ; level = `error + ; msg = sprintf "Parse error %s in %s" e line + } + }); + parse_next msg_count + ) + ) + in + { state = State.init ~node ~modul + ; stream = Stream.from parse_next + } + +let rec fold ({state; stream} as t) ~f ~init = + match Stream.next stream with + | exception Stream.Failure -> + init + | msg -> + let state = State.update state ~msg in + fold {t with state} ~f ~init:(f init ~state ~msg)