From 6f35286a0c7e5c5b6569f742495efc267772f7a1 Mon Sep 17 00:00:00 2001 From: Siraaj Khandkar Date: Mon, 10 Sep 2018 17:38:08 -0400 Subject: [PATCH] Refactor cache dumper --- src/ocaml/exe/khatus_cache_dumper.ml | 90 ++++++---------------------- src/ocaml/lib/khatus.ml | 2 + src/ocaml/lib/khatus_cache.ml | 43 +++++++++---- src/ocaml/lib/khatus_cache.mli | 10 +++- src/ocaml/lib/khatus_msg.ml | 17 +++--- src/ocaml/lib/khatus_msg.mli | 5 +- src/ocaml/lib/khatus_msg_parser.mli | 14 ++--- src/ocaml/lib/khatus_msg_parser.mll | 6 ++ src/ocaml/lib/khatus_msg_stream.ml | 61 +++++++++++++++++++ src/ocaml/lib/khatus_msg_stream.mli | 9 +++ src/ocaml/lib/khatus_state.ml | 31 ++++++++++ src/ocaml/lib/khatus_state.mli | 10 ++++ src/ocaml/lib/khatus_time.ml | 1 + src/ocaml/lib/khatus_time.mli | 1 + 14 files changed, 194 insertions(+), 106 deletions(-) create mode 100644 src/ocaml/lib/khatus_msg_stream.ml create mode 100644 src/ocaml/lib/khatus_msg_stream.mli create mode 100644 src/ocaml/lib/khatus_state.ml create mode 100644 src/ocaml/lib/khatus_state.mli diff --git a/src/ocaml/exe/khatus_cache_dumper.ml b/src/ocaml/exe/khatus_cache_dumper.ml index e51243e..1ae1e84 100644 --- a/src/ocaml/exe/khatus_cache_dumper.ml +++ b/src/ocaml/exe/khatus_cache_dumper.ml @@ -1,76 +1,24 @@ -open Printf open Khatus -let modul = __MODULE__ - -let (/) = Filename.concat - -let mkdir_p dir = - match Sys.command("mkdir -p " ^ dir) with - | 0 -> () - | n -> - failwith - (sprintf "Failed to create directory: %S. mkdir status: %d\n" dir n) - -let gzip path = - match Sys.command("gzip " ^ path) with - | 0 -> () - | n -> - failwith - (sprintf "Failed to gzip path: %S. gzip status: %d\n" path n) - -let main ~node ~cache ~dump_interval:interval ~dump_directory = - mkdir_p dump_directory; - let dump_filename = dump_directory / "khatus-cache-dump.psv.gz" in - let rec loop ~time ~time_dumped = - (match read_line () with - | exception End_of_file -> - () - | line -> - (match Msg_parser.parse_msg (Lexing.from_string line) with - | Ok msg -> - let time = Msg.next_time msg ~node ~time in - Cache.update_if_data cache ~msg ~time; - if (Time.Span.is_gt_or_eq (Time.diff time_dumped time) interval) - then - ( - let (tmp_filename, oc) = - Filename.open_temp_file "khatus-cache" "dump" - in - Cache.dump cache ~node ~modul ~oc; - close_out oc; - gzip tmp_filename; - Sys.rename (tmp_filename ^ ".gz") dump_filename; - loop ~time ~time_dumped:time - ) - else - loop ~time ~time_dumped - | Error e -> - let e = - match e with - | `Bad_format_of_msg_head -> "Bad_format_of_msg_head" - | `Bad_format_of_msg_content -> "Bad_format_of_msg_content" - in - eprintf - "%s\n%!" - Msg.(to_string - { node - ; modul - ; content = Log - { location = "main:loop" - ; level = `error - ; msg = sprintf "Parse error %s in %s" e line - } - }); - loop ~time ~time_dumped - ) - ) - in - loop ~time:Time.init ~time_dumped:Time.init +let main ~stream ~interval ~dir = + let dumped = Time.init in + ignore (Msg_stream.fold stream ~init:dumped ~f:( + fun dumped ~state:(State.({node; modul; time = now; cache;})) ~msg:_ -> + let elapsed = Time.diff dumped now in + if (Time.Span.is_gt_or_eq elapsed interval) + then begin + Cache.dump_to_dir cache ~time:now ~node ~modul ~dir; + now + end else + dumped + )) let () = + let modul = __MODULE__ in + let node = Sys.argv.(1) in + let dump_interval = Sys.argv.(2) |> Time.Span.of_string in + let dump_directory = Sys.argv.(3) in main - ~node:(Sys.argv.(1)) - ~dump_interval:(Time.Span.of_string Sys.argv.(2)) - ~dump_directory:(Sys.argv.(3)) - ~cache:(Cache.create ()) + ~stream: (Msg_stream.init ~node ~modul) + ~interval: dump_interval + ~dir: dump_directory diff --git a/src/ocaml/lib/khatus.ml b/src/ocaml/lib/khatus.ml index bd01385..efc41ae 100644 --- a/src/ocaml/lib/khatus.ml +++ b/src/ocaml/lib/khatus.ml @@ -1,4 +1,6 @@ module Cache = Khatus_cache module Msg = Khatus_msg module Msg_parser = Khatus_msg_parser +module Msg_stream = Khatus_msg_stream +module State = Khatus_state module Time = Khatus_time diff --git a/src/ocaml/lib/khatus_cache.ml b/src/ocaml/lib/khatus_cache.ml index 698d20b..6a70e23 100644 --- a/src/ocaml/lib/khatus_cache.ml +++ b/src/ocaml/lib/khatus_cache.ml @@ -1,3 +1,5 @@ +open Printf + module Hashtbl = MoreLabels.Hashtbl module Msg = Khatus_msg @@ -18,19 +20,7 @@ let update {values; mtimes} ~node ~modul ~key ~value:data ~time = Hashtbl.replace values ~key ~data; Hashtbl.replace mtimes ~key ~data:time -let update_if_data t ~msg ~time = - match msg with - | Msg.({content = Data {key; value}; node; modul}) -> - update t ~node ~modul ~key ~value ~time - | {Msg.content = Msg.Alert _; _} - | {Msg.content = Msg.Cache _; _} - | {Msg.content = Msg.Error _; _} - | {Msg.content = Msg.Log _; _} - | {Msg.content = Msg.Status_bar _; _} - -> - () - -let dump {values; mtimes} ~node ~modul ~oc = +let dump_to_channel {values; mtimes} ~node ~modul ~oc = Hashtbl.iter values ~f:(fun ~key ~data:value -> let mtime = match Hashtbl.find_opt mtimes key with @@ -50,3 +40,30 @@ let dump {values; mtimes} ~node ~modul ~oc = oc (msg ^ "\n") ) + +let (/) = Filename.concat + +let mkdir_p dir = + match Sys.command("mkdir -p " ^ dir) with + | 0 -> () + | n -> + failwith + (sprintf "Failed to create directory: %S. mkdir status: %d\n" dir n) + +let gzip path = + match Sys.command("gzip " ^ path) with + | 0 -> () + | n -> + failwith + (sprintf "Failed to gzip path: %S. gzip status: %d\n" path n) + +let dump_to_dir t ~time ~node ~modul ~dir = + (* TODO: Just log the errors and keep it moving, instead of failing. *) + mkdir_p dir; + let dump_filename = dir / "khatus-cache-dump.psv.gz" in + let tmp_filename = "khatus-cache-dump-" ^ (Time.to_string time) in + let oc = open_out tmp_filename in + dump_to_channel t ~node ~modul ~oc; + close_out oc; + gzip tmp_filename; + Sys.rename (tmp_filename ^ ".gz") dump_filename diff --git a/src/ocaml/lib/khatus_cache.mli b/src/ocaml/lib/khatus_cache.mli index f6c951e..7a426f3 100644 --- a/src/ocaml/lib/khatus_cache.mli +++ b/src/ocaml/lib/khatus_cache.mli @@ -11,6 +11,10 @@ val update -> time : Khatus_time.t -> unit -val update_if_data : t -> msg:Khatus_msg.t -> time:Khatus_time.t -> unit - -val dump : t -> node:string -> modul:string -> oc:out_channel -> unit +val dump_to_dir + : t + -> time:Khatus_time.t + -> node:string + -> modul:string + -> dir:string + -> unit diff --git a/src/ocaml/lib/khatus_msg.ml b/src/ocaml/lib/khatus_msg.ml index cd9fd86..1d39d69 100644 --- a/src/ocaml/lib/khatus_msg.ml +++ b/src/ocaml/lib/khatus_msg.ml @@ -17,6 +17,9 @@ type content = type t = {node : string; modul : string; content : content} +type 'a data_handler = + node:string -> modul:string -> key:string list -> value:string -> 'a + let sep_1 = "|" let sep_2 = ":" @@ -51,20 +54,14 @@ let to_string {node; modul; content} = | Status_bar text -> String.concat sep_1 [node; modul; "status_bar"; text] -let next_time t ~node ~time:time0 = +let handle_data t ~f ~otherwise = match t with - | { modul = "khatus_sensor_datetime" - ; content = Data {key = ["epoch"]; value = time1} - ; node = node' - } when node' = node -> - (* TODO: Going forawrd, perhaps throwing exceptions is the wrong way. *) - (* TODO: Should we check this one at msg parse time? *) - Time.of_string time1 - | {content = Data _; _} + | {content = Data {key; value}; node; modul} -> + f ~node ~modul ~key ~value | {content = Alert _; _} | {content = Cache _; _} | {content = Error _; _} | {content = Log _; _} | {content = Status_bar _; _} -> - time0 + otherwise diff --git a/src/ocaml/lib/khatus_msg.mli b/src/ocaml/lib/khatus_msg.mli index 7f89c2b..358d0ed 100644 --- a/src/ocaml/lib/khatus_msg.mli +++ b/src/ocaml/lib/khatus_msg.mli @@ -15,6 +15,9 @@ type content = type t = {node : string; modul : string; content : content} +type 'a data_handler = + (node:string -> modul:string -> key:string list -> value:string -> 'a) + val to_string : t -> string -val next_time : t -> node:string -> time:Khatus_time.t -> Khatus_time.t +val handle_data : t -> f:'a data_handler -> otherwise:'a -> 'a diff --git a/src/ocaml/lib/khatus_msg_parser.mli b/src/ocaml/lib/khatus_msg_parser.mli index f177b28..a4296f6 100644 --- a/src/ocaml/lib/khatus_msg_parser.mli +++ b/src/ocaml/lib/khatus_msg_parser.mli @@ -1,8 +1,6 @@ -val parse_msg - : Lexing.lexbuf - -> - ( Khatus_msg.t - , [ `Bad_format_of_msg_head - | `Bad_format_of_msg_content - ] - ) result +type error = + [ `Bad_format_of_msg_head + | `Bad_format_of_msg_content + ] + +val parse_msg : Lexing.lexbuf -> (Khatus_msg.t, error) result diff --git a/src/ocaml/lib/khatus_msg_parser.mll b/src/ocaml/lib/khatus_msg_parser.mll index 2015f77..b7f6418 100644 --- a/src/ocaml/lib/khatus_msg_parser.mll +++ b/src/ocaml/lib/khatus_msg_parser.mll @@ -1,6 +1,12 @@ { module Msg = Khatus_msg module Time = Khatus_time + + type error = + [ `Bad_format_of_msg_head + | `Bad_format_of_msg_content + ] + let sep_2 = ':' } diff --git a/src/ocaml/lib/khatus_msg_stream.ml b/src/ocaml/lib/khatus_msg_stream.ml new file mode 100644 index 0000000..3013d53 --- /dev/null +++ b/src/ocaml/lib/khatus_msg_stream.ml @@ -0,0 +1,61 @@ +open Printf + +module Msg = Khatus_msg +module Msg_parser = Khatus_msg_parser +module State = Khatus_state + +type t = + { state : State.t + ; stream : Msg.t Stream.t + } + +let init ~node ~modul = + let line_stream = + Stream.from (fun _ -> + match read_line () with + | exception End_of_file -> + None + | line -> + Some line + ) + in + let rec parse_next msg_count = + (match Stream.next line_stream with + | exception Stream.Failure -> + None + | line -> + (match (Msg_parser.parse_msg (Lexing.from_string line)) with + | Ok msg -> + Some msg + | Error e -> + let e = + match e with + | `Bad_format_of_msg_head -> "Bad_format_of_msg_head" + | `Bad_format_of_msg_content -> "Bad_format_of_msg_content" + in + eprintf + "%s\n%!" + Msg.(to_string + { node + ; modul + ; content = Log + { location = "khatus_msg_stream:fold" + ; level = `error + ; msg = sprintf "Parse error %s in %s" e line + } + }); + parse_next msg_count + ) + ) + in + { state = State.init ~node ~modul + ; stream = Stream.from parse_next + } + +let rec fold ({state; stream} as t) ~f ~init = + match Stream.next stream with + | exception Stream.Failure -> + init + | msg -> + let state = State.update state ~msg in + fold {t with state} ~f ~init:(f init ~state ~msg) diff --git a/src/ocaml/lib/khatus_msg_stream.mli b/src/ocaml/lib/khatus_msg_stream.mli new file mode 100644 index 0000000..20b1677 --- /dev/null +++ b/src/ocaml/lib/khatus_msg_stream.mli @@ -0,0 +1,9 @@ +type t + +val init : node:string -> modul:string -> t + +val fold + : t + -> f:('a -> state:Khatus_state.t -> msg:Khatus_msg.t -> 'a) + -> init:'a + -> 'a diff --git a/src/ocaml/lib/khatus_state.ml b/src/ocaml/lib/khatus_state.ml new file mode 100644 index 0000000..9a77bc6 --- /dev/null +++ b/src/ocaml/lib/khatus_state.ml @@ -0,0 +1,31 @@ +module Cache = Khatus_cache +module Msg = Khatus_msg +module Time = Khatus_time + +type t = + { node : string + ; modul : string + ; time : Time.t + ; cache : Cache.t + } + +let init ~node ~modul = + { node + ; modul + ; time = Time.init + ; cache = Cache.create () + } + +(* TODO: Should probably wrap state update in result. *) +let update ({node; modul = _; time; cache} as t) ~msg = + Msg.handle_data msg ~otherwise:t ~f:(fun ~node:src_node ~modul ~key ~value -> + let time = + match (modul, key) with + | ("khatus_sensor_datetime", ["epoch"]) when src_node = node -> + Time.of_string value (* Raises if value is not a number *) + | (_, _) -> + time + in + Cache.update cache ~node:src_node ~modul ~key ~value ~time; + {t with time} + ) diff --git a/src/ocaml/lib/khatus_state.mli b/src/ocaml/lib/khatus_state.mli new file mode 100644 index 0000000..c097758 --- /dev/null +++ b/src/ocaml/lib/khatus_state.mli @@ -0,0 +1,10 @@ +type t = + { node : string + ; modul : string + ; time : Khatus_time.t + ; cache : Khatus_cache.t + } + +val init : node:string -> modul:string -> t + +val update : t -> msg:Khatus_msg.t -> t diff --git a/src/ocaml/lib/khatus_time.ml b/src/ocaml/lib/khatus_time.ml index cf6538a..9410417 100644 --- a/src/ocaml/lib/khatus_time.ml +++ b/src/ocaml/lib/khatus_time.ml @@ -21,4 +21,5 @@ let to_string t = |> List.hd let of_string s = + (* TODO: Shall we validate time string format at msg parse time? *) float_of_string s diff --git a/src/ocaml/lib/khatus_time.mli b/src/ocaml/lib/khatus_time.mli index 9f930af..b6b5441 100644 --- a/src/ocaml/lib/khatus_time.mli +++ b/src/ocaml/lib/khatus_time.mli @@ -15,3 +15,4 @@ val diff : t -> t -> Span.t val to_string : t -> string val of_string : string -> t +(** Raises if string is not a number. *) -- 2.20.1