-open Printf
open Khatus
-let modul = __MODULE__
-
-let (/) = Filename.concat
-
-let mkdir_p dir =
- match Sys.command("mkdir -p " ^ dir) with
- | 0 -> ()
- | n ->
- failwith
- (sprintf "Failed to create directory: %S. mkdir status: %d\n" dir n)
-
-let gzip path =
- match Sys.command("gzip " ^ path) with
- | 0 -> ()
- | n ->
- failwith
- (sprintf "Failed to gzip path: %S. gzip status: %d\n" path n)
-
-let main ~node ~cache ~dump_interval:interval ~dump_directory =
- mkdir_p dump_directory;
- let dump_filename = dump_directory / "khatus-cache-dump.psv.gz" in
- let rec loop ~time ~time_dumped =
- (match read_line () with
- | exception End_of_file ->
- ()
- | line ->
- (match Msg_parser.parse_msg (Lexing.from_string line) with
- | Ok msg ->
- let time = Msg.next_time msg ~node ~time in
- Cache.update_if_data cache ~msg ~time;
- if (Time.Span.is_gt_or_eq (Time.diff time_dumped time) interval)
- then
- (
- let (tmp_filename, oc) =
- Filename.open_temp_file "khatus-cache" "dump"
- in
- Cache.dump cache ~node ~modul ~oc;
- close_out oc;
- gzip tmp_filename;
- Sys.rename (tmp_filename ^ ".gz") dump_filename;
- loop ~time ~time_dumped:time
- )
- else
- loop ~time ~time_dumped
- | Error e ->
- let e =
- match e with
- | `Bad_format_of_msg_head -> "Bad_format_of_msg_head"
- | `Bad_format_of_msg_content -> "Bad_format_of_msg_content"
- in
- eprintf
- "%s\n%!"
- Msg.(to_string
- { node
- ; modul
- ; content = Log
- { location = "main:loop"
- ; level = `error
- ; msg = sprintf "Parse error %s in %s" e line
- }
- });
- loop ~time ~time_dumped
- )
- )
- in
- loop ~time:Time.init ~time_dumped:Time.init
+let main ~stream ~interval ~dir =
+ let dumped = Time.init in
+ ignore (Msg_stream.fold stream ~init:dumped ~f:(
+ fun dumped ~state:(State.({node; modul; time = now; cache;})) ~msg:_ ->
+ let elapsed = Time.diff dumped now in
+ if (Time.Span.is_gt_or_eq elapsed interval)
+ then begin
+ Cache.dump_to_dir cache ~time:now ~node ~modul ~dir;
+ now
+ end else
+ dumped
+ ))
let () =
+ let modul = __MODULE__ in
+ let node = Sys.argv.(1) in
+ let dump_interval = Sys.argv.(2) |> Time.Span.of_string in
+ let dump_directory = Sys.argv.(3) in
main
- ~node:(Sys.argv.(1))
- ~dump_interval:(Time.Span.of_string Sys.argv.(2))
- ~dump_directory:(Sys.argv.(3))
- ~cache:(Cache.create ())
+ ~stream: (Msg_stream.init ~node ~modul)
+ ~interval: dump_interval
+ ~dir: dump_directory
module Cache = Khatus_cache
module Msg = Khatus_msg
module Msg_parser = Khatus_msg_parser
+module Msg_stream = Khatus_msg_stream
+module State = Khatus_state
module Time = Khatus_time
+open Printf
+
module Hashtbl = MoreLabels.Hashtbl
module Msg = Khatus_msg
Hashtbl.replace values ~key ~data;
Hashtbl.replace mtimes ~key ~data:time
-let update_if_data t ~msg ~time =
- match msg with
- | Msg.({content = Data {key; value}; node; modul}) ->
- update t ~node ~modul ~key ~value ~time
- | {Msg.content = Msg.Alert _; _}
- | {Msg.content = Msg.Cache _; _}
- | {Msg.content = Msg.Error _; _}
- | {Msg.content = Msg.Log _; _}
- | {Msg.content = Msg.Status_bar _; _}
- ->
- ()
-
-let dump {values; mtimes} ~node ~modul ~oc =
+let dump_to_channel {values; mtimes} ~node ~modul ~oc =
Hashtbl.iter values ~f:(fun ~key ~data:value ->
let mtime =
match Hashtbl.find_opt mtimes key with
oc
(msg ^ "\n")
)
+
+let (/) = Filename.concat
+
+let mkdir_p dir =
+ match Sys.command("mkdir -p " ^ dir) with
+ | 0 -> ()
+ | n ->
+ failwith
+ (sprintf "Failed to create directory: %S. mkdir status: %d\n" dir n)
+
+let gzip path =
+ match Sys.command("gzip " ^ path) with
+ | 0 -> ()
+ | n ->
+ failwith
+ (sprintf "Failed to gzip path: %S. gzip status: %d\n" path n)
+
+let dump_to_dir t ~time ~node ~modul ~dir =
+ (* TODO: Just log the errors and keep it moving, instead of failing. *)
+ mkdir_p dir;
+ let dump_filename = dir / "khatus-cache-dump.psv.gz" in
+ let tmp_filename = "khatus-cache-dump-" ^ (Time.to_string time) in
+ let oc = open_out tmp_filename in
+ dump_to_channel t ~node ~modul ~oc;
+ close_out oc;
+ gzip tmp_filename;
+ Sys.rename (tmp_filename ^ ".gz") dump_filename
-> time : Khatus_time.t
-> unit
-val update_if_data : t -> msg:Khatus_msg.t -> time:Khatus_time.t -> unit
-
-val dump : t -> node:string -> modul:string -> oc:out_channel -> unit
+val dump_to_dir
+ : t
+ -> time:Khatus_time.t
+ -> node:string
+ -> modul:string
+ -> dir:string
+ -> unit
type t =
{node : string; modul : string; content : content}
+type 'a data_handler =
+ node:string -> modul:string -> key:string list -> value:string -> 'a
+
let sep_1 = "|"
let sep_2 = ":"
| Status_bar text ->
String.concat sep_1 [node; modul; "status_bar"; text]
-let next_time t ~node ~time:time0 =
+let handle_data t ~f ~otherwise =
match t with
- | { modul = "khatus_sensor_datetime"
- ; content = Data {key = ["epoch"]; value = time1}
- ; node = node'
- } when node' = node ->
- (* TODO: Going forawrd, perhaps throwing exceptions is the wrong way. *)
- (* TODO: Should we check this one at msg parse time? *)
- Time.of_string time1
- | {content = Data _; _}
+ | {content = Data {key; value}; node; modul} ->
+ f ~node ~modul ~key ~value
| {content = Alert _; _}
| {content = Cache _; _}
| {content = Error _; _}
| {content = Log _; _}
| {content = Status_bar _; _}
->
- time0
+ otherwise
type t =
{node : string; modul : string; content : content}
+type 'a data_handler =
+ (node:string -> modul:string -> key:string list -> value:string -> 'a)
+
val to_string : t -> string
-val next_time : t -> node:string -> time:Khatus_time.t -> Khatus_time.t
+val handle_data : t -> f:'a data_handler -> otherwise:'a -> 'a
-val parse_msg
- : Lexing.lexbuf
- ->
- ( Khatus_msg.t
- , [ `Bad_format_of_msg_head
- | `Bad_format_of_msg_content
- ]
- ) result
+type error =
+ [ `Bad_format_of_msg_head
+ | `Bad_format_of_msg_content
+ ]
+
+val parse_msg : Lexing.lexbuf -> (Khatus_msg.t, error) result
{
module Msg = Khatus_msg
module Time = Khatus_time
+
+ type error =
+ [ `Bad_format_of_msg_head
+ | `Bad_format_of_msg_content
+ ]
+
let sep_2 = ':'
}
--- /dev/null
+open Printf
+
+module Msg = Khatus_msg
+module Msg_parser = Khatus_msg_parser
+module State = Khatus_state
+
+type t =
+ { state : State.t
+ ; stream : Msg.t Stream.t
+ }
+
+let init ~node ~modul =
+ let line_stream =
+ Stream.from (fun _ ->
+ match read_line () with
+ | exception End_of_file ->
+ None
+ | line ->
+ Some line
+ )
+ in
+ let rec parse_next msg_count =
+ (match Stream.next line_stream with
+ | exception Stream.Failure ->
+ None
+ | line ->
+ (match (Msg_parser.parse_msg (Lexing.from_string line)) with
+ | Ok msg ->
+ Some msg
+ | Error e ->
+ let e =
+ match e with
+ | `Bad_format_of_msg_head -> "Bad_format_of_msg_head"
+ | `Bad_format_of_msg_content -> "Bad_format_of_msg_content"
+ in
+ eprintf
+ "%s\n%!"
+ Msg.(to_string
+ { node
+ ; modul
+ ; content = Log
+ { location = "khatus_msg_stream:fold"
+ ; level = `error
+ ; msg = sprintf "Parse error %s in %s" e line
+ }
+ });
+ parse_next msg_count
+ )
+ )
+ in
+ { state = State.init ~node ~modul
+ ; stream = Stream.from parse_next
+ }
+
+let rec fold ({state; stream} as t) ~f ~init =
+ match Stream.next stream with
+ | exception Stream.Failure ->
+ init
+ | msg ->
+ let state = State.update state ~msg in
+ fold {t with state} ~f ~init:(f init ~state ~msg)
--- /dev/null
+type t
+
+val init : node:string -> modul:string -> t
+
+val fold
+ : t
+ -> f:('a -> state:Khatus_state.t -> msg:Khatus_msg.t -> 'a)
+ -> init:'a
+ -> 'a
--- /dev/null
+module Cache = Khatus_cache
+module Msg = Khatus_msg
+module Time = Khatus_time
+
+type t =
+ { node : string
+ ; modul : string
+ ; time : Time.t
+ ; cache : Cache.t
+ }
+
+let init ~node ~modul =
+ { node
+ ; modul
+ ; time = Time.init
+ ; cache = Cache.create ()
+ }
+
+(* TODO: Should probably wrap state update in result. *)
+let update ({node; modul = _; time; cache} as t) ~msg =
+ Msg.handle_data msg ~otherwise:t ~f:(fun ~node:src_node ~modul ~key ~value ->
+ let time =
+ match (modul, key) with
+ | ("khatus_sensor_datetime", ["epoch"]) when src_node = node ->
+ Time.of_string value (* Raises if value is not a number *)
+ | (_, _) ->
+ time
+ in
+ Cache.update cache ~node:src_node ~modul ~key ~value ~time;
+ {t with time}
+ )
--- /dev/null
+type t =
+ { node : string
+ ; modul : string
+ ; time : Khatus_time.t
+ ; cache : Khatus_cache.t
+ }
+
+val init : node:string -> modul:string -> t
+
+val update : t -> msg:Khatus_msg.t -> t
|> List.hd
let of_string s =
+ (* TODO: Shall we validate time string format at msg parse time? *)
float_of_string s
val to_string : t -> string
val of_string : string -> t
+(** Raises if string is not a number. *)