1 -module(beam_stats_consumer_csv).
3 -include("include/beam_stats.hrl").
5 -behaviour(beam_stats_consumer).
18 {path , file:filename()}
19 | {consumption_interval , erlang:time()}
23 { path :: file:filename()
24 , file = none :: hope_option:t(file:io_device())
30 -spec init([option()]) ->
31 {erlang:time(), state()}.
33 ConsumptionInterval = hope_kv_list:get(Options, consumption_interval, 60000),
34 {some, Path} = hope_kv_list:get(Options, path),
39 {ConsumptionInterval, State}.
41 -spec consume(beam_stats_consumer:queue(), state()) ->
43 consume(Q, #state{}=State1) ->
44 Payload = beam_stats_queue_to_binary(Q),
45 State2 = try_to_open_if_no_file(State1),
46 try_to_write(State2, Payload).
48 -spec terminate(state()) ->
50 terminate(#state{file=FileOpt}) ->
51 ok = hope_option:iter(FileOpt, fun file:close/1),
54 %% ============================================================================
56 -spec try_to_write(state(), binary()) ->
58 try_to_write(#state{file=none, path=Path}=State, _) ->
59 io:format("error: file closed: ~s~n", [Path]),
61 try_to_write(#state{file={some, File}}=State, Payload) ->
62 case file:write(File, Payload)
66 io:format("error: file:write/2 failed: ~p~n", [Error]),
67 % TODO: Maybe schedule retry?
68 ok = file:close(File),
69 State#state{file=none}
72 -spec try_to_open_if_no_file(state()) ->
74 try_to_open_if_no_file(#state{file={some, _}}=State) ->
76 try_to_open_if_no_file(#state{file=none, path=Path}=State) ->
77 case file:open(Path, [append])
79 State#state{file = {some, File}}
81 io:format("error: file:open/2 failed: ~p~n", [Error]),
82 State#state{file = none}
85 -spec beam_stats_queue_to_binary(beam_stats_consumer:queue()) ->
87 beam_stats_queue_to_binary(BEAMStatsQ) ->
88 Bins = [beam_stats_to_bin(B) || B <- queue:to_list(BEAMStatsQ)],
89 iolist_to_binary(Bins).
92 -spec beam_stats_to_bin(beam_stats:t()) ->
94 beam_stats_to_bin(#beam_stats
95 { timestamp = Timestamp
100 <<TimestampBin/binary>> = timestamp_to_bin(Timestamp),
101 <<NodeIDBin/binary>> = node_id_to_bin(NodeID),
102 PairToBin = make_pair_to_bin(NodeIDBin, TimestampBin),
103 MemoryBinPairs = lists:map(fun atom_int_to_bin_bin/1, Memory),
104 MemoryBins = lists:map(PairToBin, MemoryBinPairs),
108 iolist_to_binary(AllBins).
110 -spec timestamp_to_bin(erlang:timestamp()) ->
112 timestamp_to_bin(Timestamp) ->
113 TimestampFloat = timestamp_to_float(Timestamp),
114 {{Year, Month, Day}, {Hour, Min, Sec}} = calendar:now_to_local_time(Timestamp),
115 SecondsFloat = Sec + (TimestampFloat - trunc(TimestampFloat)),
116 Fmt2Digits = "~2.10.0b",
117 FmtDate = string:join(["~b" , Fmt2Digits, Fmt2Digits], "-"),
118 FmtTime = string:join([Fmt2Digits, Fmt2Digits, "~9..0f" ], ":"),
120 Fmt = FmtDate ++ Separator ++ FmtTime,
121 IOList = io_lib:format(Fmt, [Year, Month, Day, Hour, Min, SecondsFloat]),
122 iolist_to_binary(IOList).
124 -spec timestamp_to_float(erlang:timestamp()) ->
126 timestamp_to_float({ComponentMega, ComponentWhole, ComponentMicro}) ->
127 OneMillion = 1000000,
128 TotalWholeSeconds = ComponentMega * OneMillion + ComponentWhole,
129 TotalMicroSeconds = (TotalWholeSeconds * OneMillion) + ComponentMicro,
130 TotalMicroSeconds / OneMillion.
132 -spec make_pair_to_bin(binary(), binary()) ->
133 fun(({binary(), binary()}) -> binary()).
134 make_pair_to_bin(<<NodeID/binary>>, <<TimestampBin/binary>>) ->
135 fun ({<<K/binary>>, <<V/binary>>}) ->
136 << TimestampBin/binary
147 -spec node_id_to_bin(node()) ->
149 node_id_to_bin(NodeID) ->
150 atom_to_binary(NodeID, utf8).
152 -spec atom_int_to_bin_bin({atom(), integer()}) ->
153 {binary(), binary()}.
154 atom_int_to_bin_bin({K, V}) ->
155 {atom_to_binary(K, latin1), integer_to_binary(V)}.