From 8d083c4282b3b486e8d09cebde01109efcc71ac2 Mon Sep 17 00:00:00 2001 From: Xavier Vello Date: Mon, 8 Jan 2018 11:00:01 +0100 Subject: [PATCH] add UDS support --- pom.xml | 5 ++ .../statsd/NonBlockingStatsDClient.java | 49 ++++++++++------- .../timgroup/statsd/DummyStatsDServer.java | 36 +++++++++---- .../NonBlockingStatsDClientPerfTest.java | 3 +- .../statsd/NonBlockingStatsDClientTest.java | 3 +- .../com/timgroup/statsd/UnixSocketTest.java | 52 +++++++++++++++++++ 6 files changed, 117 insertions(+), 31 deletions(-) create mode 100644 src/test/java/com/timgroup/statsd/UnixSocketTest.java diff --git a/pom.xml b/pom.xml index c145dd0e..64ddf1af 100644 --- a/pom.xml +++ b/pom.xml @@ -66,6 +66,11 @@ test 4.12 + + com.github.jnr + jnr-unixsocket + 0.18 + diff --git a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java index de8c9dea..ac5265f9 100644 --- a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java +++ b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java @@ -2,6 +2,7 @@ import java.io.IOException; import java.net.InetAddress; +import java.net.SocketAddress; import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.nio.ByteBuffer; @@ -19,7 +20,8 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; - +import jnr.unixsocket.UnixSocketAddress; +import jnr.unixsocket.UnixDatagramChannel; /** @@ -288,7 +290,7 @@ public NonBlockingStatsDClient(final String prefix, final String hostname, final * if the client could not be started */ public NonBlockingStatsDClient(final String prefix, final int queueSize, String[] constantTags, final StatsDClientErrorHandler errorHandler, - final Callable addressLookup) throws StatsDClientException { + final Callable addressLookup) throws StatsDClientException { if((prefix != null) && (!prefix.isEmpty())) { this.prefix = String.format("%s.", prefix); } else { @@ -313,7 +315,12 @@ public NonBlockingStatsDClient(final String prefix, final int queueSize, String } try { - clientChannel = DatagramChannel.open(); + final SocketAddress address = addressLookup.call(); + if (address instanceof UnixSocketAddress) { + clientChannel = UnixDatagramChannel.open(); + } else{ + clientChannel = DatagramChannel.open(); + } } catch (final Exception e) { throw new StatsDClientException("Failed to start StatsD client", e); } @@ -883,10 +890,11 @@ private boolean isInvalidSample(double sampleRate) { private class QueueConsumer implements Runnable { private final ByteBuffer sendBuffer = ByteBuffer.allocate(PACKET_SIZE_BYTES); + private final Callable addressLookup; - private final Callable addressLookup; - QueueConsumer(final Callable addressLookup) { + + QueueConsumer(final Callable addressLookup) { this.addressLookup = addressLookup; } @@ -895,7 +903,7 @@ private class QueueConsumer implements Runnable { try { final String message = queue.poll(1, TimeUnit.SECONDS); if(null != message) { - final InetSocketAddress address = addressLookup.call(); + final SocketAddress address = addressLookup.call(); final byte[] data = message.getBytes(MESSAGE_CHARSET); if(sendBuffer.remaining() < (data.length + 1)) { blockingSend(address); @@ -914,7 +922,7 @@ private class QueueConsumer implements Runnable { } } - private void blockingSend(final InetSocketAddress address) throws IOException { + private void blockingSend(final SocketAddress address) throws IOException { final int sizeOfBuffer = sendBuffer.position(); sendBuffer.flip(); @@ -926,10 +934,9 @@ private void blockingSend(final InetSocketAddress address) throws IOException { handler.handle( new IOException( String.format( - "Could not send entirely stat %s to host %s:%d. Only sent %d bytes out of %d bytes", + "Could not send entirely stat %s to %s. Only sent %d bytes out of %d bytes", sendBuffer.toString(), - address.getHostName(), - address.getPort(), + address.toString(), sentBytes, sizeOfBuffer))); } @@ -943,10 +950,14 @@ private void blockingSend(final InetSocketAddress address) throws IOException { * @param port the port of the targeted StatsD server * @return a function to perform the lookup */ - public static Callable volatileAddressResolution(final String hostname, final int port) { - return new Callable() { - @Override public InetSocketAddress call() throws UnknownHostException { - return new InetSocketAddress(InetAddress.getByName(hostname), port); + public static Callable volatileAddressResolution(final String hostname, final int port) { + return new Callable() { + @Override public SocketAddress call() throws UnknownHostException { + if (port == 0) { // Hostname is a file path to the socket + return new UnixSocketAddress(hostname); + } else { + return new InetSocketAddress(InetAddress.getByName(hostname), port); + } } }; } @@ -959,16 +970,16 @@ public static Callable volatileAddressResolution(final String * @return a function that cached the result of the lookup * @throws Exception if the lookup fails, i.e. {@link UnknownHostException} */ - public static Callable staticAddressResolution(final String hostname, final int port) throws Exception { - final InetSocketAddress address = volatileAddressResolution(hostname, port).call(); - return new Callable() { - @Override public InetSocketAddress call() { + public static Callable staticAddressResolution(final String hostname, final int port) throws Exception { + final SocketAddress address = volatileAddressResolution(hostname, port).call(); + return new Callable() { + @Override public SocketAddress call() { return address; } }; } - private static Callable staticStatsDAddressResolution(final String hostname, final int port) throws StatsDClientException { + private static Callable staticStatsDAddressResolution(final String hostname, final int port) throws StatsDClientException { try { return staticAddressResolution(hostname, port); } catch (final Exception e) { diff --git a/src/test/java/com/timgroup/statsd/DummyStatsDServer.java b/src/test/java/com/timgroup/statsd/DummyStatsDServer.java index b9387b4c..3d837a78 100644 --- a/src/test/java/com/timgroup/statsd/DummyStatsDServer.java +++ b/src/test/java/com/timgroup/statsd/DummyStatsDServer.java @@ -2,27 +2,43 @@ package com.timgroup.statsd; import java.io.IOException; -import java.net.DatagramPacket; -import java.net.DatagramSocket; -import java.net.SocketException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.DatagramChannel; import java.util.ArrayList; import java.util.List; +import jnr.unixsocket.UnixDatagramChannel; +import jnr.unixsocket.UnixSocketAddress; +import java.nio.charset.StandardCharsets; final class DummyStatsDServer { private final List messagesReceived = new ArrayList(); - private final DatagramSocket server; + private final DatagramChannel server; - public DummyStatsDServer(int port) throws SocketException { - server = new DatagramSocket(port); + public DummyStatsDServer(int port) throws IOException { + server = DatagramChannel.open(); + server.bind(new InetSocketAddress(port)); + this.listen(); + } + + public DummyStatsDServer(String socketPath) throws IOException { + server = UnixDatagramChannel.open(); + server.bind(new UnixSocketAddress(socketPath)); + this.listen(); + } + + private void listen() { Thread thread = new Thread(new Runnable() { @Override public void run() { - while(!server.isClosed()) { + final ByteBuffer packet = ByteBuffer.allocate(1500); + while(server.isOpen()) { try { - final DatagramPacket packet = new DatagramPacket(new byte[1500], 1500); + packet.clear(); server.receive(packet); - for(String msg : new String(packet.getData(), NonBlockingStatsDClient.MESSAGE_CHARSET).split("\n")) { + packet.flip(); + for(String msg : StandardCharsets.UTF_8.decode(packet).toString().split("\n")) { messagesReceived.add(msg.trim()); } } catch (IOException e) { @@ -47,7 +63,7 @@ public List messagesReceived() { return new ArrayList(messagesReceived); } - public void close() { + public void close() throws IOException { server.close(); } diff --git a/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientPerfTest.java b/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientPerfTest.java index f58218a5..4e46ad1e 100644 --- a/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientPerfTest.java +++ b/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientPerfTest.java @@ -1,6 +1,7 @@ package com.timgroup.statsd; +import java.io.IOException; import java.net.SocketException; import java.util.Random; import java.util.concurrent.ExecutorService; @@ -22,7 +23,7 @@ public final class NonBlockingStatsDClientPerfTest { private static DummyStatsDServer server; @BeforeClass - public static void start() throws SocketException { + public static void start() throws IOException { server = new DummyStatsDServer(STATSD_SERVER_PORT); } diff --git a/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientTest.java b/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientTest.java index bf4c09e8..654c83cb 100644 --- a/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientTest.java +++ b/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientTest.java @@ -5,6 +5,7 @@ import org.junit.After; import org.junit.Test; +import java.io.IOException; import java.net.SocketException; import java.util.Locale; @@ -19,7 +20,7 @@ public class NonBlockingStatsDClientTest { private static DummyStatsDServer server; @BeforeClass - public static void start() throws SocketException { + public static void start() throws IOException { server = new DummyStatsDServer(STATSD_SERVER_PORT); } diff --git a/src/test/java/com/timgroup/statsd/UnixSocketTest.java b/src/test/java/com/timgroup/statsd/UnixSocketTest.java new file mode 100644 index 00000000..9369e272 --- /dev/null +++ b/src/test/java/com/timgroup/statsd/UnixSocketTest.java @@ -0,0 +1,52 @@ +package com.timgroup.statsd; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import java.io.IOException; +import java.io.File; +import java.nio.file.Files; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; + +public class UnixSocketTest { + private static File tmpFolder; + private static NonBlockingStatsDClient client; + private static DummyStatsDServer server; + + @BeforeClass + public static void start() throws IOException { + tmpFolder = Files.createTempDirectory(System.getProperty("java-dsd-test")).toFile(); + tmpFolder.deleteOnExit(); + File socketFile = new File(tmpFolder, "socket.sock"); + socketFile.deleteOnExit(); + + server = new DummyStatsDServer(socketFile.toString()); + client = new NonBlockingStatsDClient("my.prefix", socketFile.toString(), 0); + } + + @AfterClass + public static void stop() throws Exception { + client.stop(); + server.close(); + } + + @After + public void clear() { + server.clear(); + } + + @Test(timeout = 5000L) + public void + sends_to_statsd() throws Exception { + for(long i = 0; i < 5 ; i++) { + client.gauge("mycount", i); + server.waitForMessage(); + String expected = String.format("my.prefix.mycount:%d|g", i); + assertThat(server.messagesReceived(), contains(expected)); + server.clear(); + } + } +}