| 1 | open Printf |
| 2 | |
| 3 | module Msg = Khatus_msg |
| 4 | module Msg_parser = Khatus_msg_parser |
| 5 | module State = Khatus_state |
| 6 | |
| 7 | type t = |
| 8 | { state : State.t |
| 9 | ; stream : Msg.t Stream.t |
| 10 | } |
| 11 | |
| 12 | let init ~node ~modul = |
| 13 | let line_stream = |
| 14 | Stream.from (fun _ -> |
| 15 | match read_line () with |
| 16 | | exception End_of_file -> |
| 17 | None |
| 18 | | line -> |
| 19 | Some line |
| 20 | ) |
| 21 | in |
| 22 | let rec parse_next msg_count = |
| 23 | (match Stream.next line_stream with |
| 24 | | exception Stream.Failure -> |
| 25 | None |
| 26 | | line -> |
| 27 | (match (Msg_parser.parse_msg (Lexing.from_string line)) with |
| 28 | | Ok msg -> |
| 29 | Some msg |
| 30 | | Error e -> |
| 31 | let e = |
| 32 | match e with |
| 33 | | `Bad_format_of_msg_head -> "Bad_format_of_msg_head" |
| 34 | | `Bad_format_of_msg_content -> "Bad_format_of_msg_content" |
| 35 | in |
| 36 | eprintf |
| 37 | "%s\n%!" |
| 38 | Msg.(to_string |
| 39 | { node |
| 40 | ; modul |
| 41 | ; content = Log |
| 42 | { location = "khatus_msg_stream:fold" |
| 43 | ; level = `error |
| 44 | ; msg = sprintf "Parse error %s in %s" e line |
| 45 | } |
| 46 | }); |
| 47 | parse_next msg_count |
| 48 | ) |
| 49 | ) |
| 50 | in |
| 51 | { state = State.init ~node ~modul |
| 52 | ; stream = Stream.from parse_next |
| 53 | } |
| 54 | |
| 55 | let rec fold ({state; stream} as t) ~f ~init = |
| 56 | match Stream.next stream with |
| 57 | | exception Stream.Failure -> |
| 58 | init |
| 59 | | msg -> |
| 60 | let state = State.update state ~msg in |
| 61 | fold {t with state} ~f ~init:(f init ~state ~msg) |