Upgrade hope to 4.0.0, which uses empty tuple as unit
[beam_stats.git] / src / beam_stats_consumer_csv.erl
1 -module(beam_stats_consumer_csv).
2
3 -include("include/beam_stats.hrl").
4 -include("beam_stats_logging.hrl").
5
6 -behaviour(beam_stats_consumer).
7
8 -export_type(
9 [ option/0
10 ]).
11
12 -export(
13 [ init/1
14 , consume/2
15 , terminate/1
16 ]).
17
18 -type option() ::
19 {consumption_interval , non_neg_integer()}
20 | {path , file:filename()}
21 .
22
23 -record(state,
24 { path :: file:filename()
25 , file = none :: hope_option:t(file:io_device())
26 }).
27
28 -type state() ::
29 #state{}.
30
31 -spec init([option()]) ->
32 {non_neg_integer(), state()}.
33 init(Options) ->
34 ConsumptionInterval = hope_kv_list:get(Options, consumption_interval, 60000),
35 {some, Path} = hope_kv_list:get(Options, path),
36 State = #state
37 { path = Path
38 , file = none
39 },
40 {ConsumptionInterval, State}.
41
42 -spec consume(beam_stats_consumer:queue(), state()) ->
43 state().
44 consume(Q, #state{}=State1) ->
45 Payload = beam_stats_queue_to_binary(Q),
46 State2 = try_to_open_if_no_file(State1),
47 try_to_write(State2, Payload).
48
49 -spec terminate(state()) ->
50 {}.
51 terminate(#state{file=FileOpt}) ->
52 hope_option:iter(FileOpt, fun file:close/1).
53
54 %% ============================================================================
55
56 -spec try_to_write(state(), binary()) ->
57 state().
58 try_to_write(#state{file=none, path=Path}=State, _) ->
59 ?log_error("Writing to file (~p) failed: no file in state.", [Path]),
60 State;
61 try_to_write(#state{file={some, File}}=State, Payload) ->
62 case file:write(File, Payload)
63 of ok ->
64 State
65 ; {error, _}=Error ->
66 ?log_error("file:write(~p, ~p) -> ~p", [File, Payload, Error]),
67 % TODO: Maybe schedule retry?
68 ok = file:close(File),
69 State#state{file=none}
70 end.
71
72 -spec try_to_open_if_no_file(state()) ->
73 state().
74 try_to_open_if_no_file(#state{file={some, _}}=State) ->
75 State;
76 try_to_open_if_no_file(#state{file=none, path=Path}=State) ->
77 Options = [append],
78 case file:open(Path, Options)
79 of {ok, File} ->
80 State#state{file = {some, File}}
81 ; {error, _}=Error ->
82 ?log_error("file:open(~p, ~p) -> ~p", [Path, Options, Error]),
83 State#state{file = none}
84 end.
85
86 -spec beam_stats_queue_to_binary(beam_stats_consumer:queue()) ->
87 binary().
88 beam_stats_queue_to_binary(BEAMStatsQ) ->
89 Bins = [beam_stats_to_bin(B) || B <- queue:to_list(BEAMStatsQ)],
90 iolist_to_binary(Bins).
91
92
93 -spec beam_stats_to_bin(beam_stats:t()) ->
94 binary().
95 beam_stats_to_bin(#beam_stats
96 { timestamp = Timestamp
97 , node_id = NodeID
98 , memory = Memory
99 }
100 ) ->
101 <<TimestampBin/binary>> = timestamp_to_bin(Timestamp),
102 <<NodeIDBin/binary>> = node_id_to_bin(NodeID),
103 MemoryPairToBin = make_pair_to_bin(NodeIDBin, TimestampBin, <<"memory">>),
104 MemoryBinPairs = lists:map(fun atom_int_to_bin_bin/1, Memory),
105 MemoryBins = lists:map(MemoryPairToBin, MemoryBinPairs),
106 AllBins =
107 [ MemoryBins
108 ],
109 iolist_to_binary(AllBins).
110
111 -spec timestamp_to_bin(erlang:timestamp()) ->
112 binary().
113 timestamp_to_bin(Timestamp) ->
114 TimestampFloat = timestamp_to_float(Timestamp),
115 {{Year, Month, Day}, {Hour, Min, Sec}} = calendar:now_to_local_time(Timestamp),
116 SecondsFloat = Sec + (TimestampFloat - trunc(TimestampFloat)),
117 Fmt2Digits = "~2.10.0b",
118 FmtDate = string:join(["~b" , Fmt2Digits, Fmt2Digits], "-"),
119 FmtTime = string:join([Fmt2Digits, Fmt2Digits, "~9..0f" ], ":"),
120 Separator = " ",
121 Fmt = FmtDate ++ Separator ++ FmtTime,
122 IOList = io_lib:format(Fmt, [Year, Month, Day, Hour, Min, SecondsFloat]),
123 iolist_to_binary(IOList).
124
125 -spec timestamp_to_float(erlang:timestamp()) ->
126 float().
127 timestamp_to_float({ComponentMega, ComponentWhole, ComponentMicro}) ->
128 OneMillion = 1000000,
129 TotalWholeSeconds = ComponentMega * OneMillion + ComponentWhole,
130 TotalMicroSeconds = (TotalWholeSeconds * OneMillion) + ComponentMicro,
131 TotalMicroSeconds / OneMillion.
132
133 -spec make_pair_to_bin(binary(), binary(), binary()) ->
134 fun(({binary(), binary()}) -> binary()).
135 make_pair_to_bin(<<NodeID/binary>>, <<TimestampBin/binary>>, <<Type/binary>>) ->
136 fun ({<<K/binary>>, <<V/binary>>}) ->
137 << TimestampBin/binary
138 , "|"
139 , NodeID/binary
140 , "|"
141 , Type/binary
142 , "|"
143 , K/binary
144 , "|"
145 , V/binary
146 , "\n"
147 >>
148 end.
149
150 -spec node_id_to_bin(node()) ->
151 binary().
152 node_id_to_bin(NodeID) ->
153 atom_to_binary(NodeID, utf8).
154
155 -spec atom_int_to_bin_bin({atom(), integer()}) ->
156 {binary(), binary()}.
157 atom_int_to_bin_bin({K, V}) ->
158 {atom_to_binary(K, latin1), integer_to_binary(V)}.
This page took 0.050359 seconds and 4 git commands to generate.