Update alignments.
[beam_stats.git] / src / beam_stats_consumer_graphite.erl
CommitLineData
caf75ed8
SK
1-module(beam_stats_consumer_graphite).
2
3-include("include/beam_stats.hrl").
f079a56c 4-include("beam_stats_logging.hrl").
caf75ed8
SK
5
6-behaviour(beam_stats_consumer).
7
8-export_type(
1b0b4721 9 [ option/0
caf75ed8
SK
10 ]).
11
12-export(
13 [ init/1
14 , consume/2
15 , terminate/1
16 ]).
17
caf75ed8 18-type option() ::
b2f78fc6 19 {consumption_interval , non_neg_integer()}
1b0b4721
SK
20 | {host , inet:ip_address() | inet:hostname()}
21 | {port , inet:port_number()}
22 | {timeout , timeout()}
caf75ed8
SK
23 .
24
25-record(state,
1b0b4721
SK
26 { sock = none :: hope_option:t(Socket :: port())
27 , host :: inet:ip_address() | inet:hostname()
28 , port :: inet:port_number()
29 , timeout :: timeout()
caf75ed8
SK
30 }).
31
32-type state() ::
33 #state{}.
34
39ff67a6
SK
35-define(DEFAULT_HOST , "localhost").
36-define(DEFAULT_PORT , 2003).
37-define(DEFAULT_TIMEOUT , 5000).
caf75ed8
SK
38
39-spec init([option()]) ->
76aefffb 40 {non_neg_integer(), state()}.
caf75ed8 41init(Options) ->
1b0b4721
SK
42 Get = fun (Key, Default) -> hope_kv_list:get(Options, Key, Default) end,
43 ConsumptionInterval = Get(consumption_interval, 60000),
caf75ed8 44 State = #state
1b0b4721
SK
45 { sock = none
46 , host = Get(host , ?DEFAULT_HOST)
47 , port = Get(port , ?DEFAULT_PORT)
48 , timeout = Get(timeout , ?DEFAULT_TIMEOUT)
caf75ed8
SK
49 },
50 {ConsumptionInterval, State}.
51
52-spec consume(beam_stats_consumer:queue(), state()) ->
53 state().
54consume(Q, #state{}=State1) ->
cdcb989e 55 Payload = beam_stats_queue_to_iolists(Q),
caf75ed8
SK
56 State2 = try_to_connect_if_no_socket(State1),
57 try_to_send(State2, Payload).
58
59-spec terminate(state()) ->
60 {}.
61terminate(#state{sock=SockOpt}) ->
62 ok = hope_option:iter(SockOpt, fun gen_tcp:close/1),
63 {}.
64
65%% ============================================================================
66
cdcb989e 67-spec try_to_send(state(), iolist()) ->
caf75ed8
SK
68 state().
69try_to_send(#state{sock=none}=State, _) ->
f079a56c 70 ?log_error("Sending failed. No socket in state."),
caf75ed8
SK
71 % TODO: Maybe schedule retry?
72 State;
73try_to_send(#state{sock={some, Sock}}=State, Payload) ->
74 case gen_tcp:send(Sock, Payload)
75 of ok ->
76 State
77 ; {error, _}=Error ->
7ef3dd01 78 ?log_error("gen_tcp:send(~p, Payload) -> ~p", [Sock, Error]),
caf75ed8
SK
79 % TODO: Maybe schedule retry?
80 ok = gen_tcp:close(Sock),
81 State#state{sock=none}
82 end.
83
84-spec try_to_connect_if_no_socket(state()) ->
85 state().
86try_to_connect_if_no_socket(#state{sock={some, _}}=State) ->
87 State;
1b0b4721
SK
88try_to_connect_if_no_socket(
89 #state
90 { sock = none
91 , host = Host
92 , port = Port
93 , timeout = Timeout
94 }=State
95) ->
f079a56c
SK
96 Options = [binary, {active, false}],
97 case gen_tcp:connect(Host, Port, Options, Timeout)
caf75ed8
SK
98 of {ok, Sock} ->
99 State#state{sock = {some, Sock}}
100 ; {error, _}=Error ->
f079a56c
SK
101 ?log_error(
102 "gen_tcp:connect(~p, ~p, ~p, ~p) -> ~p",
103 [Host, Port, Options, Timeout, Error]
104 ),
caf75ed8
SK
105 State#state{sock = none}
106 end.
107
cdcb989e
PO
108-spec beam_stats_queue_to_iolists(beam_stats_consumer:queue()) ->
109 [iolist()].
110beam_stats_queue_to_iolists(Q) ->
111 [beam_stats_to_iolist(B) || B <- queue:to_list(Q)].
caf75ed8 112
cdcb989e
PO
113-spec beam_stats_to_iolist(beam_stats:t()) ->
114 [iolist()].
115beam_stats_to_iolist(#beam_stats{}=BeamStats) ->
8fe744e7 116 Msgs = beam_stats_msg_graphite:of_beam_stats(BeamStats),
cdcb989e 117 lists:map(fun beam_stats_msg_graphite:to_iolist/1, Msgs).
This page took 0.033359 seconds and 4 git commands to generate.