diff --git a/src/main/java/com/timgroup/statsd/NamedPipeSocketAddress.java b/src/main/java/com/timgroup/statsd/NamedPipeSocketAddress.java index d4b4c4fa..d95287d6 100644 --- a/src/main/java/com/timgroup/statsd/NamedPipeSocketAddress.java +++ b/src/main/java/com/timgroup/statsd/NamedPipeSocketAddress.java @@ -14,6 +14,16 @@ public String getPipe() { return pipe; } + /** + * Return true if object is a NamedPipeSocketAddress referring to the same path. + */ + public boolean equals(Object object) { + if (object instanceof NamedPipeSocketAddress) { + return pipe.equals(((NamedPipeSocketAddress)object).pipe); + } + return false; + } + /** * A normalized version of the pipe name that includes the `\\.\pipe\` prefix */ @@ -24,4 +34,8 @@ static String normalizePipeName(String pipeName) { return NAMED_PIPE_PREFIX + pipeName; } } + + static boolean isNamedPipe(String address) { + return address.startsWith(NAMED_PIPE_PREFIX); + } } diff --git a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java index f11f8df8..812f0063 100644 --- a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java +++ b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java @@ -62,6 +62,7 @@ public class NonBlockingStatsDClient implements StatsDClient { public static final String DD_ENTITY_ID_ENV_VAR = "DD_ENTITY_ID"; private static final String ENTITY_ID_TAG_NAME = "dd.internal.entity_id" ; public static final String ORIGIN_DETECTION_ENABLED_ENV_VAR = "DD_ORIGIN_DETECTION_ENABLED"; + public static final String DD_DOGSTATSD_URL_ENV_VAR = "DD_DOGSTATSD_URL"; private static final long MIN_TIMESTAMP = 1; diff --git a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java index 74ef6b0a..91479603 100644 --- a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java +++ b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java @@ -5,6 +5,7 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; +import java.net.URI; import java.net.UnknownHostException; import java.util.concurrent.Callable; import java.util.concurrent.ThreadFactory; @@ -207,18 +208,7 @@ protected NonBlockingStatsDClientBuilder resolve() { } int packetSize = maxPacketSizeBytes; - Callable lookup = addressLookup; - - if (lookup == null) { - String namedPipeFromEnv = System.getenv(NonBlockingStatsDClient.DD_NAMED_PIPE_ENV_VAR); - String resolvedNamedPipe = namedPipe == null ? namedPipeFromEnv : namedPipe; - - if (resolvedNamedPipe == null) { - lookup = staticStatsDAddressResolution(hostname, port); - } else { - lookup = staticNamedPipeResolution(resolvedNamedPipe); - } - } + Callable lookup = getAddressLookup(); if (packetSize == 0) { packetSize = (port == 0) ? NonBlockingStatsDClient.DEFAULT_UDS_MAX_PACKET_SIZE_BYTES : @@ -230,7 +220,7 @@ protected NonBlockingStatsDClientBuilder resolve() { if (telemetryHostname == null) { telemetryLookup = lookup; } else { - telemetryLookup = staticStatsDAddressResolution(telemetryHostname, telemetryPort); + telemetryLookup = staticAddress(telemetryHostname, telemetryPort); } } @@ -241,6 +231,66 @@ protected NonBlockingStatsDClientBuilder resolve() { return resolved; } + private Callable getAddressLookup() { + // First, use explicit configuration on the builder. + if (addressLookup != null) { + return addressLookup; + } + + if (namedPipe != null) { + return staticNamedPipeResolution(namedPipe); + } + + if (hostname != null) { + return staticAddress(hostname, port); + } + + // Next, try various environment variables. + String url = System.getenv(NonBlockingStatsDClient.DD_DOGSTATSD_URL_ENV_VAR); + if (url != null) { + return getAddressLookupFromUrl(url); + } + + String namedPipeFromEnv = System.getenv(NonBlockingStatsDClient.DD_NAMED_PIPE_ENV_VAR); + if (namedPipeFromEnv != null) { + return staticNamedPipeResolution(namedPipeFromEnv); + } + + String hostFromEnv = getHostnameFromEnvVar(); + int portFromEnv = getPortFromEnvVar(port); + + return staticAddress(hostFromEnv, portFromEnv); + } + + private Callable getAddressLookupFromUrl(String url) { + if (NamedPipeSocketAddress.isNamedPipe(url)) { + return staticNamedPipeResolution(url); + } + + URI parsed; + try { + parsed = new URI(url); + } catch (Exception e) { + return null; + } + + if (parsed.getScheme().equals("udp")) { + String uriHost = parsed.getHost(); + int uriPort = parsed.getPort(); + if (uriPort < 0) { + uriPort = port; + } + return staticAddress(uriHost, uriPort); + } + + if (parsed.getScheme().equals("unix")) { + String uriPath = parsed.getPath(); + return staticAddress(uriPath, 0); + } + + return null; + } + /** * Create dynamic lookup for the given host name and port. * @@ -288,20 +338,6 @@ public static Callable staticAddressResolution(final String hostn }; } - protected static Callable staticStatsDAddressResolution(String hostname, int port) - throws StatsDClientException { - try { - if (hostname == null) { - hostname = getHostnameFromEnvVar(); - port = getPortFromEnvVar(port); - } - - return staticAddressResolution(hostname, port); - } catch (final Exception e) { - throw new StatsDClientException("Failed to lookup StatsD host", e); - } - } - protected static Callable staticNamedPipeResolution(String namedPipe) { final NamedPipeSocketAddress socketAddress = new NamedPipeSocketAddress(namedPipe); return new Callable() { @@ -311,6 +347,14 @@ protected static Callable staticNamedPipeResolution(String namedP }; } + private static Callable staticAddress(final String hostname, final int port) { + try { + return staticAddressResolution(hostname, port); + } catch (Exception e) { + throw new StatsDClientException("Failed to lookup StatsD host", e); + } + } + /** * Retrieves host name from the environment variable "DD_AGENT_HOST". * diff --git a/src/test/java/com/timgroup/statsd/BuilderAddressTest.java b/src/test/java/com/timgroup/statsd/BuilderAddressTest.java new file mode 100644 index 00000000..569ac837 --- /dev/null +++ b/src/test/java/com/timgroup/statsd/BuilderAddressTest.java @@ -0,0 +1,133 @@ +package com.timgroup.statsd; + +import java.net.SocketAddress; +import java.net.InetSocketAddress; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; + +import jnr.unixsocket.UnixSocketAddress; + +import org.junit.Assume; +import org.junit.Before; +import org.junit.Test; +import org.junit.Rule; +import org.junit.contrib.java.lang.system.EnvironmentVariables; +import org.junit.runners.MethodSorters; +import org.junit.function.ThrowingRunnable; + +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import static org.junit.Assert.assertEquals; + +@RunWith(Parameterized.class) +public class BuilderAddressTest { + @Rule + public final EnvironmentVariables environmentVariables = new EnvironmentVariables(); + + final String url; + final String host; + final String port; + final String pipe; + final SocketAddress expected; + + public BuilderAddressTest(String url, String host, String port, String pipe, SocketAddress expected) { + this.url = url; + this.host = host; + this.port = port; + this.pipe = pipe; + this.expected = expected; + } + + static boolean isJnrAvailable() { + try { + Class.forName("jnr.unixsocket.UnixDatagramChannel"); + return true; + } catch (ClassNotFoundException e) { + return false; + } + } + + static final private int defaultPort = NonBlockingStatsDClient.DEFAULT_DOGSTATSD_PORT; + + @Parameters + public static Collection parameters() { + ArrayList params = new ArrayList(); + + params.addAll(Arrays.asList(new Object[][]{ + // DD_DOGSTATSD_URL + { "udp://1.1.1.1", null, null, null, new InetSocketAddress("1.1.1.1", defaultPort) }, + { "udp://1.1.1.1:9999", null, null, null, new InetSocketAddress("1.1.1.1", 9999) }, + { "\\\\.\\pipe\\foo", null, null, null, new NamedPipeSocketAddress("\\\\.\\pipe\\foo") }, + + // DD_AGENT_HOST + { null, "1.1.1.1", null, null, new InetSocketAddress("1.1.1.1", defaultPort) }, + + // DD_AGENT_HOST, DD_DOGSTATSD_PORT + { null, "1.1.1.1", "9999", null, new InetSocketAddress("1.1.1.1", 9999) }, + + { null, null, null, "foo", new NamedPipeSocketAddress("\\\\.\\pipe\\foo") }, + + // DD_DOGSTATSD_URL overrides other env vars. + { "udp://1.1.1.1", null, null, "foo", new InetSocketAddress("1.1.1.1", defaultPort) }, + { "udp://1.1.1.1:9999", null, null, "foo", new InetSocketAddress("1.1.1.1", 9999) }, + { "\\\\.\\pipe\\foo", null, null, "bar", new NamedPipeSocketAddress("\\\\.\\pipe\\foo") }, + { "\\\\.\\pipe\\foo", "1.1.1.1", null, null, new NamedPipeSocketAddress("\\\\.\\pipe\\foo") }, + { "\\\\.\\pipe\\foo", "1.1.1.1", "9999", null, new NamedPipeSocketAddress("\\\\.\\pipe\\foo") }, + + // DD_DOGSTATSD_NAMED_PIPE overrides DD_AGENT_HOST. + { null, "1.1.1.1", null, "foo", new NamedPipeSocketAddress("\\\\.\\pipe\\foo") }, + { null, "1.1.1.1", "9999", "foo", new NamedPipeSocketAddress("\\\\.\\pipe\\foo") }, + })); + + if (isJnrAvailable()) { + params.addAll(Arrays.asList(new Object[][]{ + { "unix:///dsd.sock", null, null, null, new UnixSocketAddress("/dsd.sock") }, + { "unix://unused/dsd.sock", null, null, null, new UnixSocketAddress("/dsd.sock") }, + { "unix://unused:9999/dsd.sock", null, null, null, new UnixSocketAddress("/dsd.sock") }, + { null, "/dsd.sock", "0", null, new UnixSocketAddress("/dsd.sock") }, + { "unix:///dsd.sock", "1.1.1.1", "9999", null, new UnixSocketAddress("/dsd.sock") }, + })); + } + + return params; + } + + @Before + public void set() { + set(NonBlockingStatsDClient.DD_DOGSTATSD_URL_ENV_VAR, url); + set(NonBlockingStatsDClient.DD_AGENT_HOST_ENV_VAR, host); + set(NonBlockingStatsDClient.DD_DOGSTATSD_PORT_ENV_VAR, port); + set(NonBlockingStatsDClient.DD_NAMED_PIPE_ENV_VAR, pipe); + } + + void set(String name, String val) { + if (val != null) { + environmentVariables.set(name, val); + } else { + environmentVariables.clear(name); + } + } + + @Test(timeout = 5000L) + public void address_resolution() throws Exception { + NonBlockingStatsDClientBuilder b; + + // Default configuration matches env vars + b = new NonBlockingStatsDClientBuilder().resolve(); + assertEquals(expected, b.addressLookup.call()); + + // Explicit configuration is used regardless of environment variables. + b = new NonBlockingStatsDClientBuilder().hostname("2.2.2.2").resolve(); + assertEquals(new InetSocketAddress("2.2.2.2", defaultPort), b.addressLookup.call()); + + b = new NonBlockingStatsDClientBuilder().hostname("2.2.2.2").port(2222).resolve(); + assertEquals(new InetSocketAddress("2.2.2.2", 2222), b.addressLookup.call()); + + b = new NonBlockingStatsDClientBuilder().namedPipe("ook").resolve(); + assertEquals(new NamedPipeSocketAddress("ook"), b.addressLookup.call()); + } +} diff --git a/src/test/java/com/timgroup/statsd/NamedPipeTest.java b/src/test/java/com/timgroup/statsd/NamedPipeTest.java index 97b723e1..94dc1896 100644 --- a/src/test/java/com/timgroup/statsd/NamedPipeTest.java +++ b/src/test/java/com/timgroup/statsd/NamedPipeTest.java @@ -3,15 +3,22 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.Matchers.instanceOf; + import java.io.IOException; +import java.net.InetSocketAddress; import java.util.Random; import java.util.logging.Logger; + import org.junit.After; import org.junit.Assume; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import org.junit.Rule; +import org.junit.contrib.java.lang.system.EnvironmentVariables; +import static org.junit.Assert.assertEquals; public class NamedPipeTest implements StatsDClientErrorHandler { private static final Logger log = Logger.getLogger("NamedPipeTest"); @@ -21,6 +28,9 @@ public class NamedPipeTest implements StatsDClientErrorHandler { private DummyStatsDServer server; private volatile Exception lastException = new Exception(); + @Rule + public final EnvironmentVariables environmentVariables = new EnvironmentVariables(); + public synchronized void handle(Exception exception) { log.info("Got exception: " + exception.getMessage()); lastException = exception; diff --git a/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientTest.java b/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientTest.java index 461b7a59..510bc0cb 100644 --- a/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientTest.java +++ b/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientTest.java @@ -1,5 +1,6 @@ package com.timgroup.statsd; + import org.junit.After; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -24,14 +25,17 @@ import java.util.concurrent.BlockingQueue; import java.util.logging.Logger; import java.text.NumberFormat; - import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.comparesEqualTo; import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.startsWith; +import static org.hamcrest.Matchers.instanceOf; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertThrows; + +import org.junit.function.ThrowingRunnable; @FixMethodOrder(MethodSorters.NAME_ASCENDING) public class NonBlockingStatsDClientTest { @@ -1679,27 +1683,8 @@ private static class SlowStatsDNonBlockingStatsDClient extends NonBlockingStatsD private CountDownLatch lock; - SlowStatsDNonBlockingStatsDClient(final String prefix, final int queueSize, - String[] constantTags, final StatsDClientErrorHandler errorHandler, - Callable addressLookup, final int timeout, final int bufferSize, - final int maxPacketSizeBytes, String entityID, final int poolSize, final int processorWorkers, - final int senderWorkers, boolean blocking) throws StatsDClientException { - - super(new NonBlockingStatsDClientBuilder() - .prefix(prefix) - .queueSize(queueSize) - .constantTags(constantTags) - .errorHandler(errorHandler) - .addressLookup(addressLookup) - .timeout(timeout) - .entityID(entityID) - .bufferPoolSize(poolSize) - .blocking(blocking) - .senderWorkers(senderWorkers) - .processorWorkers(processorWorkers) - .maxPacketSizeBytes(maxPacketSizeBytes) - .originDetectionEnabled(false) - .resolve()); + SlowStatsDNonBlockingStatsDClient(NonBlockingStatsDClientBuilder builder) throws StatsDClientException { + super(builder); lock = new CountDownLatch(1); } @@ -1720,21 +1705,7 @@ private static class SlowStatsDNonBlockingStatsDClientBuilder extends NonBlockin @Override public SlowStatsDNonBlockingStatsDClient build() throws StatsDClientException { - int packetSize = maxPacketSizeBytes; - if (packetSize == 0) { - packetSize = (port == 0) ? NonBlockingStatsDClient.DEFAULT_UDS_MAX_PACKET_SIZE_BYTES : - NonBlockingStatsDClient.DEFAULT_UDP_MAX_PACKET_SIZE_BYTES; - } - - if (addressLookup != null) { - return new SlowStatsDNonBlockingStatsDClient(prefix, queueSize, constantTags, errorHandler, - addressLookup, timeout, socketBufferSize, packetSize, entityID, bufferPoolSize, - processorWorkers, senderWorkers, blocking); - } else { - return new SlowStatsDNonBlockingStatsDClient(prefix, queueSize, constantTags, errorHandler, - staticStatsDAddressResolution(hostname, port), timeout, socketBufferSize, packetSize, - entityID, bufferPoolSize, processorWorkers, senderWorkers, blocking); - } + return new SlowStatsDNonBlockingStatsDClient(resolve()); } } @@ -1874,4 +1845,14 @@ public void handle(Exception ex) { assertEquals(0, errors.size()); } + + @Test(timeout = 5000L) + public void address_resolution_empty() throws Exception { + assertThrows(StatsDClientException.class, new ThrowingRunnable() { + @Override + public void run() { + new NonBlockingStatsDClientBuilder().resolve(); + } + }); + } }