Skip to content

Commit

Permalink
add UDS support
Browse files Browse the repository at this point in the history
  • Loading branch information
xvello committed Jan 8, 2018
1 parent 14e253f commit 8d083c4
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 31 deletions.
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@
<scope>test</scope>
<version>4.12</version>
</dependency>
<dependency>
<groupId>com.github.jnr</groupId>
<artifactId>jnr-unixsocket</artifactId>
<version>0.18</version>
</dependency>
</dependencies>

<distributionManagement>
Expand Down
49 changes: 30 additions & 19 deletions src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.io.IOException;
import java.net.InetAddress;
import java.net.SocketAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
Expand All @@ -19,7 +20,8 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

import jnr.unixsocket.UnixSocketAddress;
import jnr.unixsocket.UnixDatagramChannel;


/**
Expand Down Expand Up @@ -288,7 +290,7 @@ public NonBlockingStatsDClient(final String prefix, final String hostname, final
* if the client could not be started
*/
public NonBlockingStatsDClient(final String prefix, final int queueSize, String[] constantTags, final StatsDClientErrorHandler errorHandler,
final Callable<InetSocketAddress> addressLookup) throws StatsDClientException {
final Callable<SocketAddress> addressLookup) throws StatsDClientException {
if((prefix != null) && (!prefix.isEmpty())) {
this.prefix = String.format("%s.", prefix);
} else {
Expand All @@ -313,7 +315,12 @@ public NonBlockingStatsDClient(final String prefix, final int queueSize, String
}

try {
clientChannel = DatagramChannel.open();
final SocketAddress address = addressLookup.call();
if (address instanceof UnixSocketAddress) {
clientChannel = UnixDatagramChannel.open();
} else{
clientChannel = DatagramChannel.open();
}
} catch (final Exception e) {
throw new StatsDClientException("Failed to start StatsD client", e);
}
Expand Down Expand Up @@ -883,10 +890,11 @@ private boolean isInvalidSample(double sampleRate) {

private class QueueConsumer implements Runnable {
private final ByteBuffer sendBuffer = ByteBuffer.allocate(PACKET_SIZE_BYTES);
private final Callable<SocketAddress> addressLookup;

private final Callable<InetSocketAddress> addressLookup;

QueueConsumer(final Callable<InetSocketAddress> addressLookup) {

QueueConsumer(final Callable<SocketAddress> addressLookup) {
this.addressLookup = addressLookup;
}

Expand All @@ -895,7 +903,7 @@ private class QueueConsumer implements Runnable {
try {
final String message = queue.poll(1, TimeUnit.SECONDS);
if(null != message) {
final InetSocketAddress address = addressLookup.call();
final SocketAddress address = addressLookup.call();
final byte[] data = message.getBytes(MESSAGE_CHARSET);
if(sendBuffer.remaining() < (data.length + 1)) {
blockingSend(address);
Expand All @@ -914,7 +922,7 @@ private class QueueConsumer implements Runnable {
}
}

private void blockingSend(final InetSocketAddress address) throws IOException {
private void blockingSend(final SocketAddress address) throws IOException {
final int sizeOfBuffer = sendBuffer.position();
sendBuffer.flip();

Expand All @@ -926,10 +934,9 @@ private void blockingSend(final InetSocketAddress address) throws IOException {
handler.handle(
new IOException(
String.format(
"Could not send entirely stat %s to host %s:%d. Only sent %d bytes out of %d bytes",
"Could not send entirely stat %s to %s. Only sent %d bytes out of %d bytes",
sendBuffer.toString(),
address.getHostName(),
address.getPort(),
address.toString(),
sentBytes,
sizeOfBuffer)));
}
Expand All @@ -943,10 +950,14 @@ private void blockingSend(final InetSocketAddress address) throws IOException {
* @param port the port of the targeted StatsD server
* @return a function to perform the lookup
*/
public static Callable<InetSocketAddress> volatileAddressResolution(final String hostname, final int port) {
return new Callable<InetSocketAddress>() {
@Override public InetSocketAddress call() throws UnknownHostException {
return new InetSocketAddress(InetAddress.getByName(hostname), port);
public static Callable<SocketAddress> volatileAddressResolution(final String hostname, final int port) {
return new Callable<SocketAddress>() {
@Override public SocketAddress call() throws UnknownHostException {
if (port == 0) { // Hostname is a file path to the socket
return new UnixSocketAddress(hostname);
} else {
return new InetSocketAddress(InetAddress.getByName(hostname), port);
}
}
};
}
Expand All @@ -959,16 +970,16 @@ public static Callable<InetSocketAddress> volatileAddressResolution(final String
* @return a function that cached the result of the lookup
* @throws Exception if the lookup fails, i.e. {@link UnknownHostException}
*/
public static Callable<InetSocketAddress> staticAddressResolution(final String hostname, final int port) throws Exception {
final InetSocketAddress address = volatileAddressResolution(hostname, port).call();
return new Callable<InetSocketAddress>() {
@Override public InetSocketAddress call() {
public static Callable<SocketAddress> staticAddressResolution(final String hostname, final int port) throws Exception {
final SocketAddress address = volatileAddressResolution(hostname, port).call();
return new Callable<SocketAddress>() {
@Override public SocketAddress call() {
return address;
}
};
}

private static Callable<InetSocketAddress> staticStatsDAddressResolution(final String hostname, final int port) throws StatsDClientException {
private static Callable<SocketAddress> staticStatsDAddressResolution(final String hostname, final int port) throws StatsDClientException {
try {
return staticAddressResolution(hostname, port);
} catch (final Exception e) {
Expand Down
36 changes: 26 additions & 10 deletions src/test/java/com/timgroup/statsd/DummyStatsDServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,43 @@
package com.timgroup.statsd;

import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.SocketException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.util.ArrayList;
import java.util.List;
import jnr.unixsocket.UnixDatagramChannel;
import jnr.unixsocket.UnixSocketAddress;
import java.nio.charset.StandardCharsets;


final class DummyStatsDServer {
private final List<String> messagesReceived = new ArrayList<String>();
private final DatagramSocket server;
private final DatagramChannel server;

public DummyStatsDServer(int port) throws SocketException {
server = new DatagramSocket(port);
public DummyStatsDServer(int port) throws IOException {
server = DatagramChannel.open();
server.bind(new InetSocketAddress(port));
this.listen();
}

public DummyStatsDServer(String socketPath) throws IOException {
server = UnixDatagramChannel.open();
server.bind(new UnixSocketAddress(socketPath));
this.listen();
}

private void listen() {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
while(!server.isClosed()) {
final ByteBuffer packet = ByteBuffer.allocate(1500);
while(server.isOpen()) {
try {
final DatagramPacket packet = new DatagramPacket(new byte[1500], 1500);
packet.clear();
server.receive(packet);
for(String msg : new String(packet.getData(), NonBlockingStatsDClient.MESSAGE_CHARSET).split("\n")) {
packet.flip();
for(String msg : StandardCharsets.UTF_8.decode(packet).toString().split("\n")) {
messagesReceived.add(msg.trim());
}
} catch (IOException e) {
Expand All @@ -47,7 +63,7 @@ public List<String> messagesReceived() {
return new ArrayList<String>(messagesReceived);
}

public void close() {
public void close() throws IOException {
server.close();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.timgroup.statsd;


import java.io.IOException;
import java.net.SocketException;
import java.util.Random;
import java.util.concurrent.ExecutorService;
Expand All @@ -22,7 +23,7 @@ public final class NonBlockingStatsDClientPerfTest {
private static DummyStatsDServer server;

@BeforeClass
public static void start() throws SocketException {
public static void start() throws IOException {
server = new DummyStatsDServer(STATSD_SERVER_PORT);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import org.junit.After;
import org.junit.Test;

import java.io.IOException;
import java.net.SocketException;
import java.util.Locale;

Expand All @@ -19,7 +20,7 @@ public class NonBlockingStatsDClientTest {
private static DummyStatsDServer server;

@BeforeClass
public static void start() throws SocketException {
public static void start() throws IOException {
server = new DummyStatsDServer(STATSD_SERVER_PORT);
}

Expand Down
52 changes: 52 additions & 0 deletions src/test/java/com/timgroup/statsd/UnixSocketTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package com.timgroup.statsd;

import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
import java.io.File;
import java.nio.file.Files;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;

public class UnixSocketTest {
private static File tmpFolder;
private static NonBlockingStatsDClient client;
private static DummyStatsDServer server;

@BeforeClass
public static void start() throws IOException {
tmpFolder = Files.createTempDirectory(System.getProperty("java-dsd-test")).toFile();
tmpFolder.deleteOnExit();
File socketFile = new File(tmpFolder, "socket.sock");
socketFile.deleteOnExit();

server = new DummyStatsDServer(socketFile.toString());
client = new NonBlockingStatsDClient("my.prefix", socketFile.toString(), 0);
}

@AfterClass
public static void stop() throws Exception {
client.stop();
server.close();
}

@After
public void clear() {
server.clear();
}

@Test(timeout = 5000L)
public void
sends_to_statsd() throws Exception {
for(long i = 0; i < 5 ; i++) {
client.gauge("mycount", i);
server.waitForMessage();
String expected = String.format("my.prefix.mycount:%d|g", i);
assertThat(server.messagesReceived(), contains(expected));
server.clear();
}
}
}

0 comments on commit 8d083c4

Please sign in to comment.