Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for DD_DOGSTATSD_URL #217

Merged
merged 3 commits into from
Mar 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions src/main/java/com/timgroup/statsd/NamedPipeSocketAddress.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,16 @@ public String getPipe() {
return pipe;
}

/**
* Return true if object is a NamedPipeSocketAddress referring to the same path.
*/
public boolean equals(Object object) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this used anywhere?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is used implicitly in tests when we assertEquals two NamedPipeSocketAddresses

if (object instanceof NamedPipeSocketAddress) {
return pipe.equals(((NamedPipeSocketAddress)object).pipe);
}
return false;
}

/**
* A normalized version of the pipe name that includes the `\\.\pipe\` prefix
*/
Expand All @@ -24,4 +34,8 @@ static String normalizePipeName(String pipeName) {
return NAMED_PIPE_PREFIX + pipeName;
}
}

static boolean isNamedPipe(String address) {
return address.startsWith(NAMED_PIPE_PREFIX);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class NonBlockingStatsDClient implements StatsDClient {
public static final String DD_ENTITY_ID_ENV_VAR = "DD_ENTITY_ID";
private static final String ENTITY_ID_TAG_NAME = "dd.internal.entity_id" ;
public static final String ORIGIN_DETECTION_ENABLED_ENV_VAR = "DD_ORIGIN_DETECTION_ENABLED";
public static final String DD_DOGSTATSD_URL_ENV_VAR = "DD_DOGSTATSD_URL";

private static final long MIN_TIMESTAMP = 1;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.URI;
import java.net.UnknownHostException;
import java.util.concurrent.Callable;
import java.util.concurrent.ThreadFactory;
Expand Down Expand Up @@ -207,18 +208,7 @@ protected NonBlockingStatsDClientBuilder resolve() {
}

int packetSize = maxPacketSizeBytes;
Callable<SocketAddress> lookup = addressLookup;

if (lookup == null) {
String namedPipeFromEnv = System.getenv(NonBlockingStatsDClient.DD_NAMED_PIPE_ENV_VAR);
String resolvedNamedPipe = namedPipe == null ? namedPipeFromEnv : namedPipe;

if (resolvedNamedPipe == null) {
lookup = staticStatsDAddressResolution(hostname, port);
} else {
lookup = staticNamedPipeResolution(resolvedNamedPipe);
}
}
Callable<SocketAddress> lookup = getAddressLookup();

if (packetSize == 0) {
packetSize = (port == 0) ? NonBlockingStatsDClient.DEFAULT_UDS_MAX_PACKET_SIZE_BYTES :
Expand All @@ -230,7 +220,7 @@ protected NonBlockingStatsDClientBuilder resolve() {
if (telemetryHostname == null) {
telemetryLookup = lookup;
} else {
telemetryLookup = staticStatsDAddressResolution(telemetryHostname, telemetryPort);
telemetryLookup = staticAddress(telemetryHostname, telemetryPort);
}
}

Expand All @@ -241,6 +231,66 @@ protected NonBlockingStatsDClientBuilder resolve() {
return resolved;
}

private Callable<SocketAddress> getAddressLookup() {
// First, use explicit configuration on the builder.
if (addressLookup != null) {
return addressLookup;
}

if (namedPipe != null) {
return staticNamedPipeResolution(namedPipe);
}

if (hostname != null) {
return staticAddress(hostname, port);
}

// Next, try various environment variables.
String url = System.getenv(NonBlockingStatsDClient.DD_DOGSTATSD_URL_ENV_VAR);
if (url != null) {
return getAddressLookupFromUrl(url);
}

String namedPipeFromEnv = System.getenv(NonBlockingStatsDClient.DD_NAMED_PIPE_ENV_VAR);
if (namedPipeFromEnv != null) {
return staticNamedPipeResolution(namedPipeFromEnv);
}

String hostFromEnv = getHostnameFromEnvVar();
int portFromEnv = getPortFromEnvVar(port);

return staticAddress(hostFromEnv, portFromEnv);
}

private Callable<SocketAddress> getAddressLookupFromUrl(String url) {
if (NamedPipeSocketAddress.isNamedPipe(url)) {
return staticNamedPipeResolution(url);
}

URI parsed;
try {
parsed = new URI(url);
} catch (Exception e) {
return null;
}

if (parsed.getScheme().equals("udp")) {
String uriHost = parsed.getHost();
int uriPort = parsed.getPort();
if (uriPort < 0) {
uriPort = port;
}
return staticAddress(uriHost, uriPort);
}

if (parsed.getScheme().equals("unix")) {
String uriPath = parsed.getPath();
return staticAddress(uriPath, 0);
}

return null;
}

/**
* Create dynamic lookup for the given host name and port.
*
Expand Down Expand Up @@ -288,20 +338,6 @@ public static Callable<SocketAddress> staticAddressResolution(final String hostn
};
}

protected static Callable<SocketAddress> staticStatsDAddressResolution(String hostname, int port)
throws StatsDClientException {
try {
if (hostname == null) {
hostname = getHostnameFromEnvVar();
port = getPortFromEnvVar(port);
}

return staticAddressResolution(hostname, port);
} catch (final Exception e) {
throw new StatsDClientException("Failed to lookup StatsD host", e);
}
}

protected static Callable<SocketAddress> staticNamedPipeResolution(String namedPipe) {
final NamedPipeSocketAddress socketAddress = new NamedPipeSocketAddress(namedPipe);
return new Callable<SocketAddress>() {
Expand All @@ -311,6 +347,14 @@ protected static Callable<SocketAddress> staticNamedPipeResolution(String namedP
};
}

private static Callable<SocketAddress> staticAddress(final String hostname, final int port) {
try {
return staticAddressResolution(hostname, port);
} catch (Exception e) {
throw new StatsDClientException("Failed to lookup StatsD host", e);
}
}

/**
* Retrieves host name from the environment variable "DD_AGENT_HOST".
*
Expand Down
133 changes: 133 additions & 0 deletions src/test/java/com/timgroup/statsd/BuilderAddressTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package com.timgroup.statsd;

import java.net.SocketAddress;
import java.net.InetSocketAddress;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;

import jnr.unixsocket.UnixSocketAddress;

import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
import org.junit.Rule;
import org.junit.contrib.java.lang.system.EnvironmentVariables;
import org.junit.runners.MethodSorters;
import org.junit.function.ThrowingRunnable;

import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;

import static org.junit.Assert.assertEquals;

@RunWith(Parameterized.class)
public class BuilderAddressTest {
@Rule
public final EnvironmentVariables environmentVariables = new EnvironmentVariables();

final String url;
final String host;
final String port;
final String pipe;
final SocketAddress expected;

public BuilderAddressTest(String url, String host, String port, String pipe, SocketAddress expected) {
this.url = url;
this.host = host;
this.port = port;
this.pipe = pipe;
this.expected = expected;
}

static boolean isJnrAvailable() {
try {
Class.forName("jnr.unixsocket.UnixDatagramChannel");
return true;
} catch (ClassNotFoundException e) {
return false;
}
}

static final private int defaultPort = NonBlockingStatsDClient.DEFAULT_DOGSTATSD_PORT;

@Parameters
public static Collection<Object[]> parameters() {
ArrayList params = new ArrayList();

params.addAll(Arrays.asList(new Object[][]{
// DD_DOGSTATSD_URL
{ "udp://1.1.1.1", null, null, null, new InetSocketAddress("1.1.1.1", defaultPort) },
{ "udp://1.1.1.1:9999", null, null, null, new InetSocketAddress("1.1.1.1", 9999) },
{ "\\\\.\\pipe\\foo", null, null, null, new NamedPipeSocketAddress("\\\\.\\pipe\\foo") },

// DD_AGENT_HOST
{ null, "1.1.1.1", null, null, new InetSocketAddress("1.1.1.1", defaultPort) },

// DD_AGENT_HOST, DD_DOGSTATSD_PORT
{ null, "1.1.1.1", "9999", null, new InetSocketAddress("1.1.1.1", 9999) },

{ null, null, null, "foo", new NamedPipeSocketAddress("\\\\.\\pipe\\foo") },

// DD_DOGSTATSD_URL overrides other env vars.
{ "udp://1.1.1.1", null, null, "foo", new InetSocketAddress("1.1.1.1", defaultPort) },
{ "udp://1.1.1.1:9999", null, null, "foo", new InetSocketAddress("1.1.1.1", 9999) },
{ "\\\\.\\pipe\\foo", null, null, "bar", new NamedPipeSocketAddress("\\\\.\\pipe\\foo") },
{ "\\\\.\\pipe\\foo", "1.1.1.1", null, null, new NamedPipeSocketAddress("\\\\.\\pipe\\foo") },
{ "\\\\.\\pipe\\foo", "1.1.1.1", "9999", null, new NamedPipeSocketAddress("\\\\.\\pipe\\foo") },

// DD_DOGSTATSD_NAMED_PIPE overrides DD_AGENT_HOST.
{ null, "1.1.1.1", null, "foo", new NamedPipeSocketAddress("\\\\.\\pipe\\foo") },
{ null, "1.1.1.1", "9999", "foo", new NamedPipeSocketAddress("\\\\.\\pipe\\foo") },
}));

if (isJnrAvailable()) {
params.addAll(Arrays.asList(new Object[][]{
{ "unix:///dsd.sock", null, null, null, new UnixSocketAddress("/dsd.sock") },
{ "unix://unused/dsd.sock", null, null, null, new UnixSocketAddress("/dsd.sock") },
{ "unix://unused:9999/dsd.sock", null, null, null, new UnixSocketAddress("/dsd.sock") },
{ null, "/dsd.sock", "0", null, new UnixSocketAddress("/dsd.sock") },
{ "unix:///dsd.sock", "1.1.1.1", "9999", null, new UnixSocketAddress("/dsd.sock") },
}));
}

return params;
}

@Before
public void set() {
set(NonBlockingStatsDClient.DD_DOGSTATSD_URL_ENV_VAR, url);
set(NonBlockingStatsDClient.DD_AGENT_HOST_ENV_VAR, host);
set(NonBlockingStatsDClient.DD_DOGSTATSD_PORT_ENV_VAR, port);
set(NonBlockingStatsDClient.DD_NAMED_PIPE_ENV_VAR, pipe);
}

void set(String name, String val) {
if (val != null) {
environmentVariables.set(name, val);
} else {
environmentVariables.clear(name);
}
}

@Test(timeout = 5000L)
public void address_resolution() throws Exception {
NonBlockingStatsDClientBuilder b;

// Default configuration matches env vars
b = new NonBlockingStatsDClientBuilder().resolve();
assertEquals(expected, b.addressLookup.call());

// Explicit configuration is used regardless of environment variables.
b = new NonBlockingStatsDClientBuilder().hostname("2.2.2.2").resolve();
assertEquals(new InetSocketAddress("2.2.2.2", defaultPort), b.addressLookup.call());

b = new NonBlockingStatsDClientBuilder().hostname("2.2.2.2").port(2222).resolve();
assertEquals(new InetSocketAddress("2.2.2.2", 2222), b.addressLookup.call());

b = new NonBlockingStatsDClientBuilder().namedPipe("ook").resolve();
assertEquals(new NamedPipeSocketAddress("ook"), b.addressLookup.call());
}
}
10 changes: 10 additions & 0 deletions src/test/java/com/timgroup/statsd/NamedPipeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,22 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.instanceOf;


import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Random;
import java.util.logging.Logger;

import org.junit.After;
import org.junit.Assume;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.Rule;
import org.junit.contrib.java.lang.system.EnvironmentVariables;
import static org.junit.Assert.assertEquals;

public class NamedPipeTest implements StatsDClientErrorHandler {
private static final Logger log = Logger.getLogger("NamedPipeTest");
Expand All @@ -21,6 +28,9 @@ public class NamedPipeTest implements StatsDClientErrorHandler {
private DummyStatsDServer server;
private volatile Exception lastException = new Exception();

@Rule
public final EnvironmentVariables environmentVariables = new EnvironmentVariables();

public synchronized void handle(Exception exception) {
log.info("Got exception: " + exception.getMessage());
lastException = exception;
Expand Down
Loading