Skip to content

Commit

Permalink
Merge branch '1.5.x' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
shakuzen committed Aug 6, 2020
2 parents 9c167c1 + a0b4525 commit a5a9608
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.micrometer.core.instrument.util.HierarchicalNameMapper;
import io.micrometer.core.lang.Nullable;
import io.micrometer.statsd.internal.*;
import io.netty.util.AttributeKey;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
Expand Down Expand Up @@ -78,7 +79,7 @@ public class StatsdMeterRegistry extends MeterRegistry {
private final AtomicBoolean started = new AtomicBoolean(false);
DirectProcessor<String> processor = DirectProcessor.create();
FluxSink<String> fluxSink = new NoopFluxSink();
Disposable.Swap client = Disposables.swap();
Disposable.Swap statsdConnection = Disposables.swap();
private Disposable.Swap meterPoller = Disposables.swap();

@Nullable
Expand All @@ -87,6 +88,8 @@ public class StatsdMeterRegistry extends MeterRegistry {
@Nullable
private Consumer<String> lineSink;

private static final AttributeKey<Boolean> CONNECTION_DISPOSED = AttributeKey.valueOf("doOnDisconnectCalled");

public StatsdMeterRegistry(StatsdConfig config, Clock clock) {
this(config, HierarchicalNameMapper.DEFAULT, clock);
}
Expand Down Expand Up @@ -230,7 +233,12 @@ private void prepareUdpClient(Publisher<String> publisher) {
.neverComplete()
.retryWhen(Retry.indefinitely().filter(throwable -> throwable instanceof PortUnreachableException))
)
.doOnDisconnected(connection -> connectAndSubscribe(udpClientReference.get()));
.doOnDisconnected(connection -> {
Boolean connectionDisposed = connection.channel().attr(CONNECTION_DISPOSED).getAndSet(Boolean.TRUE);
if (connectionDisposed == null || !connectionDisposed) {
connectAndSubscribe(udpClientReference.get());
}
});
udpClientReference.set(udpClient);
connectAndSubscribe(udpClient);
}
Expand All @@ -243,7 +251,12 @@ private void prepareTcpClient(Publisher<String> publisher) {
.handle((in, out) -> out
.sendString(publisher)
.neverComplete())
.doOnDisconnected(connection -> connectAndSubscribe(tcpClientReference.get()));
.doOnDisconnected(connection -> {
Boolean connectionDisposed = connection.channel().attr(CONNECTION_DISPOSED).getAndSet(Boolean.TRUE);
if (connectionDisposed == null || !connectionDisposed) {
connectAndSubscribe(tcpClientReference.get());
}
});
tcpClientReference.set(tcpClient);
connectAndSubscribe(tcpClient);
}
Expand All @@ -266,11 +279,11 @@ private void connectAndSubscribe(UdpClient udpClient) {
}));
}

