feat: handle processes in graphite_msg
[beam_stats.git] / src / beam_stats_msg_graphite.erl
CommitLineData
a9ed8751
SK
1-module(beam_stats_msg_graphite).
2
3-include("include/beam_stats.hrl").
6e1a5b00 4-include("include/beam_stats_ets_table.hrl").
a9ed8751 5-include("include/beam_stats_msg_graphite.hrl").
4bb8ddfe
SK
6-include("include/beam_stats_process.hrl").
7-include("include/beam_stats_process_ancestry.hrl").
8-include("include/beam_stats_processes.hrl").
a9ed8751
SK
9
10-export_type(
11 [ t/0
12 ]).
13
14-export(
15 [ of_beam_stats/1
16 %, to_bin/1
17 ]).
18
19-define(T, #?MODULE).
20
21-type t() ::
22 ?T{}.
23
24-spec of_beam_stats(beam_stats:t()) ->
25 [t()].
26of_beam_stats(#beam_stats{node_id=NodeID}=BeamStats) ->
27 NodeIDBin = node_id_to_bin(NodeID),
28 of_beam_stats(BeamStats, NodeIDBin).
29
30-spec of_beam_stats(beam_stats:t(), binary()) ->
31 [t()].
32of_beam_stats(#beam_stats
33 { timestamp = Timestamp
34 , node_id = _
35 , memory = Memory
a37cd038
SK
36 % TODO: Handle the rest of data points
37 , io_bytes_in = IOBytesIn
38 , io_bytes_out = IOBytesOut
39 , context_switches = ContextSwitches
40 , reductions = Reductions
41 , run_queue = RunQueue
6e1a5b00 42 , ets = ETS
4bb8ddfe 43 , processes = Processes
a9ed8751
SK
44 },
45 <<NodeID/binary>>
46) ->
a37cd038
SK
47 Ts = Timestamp,
48 N = NodeID,
49 [ cons([N, <<"io">> , <<"bytes_in">> ], IOBytesIn , Ts)
50 , cons([N, <<"io">> , <<"bytes_out">>], IOBytesOut , Ts)
51 , cons([N, <<"context_switches">> ], ContextSwitches, Ts)
52 , cons([N, <<"reductions">> ], Reductions , Ts)
53 , cons([N, <<"run_queue">> ], RunQueue , Ts)
54 | of_memory(Memory, NodeID, Ts)
6e1a5b00
SK
55 ]
56 ++ of_ets(ETS, NodeID, Ts)
4bb8ddfe 57 ++ of_processes(Processes, NodeID, Ts).
a9ed8751
SK
58
59-spec of_memory([{atom(), non_neg_integer()}], binary(), erlang:timestamp()) ->
60 [t()].
61of_memory(Memory, <<NodeID/binary>>, Timestamp) ->
62 ComponentToMessage =
63 fun ({Key, Value}) ->
64 KeyBin = atom_to_binary(Key, latin1),
d07a494b 65 cons([NodeID, <<"memory">>, KeyBin], Value, Timestamp)
a9ed8751
SK
66 end,
67 lists:map(ComponentToMessage, Memory).
68
6e1a5b00
SK
69-spec of_ets(beam_stats_ets_table:t(), binary(), erlang:timestamp()) ->
70 [t()].
71of_ets(PerTableStats, <<NodeID/binary>>, Timestamp) ->
72 OfEtsTable = fun (Table) -> of_ets_table(Table, NodeID, Timestamp) end,
73 NestedMsgs = lists:map(OfEtsTable, PerTableStats),
74 lists:append(NestedMsgs).
75
76-spec of_ets_table(beam_stats_ets_table:t(), binary(), erlang:timestamp()) ->
77 [t()].
78of_ets_table(#beam_stats_ets_table
79 { id = ID
80 , name = Name
81 , size = Size
82 , memory = Memory
83 },
84 <<NodeID/binary>>,
85 Timestamp
86) ->
87 IDBin = beam_stats_ets_table:id_to_bin(ID),
88 NameBin = atom_to_binary(Name, latin1),
89 NameAndID = [NameBin, IDBin],
90 [ cons([NodeID, <<"ets_table">>, <<"size">> | NameAndID], Size , Timestamp)
91 , cons([NodeID, <<"ets_table">>, <<"memory">> | NameAndID], Memory, Timestamp)
92 ].
93
4bb8ddfe
SK
94-spec of_processes(beam_stats_processes:t(), binary(), erlang:timestamp()) ->
95 [t()].
96of_processes(
97 #beam_stats_processes
98 { individual_stats = Processes
99 , count_all = CountAll
100 , count_exiting = CountExiting
101 , count_garbage_collecting = CountGarbageCollecting
102 , count_registered = CountRegistered
103 , count_runnable = CountRunnable
104 , count_running = CountRunning
105 , count_suspended = CountSuspended
106 , count_waiting = CountWaiting
107 },
108 <<NodeID/binary>>,
109 Timestamp
110) ->
111 OfProcess = fun (P) -> of_process(P, NodeID, Timestamp) end,
112 PerProcessMsgsNested = lists:map(OfProcess, Processes),
113 PerProcessMsgsFlattened = lists:append(PerProcessMsgsNested),
114 Ts = Timestamp,
115 N = NodeID,
116 [ cons([N, <<"processes_count_all">> ], CountAll , Ts)
117 , cons([N, <<"processes_count_exiting">> ], CountExiting , Ts)
118 , cons([N, <<"processes_count_garbage_collecting">>], CountGarbageCollecting, Ts)
119 , cons([N, <<"processes_count_registered">> ], CountRegistered , Ts)
120 , cons([N, <<"processes_count_runnable">> ], CountRunnable , Ts)
121 , cons([N, <<"processes_count_running">> ], CountRunning , Ts)
122 , cons([N, <<"processes_count_suspended">> ], CountSuspended , Ts)
123 , cons([N, <<"processes_count_waiting">> ], CountWaiting , Ts)
124 | PerProcessMsgsFlattened
125 ].
126
127-spec of_process(beam_stats_process:t(), binary(), erlang:timestamp()) ->
128 [t()].
129of_process(
130 #beam_stats_process
131 { pid = Pid
132 , memory = Memory
133 , total_heap_size = TotalHeapSize
134 , stack_size = StackSize
135 , message_queue_len = MsgQueueLen
136 }=Process,
137 <<NodeID/binary>>,
138 Timestamp
139) ->
140 Origin = beam_stats_process:get_best_known_origin(Process),
141 OriginBin = proc_origin_to_bin(Origin),
142 PidBin = pid_to_bin(Pid),
143 OriginAndPid = [OriginBin, PidBin],
144 Ts = Timestamp,
145 N = NodeID,
146 [ cons([N, <<"process_memory">> , OriginAndPid], Memory , Ts)
147 , cons([N, <<"process_total_heap_size">> , OriginAndPid], TotalHeapSize , Ts)
148 , cons([N, <<"process_stack_size">> , OriginAndPid], StackSize , Ts)
149 , cons([N, <<"process_message_queue_len">> , OriginAndPid], MsgQueueLen , Ts)
150 ].
151
152-spec proc_origin_to_bin(beam_stats_process:best_known_origin()) ->
153 binary().
154proc_origin_to_bin({registered_name, Name}) ->
155 atom_to_binary(Name, utf8);
156proc_origin_to_bin({ancestry, Ancestry}) ->
157 #beam_stats_process_ancestry
158 { raw_initial_call = InitCallRaw
159 , otp_initial_call = InitCallOTPOpt
160 , otp_ancestors = AncestorsOpt
161 } = Ancestry,
162 Blank = <<"NONE">>,
163 InitCallOTPBinOpt = hope_option:map(InitCallOTPOpt , fun mfa_to_bin/1),
164 InitCallOTPBin = hope_option:get(InitCallOTPBinOpt, Blank),
165 AncestorsBinOpt = hope_option:map(AncestorsOpt , fun ancestors_to_bin/1),
166 AncestorsBin = hope_option:get(AncestorsBinOpt , Blank),
167 InitCallRawBin = mfa_to_bin(InitCallRaw),
168 << InitCallRawBin/binary
169 , "--"
170 , InitCallOTPBin/binary
171 , "--"
172 , AncestorsBin/binary
173 >>.
174
175ancestors_to_bin([]) ->
176 <<>>;
177ancestors_to_bin([A | Ancestors]) ->
178 ABin = ancestor_to_bin(A),
179 case ancestors_to_bin(Ancestors)
180 of <<>> ->
181 ABin
182 ; <<AncestorsBin/binary>> ->
183 <<ABin/binary, "-", AncestorsBin/binary>>
184 end.
185
186ancestor_to_bin(A) when is_atom(A) ->
187 atom_to_binary(A, utf8);
188ancestor_to_bin(A) when is_pid(A) ->
189 pid_to_bin(A).
190
191pid_to_bin(Pid) ->
192 PidList = erlang:pid_to_list(Pid),
193 PidBin = re:replace(PidList, "[\.]", "_", [global, {return, binary}]),
194 re:replace(PidBin , "[><]", "" , [global, {return, binary}]).
195
196-spec mfa_to_bin(mfa()) ->
197 binary().
198mfa_to_bin({Module, Function, Arity}) ->
199 ModuleBin = atom_to_binary(Module , utf8),
200 FunctionBin = atom_to_binary(Function, utf8),
201 ArityBin = erlang:integer_to_binary(Arity),
202 <<ModuleBin/binary, "-", FunctionBin/binary, "-", ArityBin/binary>>.
203
a37cd038
SK
204-spec cons([binary()], integer(), erlang:timestamp()) ->
205 t().
206cons(Path, Value, Timestamp) ->
207 ?T
208 { path = Path
209 , value = Value
210 , timestamp = Timestamp
211 }.
212
a9ed8751
SK
213-spec node_id_to_bin(node()) ->
214 binary().
215node_id_to_bin(NodeID) ->
216 NodeIDBin = atom_to_binary(NodeID, utf8),
217 re:replace(NodeIDBin, "[\@\.]", "_", [global, {return, binary}]).
This page took 0.033998 seconds and 4 git commands to generate.