Skip to content

Commit

Permalink
Add builder method
Browse files Browse the repository at this point in the history
  • Loading branch information
iksaif committed Jan 17, 2024
1 parent 95843b6 commit d50bb85
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 17 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.timgroup.statsd;

import jnr.constants.platform.Sock;
import jnr.unixsocket.UnixSocketAddress;

import java.net.InetAddress;
Expand Down Expand Up @@ -132,6 +133,25 @@ public NonBlockingStatsDClientBuilder namedPipe(String val) {
return this;
}

private Callable<SocketAddress> socketLookup(final String path, final UnixSocketAddressWithTransport.TransportType transport) {
return new Callable<SocketAddress>() {
@Override
public SocketAddress call() throws Exception {
return new UnixSocketAddressWithTransport(new UnixSocketAddress(path), transport);
}
};
}

public NonBlockingStatsDClientBuilder socket(final String path, final UnixSocketAddressWithTransport.TransportType transport) {
addressLookup = socketLookup(path, transport);
return this;
}

public NonBlockingStatsDClientBuilder telemetrySocket(final String path, final UnixSocketAddressWithTransport.TransportType transport) {
telemetryAddressLookup = socketLookup(path, transport);
return this;
}

public NonBlockingStatsDClientBuilder prefix(String val) {
prefix = val;
return this;
Expand Down
12 changes: 4 additions & 8 deletions src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -134,17 +134,13 @@ private void connect() throws IOException {
// We'd have better timeout support if we used Java 16's native Unix domain socket support (JEP 380)
delegate.setOption(UnixSocketOptions.SO_SNDTIMEO, connectionTimeout);
}
delegate.connect(address);
while (!delegate.finishConnect()) {
// wait for connection to be established
try {
Thread.sleep(10);
} catch (InterruptedException e) {
throw new IOException("Interrupted while waiting for connection", e);
}
if (!delegate.connect(address)) {
if (connectionTimeout > 0 && System.nanoTime() > deadline) {
throw new IOException("Connection timed out");
}
if (!delegate.finishConnect()) {
throw new IOException("Connection failed");
}
}

if (timeout > 0) {
Expand Down
11 changes: 2 additions & 9 deletions src/test/java/com/timgroup/statsd/UnixStreamSocketTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,8 @@ public void start() throws IOException {

server = new UnixStreamSocketDummyStatsDServer(socketFile.toString());

Callable<SocketAddress> addressLookup = new Callable<SocketAddress>() {
@Override
public SocketAddress call() throws Exception {
return new UnixSocketAddressWithTransport(new UnixSocketAddress(socketFile.getPath()), UnixSocketAddressWithTransport.TransportType.UDS_STREAM);
}
};

client = new NonBlockingStatsDClientBuilder().prefix("my.prefix")
.addressLookup(addressLookup)
.socket(socketFile.getPath(), UnixSocketAddressWithTransport.TransportType.UDS_STREAM)
.port(0)
.queueSize(1)
.timeout(500) // non-zero timeout to ensure exception triggered if socket buffer full.
Expand All @@ -71,7 +64,7 @@ public SocketAddress call() throws Exception {
.build();

clientAggregate = new NonBlockingStatsDClientBuilder().prefix("my.prefix")
.addressLookup(addressLookup)
.socket(socketFile.getPath(), UnixSocketAddressWithTransport.TransportType.UDS_STREAM)
.port(0)
.queueSize(1)
.timeout(500) // non-zero timeout to ensure exception triggered if socket buffer full.
Expand Down

0 comments on commit d50bb85

Please sign in to comment.