|
21 | 21 | import org.apache.commons.configuration2.SubsetConfiguration; |
22 | 22 | import org.apache.hadoop.classification.InterfaceAudience; |
23 | 23 | import org.apache.hadoop.classification.InterfaceStability; |
| 24 | +import org.apache.hadoop.classification.VisibleForTesting; |
24 | 25 | import org.apache.hadoop.metrics2.AbstractMetric; |
25 | 26 | import org.apache.hadoop.metrics2.MetricsException; |
26 | 27 | import org.apache.hadoop.metrics2.MetricsRecord; |
|
37 | 38 | import java.nio.charset.StandardCharsets; |
38 | 39 |
|
39 | 40 | /** |
40 | | - * A metrics sink that writes to a Graphite server |
| 41 | + * A metrics sink that writes to a Graphite server. |
41 | 42 | */ |
42 | 43 | @InterfaceAudience.Public |
43 | 44 | @InterfaceStability.Evolving |
44 | 45 | public class GraphiteSink implements MetricsSink, Closeable { |
45 | | - private static final Logger LOG = |
46 | | - LoggerFactory.getLogger(GraphiteSink.class); |
47 | | - private static final String SERVER_HOST_KEY = "server_host"; |
48 | | - private static final String SERVER_PORT_KEY = "server_port"; |
49 | | - private static final String METRICS_PREFIX = "metrics_prefix"; |
50 | | - private String metricsPrefix = null; |
51 | | - private Graphite graphite = null; |
52 | | - |
53 | | - @Override |
54 | | - public void init(SubsetConfiguration conf) { |
55 | | - // Get Graphite host configurations. |
56 | | - final String serverHost = conf.getString(SERVER_HOST_KEY); |
57 | | - final int serverPort = Integer.parseInt(conf.getString(SERVER_PORT_KEY)); |
58 | | - |
59 | | - // Get Graphite metrics graph prefix. |
60 | | - metricsPrefix = conf.getString(METRICS_PREFIX); |
61 | | - if (metricsPrefix == null) |
62 | | - metricsPrefix = ""; |
63 | | - |
64 | | - graphite = new Graphite(serverHost, serverPort); |
65 | | - graphite.connect(); |
| 46 | + private static final Logger LOG = |
| 47 | + LoggerFactory.getLogger(GraphiteSink.class); |
| 48 | + private static final String SERVER_HOST_KEY = "server_host"; |
| 49 | + private static final String SERVER_PORT_KEY = "server_port"; |
| 50 | + private static final String METRICS_PREFIX = "metrics_prefix"; |
| 51 | + private String metricsPrefix = null; |
| 52 | + private Graphite graphite = null; |
| 53 | + |
| 54 | + @Override |
| 55 | + public void init(SubsetConfiguration conf) { |
| 56 | + // Get Graphite host configurations. |
| 57 | + final String serverHost = conf.getString(SERVER_HOST_KEY); |
| 58 | + final int serverPort = Integer.parseInt(conf.getString(SERVER_PORT_KEY)); |
| 59 | + |
| 60 | + // Get Graphite metrics graph prefix. |
| 61 | + metricsPrefix = conf.getString(METRICS_PREFIX); |
| 62 | + if (metricsPrefix == null) { |
| 63 | + metricsPrefix = ""; |
66 | 64 | } |
67 | 65 |
|
68 | | - @Override |
69 | | - public void putMetrics(MetricsRecord record) { |
70 | | - StringBuilder lines = new StringBuilder(); |
71 | | - StringBuilder metricsPathPrefix = new StringBuilder(); |
72 | | - |
73 | | - // Configure the hierarchical place to display the graph. |
74 | | - metricsPathPrefix.append(metricsPrefix).append(".") |
75 | | - .append(record.context()).append(".").append(record.name()); |
76 | | - |
77 | | - for (MetricsTag tag : record.tags()) { |
78 | | - if (tag.value() != null) { |
79 | | - metricsPathPrefix.append(".") |
80 | | - .append(tag.name()) |
81 | | - .append("=") |
82 | | - .append(tag.value()); |
83 | | - } |
84 | | - } |
85 | | - |
86 | | - // The record timestamp is in milliseconds while Graphite expects an epoc time in seconds. |
87 | | - long timestamp = record.timestamp() / 1000L; |
| 66 | + graphite = new Graphite(serverHost, serverPort); |
| 67 | + graphite.connect(); |
| 68 | + } |
| 69 | + |
| 70 | + @Override |
| 71 | + public void putMetrics(MetricsRecord record) { |
| 72 | + StringBuilder lines = new StringBuilder(); |
| 73 | + StringBuilder metricsPathPrefix = new StringBuilder(); |
| 74 | + |
| 75 | + // Configure the hierarchical place to display the graph. |
| 76 | + metricsPathPrefix.append(metricsPrefix).append(".") |
| 77 | + .append(record.context()).append(".").append(record.name()); |
| 78 | + |
| 79 | + for (MetricsTag tag : record.tags()) { |
| 80 | + if (tag.value() != null) { |
| 81 | + metricsPathPrefix.append(".") |
| 82 | + .append(tag.name()) |
| 83 | + .append("=") |
| 84 | + .append(tag.value()); |
| 85 | + } |
| 86 | + } |
88 | 87 |
|
89 | | - // Collect datapoints. |
90 | | - for (AbstractMetric metric : record.metrics()) { |
91 | | - lines.append( |
92 | | - metricsPathPrefix.toString() + "." |
93 | | - + metric.name().replace(' ', '.')).append(" ") |
94 | | - .append(metric.value()).append(" ").append(timestamp) |
95 | | - .append("\n"); |
96 | | - } |
| 88 | + // The record timestamp is in milliseconds while Graphite expects an epoc time in seconds. |
| 89 | + long timestamp = record.timestamp() / 1000L; |
97 | 90 |
|
98 | | - try { |
99 | | - graphite.write(lines.toString()); |
100 | | - } catch (Exception e) { |
101 | | - LOG.warn("Error sending metrics to Graphite", e); |
102 | | - try { |
103 | | - graphite.close(); |
104 | | - } catch (Exception e1) { |
105 | | - throw new MetricsException("Error closing connection to Graphite", e1); |
106 | | - } |
107 | | - } |
| 91 | + // Collect datapoints. |
| 92 | + for (AbstractMetric metric : record.metrics()) { |
| 93 | + lines.append(metricsPathPrefix + "." + metric.name().replace(' ', '.')).append(" ") |
| 94 | + .append(metric.value()).append(" ").append(timestamp) |
| 95 | + .append("\n"); |
108 | 96 | } |
109 | 97 |
|
110 | | - @Override |
111 | | - public void flush() { |
| 98 | + try { |
| 99 | + graphite.write(lines.toString()); |
| 100 | + } catch (Exception e) { |
| 101 | + LOG.warn("Error sending metrics to Graphite.", e); |
112 | 102 | try { |
113 | | - graphite.flush(); |
114 | | - } catch (Exception e) { |
115 | | - LOG.warn("Error flushing metrics to Graphite", e); |
116 | | - try { |
117 | | - graphite.close(); |
118 | | - } catch (Exception e1) { |
119 | | - throw new MetricsException("Error closing connection to Graphite", e1); |
120 | | - } |
| 103 | + graphite.close(); |
| 104 | + } catch (Exception e1) { |
| 105 | + throw new MetricsException("Error closing connection to Graphite", e1); |
121 | 106 | } |
122 | 107 | } |
123 | | - |
124 | | - @Override |
125 | | - public void close() throws IOException { |
126 | | - graphite.close(); |
| 108 | + } |
| 109 | + |
| 110 | + @Override |
| 111 | + public void flush() { |
| 112 | + try { |
| 113 | + graphite.flush(); |
| 114 | + } catch (Exception e) { |
| 115 | + LOG.warn("Error flushing metrics to Graphite.", e); |
| 116 | + try { |
| 117 | + graphite.close(); |
| 118 | + } catch (Exception e1) { |
| 119 | + throw new MetricsException("Error closing connection to Graphite.", e1); |
| 120 | + } |
127 | 121 | } |
| 122 | + } |
128 | 123 |
|
129 | | - public static class Graphite { |
130 | | - private final static int MAX_CONNECTION_FAILURES = 5; |
| 124 | + @Override |
| 125 | + public void close() throws IOException { |
| 126 | + graphite.close(); |
| 127 | + } |
131 | 128 |
|
132 | | - private String serverHost; |
133 | | - private int serverPort; |
134 | | - private Writer writer = null; |
135 | | - private Socket socket = null; |
136 | | - private int connectionFailures = 0; |
| 129 | + public static class Graphite { |
| 130 | + private final static int MAX_CONNECTION_FAILURES = 5; |
137 | 131 |
|
138 | | - public Graphite(String serverHost, int serverPort) { |
139 | | - this.serverHost = serverHost; |
140 | | - this.serverPort = serverPort; |
141 | | - } |
| 132 | + private String serverHost; |
| 133 | + private int serverPort; |
| 134 | + private Writer writer = null; |
| 135 | + private Socket socket = null; |
| 136 | + private int connectionFailures = 0; |
142 | 137 |
|
143 | | - public void connect() { |
144 | | - if (isConnected()) { |
145 | | - throw new MetricsException("Already connected to Graphite"); |
146 | | - } |
147 | | - if (tooManyConnectionFailures()) { |
148 | | - // return silently (there was ERROR in logs when we reached limit for the first time) |
149 | | - return; |
150 | | - } |
151 | | - try { |
| 138 | + public Graphite(String serverHost, int serverPort) { |
| 139 | + this.serverHost = serverHost; |
| 140 | + this.serverPort = serverPort; |
| 141 | + } |
| 142 | + |
| 143 | + public void connect() { |
| 144 | + if (isConnected()) { |
| 145 | + throw new MetricsException("Already connected to Graphite"); |
| 146 | + } |
| 147 | + if (tooManyConnectionFailures()) { |
| 148 | + // return silently (there was ERROR in logs when we reached limit for the first time) |
| 149 | + return; |
| 150 | + } |
| 151 | + try { |
152 | 152 | // Open a connection to Graphite server. |
153 | | - socket = new Socket(serverHost, serverPort); |
| 153 | + socket = new Socket(serverHost, serverPort); |
154 | 154 | writer = new OutputStreamWriter(socket.getOutputStream(), |
155 | 155 | StandardCharsets.UTF_8); |
156 | | - } catch (Exception e) { |
157 | | - connectionFailures++; |
158 | | - if (tooManyConnectionFailures()) { |
159 | | - // first time when connection limit reached, report to logs |
160 | | - LOG.error("Too many connection failures, would not try to connect again."); |
161 | | - } |
162 | | - throw new MetricsException("Error creating connection, " |
163 | | - + serverHost + ":" + serverPort, e); |
| 156 | + } catch (Exception e) { |
| 157 | + connectionFailures++; |
| 158 | + if (tooManyConnectionFailures()) { |
| 159 | + // first time when connection limit reached, report to logs |
| 160 | + LOG.error("Too many connection failures, would not try to connect again."); |
164 | 161 | } |
| 162 | + throw new MetricsException("Error creating connection, " + |
| 163 | + serverHost + ":" + serverPort, e); |
165 | 164 | } |
| 165 | + } |
166 | 166 |
|
167 | | - public void write(String msg) throws IOException { |
168 | | - if (!isConnected()) { |
169 | | - connect(); |
170 | | - } |
171 | | - if (isConnected()) { |
172 | | - writer.write(msg); |
173 | | - } |
| 167 | + public void write(String msg) throws IOException { |
| 168 | + if (!isConnected()) { |
| 169 | + connect(); |
174 | 170 | } |
175 | | - |
176 | | - public void flush() throws IOException { |
177 | | - if (isConnected()) { |
178 | | - writer.flush(); |
179 | | - } |
| 171 | + if (isConnected()) { |
| 172 | + writer.write(msg); |
180 | 173 | } |
| 174 | + } |
181 | 175 |
|
182 | | - public boolean isConnected() { |
183 | | - return socket != null && socket.isConnected() && !socket.isClosed(); |
| 176 | + public void flush() throws IOException { |
| 177 | + if (isConnected()) { |
| 178 | + writer.flush(); |
184 | 179 | } |
| 180 | + } |
185 | 181 |
|
186 | | - public void close() throws IOException { |
187 | | - try { |
188 | | - if (writer != null) { |
189 | | - writer.close(); |
190 | | - } |
191 | | - } catch (IOException ex) { |
192 | | - if (socket != null) { |
193 | | - socket.close(); |
194 | | - } |
195 | | - } finally { |
196 | | - socket = null; |
197 | | - writer = null; |
198 | | - } |
199 | | - } |
| 182 | + public boolean isConnected() { |
| 183 | + return socket != null && socket.isConnected() && !socket.isClosed(); |
| 184 | + } |
200 | 185 |
|
201 | | - private boolean tooManyConnectionFailures() { |
202 | | - return connectionFailures > MAX_CONNECTION_FAILURES; |
| 186 | + public void close() throws IOException { |
| 187 | + try { |
| 188 | + if (writer != null) { |
| 189 | + writer.close(); |
| 190 | + } |
| 191 | + } catch (IOException ex) { |
| 192 | + if (socket != null) { |
| 193 | + socket.close(); |
| 194 | + } |
| 195 | + } finally { |
| 196 | + socket = null; |
| 197 | + writer = null; |
203 | 198 | } |
| 199 | + } |
204 | 200 |
|
| 201 | + private boolean tooManyConnectionFailures() { |
| 202 | + return connectionFailures > MAX_CONNECTION_FAILURES; |
205 | 203 | } |
| 204 | + } |
206 | 205 |
|
| 206 | + @VisibleForTesting |
| 207 | + void setGraphite(Graphite graphite) { |
| 208 | + this.graphite = graphite; |
| 209 | + } |
207 | 210 | } |
0 commit comments