+++ /dev/null
-open Printf
-
-module Msg = Khatus_msg
-module Msg_parser = Khatus_msg_parser
-module State = Khatus_state
-
-type t =
- { state : State.t
- ; stream : Msg.t Stream.t
- }
-
-let init ~node ~modul =
- let line_stream =
- Stream.from (fun _ ->
- match read_line () with
- | exception End_of_file ->
- None
- | line ->
- Some line
- )
- in
- let rec parse_next msg_count =
- (match Stream.next line_stream with
- | exception Stream.Failure ->
- None
- | line ->
- (match (Msg_parser.parse_msg (Lexing.from_string line)) with
- | Ok msg ->
- Some msg
- | Error e ->
- let e =
- match e with
- | `Bad_format_of_msg_head -> "Bad_format_of_msg_head"
- | `Bad_format_of_msg_content -> "Bad_format_of_msg_content"
- in
- eprintf
- "%s\n%!"
- Msg.(to_string
- { node
- ; modul
- ; content = Log
- { location = "khatus_msg_stream:fold"
- ; level = `error
- ; msg = sprintf "Parse error %s in %s" e line
- }
- });
- parse_next msg_count
- )
- )
- in
- { state = State.init ~node ~modul
- ; stream = Stream.from parse_next
- }
-
-let rec fold ({state; stream} as t) ~f ~init =
- match Stream.next stream with
- | exception Stream.Failure ->
- init
- | msg ->
- let state = State.update state ~msg in
- fold {t with state} ~f ~init:(f init ~state ~msg)