X-Git-Url: https://git.xandkar.net/?p=beam_stats.git;a=blobdiff_plain;f=src%2Fbeam_stats_consumer_statsd.erl;h=edc5e5bcd8093927526b8ca3437c4e2b3ae09931;hp=06945e1867dd14c245e9898c27c0436ea7572d5f;hb=a3fdbec97580ab23bdb68e61d4321d08b1c599da;hpb=ece99ea366e3bc8720528aaf5e1e1563e85b561c diff --git a/src/beam_stats_consumer_statsd.erl b/src/beam_stats_consumer_statsd.erl index 06945e1..edc5e5b 100644 --- a/src/beam_stats_consumer_statsd.erl +++ b/src/beam_stats_consumer_statsd.erl @@ -41,8 +41,6 @@ -type state() :: #state{}. --define(PATH_PREFIX, <<"beam_stats">>). - %% ============================================================================ %% Consumer implementation %% ============================================================================ @@ -81,20 +79,19 @@ consume( -spec terminate(state()) -> {}. terminate(#state{sock=SockOpt}) -> - ok = hope_option:iter(SockOpt, fun gen_udp:close/1), - {}. + hope_option:iter(SockOpt, fun gen_udp:close/1). %% ============================================================================ %% Transport %% ============================================================================ --spec try_to_connect_and_send(binary(), state()) -> +-spec try_to_connect_and_send(iolist(), state()) -> state(). -try_to_connect_and_send(<>, #state{}=State1) -> +try_to_connect_and_send(Payload, #state{}=State1) -> State2 = try_to_connect_if_no_socket(State1), try_to_send(State2, Payload). --spec try_to_send(state(), binary()) -> +-spec try_to_send(state(), iolist()) -> state(). try_to_send(#state{sock=none}=State, _) -> ?log_error("Sending failed. No socket in state."), @@ -113,7 +110,7 @@ try_to_send( State ; {error, _}=Error -> ?log_error( - "gen_udp:send(~p, ~p, ~p, ~p) -> ~p", + "gen_udp:send(~p, ~p, ~p, Payload) -> ~p", [Sock, DstHost, DstPort, Error] ), % TODO: Do something with unsent messages? @@ -143,21 +140,17 @@ try_to_connect_if_no_socket(#state{sock=none, src_port=SrcPort}=State) -> non_neg_integer(), hope_option:t(binary()) ) -> - [binary()]. + [iolist()]. beam_stats_queue_to_packets(Q, NumMsgsPerPacket, StaticNodeNameOpt) -> - MsgBins = lists:append([beam_stats_to_bins(B, StaticNodeNameOpt) || B <- queue:to_list(Q)]), - MsgBinsChucks = hope_list:divide(MsgBins, NumMsgsPerPacket), - lists:map(fun erlang:iolist_to_binary/1, MsgBinsChucks). + MsgIOLists = lists:append([beam_stats_to_iolists(B, StaticNodeNameOpt) || B <- queue:to_list(Q)]), + hope_list:divide(MsgIOLists, NumMsgsPerPacket). --spec beam_stats_to_bins(beam_stats:t(), hope_option:t(binary())) -> - [binary()]. -beam_stats_to_bins(#beam_stats{node_id=NodeID}=BeamStats, StaticNodeNameOpt) -> +-spec beam_stats_to_iolists(beam_stats:t(), hope_option:t(binary())) -> + [iolist()]. +beam_stats_to_iolists(#beam_stats{node_id=NodeID}=BeamStats, StaticNodeNameOpt) -> NodeIDBinDefault = beam_stats_msg_graphite:node_id_to_bin(NodeID), NodeIDBin = hope_option:get(StaticNodeNameOpt, NodeIDBinDefault), - GraphiteMsgAddPrefix = - fun (M) -> beam_stats_msg_graphite:add_path_prefix(M, ?PATH_PREFIX) end, - MsgsGraphite1 = beam_stats_msg_graphite:of_beam_stats(BeamStats, NodeIDBin), - MsgsGraphite2 = lists:map(GraphiteMsgAddPrefix, MsgsGraphite1), + MsgsGraphite = beam_stats_msg_graphite:of_beam_stats(BeamStats, NodeIDBin), MsgsStatsD = - lists:map(fun beam_stats_msg_statsd_gauge:of_msg_graphite/1, MsgsGraphite2), - lists:map(fun beam_stats_msg_statsd_gauge:to_bin/1, MsgsStatsD). + lists:map(fun beam_stats_msg_statsd_gauge:of_msg_graphite/1, MsgsGraphite), + lists:map(fun beam_stats_msg_statsd_gauge:to_iolist/1, MsgsStatsD).