private void retryReplaceClient(Mono<? extends Connection> connection) {
connection
private void retryReplaceClient(Mono<? extends Connection> connectMono) {
connectMono
.retryWhen(Retry.backoff(Long.MAX_VALUE, Duration.ofSeconds(1)).maxBackoff(Duration.ofMinutes(1)))
.subscribe(client -> {
this.client.update(client);
.subscribe(connection -> {
this.statsdConnection.replace(connection);

// now that we're connected, start polling gauges and other pollable meter types
startPolling();
Expand All @@ -285,8 +298,8 @@ private void startPolling() {

public void stop() {
if (started.compareAndSet(true, false)) {
if (client.get() != null) {
client.get().dispose();
if (statsdConnection.get() != null) {
statsdConnection.get().dispose();
}
if (meterPoller.get() != null) {
meterPoller.get().dispose();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class StatsdMeterRegistryPublishTest {
StatsdMeterRegistry meterRegistry;
DisposableChannel server;
CountDownLatch serverLatch;
AtomicInteger serverMetricReadCount = new AtomicInteger();

volatile boolean bound = false;

Expand Down Expand Up @@ -99,7 +100,7 @@ void resumeSendingMetrics_whenServerIntermittentlyFails(StatsdProtocol protocol)
counter.increment(1);
assertThat(serverLatch.await(5, TimeUnit.SECONDS)).isTrue();

Disposable firstClient = meterRegistry.client.get();
Disposable firstClient = meterRegistry.statsdConnection.get();

server.disposeNow();
serverLatch = new CountDownLatch(3);
Expand All @@ -119,7 +120,7 @@ void resumeSendingMetrics_whenServerIntermittentlyFails(StatsdProtocol protocol)
Counter.builder("another.counter").register(meterRegistry).increment();

if (protocol == StatsdProtocol.TCP) {
await().until(() -> meterRegistry.client.get() != firstClient);
await().until(() -> meterRegistry.statsdConnection.get() != firstClient);
}

counter.increment(5);
Expand Down Expand Up @@ -190,7 +191,7 @@ void whenBackendInitiallyDown_metricsSentAfterBackendStarts(StatsdProtocol proto
server = startServer(protocol, port);
if (protocol == StatsdProtocol.TCP) {
// client is null until TcpClient first connects
await().until(() -> meterRegistry.client.get() != null);
await().until(() -> meterRegistry.statsdConnection.get() != null);
// TcpClient may take some time to reconnect to the server
await().until(() -> !clientIsDisposed());
}
Expand Down Expand Up @@ -226,10 +227,37 @@ void whenRegistryStopped_doNotConnectToBackend(StatsdProtocol protocol) throws I
assertThat(serverLatch.await(1, TimeUnit.SECONDS)).isFalse();
}

@ParameterizedTest
@EnumSource(StatsdProtocol.class)
@Issue("#2177")
void whenSendError_reconnectsAndWritesNewMetrics(StatsdProtocol protocol) throws InterruptedException {
serverLatch = new CountDownLatch(3);
server = startServer(protocol, 0);
final int port = server.address().getPort();
meterRegistry = new StatsdMeterRegistry(getUnbufferedConfig(protocol, port), Clock.SYSTEM);
startRegistryAndWaitForClient();
((Connection) meterRegistry.statsdConnection.get()).addHandler("writeFailure", new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
throw new RuntimeException("write error for testing purposes");
}
});
Counter counter = Counter.builder("my.counter").register(meterRegistry);
// write will cause error
counter.increment();
// wait for reconnect
await().until(() -> !clientIsDisposed());
// remove write exception handler
((Connection) meterRegistry.statsdConnection.get()).removeHandler("writeFailure");
IntStream.range(1, 4).forEach(counter::increment);
assertThat(serverLatch.await(3, TimeUnit.SECONDS)).isTrue();
await().pollDelay(Duration.ofSeconds(1)).atMost(Duration.ofSeconds(3)).until(() -> serverMetricReadCount.get() == 3);
}

private void trackWritesForUdpClient(StatsdProtocol protocol, AtomicInteger writeCount) {
if (protocol == StatsdProtocol.UDP) {
await().until(() -> meterRegistry.client.get() != null);
((Connection) meterRegistry.client.get())
await().until(() -> meterRegistry.statsdConnection.get() != null);
((Connection) meterRegistry.statsdConnection.get())
.addHandler(new LoggingHandler("udpclient", LogLevel.INFO))
.addHandler(new ChannelOutboundHandlerAdapter() {
@Override
Expand All @@ -247,7 +275,7 @@ private void startRegistryAndWaitForClient() {
}

private boolean clientIsDisposed() {
return meterRegistry.client.get().isDisposed();
return meterRegistry.statsdConnection.get().isDisposed();
}

private DisposableChannel startServer(StatsdProtocol protocol, int port) {
Expand All @@ -259,11 +287,12 @@ private DisposableChannel startServer(StatsdProtocol protocol, int port) {
in.receive().asString()
.flatMap(packet -> {
serverLatch.countDown();
serverMetricReadCount.getAndIncrement();
return Flux.never();
}))
.doOnBound((server) -> bound = true)
.doOnUnbound((server) -> bound = false)
.wiretap(true)
.wiretap("udpserver", LogLevel.INFO)
.bindNow(Duration.ofSeconds(2));
} else if (protocol == StatsdProtocol.TCP) {
AtomicReference<DisposableChannel> channel = new AtomicReference<>();
Expand All @@ -273,14 +302,19 @@ private DisposableChannel startServer(StatsdProtocol protocol, int port) {
.handle((in, out) ->
in.receive().asString()
.flatMap(packet -> {
IntStream.range(0, packet.split("my.counter").length - 1).forEach(i -> serverLatch.countDown());
IntStream.range(0, packet.split("my.counter").length - 1).forEach(i -> {
serverLatch.countDown();
serverMetricReadCount.getAndIncrement();
});
in.withConnection(channel::set);
return Flux.never();
}))
.doOnBound((server) -> bound = true)
.doOnUnbound((server) -> {
bound = false;
channel.get().dispose();
if (channel.get() != null) {
channel.get().dispose();
}
})
.wiretap("tcpserver", LogLevel.INFO)
.bindNow(Duration.ofSeconds(5));
Expand Down

0 comments on commit a5a9608

Please sign in to comment.