Skip to content

Commit

Permalink
Fix library cleanup by making the Telemetry TimerTask a daemon and ma…
Browse files Browse the repository at this point in the history
…king the StatsDProcessor use daemon threads in its executor (DataDog#117)
  • Loading branch information
blevz authored and David Byron committed Jan 11, 2021
1 parent 8294ecf commit 215448e
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,6 @@ protected boolean send(final Message message) {
return false;
}

boolean isShutdown() {
return shutdown;
}

void shutdown() {
shutdown = true;
}
Expand Down
25 changes: 23 additions & 2 deletions src/main/java/com/timgroup/statsd/StatsDProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

public abstract class StatsDProcessor implements Runnable {
Expand Down Expand Up @@ -73,7 +74,17 @@ protected void writeBuilderToSendBuffer(ByteBuffer sendBuffer) {
this.workers = workers;
this.qcapacity = queueSize;

this.executor = Executors.newFixedThreadPool(workers);
this.executor = Executors.newFixedThreadPool(workers, new ThreadFactory() {
final ThreadFactory delegate = Executors.defaultThreadFactory();
@Override
public Thread newThread(final Runnable runnable) {
final Thread result = delegate.newThread(runnable);
result.setName("StatsD-Processor-" + result.getName());
result.setDaemon(true);
return result;
}
});

this.bufferPool = new BufferPool(poolSize, maxPacketSizeBytes, true);
this.outboundQueue = new ArrayBlockingQueue<ByteBuffer>(poolSize);
this.endSignal = new CountDownLatch(workers);
Expand All @@ -86,7 +97,16 @@ protected void writeBuilderToSendBuffer(ByteBuffer sendBuffer) {
this.workers = processor.workers;
this.qcapacity = processor.getQcapacity();

this.executor = Executors.newFixedThreadPool(this.workers);
this.executor = Executors.newFixedThreadPool(workers, new ThreadFactory() {
final ThreadFactory delegate = Executors.defaultThreadFactory();
@Override
public Thread newThread(final Runnable runnable) {
final Thread result = delegate.newThread(runnable);
result.setName("StatsD-Processor-" + result.getName());
result.setDaemon(true);
return result;
}
});
this.bufferPool = new BufferPool(processor.bufferPool);
this.outboundQueue = new ArrayBlockingQueue<ByteBuffer>(this.bufferPool.getSize());
this.endSignal = new CountDownLatch(this.workers);
Expand Down Expand Up @@ -127,6 +147,7 @@ public void run() {
}

boolean isShutdown() {
executor.shutdown();
return shutdown;
}

Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/timgroup/statsd/Telemetry.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public final void writeTo(StringBuilder builder) {
*/
public void start(final long flushInterval) {
// flush the telemetry at regualar interval
this.timer = new Timer();
this.timer = new Timer(true);
this.timer.scheduleAtFixedRate(new TelemetryTask(this), flushInterval, flushInterval);
}

Expand Down

0 comments on commit 215448e

Please sign in to comment.