Include the metric type in it's name.
[beam_stats.git] / src / beam_stats_consumer_csv.erl
1 -module(beam_stats_consumer_csv).
2
3 -include("include/beam_stats.hrl").
4 -include("beam_stats_logging.hrl").
5
6 -behaviour(beam_stats_consumer).
7
8 -export_type(
9 [ option/0
10 ]).
11
12 -export(
13 [ init/1
14 , consume/2
15 , terminate/1
16 ]).
17
18 -type option() ::
19 {consumption_interval , non_neg_integer()}
20 | {path , file:filename()}
21 .
22
23 -record(state,
24 { path :: file:filename()
25 , file = none :: hope_option:t(file:io_device())
26 }).
27
28 -type state() ::
29 #state{}.
30
31 -spec init([option()]) ->
32 {non_neg_integer(), state()}.
33 init(Options) ->
34 ConsumptionInterval = hope_kv_list:get(Options, consumption_interval, 60000),
35 {some, Path} = hope_kv_list:get(Options, path),
36 State = #state
37 { path = Path
38 , file = none
39 },
40 {ConsumptionInterval, State}.
41
42 -spec consume(beam_stats_consumer:queue(), state()) ->
43 state().
44 consume(Q, #state{}=State1) ->
45 Payload = beam_stats_queue_to_binary(Q),
46 State2 = try_to_open_if_no_file(State1),
47 try_to_write(State2, Payload).
48
49 -spec terminate(state()) ->
50 {}.
51 terminate(#state{file=FileOpt}) ->
52 ok = hope_option:iter(FileOpt, fun file:close/1),
53 {}.
54
55 %% ============================================================================
56
57 -spec try_to_write(state(), binary()) ->
58 state().
59 try_to_write(#state{file=none, path=Path}=State, _) ->
60 ?log_error("Writing to file (~p) failed: no file in state.", [Path]),
61 State;
62 try_to_write(#state{file={some, File}}=State, Payload) ->
63 case file:write(File, Payload)
64 of ok ->
65 State
66 ; {error, _}=Error ->
67 ?log_error("file:write(~p, ~p) -> ~p", [File, Payload, Error]),
68 % TODO: Maybe schedule retry?
69 ok = file:close(File),
70 State#state{file=none}
71 end.
72
73 -spec try_to_open_if_no_file(state()) ->
74 state().
75 try_to_open_if_no_file(#state{file={some, _}}=State) ->
76 State;
77 try_to_open_if_no_file(#state{file=none, path=Path}=State) ->
78 Options = [append],
79 case file:open(Path, Options)
80 of {ok, File} ->
81 State#state{file = {some, File}}
82 ; {error, _}=Error ->
83 ?log_error("file:open(~p, ~p) -> ~p", [Path, Options, Error]),
84 State#state{file = none}
85 end.
86
87 -spec beam_stats_queue_to_binary(beam_stats_consumer:queue()) ->
88 binary().
89 beam_stats_queue_to_binary(BEAMStatsQ) ->
90 Bins = [beam_stats_to_bin(B) || B <- queue:to_list(BEAMStatsQ)],
91 iolist_to_binary(Bins).
92
93
94 -spec beam_stats_to_bin(beam_stats:t()) ->
95 binary().
96 beam_stats_to_bin(#beam_stats
97 { timestamp = Timestamp
98 , node_id = NodeID
99 , memory = Memory
100 }
101 ) ->
102 <<TimestampBin/binary>> = timestamp_to_bin(Timestamp),
103 <<NodeIDBin/binary>> = node_id_to_bin(NodeID),
104 MemoryPairToBin = make_pair_to_bin(NodeIDBin, TimestampBin, <<"memory">>),
105 MemoryBinPairs = lists:map(fun atom_int_to_bin_bin/1, Memory),
106 MemoryBins = lists:map(MemoryPairToBin, MemoryBinPairs),
107 AllBins =
108 [ MemoryBins
109 ],
110 iolist_to_binary(AllBins).
111
112 -spec timestamp_to_bin(erlang:timestamp()) ->
113 binary().
114 timestamp_to_bin(Timestamp) ->
115 TimestampFloat = timestamp_to_float(Timestamp),
116 {{Year, Month, Day}, {Hour, Min, Sec}} = calendar:now_to_local_time(Timestamp),
117 SecondsFloat = Sec + (TimestampFloat - trunc(TimestampFloat)),
118 Fmt2Digits = "~2.10.0b",
119 FmtDate = string:join(["~b" , Fmt2Digits, Fmt2Digits], "-"),
120 FmtTime = string:join([Fmt2Digits, Fmt2Digits, "~9..0f" ], ":"),
121 Separator = " ",
122 Fmt = FmtDate ++ Separator ++ FmtTime,
123 IOList = io_lib:format(Fmt, [Year, Month, Day, Hour, Min, SecondsFloat]),
124 iolist_to_binary(IOList).
125
126 -spec timestamp_to_float(erlang:timestamp()) ->
127 float().
128 timestamp_to_float({ComponentMega, ComponentWhole, ComponentMicro}) ->
129 OneMillion = 1000000,
130 TotalWholeSeconds = ComponentMega * OneMillion + ComponentWhole,
131 TotalMicroSeconds = (TotalWholeSeconds * OneMillion) + ComponentMicro,
132 TotalMicroSeconds / OneMillion.
133
134 -spec make_pair_to_bin(binary(), binary(), binary()) ->
135 fun(({binary(), binary()}) -> binary()).
136 make_pair_to_bin(<<NodeID/binary>>, <<TimestampBin/binary>>, <<Type/binary>>) ->
137 fun ({<<K/binary>>, <<V/binary>>}) ->
138 << TimestampBin/binary
139 , "|"
140 , NodeID/binary
141 , "|"
142 , Type/binary
143 , "|"
144 , K/binary
145 , "|"
146 , V/binary
147 , "\n"
148 >>
149 end.
150
151 -spec node_id_to_bin(node()) ->
152 binary().
153 node_id_to_bin(NodeID) ->
154 atom_to_binary(NodeID, utf8).
155
156 -spec atom_int_to_bin_bin({atom(), integer()}) ->
157 {binary(), binary()}.
158 atom_int_to_bin_bin({K, V}) ->
159 {atom_to_binary(K, latin1), integer_to_binary(V)}.
This page took 0.054758 seconds and 4 git commands to generate.