Refactor cache dumper
[khatus.git] / src / ocaml / exe / khatus_cache_dumper.ml
index e51243e..1ae1e84 100644 (file)
@@ -1,76 +1,24 @@
-open Printf
 open Khatus
 
-let modul = __MODULE__
-
-let (/) = Filename.concat
-
-let mkdir_p dir =
-  match Sys.command("mkdir -p " ^ dir) with
-  | 0 -> ()
-  | n ->
-      failwith
-        (sprintf "Failed to create directory: %S. mkdir status: %d\n" dir n)
-
-let gzip path =
-  match Sys.command("gzip " ^ path) with
-  | 0 -> ()
-  | n ->
-      failwith
-        (sprintf "Failed to gzip path: %S. gzip status: %d\n" path n)
-
-let main ~node ~cache ~dump_interval:interval ~dump_directory =
-  mkdir_p dump_directory;
-  let dump_filename = dump_directory / "khatus-cache-dump.psv.gz" in
-  let rec loop ~time ~time_dumped =
-    (match read_line () with
-    | exception End_of_file ->
-        ()
-    | line ->
-        (match Msg_parser.parse_msg (Lexing.from_string line) with
-        | Ok msg ->
-            let time = Msg.next_time msg ~node ~time in
-            Cache.update_if_data cache ~msg ~time;
-            if (Time.Span.is_gt_or_eq (Time.diff time_dumped time) interval)
-            then
-              (
-                let (tmp_filename, oc) =
-                  Filename.open_temp_file "khatus-cache" "dump"
-                in
-                Cache.dump cache ~node ~modul ~oc;
-                close_out oc;
-                gzip tmp_filename;
-                Sys.rename (tmp_filename ^ ".gz") dump_filename;
-                loop ~time ~time_dumped:time
-              )
-            else
-              loop ~time ~time_dumped
-        | Error e ->
-            let e =
-              match e with
-              | `Bad_format_of_msg_head    -> "Bad_format_of_msg_head"
-              | `Bad_format_of_msg_content -> "Bad_format_of_msg_content"
-            in
-            eprintf
-              "%s\n%!"
-              Msg.(to_string
-                { node
-                ; modul
-                ; content = Log
-                    { location = "main:loop"
-                    ; level    = `error
-                    ; msg      = sprintf "Parse error %s in %s" e line
-                    }
-                });
-            loop ~time ~time_dumped
-        )
-    )
-  in
-  loop ~time:Time.init ~time_dumped:Time.init
+let main ~stream ~interval ~dir =
+  let dumped = Time.init in
+  ignore (Msg_stream.fold stream ~init:dumped ~f:(
+    fun dumped ~state:(State.({node; modul; time = now; cache;})) ~msg:_ ->
+      let elapsed = Time.diff dumped now in
+      if (Time.Span.is_gt_or_eq elapsed interval)
+      then begin
+        Cache.dump_to_dir cache ~time:now ~node ~modul ~dir;
+        now
+      end else
+        dumped
+  ))
 
 let () =
+  let modul          = __MODULE__ in
+  let node           = Sys.argv.(1) in
+  let dump_interval  = Sys.argv.(2) |> Time.Span.of_string in
+  let dump_directory = Sys.argv.(3) in
   main
-    ~node:(Sys.argv.(1))
-    ~dump_interval:(Time.Span.of_string Sys.argv.(2))
-    ~dump_directory:(Sys.argv.(3))
-    ~cache:(Cache.create ())
+    ~stream:   (Msg_stream.init ~node ~modul)
+    ~interval: dump_interval
+    ~dir:      dump_directory
This page took 0.020809 seconds and 4 git commands to generate.