Commit | Line | Data |
---|---|---|
c6a7396e SK |
1 | open Printf |
2 | open Khatus | |
3 | ||
4 | let modul = __MODULE__ | |
5 | ||
6 | let (/) = Filename.concat | |
7 | ||
8 | let mkdir_p dir = | |
9 | match Sys.command("mkdir -p " ^ dir) with | |
10 | | 0 -> () | |
11 | | n -> | |
12 | failwith | |
13 | (sprintf "Failed to create directory: %S. mkdir status: %d\n" dir n) | |
14 | ||
15 | let gzip path = | |
16 | match Sys.command("gzip " ^ path) with | |
17 | | 0 -> () | |
18 | | n -> | |
19 | failwith | |
20 | (sprintf "Failed to gzip path: %S. gzip status: %d\n" path n) | |
21 | ||
22 | let main ~node ~cache ~dump_interval:interval ~dump_directory = | |
23 | mkdir_p dump_directory; | |
24 | let dump_filename = dump_directory / "khatus-cache-dump.psv.gz" in | |
25 | let rec loop ~time ~time_dumped = | |
26 | (match read_line () with | |
27 | | exception End_of_file -> | |
28 | () | |
29 | | line -> | |
30 | (match Msg_parser.parse_msg (Lexing.from_string line) with | |
31 | | Ok msg -> | |
32 | let time = Msg.next_time msg ~node ~time in | |
33 | Cache.update_if_data cache ~msg ~time; | |
34 | if (Time.Span.is_gt_or_eq (Time.diff time_dumped time) interval) | |
35 | then | |
36 | ( | |
37 | let (tmp_filename, oc) = | |
38 | Filename.open_temp_file "khatus-cache" "dump" | |
39 | in | |
40 | Cache.dump cache ~node ~modul ~oc; | |
41 | close_out oc; | |
42 | gzip tmp_filename; | |
43 | Sys.rename (tmp_filename ^ ".gz") dump_filename; | |
44 | loop ~time ~time_dumped:time | |
45 | ) | |
46 | else | |
47 | loop ~time ~time_dumped | |
48 | | Error e -> | |
49 | let e = | |
50 | match e with | |
51 | | `Bad_format_of_msg_head -> "Bad_format_of_msg_head" | |
52 | | `Bad_format_of_msg_content -> "Bad_format_of_msg_content" | |
53 | in | |
54 | eprintf | |
55 | "%s\n%!" | |
56 | Msg.(to_string | |
57 | { node | |
58 | ; modul | |
59 | ; content = Log | |
60 | { location = "main:loop" | |
61 | ; level = `error | |
62 | ; msg = sprintf "Parse error %s in %s" e line | |
63 | } | |
64 | }); | |
65 | loop ~time ~time_dumped | |
66 | ) | |
67 | ) | |
68 | in | |
69 | loop ~time:Time.init ~time_dumped:Time.init | |
70 | ||
71 | let () = | |
72 | main | |
73 | ~node:(Sys.argv.(1)) | |
74 | ~dump_interval:(Time.Span.of_string Sys.argv.(2)) | |
75 | ~dump_directory:(Sys.argv.(3)) | |
76 | ~cache:(Cache.create ()) |