| 1 | open Printf |
| 2 | open Khatus |
| 3 | |
| 4 | let modul = __MODULE__ |
| 5 | |
| 6 | let (/) = Filename.concat |
| 7 | |
| 8 | let mkdir_p dir = |
| 9 | match Sys.command("mkdir -p " ^ dir) with |
| 10 | | 0 -> () |
| 11 | | n -> |
| 12 | failwith |
| 13 | (sprintf "Failed to create directory: %S. mkdir status: %d\n" dir n) |
| 14 | |
| 15 | let gzip path = |
| 16 | match Sys.command("gzip " ^ path) with |
| 17 | | 0 -> () |
| 18 | | n -> |
| 19 | failwith |
| 20 | (sprintf "Failed to gzip path: %S. gzip status: %d\n" path n) |
| 21 | |
| 22 | let main ~node ~cache ~dump_interval:interval ~dump_directory = |
| 23 | mkdir_p dump_directory; |
| 24 | let dump_filename = dump_directory / "khatus-cache-dump.psv.gz" in |
| 25 | let rec loop ~time ~time_dumped = |
| 26 | (match read_line () with |
| 27 | | exception End_of_file -> |
| 28 | () |
| 29 | | line -> |
| 30 | (match Msg_parser.parse_msg (Lexing.from_string line) with |
| 31 | | Ok msg -> |
| 32 | let time = Msg.next_time msg ~node ~time in |
| 33 | Cache.update_if_data cache ~msg ~time; |
| 34 | if (Time.Span.is_gt_or_eq (Time.diff time_dumped time) interval) |
| 35 | then |
| 36 | ( |
| 37 | let (tmp_filename, oc) = |
| 38 | Filename.open_temp_file "khatus-cache" "dump" |
| 39 | in |
| 40 | Cache.dump cache ~node ~modul ~oc; |
| 41 | close_out oc; |
| 42 | gzip tmp_filename; |
| 43 | Sys.rename (tmp_filename ^ ".gz") dump_filename; |
| 44 | loop ~time ~time_dumped:time |
| 45 | ) |
| 46 | else |
| 47 | loop ~time ~time_dumped |
| 48 | | Error e -> |
| 49 | let e = |
| 50 | match e with |
| 51 | | `Bad_format_of_msg_head -> "Bad_format_of_msg_head" |
| 52 | | `Bad_format_of_msg_content -> "Bad_format_of_msg_content" |
| 53 | in |
| 54 | eprintf |
| 55 | "%s\n%!" |
| 56 | Msg.(to_string |
| 57 | { node |
| 58 | ; modul |
| 59 | ; content = Log |
| 60 | { location = "main:loop" |
| 61 | ; level = `error |
| 62 | ; msg = sprintf "Parse error %s in %s" e line |
| 63 | } |
| 64 | }); |
| 65 | loop ~time ~time_dumped |
| 66 | ) |
| 67 | ) |
| 68 | in |
| 69 | loop ~time:Time.init ~time_dumped:Time.init |
| 70 | |
| 71 | let () = |
| 72 | main |
| 73 | ~node:(Sys.argv.(1)) |
| 74 | ~dump_interval:(Time.Span.of_string Sys.argv.(2)) |
| 75 | ~dump_directory:(Sys.argv.(3)) |
| 76 | ~cache:(Cache.create ()) |