Refactor cache dumper
authorSiraaj Khandkar <siraaj@khandkar.net>
Mon, 10 Sep 2018 21:38:08 +0000 (17:38 -0400)
committerSiraaj Khandkar <siraaj@khandkar.net>
Mon, 10 Sep 2018 21:38:08 +0000 (17:38 -0400)
14 files changed:
src/ocaml/exe/khatus_cache_dumper.ml
src/ocaml/lib/khatus.ml
src/ocaml/lib/khatus_cache.ml
src/ocaml/lib/khatus_cache.mli
src/ocaml/lib/khatus_msg.ml
src/ocaml/lib/khatus_msg.mli
src/ocaml/lib/khatus_msg_parser.mli
src/ocaml/lib/khatus_msg_parser.mll
src/ocaml/lib/khatus_msg_stream.ml [new file with mode: 0644]
src/ocaml/lib/khatus_msg_stream.mli [new file with mode: 0644]
src/ocaml/lib/khatus_state.ml [new file with mode: 0644]
src/ocaml/lib/khatus_state.mli [new file with mode: 0644]
src/ocaml/lib/khatus_time.ml
src/ocaml/lib/khatus_time.mli

index e51243e..1ae1e84 100644 (file)
@@ -1,76 +1,24 @@
-open Printf
 open Khatus
 
