1 -module(beam_stats_consumer_csv).
3 -include("include/beam_stats.hrl").
4 -include("beam_stats_logging.hrl").
6 -behaviour(beam_stats_consumer).
19 {consumption_interval , non_neg_integer()}
20 | {path , file:filename()}
24 { path :: file:filename()
25 , file = none :: hope_option:t(file:io_device())
31 -spec init([option()]) ->
32 {non_neg_integer(), state()}.
34 ConsumptionInterval = hope_kv_list:get(Options, consumption_interval, 60000),
35 {some, Path} = hope_kv_list:get(Options, path),
40 {ConsumptionInterval, State}.
42 -spec consume(beam_stats_consumer:queue(), state()) ->
44 consume(Q, #state{}=State1) ->
45 Payload = beam_stats_queue_to_binary(Q),
46 State2 = try_to_open_if_no_file(State1),
47 try_to_write(State2, Payload).
49 -spec terminate(state()) ->
51 terminate(#state{file=FileOpt}) ->
52 hope_option:iter(FileOpt, fun file:close/1).
54 %% ============================================================================
56 -spec try_to_write(state(), binary()) ->
58 try_to_write(#state{file=none, path=Path}=State, _) ->
59 ?log_error("Writing to file (~p) failed: no file in state.", [Path]),
61 try_to_write(#state{file={some, File}}=State, Payload) ->
62 case file:write(File, Payload)
66 ?log_error("file:write(~p, ~p) -> ~p", [File, Payload, Error]),
67 % TODO: Maybe schedule retry?
68 ok = file:close(File),
69 State#state{file=none}
72 -spec try_to_open_if_no_file(state()) ->
74 try_to_open_if_no_file(#state{file={some, _}}=State) ->
76 try_to_open_if_no_file(#state{file=none, path=Path}=State) ->
78 case file:open(Path, Options)
80 State#state{file = {some, File}}
82 ?log_error("file:open(~p, ~p) -> ~p", [Path, Options, Error]),
83 State#state{file = none}
86 -spec beam_stats_queue_to_binary(beam_stats_consumer:queue()) ->
88 beam_stats_queue_to_binary(BEAMStatsQ) ->
89 Bins = [beam_stats_to_bin(B) || B <- queue:to_list(BEAMStatsQ)],
90 iolist_to_binary(Bins).
93 -spec beam_stats_to_bin(beam_stats:t()) ->
95 beam_stats_to_bin(#beam_stats
96 { timestamp = Timestamp
101 <<TimestampBin/binary>> = timestamp_to_bin(Timestamp),
102 <<NodeIDBin/binary>> = node_id_to_bin(NodeID),
103 MemoryPairToBin = make_pair_to_bin(NodeIDBin, TimestampBin, <<"memory">>),
104 MemoryBinPairs = lists:map(fun atom_int_to_bin_bin/1, Memory),
105 MemoryBins = lists:map(MemoryPairToBin, MemoryBinPairs),
109 iolist_to_binary(AllBins).
111 -spec timestamp_to_bin(erlang:timestamp()) ->
113 timestamp_to_bin(Timestamp) ->
114 TimestampFloat = timestamp_to_float(Timestamp),
115 {{Year, Month, Day}, {Hour, Min, Sec}} = calendar:now_to_local_time(Timestamp),
116 SecondsFloat = Sec + (TimestampFloat - trunc(TimestampFloat)),
117 Fmt2Digits = "~2.10.0b",
118 FmtDate = string:join(["~b" , Fmt2Digits, Fmt2Digits], "-"),
119 FmtTime = string:join([Fmt2Digits, Fmt2Digits, "~9..0f" ], ":"),
121 Fmt = FmtDate ++ Separator ++ FmtTime,
122 IOList = io_lib:format(Fmt, [Year, Month, Day, Hour, Min, SecondsFloat]),
123 iolist_to_binary(IOList).
125 -spec timestamp_to_float(erlang:timestamp()) ->
127 timestamp_to_float({ComponentMega, ComponentWhole, ComponentMicro}) ->
128 OneMillion = 1000000,
129 TotalWholeSeconds = ComponentMega * OneMillion + ComponentWhole,
130 TotalMicroSeconds = (TotalWholeSeconds * OneMillion) + ComponentMicro,
131 TotalMicroSeconds / OneMillion.
133 -spec make_pair_to_bin(binary(), binary(), binary()) ->
134 fun(({binary(), binary()}) -> binary()).
135 make_pair_to_bin(<<NodeID/binary>>, <<TimestampBin/binary>>, <<Type/binary>>) ->
136 fun ({<<K/binary>>, <<V/binary>>}) ->
137 << TimestampBin/binary
150 -spec node_id_to_bin(node()) ->
152 node_id_to_bin(NodeID) ->
153 atom_to_binary(NodeID, utf8).
155 -spec atom_int_to_bin_bin({atom(), integer()}) ->
156 {binary(), binary()}.
157 atom_int_to_bin_bin({K, V}) ->
158 {atom_to_binary(K, latin1), integer_to_binary(V)}.