--- /dev/null
+open Printf
+
+module Msg = Khatus_msg
+module Msg_parser = Khatus_msg_parser
+module State = Khatus_state
+
+type t =
+ { state : State.t
+ ; stream : Msg.t Stream.t
+ }
+
+let init ~node ~modul =
+ let line_stream =
+ Stream.from (fun _ ->
+ match read_line () with
+ | exception End_of_file ->
+ None
+ | line ->
+ Some line
+ )
+ in
+ let rec parse_next msg_count =
+ (match Stream.next line_stream with
+ | exception Stream.Failure ->
+ None
+ | line ->
+ (match (Msg_parser.parse_msg (Lexing.from_string line)) with
+ | Ok msg ->
+ Some msg
+ | Error e ->
+ let e =
+ match e with
+ | `Bad_format_of_msg_head -> "Bad_format_of_msg_head"
+ | `Bad_format_of_msg_content -> "Bad_format_of_msg_content"
+ in
+ eprintf
+ "%s\n%!"
+ Msg.(to_string
+ { node
+ ; modul
+ ; content = Log
+ { location = "khatus_msg_stream:fold"
+ ; level = `error
+ ; msg = sprintf "Parse error %s in %s" e line
+ }
+ });
+ parse_next msg_count
+ )
+ )
+ in
+ { state = State.init ~node ~modul
+ ; stream = Stream.from parse_next
+ }
+
+let rec fold ({state; stream} as t) ~f ~init =
+ match Stream.next stream with
+ | exception Stream.Failure ->
+ init
+ | msg ->
+ let state = State.update state ~msg in
+ fold {t with state} ~f ~init:(f init ~state ~msg)