-let modul = __MODULE__
-
-let (/) = Filename.concat
-
-let mkdir_p dir =
-  match Sys.command("mkdir -p " ^ dir) with
-  | 0 -> ()
-  | n ->
-      failwith
-        (sprintf "Failed to create directory: %S. mkdir status: %d\n" dir n)
-
-let gzip path =
-  match Sys.command("gzip " ^ path) with
-  | 0 -> ()
-  | n ->
-      failwith
-        (sprintf "Failed to gzip path: %S. gzip status: %d\n" path n)
-
-let main ~node ~cache ~dump_interval:interval ~dump_directory =
-  mkdir_p dump_directory;
-  let dump_filename = dump_directory / "khatus-cache-dump.psv.gz" in
-  let rec loop ~time ~time_dumped =
-    (match read_line () with
-    | exception End_of_file ->
-        ()
-    | line ->
-        (match Msg_parser.parse_msg (Lexing.from_string line) with
-        | Ok msg ->
-            let time = Msg.next_time msg ~node ~time in
-            Cache.update_if_data cache ~msg ~time;
-            if (Time.Span.is_gt_or_eq (Time.diff time_dumped time) interval)
-            then
-              (
-                let (tmp_filename, oc) =
-                  Filename.open_temp_file "khatus-cache" "dump"
-                in
-                Cache.dump cache ~node ~modul ~oc;
-                close_out oc;
-                gzip tmp_filename;
-                Sys.rename (tmp_filename ^ ".gz") dump_filename;
-                loop ~time ~time_dumped:time
-              )
-            else
-              loop ~time ~time_dumped
-        | Error e ->
-            let e =
-              match e with
-              | `Bad_format_of_msg_head    -> "Bad_format_of_msg_head"
-              | `Bad_format_of_msg_content -> "Bad_format_of_msg_content"
-            in
-            eprintf
-              "%s\n%!"
-              Msg.(to_string
-                { node
-                ; modul
-                ; content = Log
-                    { location = "main:loop"
-                    ; level    = `error
-                    ; msg      = sprintf "Parse error %s in %s" e line
-                    }
-                });
-            loop ~time ~time_dumped
-        )
-    )
-  in
-  loop ~time:Time.init ~time_dumped:Time.init
+let main ~stream ~interval ~dir =
+  let dumped = Time.init in
+  ignore (Msg_stream.fold stream ~init:dumped ~f:(
+    fun dumped ~state:(State.({node; modul; time = now; cache;})) ~msg:_ ->
+      let elapsed = Time.diff dumped now in
+      if (Time.Span.is_gt_or_eq elapsed interval)
+      then begin
+        Cache.dump_to_dir cache ~time:now ~node ~modul ~dir;
+        now
+      end else
+        dumped
+  ))
 
 let () =
+  let modul          = __MODULE__ in
+  let node           = Sys.argv.(1) in
+  let dump_interval  = Sys.argv.(2) |> Time.Span.of_string in
+  let dump_directory = Sys.argv.(3) in
   main
-    ~node:(Sys.argv.(1))
-    ~dump_interval:(Time.Span.of_string Sys.argv.(2))
-    ~dump_directory:(Sys.argv.(3))
-    ~cache:(Cache.create ())
+    ~stream:   (Msg_stream.init ~node ~modul)
+    ~interval: dump_interval
+    ~dir:      dump_directory
index bd01385..efc41ae 100644 (file)
@@ -1,4 +1,6 @@
 module Cache      = Khatus_cache
 module Msg        = Khatus_msg
 module Msg_parser = Khatus_msg_parser
+module Msg_stream = Khatus_msg_stream
+module State      = Khatus_state
 module Time       = Khatus_time
index 698d20b..6a70e23 100644 (file)
@@ -1,3 +1,5 @@
+open Printf
+
 module Hashtbl = MoreLabels.Hashtbl
 
 module Msg  = Khatus_msg
@@ -18,19 +20,7 @@ let update {values; mtimes} ~node ~modul ~key ~value:data ~time =
   Hashtbl.replace values ~key ~data;
   Hashtbl.replace mtimes ~key ~data:time
 
-let update_if_data t ~msg ~time =
-  match msg with
-  | Msg.({content = Data {key; value}; node; modul}) ->
-      update t ~node ~modul ~key ~value ~time
-  | {Msg.content = Msg.Alert      _; _}
-  | {Msg.content = Msg.Cache      _; _}
-  | {Msg.content = Msg.Error      _; _}
-  | {Msg.content = Msg.Log        _; _}
-  | {Msg.content = Msg.Status_bar _; _}
-  ->
-      ()
-
-let dump {values; mtimes} ~node ~modul ~oc =
+let dump_to_channel {values; mtimes} ~node ~modul ~oc =
   Hashtbl.iter values ~f:(fun ~key ~data:value ->
     let mtime =
       match Hashtbl.find_opt mtimes key with
@@ -50,3 +40,30 @@ let dump {values; mtimes} ~node ~modul ~oc =
       oc
       (msg ^ "\n")
   )
+
+let (/) = Filename.concat
+
+let mkdir_p dir =
+  match Sys.command("mkdir -p " ^ dir) with
+  | 0 -> ()
+  | n ->
+      failwith
+        (sprintf "Failed to create directory: %S. mkdir status: %d\n" dir n)
+
+let gzip path =
+  match Sys.command("gzip " ^ path) with
+  | 0 -> ()
+  | n ->
+      failwith
+        (sprintf "Failed to gzip path: %S. gzip status: %d\n" path n)
+
+let dump_to_dir t ~time ~node ~modul ~dir =
+  (* TODO: Just log the errors and keep it moving, instead of failing. *)
+  mkdir_p dir;
+  let dump_filename = dir / "khatus-cache-dump.psv.gz" in
+  let tmp_filename = "khatus-cache-dump-" ^ (Time.to_string time) in
+  let oc = open_out tmp_filename in
+  dump_to_channel t ~node ~modul ~oc;
+  close_out oc;
+  gzip tmp_filename;
+  Sys.rename (tmp_filename ^ ".gz") dump_filename
index f6c951e..7a426f3 100644 (file)
@@ -11,6 +11,10 @@ val update
   -> time  : Khatus_time.t
   -> unit
 
-val update_if_data : t -> msg:Khatus_msg.t -> time:Khatus_time.t -> unit
-
-val dump : t -> node:string -> modul:string -> oc:out_channel -> unit
+val dump_to_dir
+  : t
+  -> time:Khatus_time.t
+  -> node:string
+  -> modul:string
+  -> dir:string
+  -> unit
index cd9fd86..1d39d69 100644 (file)
@@ -17,6 +17,9 @@ type content =
 type t =
   {node : string; modul : string; content : content}
 
+type 'a data_handler =
+  node:string -> modul:string -> key:string list -> value:string -> 'a
+
 let sep_1 = "|"
 let sep_2 = ":"
 
@@ -51,20 +54,14 @@ let to_string {node; modul; content} =
   | Status_bar text ->
       String.concat sep_1 [node; modul; "status_bar"; text]
 
-let next_time t ~node ~time:time0 =
+let handle_data t ~f ~otherwise =
   match t with
-  | { modul   = "khatus_sensor_datetime"
-    ; content = Data {key = ["epoch"]; value = time1}
-    ; node    = node'
-    } when node' = node ->
-      (* TODO: Going forawrd, perhaps throwing exceptions is the wrong way. *)
-      (* TODO: Should we check this one at msg parse time? *)
-      Time.of_string time1
-  | {content = Data       _; _}
+  | {content = Data {key; value}; node; modul} ->
+      f ~node ~modul ~key ~value
   | {content = Alert      _; _}
   | {content = Cache      _; _}
   | {content = Error      _; _}
   | {content = Log        _; _}
   | {content = Status_bar _; _}
   ->
-      time0
+      otherwise
index 7f89c2b..358d0ed 100644 (file)
@@ -15,6 +15,9 @@ type content =
 type t =
   {node : string; modul : string; content : content}
 
+type 'a data_handler =
+  (node:string -> modul:string -> key:string list -> value:string -> 'a)
+
 val to_string : t -> string
 
-val next_time : t -> node:string -> time:Khatus_time.t -> Khatus_time.t
+val handle_data : t -> f:'a data_handler -> otherwise:'a -> 'a
index f177b28..a4296f6 100644 (file)
@@ -1,8 +1,6 @@
-val parse_msg
-  : Lexing.lexbuf
-  ->
-    ( Khatus_msg.t
-    , [ `Bad_format_of_msg_head
-      | `Bad_format_of_msg_content
-      ]
-    ) result
+type error =
+  [ `Bad_format_of_msg_head
+  | `Bad_format_of_msg_content
+  ]
+
+val parse_msg : Lexing.lexbuf -> (Khatus_msg.t, error) result
index 2015f77..b7f6418 100644 (file)
@@ -1,6 +1,12 @@
 {
   module Msg  = Khatus_msg
   module Time = Khatus_time
+
+  type error =
+    [ `Bad_format_of_msg_head
+    | `Bad_format_of_msg_content
+    ]
+
   let sep_2    = ':'
 }
 
diff --git a/src/ocaml/lib/khatus_msg_stream.ml b/src/ocaml/lib/khatus_msg_stream.ml
new file mode 100644 (file)
index 0000000..3013d53
--- /dev/null
@@ -0,0 +1,61 @@
+open Printf
+
+module Msg        = Khatus_msg
+module Msg_parser = Khatus_msg_parser
+module State      = Khatus_state
+
+type t =
+  { state  : State.t
+  ; stream : Msg.t Stream.t
+  }
+
+let init ~node ~modul =
+  let line_stream =
+    Stream.from (fun _ ->
+      match read_line () with
+      | exception End_of_file ->
+          None
+      | line ->
+          Some line
+    )
+  in
+  let rec parse_next msg_count =
+    (match Stream.next line_stream with
+    | exception Stream.Failure ->
+        None
+    | line ->
+        (match (Msg_parser.parse_msg (Lexing.from_string line)) with
+        | Ok msg ->
+            Some msg
+        | Error e ->
+            let e =
+              match e with
+              | `Bad_format_of_msg_head    -> "Bad_format_of_msg_head"
+              | `Bad_format_of_msg_content -> "Bad_format_of_msg_content"
+            in
+            eprintf
+              "%s\n%!"
+              Msg.(to_string
+                { node
+                ; modul
+                ; content = Log
+                    { location = "khatus_msg_stream:fold"
+                    ; level    = `error
+                    ; msg      = sprintf "Parse error %s in %s" e line
+                    }
+                });
+            parse_next msg_count
+        )
+    )
+  in
+  { state  = State.init ~node ~modul
+  ; stream = Stream.from parse_next
+  }
+
+let rec fold ({state; stream} as t) ~f ~init =
+  match Stream.next stream with
+  | exception Stream.Failure ->
+      init
+  | msg ->
+      let state = State.update state ~msg in
+      fold {t with state} ~f ~init:(f init ~state ~msg)
diff --git a/src/ocaml/lib/khatus_msg_stream.mli b/src/ocaml/lib/khatus_msg_stream.mli
new file mode 100644 (file)
index 0000000..20b1677
--- /dev/null
@@ -0,0 +1,9 @@
+type t
+
+val init : node:string -> modul:string -> t
+
+val fold
+  : t
+  -> f:('a -> state:Khatus_state.t -> msg:Khatus_msg.t -> 'a)
+  -> init:'a
+  -> 'a
diff --git a/src/ocaml/lib/khatus_state.ml b/src/ocaml/lib/khatus_state.ml
new file mode 100644 (file)
index 0000000..9a77bc6
--- /dev/null
@@ -0,0 +1,31 @@
+module Cache = Khatus_cache
+module Msg   = Khatus_msg
+module Time  = Khatus_time
+
+type t =
+  { node  : string
+  ; modul : string
+  ; time  : Time.t
+  ; cache : Cache.t
+  }
+
+let init ~node ~modul =
+  { node
+  ; modul
+  ; time  = Time.init
+  ; cache = Cache.create ()
+  }
+
+(* TODO: Should probably wrap state update in result. *)
+let update ({node; modul = _; time; cache} as t) ~msg =
+  Msg.handle_data msg ~otherwise:t ~f:(fun ~node:src_node ~modul ~key ~value ->
+    let time =
+      match (modul, key) with
+      | ("khatus_sensor_datetime", ["epoch"]) when src_node = node ->
+          Time.of_string value  (* Raises if value is not a number *)
+      | (_, _) ->
+          time
+    in
+    Cache.update cache ~node:src_node ~modul ~key ~value ~time;
+    {t with time}
+  )
diff --git a/src/ocaml/lib/khatus_state.mli b/src/ocaml/lib/khatus_state.mli
new file mode 100644 (file)
index 0000000..c097758
--- /dev/null
@@ -0,0 +1,10 @@
+type t =
+  { node  : string
+  ; modul : string
+  ; time  : Khatus_time.t
+  ; cache : Khatus_cache.t
+  }
+
+val init : node:string -> modul:string -> t
+
+val update : t -> msg:Khatus_msg.t -> t
index cf6538a..9410417 100644 (file)
@@ -21,4 +21,5 @@ let to_string t =
   |> List.hd
 
 let of_string s =
+  (* TODO: Shall we validate time string format at msg parse time? *)
   float_of_string s
index 9f930af..b6b5441 100644 (file)
@@ -15,3 +15,4 @@ val diff : t -> t -> Span.t
 val to_string : t -> string
 
 val of_string : string -> t
+(** Raises if string is not a number. *)
This page took 0.059264 seconds and 4 git commands to generate.