1 -module(beam_stats_consumer_graphite).
3 -include("include/beam_stats.hrl").
4 -include("beam_stats_logging.hrl").
6 -behaviour(beam_stats_consumer).
19 {consumption_interval , non_neg_integer()}
20 | {host , inet:ip_address() | inet:hostname()}
21 | {port , inet:port_number()}
22 | {timeout , timeout()}
26 { sock = none :: hope_option:t(Socket :: port())
27 , host :: inet:ip_address() | inet:hostname()
28 , port :: inet:port_number()
29 , timeout :: timeout()
35 -define(DEFAULT_HOST , "localhost").
36 -define(DEFAULT_PORT , 2003).
37 -define(DEFAULT_TIMEOUT , 5000).
39 -spec init([option()]) ->
40 {non_neg_integer(), state()}.
42 Get = fun (Key, Default) -> hope_kv_list:get(Options, Key, Default) end,
43 ConsumptionInterval = Get(consumption_interval, 60000),
46 , host = Get(host , ?DEFAULT_HOST)
47 , port = Get(port , ?DEFAULT_PORT)
48 , timeout = Get(timeout , ?DEFAULT_TIMEOUT)
50 {ConsumptionInterval, State}.
52 -spec consume(beam_stats_consumer:queue(), state()) ->
54 consume(Q, #state{}=State1) ->
55 Payload = beam_stats_queue_to_iolists(Q),
56 State2 = try_to_connect_if_no_socket(State1),
57 try_to_send(State2, Payload).
59 -spec terminate(state()) ->
61 terminate(#state{sock=SockOpt}) ->
62 hope_option:iter(SockOpt, fun gen_tcp:close/1).
64 %% ============================================================================
66 -spec try_to_send(state(), iolist()) ->
68 try_to_send(#state{sock=none}=State, _) ->
69 ?log_error("Sending failed. No socket in state."),
70 % TODO: Maybe schedule retry?
72 try_to_send(#state{sock={some, Sock}}=State, Payload) ->
73 case gen_tcp:send(Sock, Payload)
77 ?log_error("gen_tcp:send(~p, Payload) -> ~p", [Sock, Error]),
78 % TODO: Maybe schedule retry?
79 ok = gen_tcp:close(Sock),
80 State#state{sock=none}
83 -spec try_to_connect_if_no_socket(state()) ->
85 try_to_connect_if_no_socket(#state{sock={some, _}}=State) ->
87 try_to_connect_if_no_socket(
95 Options = [binary, {active, false}],
96 case gen_tcp:connect(Host, Port, Options, Timeout)
98 State#state{sock = {some, Sock}}
101 "gen_tcp:connect(~p, ~p, ~p, ~p) -> ~p",
102 [Host, Port, Options, Timeout, Error]
104 State#state{sock = none}
107 -spec beam_stats_queue_to_iolists(beam_stats_consumer:queue()) ->
109 beam_stats_queue_to_iolists(Q) ->
110 [beam_stats_to_iolist(B) || B <- queue:to_list(Q)].
112 -spec beam_stats_to_iolist(beam_stats:t()) ->
114 beam_stats_to_iolist(#beam_stats{}=BeamStats) ->
115 Msgs = beam_stats_msg_graphite:of_beam_stats(BeamStats),
116 lists:map(fun beam_stats_msg_graphite:to_iolist/1, Msgs).