feat: use new msg abstractions in StatsD consumer.
[beam_stats.git] / src / beam_stats_msg_graphite.erl
CommitLineData
a9ed8751
SK
1-module(beam_stats_msg_graphite).
2
3-include("include/beam_stats.hrl").
6e1a5b00 4-include("include/beam_stats_ets_table.hrl").
a9ed8751 5-include("include/beam_stats_msg_graphite.hrl").
4bb8ddfe
SK
6-include("include/beam_stats_process.hrl").
7-include("include/beam_stats_process_ancestry.hrl").
8-include("include/beam_stats_processes.hrl").
a9ed8751
SK
9
10-export_type(
11 [ t/0
12 ]).
13
14-export(
15 [ of_beam_stats/1
ece99ea3 16 , of_beam_stats/2
67948b18 17 , to_bin/1
9b9299d9 18 , path_to_bin/1
ece99ea3
SK
19 , add_path_prefix/2
20 , node_id_to_bin/1
a9ed8751
SK
21 ]).
22
23-define(T, #?MODULE).
24
25-type t() ::
26 ?T{}.
27
67948b18
SK
28%% ============================================================================
29%% API
30%% ============================================================================
31
a9ed8751
SK
32-spec of_beam_stats(beam_stats:t()) ->
33 [t()].
34of_beam_stats(#beam_stats{node_id=NodeID}=BeamStats) ->
35 NodeIDBin = node_id_to_bin(NodeID),
36 of_beam_stats(BeamStats, NodeIDBin).
37
38-spec of_beam_stats(beam_stats:t(), binary()) ->
39 [t()].
40of_beam_stats(#beam_stats
41 { timestamp = Timestamp
42 , node_id = _
43 , memory = Memory
a37cd038
SK
44 % TODO: Handle the rest of data points
45 , io_bytes_in = IOBytesIn
46 , io_bytes_out = IOBytesOut
47 , context_switches = ContextSwitches
48 , reductions = Reductions
49 , run_queue = RunQueue
6e1a5b00 50 , ets = ETS
4bb8ddfe 51 , processes = Processes
a9ed8751
SK
52 },
53 <<NodeID/binary>>
54) ->
a37cd038
SK
55 Ts = Timestamp,
56 N = NodeID,
57 [ cons([N, <<"io">> , <<"bytes_in">> ], IOBytesIn , Ts)
58 , cons([N, <<"io">> , <<"bytes_out">>], IOBytesOut , Ts)
59 , cons([N, <<"context_switches">> ], ContextSwitches, Ts)
60 , cons([N, <<"reductions">> ], Reductions , Ts)
61 , cons([N, <<"run_queue">> ], RunQueue , Ts)
62 | of_memory(Memory, NodeID, Ts)
6e1a5b00
SK
63 ]
64 ++ of_ets(ETS, NodeID, Ts)
4bb8ddfe 65 ++ of_processes(Processes, NodeID, Ts).
a9ed8751 66
67948b18
SK
67-spec to_bin(t()) ->
68 binary().
69to_bin(
70 ?T
71 { path = Path
72 , value = Value
73 , timestamp = Timestamp
74 }
75) ->
9b9299d9 76 PathBin = path_to_bin(Path),
67948b18
SK
77 ValueBin = integer_to_binary(Value),
78 TimestampInt = timestamp_to_integer(Timestamp),
79 TimestampBin = integer_to_binary(TimestampInt),
80 <<PathBin/binary, " ", ValueBin/binary, " ", TimestampBin/binary>>.
81
ece99ea3
SK
82-spec add_path_prefix(t(), binary()) ->
83 t().
84add_path_prefix(?T{path=Path}=T, <<Prefix/binary>>) ->
85 T?T{path = [Prefix | Path]}.
86
9b9299d9
SK
87-spec path_to_bin([binary()]) ->
88 binary().
89path_to_bin(Path) ->
90 bin_join(Path, <<".">>).
91
ece99ea3
SK
92-spec node_id_to_bin(node()) ->
93 binary().
94node_id_to_bin(NodeID) ->
95 NodeIDBin = atom_to_binary(NodeID, utf8),
96 re:replace(NodeIDBin, "[\@\.]", "_", [global, {return, binary}]).
97
67948b18
SK
98%% ============================================================================
99%% Helpers
100%% ============================================================================
101
102-spec bin_join([binary()], binary()) ->
103 binary().
104bin_join([] , <<_/binary>> ) -> <<>>;
105bin_join([<<B/binary>> | []] , <<_/binary>> ) -> B;
106bin_join([<<B/binary>> | [_|_]=Bins], <<Sep/binary>>) ->
107 BinsBin = bin_join(Bins, Sep),
108 <<B/binary, Sep/binary, BinsBin/binary>>.
109
110-spec timestamp_to_integer(erlang:timestamp()) ->
111 non_neg_integer().
112timestamp_to_integer({Megaseconds, Seconds, _}) ->
113 Megaseconds * 1000000 + Seconds.
114
a9ed8751
SK
115-spec of_memory([{atom(), non_neg_integer()}], binary(), erlang:timestamp()) ->
116 [t()].
117of_memory(Memory, <<NodeID/binary>>, Timestamp) ->
118 ComponentToMessage =
119 fun ({Key, Value}) ->
120 KeyBin = atom_to_binary(Key, latin1),
d07a494b 121 cons([NodeID, <<"memory">>, KeyBin], Value, Timestamp)
a9ed8751
SK
122 end,
123 lists:map(ComponentToMessage, Memory).
124
6e1a5b00
SK
125-spec of_ets(beam_stats_ets_table:t(), binary(), erlang:timestamp()) ->
126 [t()].
127of_ets(PerTableStats, <<NodeID/binary>>, Timestamp) ->
128 OfEtsTable = fun (Table) -> of_ets_table(Table, NodeID, Timestamp) end,
129 NestedMsgs = lists:map(OfEtsTable, PerTableStats),
130 lists:append(NestedMsgs).
131
132-spec of_ets_table(beam_stats_ets_table:t(), binary(), erlang:timestamp()) ->
133 [t()].
134of_ets_table(#beam_stats_ets_table
135 { id = ID
136 , name = Name
137 , size = Size
138 , memory = Memory
139 },
140 <<NodeID/binary>>,
141 Timestamp
142) ->
143 IDBin = beam_stats_ets_table:id_to_bin(ID),
144 NameBin = atom_to_binary(Name, latin1),
145 NameAndID = [NameBin, IDBin],
146 [ cons([NodeID, <<"ets_table">>, <<"size">> | NameAndID], Size , Timestamp)
147 , cons([NodeID, <<"ets_table">>, <<"memory">> | NameAndID], Memory, Timestamp)
148 ].
149
4bb8ddfe
SK
150-spec of_processes(beam_stats_processes:t(), binary(), erlang:timestamp()) ->
151 [t()].
152of_processes(
153 #beam_stats_processes
154 { individual_stats = Processes
155 , count_all = CountAll
156 , count_exiting = CountExiting
157 , count_garbage_collecting = CountGarbageCollecting
158 , count_registered = CountRegistered
159 , count_runnable = CountRunnable
160 , count_running = CountRunning
161 , count_suspended = CountSuspended
162 , count_waiting = CountWaiting
163 },
164 <<NodeID/binary>>,
165 Timestamp
166) ->
167 OfProcess = fun (P) -> of_process(P, NodeID, Timestamp) end,
168 PerProcessMsgsNested = lists:map(OfProcess, Processes),
169 PerProcessMsgsFlattened = lists:append(PerProcessMsgsNested),
170 Ts = Timestamp,
171 N = NodeID,
172 [ cons([N, <<"processes_count_all">> ], CountAll , Ts)
173 , cons([N, <<"processes_count_exiting">> ], CountExiting , Ts)
174 , cons([N, <<"processes_count_garbage_collecting">>], CountGarbageCollecting, Ts)
175 , cons([N, <<"processes_count_registered">> ], CountRegistered , Ts)
176 , cons([N, <<"processes_count_runnable">> ], CountRunnable , Ts)
177 , cons([N, <<"processes_count_running">> ], CountRunning , Ts)
178 , cons([N, <<"processes_count_suspended">> ], CountSuspended , Ts)
179 , cons([N, <<"processes_count_waiting">> ], CountWaiting , Ts)
180 | PerProcessMsgsFlattened
181 ].
182
183-spec of_process(beam_stats_process:t(), binary(), erlang:timestamp()) ->
184 [t()].
185of_process(
186 #beam_stats_process
187 { pid = Pid
188 , memory = Memory
189 , total_heap_size = TotalHeapSize
190 , stack_size = StackSize
191 , message_queue_len = MsgQueueLen
192 }=Process,
193 <<NodeID/binary>>,
194 Timestamp
195) ->
196 Origin = beam_stats_process:get_best_known_origin(Process),
197 OriginBin = proc_origin_to_bin(Origin),
198 PidBin = pid_to_bin(Pid),
199 OriginAndPid = [OriginBin, PidBin],
200 Ts = Timestamp,
201 N = NodeID,
8dd2cb56
SK
202 [ cons([N, <<"process_memory">> | OriginAndPid], Memory , Ts)
203 , cons([N, <<"process_total_heap_size">> | OriginAndPid], TotalHeapSize , Ts)
204 , cons([N, <<"process_stack_size">> | OriginAndPid], StackSize , Ts)
205 , cons([N, <<"process_message_queue_len">> | OriginAndPid], MsgQueueLen , Ts)
4bb8ddfe
SK
206 ].
207
208-spec proc_origin_to_bin(beam_stats_process:best_known_origin()) ->
209 binary().
210proc_origin_to_bin({registered_name, Name}) ->
211 atom_to_binary(Name, utf8);
212proc_origin_to_bin({ancestry, Ancestry}) ->
213 #beam_stats_process_ancestry
214 { raw_initial_call = InitCallRaw
215 , otp_initial_call = InitCallOTPOpt
216 , otp_ancestors = AncestorsOpt
217 } = Ancestry,
218 Blank = <<"NONE">>,
219 InitCallOTPBinOpt = hope_option:map(InitCallOTPOpt , fun mfa_to_bin/1),
220 InitCallOTPBin = hope_option:get(InitCallOTPBinOpt, Blank),
221 AncestorsBinOpt = hope_option:map(AncestorsOpt , fun ancestors_to_bin/1),
222 AncestorsBin = hope_option:get(AncestorsBinOpt , Blank),
223 InitCallRawBin = mfa_to_bin(InitCallRaw),
224 << InitCallRawBin/binary
225 , "--"
226 , InitCallOTPBin/binary
227 , "--"
228 , AncestorsBin/binary
229 >>.
230
231ancestors_to_bin([]) ->
232 <<>>;
233ancestors_to_bin([A | Ancestors]) ->
234 ABin = ancestor_to_bin(A),
235 case ancestors_to_bin(Ancestors)
236 of <<>> ->
237 ABin
238 ; <<AncestorsBin/binary>> ->
239 <<ABin/binary, "-", AncestorsBin/binary>>
240 end.
241
242ancestor_to_bin(A) when is_atom(A) ->
243 atom_to_binary(A, utf8);
244ancestor_to_bin(A) when is_pid(A) ->
245 pid_to_bin(A).
246
247pid_to_bin(Pid) ->
248 PidList = erlang:pid_to_list(Pid),
249 PidBin = re:replace(PidList, "[\.]", "_", [global, {return, binary}]),
250 re:replace(PidBin , "[><]", "" , [global, {return, binary}]).
251
252-spec mfa_to_bin(mfa()) ->
253 binary().
254mfa_to_bin({Module, Function, Arity}) ->
255 ModuleBin = atom_to_binary(Module , utf8),
256 FunctionBin = atom_to_binary(Function, utf8),
257 ArityBin = erlang:integer_to_binary(Arity),
258 <<ModuleBin/binary, "-", FunctionBin/binary, "-", ArityBin/binary>>.
259
a37cd038
SK
260-spec cons([binary()], integer(), erlang:timestamp()) ->
261 t().
262cons(Path, Value, Timestamp) ->
263 ?T
264 { path = Path
265 , value = Value
266 , timestamp = Timestamp
267 }.
This page took 0.039187 seconds and 4 git commands to generate.