X-Git-Url: https://git.xandkar.net/?p=khatus.git;a=blobdiff_plain;f=x3%2Fsrc%2Flib%2Fkhatus_msg_stream.ml;fp=x3%2Fsrc%2Flib%2Fkhatus_msg_stream.ml;h=3013d53e378208f4dbbf31098bb0d5b82d6e0208;hp=0000000000000000000000000000000000000000;hb=499c58a269a00e031302938b5a8f006f23aae451;hpb=4c703fadbdc17d1753d16841582636598f862416 diff --git a/x3/src/lib/khatus_msg_stream.ml b/x3/src/lib/khatus_msg_stream.ml new file mode 100644 index 0000000..3013d53 --- /dev/null +++ b/x3/src/lib/khatus_msg_stream.ml @@ -0,0 +1,61 @@ +open Printf + +module Msg = Khatus_msg +module Msg_parser = Khatus_msg_parser +module State = Khatus_state + +type t = + { state : State.t + ; stream : Msg.t Stream.t + } + +let init ~node ~modul = + let line_stream = + Stream.from (fun _ -> + match read_line () with + | exception End_of_file -> + None + | line -> + Some line + ) + in + let rec parse_next msg_count = + (match Stream.next line_stream with + | exception Stream.Failure -> + None + | line -> + (match (Msg_parser.parse_msg (Lexing.from_string line)) with + | Ok msg -> + Some msg + | Error e -> + let e = + match e with + | `Bad_format_of_msg_head -> "Bad_format_of_msg_head" + | `Bad_format_of_msg_content -> "Bad_format_of_msg_content" + in + eprintf + "%s\n%!" + Msg.(to_string + { node + ; modul + ; content = Log + { location = "khatus_msg_stream:fold" + ; level = `error + ; msg = sprintf "Parse error %s in %s" e line + } + }); + parse_next msg_count + ) + ) + in + { state = State.init ~node ~modul + ; stream = Stream.from parse_next + } + +let rec fold ({state; stream} as t) ~f ~init = + match Stream.next stream with + | exception Stream.Failure -> + init + | msg -> + let state = State.update state ~msg in + fold {t with state} ~f ~init:(f init ~state ~msg)