X-Git-Url: https://git.xandkar.net/?a=blobdiff_plain;f=src%2Focaml%2Fexe%2Fkhatus_cache_dumper.ml;h=1ae1e84d832133ebda0cbfb6fecf7252e9d663c3;hb=81336934f5dcb1895c050d71805ec37e56c400ad;hp=e51243eac3c84e0d52bcef48df9413bfc844175e;hpb=c6a7396ebc93cec32a1465d878ac9d36465dcb19;p=khatus.git diff --git a/src/ocaml/exe/khatus_cache_dumper.ml b/src/ocaml/exe/khatus_cache_dumper.ml index e51243e..1ae1e84 100644 --- a/src/ocaml/exe/khatus_cache_dumper.ml +++ b/src/ocaml/exe/khatus_cache_dumper.ml @@ -1,76 +1,24 @@ -open Printf open Khatus -let modul = __MODULE__ - -let (/) = Filename.concat - -let mkdir_p dir = - match Sys.command("mkdir -p " ^ dir) with - | 0 -> () - | n -> - failwith - (sprintf "Failed to create directory: %S. mkdir status: %d\n" dir n) - -let gzip path = - match Sys.command("gzip " ^ path) with - | 0 -> () - | n -> - failwith - (sprintf "Failed to gzip path: %S. gzip status: %d\n" path n) - -let main ~node ~cache ~dump_interval:interval ~dump_directory = - mkdir_p dump_directory; - let dump_filename = dump_directory / "khatus-cache-dump.psv.gz" in - let rec loop ~time ~time_dumped = - (match read_line () with - | exception End_of_file -> - () - | line -> - (match Msg_parser.parse_msg (Lexing.from_string line) with - | Ok msg -> - let time = Msg.next_time msg ~node ~time in - Cache.update_if_data cache ~msg ~time; - if (Time.Span.is_gt_or_eq (Time.diff time_dumped time) interval) - then - ( - let (tmp_filename, oc) = - Filename.open_temp_file "khatus-cache" "dump" - in - Cache.dump cache ~node ~modul ~oc; - close_out oc; - gzip tmp_filename; - Sys.rename (tmp_filename ^ ".gz") dump_filename; - loop ~time ~time_dumped:time - ) - else - loop ~time ~time_dumped - | Error e -> - let e = - match e with - | `Bad_format_of_msg_head -> "Bad_format_of_msg_head" - | `Bad_format_of_msg_content -> "Bad_format_of_msg_content" - in - eprintf - "%s\n%!" - Msg.(to_string - { node - ; modul - ; content = Log - { location = "main:loop" - ; level = `error - ; msg = sprintf "Parse error %s in %s" e line - } - }); - loop ~time ~time_dumped - ) - ) - in - loop ~time:Time.init ~time_dumped:Time.init +let main ~stream ~interval ~dir = + let dumped = Time.init in + ignore (Msg_stream.fold stream ~init:dumped ~f:( + fun dumped ~state:(State.({node; modul; time = now; cache;})) ~msg:_ -> + let elapsed = Time.diff dumped now in + if (Time.Span.is_gt_or_eq elapsed interval) + then begin + Cache.dump_to_dir cache ~time:now ~node ~modul ~dir; + now + end else + dumped + )) let () = + let modul = __MODULE__ in + let node = Sys.argv.(1) in + let dump_interval = Sys.argv.(2) |> Time.Span.of_string in + let dump_directory = Sys.argv.(3) in main - ~node:(Sys.argv.(1)) - ~dump_interval:(Time.Span.of_string Sys.argv.(2)) - ~dump_directory:(Sys.argv.(3)) - ~cache:(Cache.create ()) + ~stream: (Msg_stream.init ~node ~modul) + ~interval: dump_interval + ~dir: dump_directory