Flatten Graphite consumer options.
[beam_stats.git] / src / beam_stats_consumer_csv.erl
CommitLineData
caf75ed8
SK
1-module(beam_stats_consumer_csv).
2
3-include("include/beam_stats.hrl").
4
5-behaviour(beam_stats_consumer).
6
7-export_type(
8 [ option/0
9 ]).
10
11-export(
12 [ init/1
13 , consume/2
14 , terminate/1
15 ]).
16
17-type option() ::
b2f78fc6
SK
18 {consumption_interval , non_neg_integer()}
19 | {path , file:filename()}
caf75ed8
SK
20 .
21
22-record(state,
23 { path :: file:filename()
24 , file = none :: hope_option:t(file:io_device())
25 }).
26
27-type state() ::
28 #state{}.
29
30-spec init([option()]) ->
76aefffb 31 {non_neg_integer(), state()}.
caf75ed8
SK
32init(Options) ->
33 ConsumptionInterval = hope_kv_list:get(Options, consumption_interval, 60000),
34 {some, Path} = hope_kv_list:get(Options, path),
35 State = #state
36 { path = Path
37 , file = none
38 },
39 {ConsumptionInterval, State}.
40
41-spec consume(beam_stats_consumer:queue(), state()) ->
42 state().
43consume(Q, #state{}=State1) ->
44 Payload = beam_stats_queue_to_binary(Q),
45 State2 = try_to_open_if_no_file(State1),
46 try_to_write(State2, Payload).
47
48-spec terminate(state()) ->
49 {}.
50terminate(#state{file=FileOpt}) ->
51 ok = hope_option:iter(FileOpt, fun file:close/1),
52 {}.
53
54%% ============================================================================
55
56-spec try_to_write(state(), binary()) ->
57 state().
58try_to_write(#state{file=none, path=Path}=State, _) ->
59 io:format("error: file closed: ~s~n", [Path]),
60 State;
61try_to_write(#state{file={some, File}}=State, Payload) ->
62 case file:write(File, Payload)
63 of ok ->
64 State
65 ; {error, _}=Error ->
66 io:format("error: file:write/2 failed: ~p~n", [Error]),
67 % TODO: Maybe schedule retry?
68 ok = file:close(File),
69 State#state{file=none}
70 end.
71
72-spec try_to_open_if_no_file(state()) ->
73 state().
74try_to_open_if_no_file(#state{file={some, _}}=State) ->
75 State;
76try_to_open_if_no_file(#state{file=none, path=Path}=State) ->
77 case file:open(Path, [append])
78 of {ok, File} ->
79 State#state{file = {some, File}}
80 ; {error, _}=Error ->
81 io:format("error: file:open/2 failed: ~p~n", [Error]),
82 State#state{file = none}
83 end.
84
85-spec beam_stats_queue_to_binary(beam_stats_consumer:queue()) ->
86 binary().
87beam_stats_queue_to_binary(BEAMStatsQ) ->
88 Bins = [beam_stats_to_bin(B) || B <- queue:to_list(BEAMStatsQ)],
89 iolist_to_binary(Bins).
90
91
92-spec beam_stats_to_bin(beam_stats:t()) ->
93 binary().
94beam_stats_to_bin(#beam_stats
95 { timestamp = Timestamp
96 , node_id = NodeID
97 , memory = Memory
98 }
99) ->
100 <<TimestampBin/binary>> = timestamp_to_bin(Timestamp),
101 <<NodeIDBin/binary>> = node_id_to_bin(NodeID),
102 PairToBin = make_pair_to_bin(NodeIDBin, TimestampBin),
103 MemoryBinPairs = lists:map(fun atom_int_to_bin_bin/1, Memory),
104 MemoryBins = lists:map(PairToBin, MemoryBinPairs),
105 AllBins =
106 [ MemoryBins
107 ],
108 iolist_to_binary(AllBins).
109
110-spec timestamp_to_bin(erlang:timestamp()) ->
111 binary().
112timestamp_to_bin(Timestamp) ->
113 TimestampFloat = timestamp_to_float(Timestamp),
114 {{Year, Month, Day}, {Hour, Min, Sec}} = calendar:now_to_local_time(Timestamp),
115 SecondsFloat = Sec + (TimestampFloat - trunc(TimestampFloat)),
116 Fmt2Digits = "~2.10.0b",
117 FmtDate = string:join(["~b" , Fmt2Digits, Fmt2Digits], "-"),
118 FmtTime = string:join([Fmt2Digits, Fmt2Digits, "~9..0f" ], ":"),
119 Separator = " ",
120 Fmt = FmtDate ++ Separator ++ FmtTime,
121 IOList = io_lib:format(Fmt, [Year, Month, Day, Hour, Min, SecondsFloat]),
122 iolist_to_binary(IOList).
123
124-spec timestamp_to_float(erlang:timestamp()) ->
125 float().
126timestamp_to_float({ComponentMega, ComponentWhole, ComponentMicro}) ->
127 OneMillion = 1000000,
128 TotalWholeSeconds = ComponentMega * OneMillion + ComponentWhole,
129 TotalMicroSeconds = (TotalWholeSeconds * OneMillion) + ComponentMicro,
130 TotalMicroSeconds / OneMillion.
131
132-spec make_pair_to_bin(binary(), binary()) ->
133 fun(({binary(), binary()}) -> binary()).
134make_pair_to_bin(<<NodeID/binary>>, <<TimestampBin/binary>>) ->
135 fun ({<<K/binary>>, <<V/binary>>}) ->
136 << TimestampBin/binary
137 , "|"
138 , NodeID/binary
139 , "|"
140 , K/binary
141 , "|"
142 , V/binary
143 , "\n"
144 >>
145 end.
146
147-spec node_id_to_bin(node()) ->
148 binary().
149node_id_to_bin(NodeID) ->
150 atom_to_binary(NodeID, utf8).
151
152-spec atom_int_to_bin_bin({atom(), integer()}) ->
153 {binary(), binary()}.
154atom_int_to_bin_bin({K, V}) ->
155 {atom_to_binary(K, latin1), integer_to_binary(V)}.
This page took 0.039955 seconds and 4 git commands to generate.