Skip to content

Commit

Permalink
Avoid duplicate publish on StepMeterRegistry when closed within first…
Browse files Browse the repository at this point in the history
… step. (#4485)

Do not perform a publish for the previous step if there has not been a previous step yet. This is achieved by internally tracking when polling of meters happens (to rollover values) and it will skip doing the publish for the previous step if they have never been rolled over.

Resolves gh-4357
  • Loading branch information
lenin-jaganathan authored Dec 11, 2023
1 parent 16f59e9 commit 2735d26
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ public class OtlpMeterRegistry extends PushMeterRegistry {

private long deltaAggregationTimeUnixNano = 0L;

// Time when the last scheduled rollOver has started. Applicable only for delta
// flavour.
private long lastMeterRolloverStartTime = -1;

@Nullable
private ScheduledExecutorService meterPollingService;

Expand Down Expand Up @@ -244,8 +248,8 @@ protected DistributionStatisticConfig defaultHistogramConfig() {
public void close() {
stop();
if (config.enabled() && isDelta() && !isClosed()) {
if (!isDataPublishedForCurrentStep() && !isPublishing()) {
// Data was not published for the current step. So, we should flush that
if (shouldPublishDataForLastStep() && !isPublishing()) {
// Data was not published for the last step. So, we should flush that
// first.
try {
publish();
Expand All @@ -264,9 +268,13 @@ else if (isPublishing()) {
super.close();
}

private boolean isDataPublishedForCurrentStep() {
return (getLastScheduledPublishStartTime() / config.step().toMillis()) == (clock.wallTime()
/ config.step().toMillis());
private boolean shouldPublishDataForLastStep() {
if (lastMeterRolloverStartTime < 0)
return false;

final long lastPublishedStep = getLastScheduledPublishStartTime() / config.step().toMillis();
final long lastPolledStep = lastMeterRolloverStartTime / config.step().toMillis();
return lastPublishedStep < lastPolledStep;
}

// Either we do this or make StepMeter public
Expand Down Expand Up @@ -343,6 +351,7 @@ private Metric writeSum(Meter meter, DoubleSupplier count) {
*/
// VisibleForTesting
void pollMetersToRollover() {
this.lastMeterRolloverStartTime = clock.wallTime();
this.getMeters()
.forEach(m -> m.match(gauge -> null, Counter::count, Timer::takeSnapshot, DistributionSummary::takeSnapshot,
meter -> null, meter -> null, FunctionCounter::count, FunctionTimer::count, meter -> null));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -726,6 +726,20 @@ void whenCloseDuringScheduledPublish_thenPreviousStepAndCurrentPartialStepArePub
assertThat(registry.publishedFunctionTimerTotals.pop()).isEqualTo(24);
}

@Test
@Issue("#4357")
void publishOnceWhenClosedWithinFirstStep() {
// Set the initial clock time to a valid time.
MockClock mockClock = new MockClock();
mockClock.add(otlpConfig().step().multipliedBy(5));

TestOtlpMeterRegistry stepMeterRegistry = new TestOtlpMeterRegistry(otlpConfig(), mockClock);

assertThat(stepMeterRegistry.publishCount.get()).isZero();
stepMeterRegistry.close();
assertThat(stepMeterRegistry.publishCount.get()).isEqualTo(1);
}

private void assertEmptyHistogramSnapshot(HistogramSnapshot snapshot) {
assertThat(snapshot.count()).isZero();
assertThat(snapshot.total()).isZero();
Expand Down Expand Up @@ -771,6 +785,8 @@ private void assertHistogramContains(HistogramSnapshot snapshot, double total, d

private class TestOtlpMeterRegistry extends OtlpMeterRegistry {

private final AtomicInteger publishCount = new AtomicInteger();

Deque<Double> publishedCounterCounts = new ArrayDeque<>();

Deque<Long> publishedTimerCounts = new ArrayDeque<>();
Expand All @@ -795,18 +811,24 @@ private class TestOtlpMeterRegistry extends OtlpMeterRegistry {

Deque<Double> publishedFunctionTimerTotals = new ArrayDeque<>();

private long lastScheduledPublishStartTime = 0L;
private long lastScheduledPublishStartTime;

AtomicBoolean isPublishing = new AtomicBoolean(false);

CompletableFuture<Void> scheduledPublishingFuture = CompletableFuture.completedFuture(null);

TestOtlpMeterRegistry() {
super(otlpConfig(), OtlpDeltaMeterRegistryTest.this.clock);
this(otlpConfig(), OtlpDeltaMeterRegistryTest.this.clock);
}

TestOtlpMeterRegistry(OtlpConfig otlpConfig, Clock clock) {
super(otlpConfig, clock);
this.lastScheduledPublishStartTime = super.getLastScheduledPublishStartTime();
}

@Override
protected void publish() {
publishCount.incrementAndGet();
forEachMeter(meter -> meter.match(null, this::publishCounter, this::publishTimer, this::publishSummary,
null, null, this::publishFunctionCounter, this::publishFunctionTimer, null));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ public abstract class StepMeterRegistry extends PushMeterRegistry {
@Nullable
private ScheduledExecutorService meterPollingService;

// Time when the last scheduled rollOver has started.
private long lastMeterRolloverStartTime = -1;

public StepMeterRegistry(StepRegistryConfig config, Clock clock) {
super(config, clock);
this.config = config;
Expand Down Expand Up @@ -139,9 +142,9 @@ public void close() {
stop();

if (config.enabled() && !isClosed()) {
if (!isDataPublishedForCurrentStep() && !isPublishing()) {
// Data was not published for the current step. So, we should flush that
// first.
if (shouldPublishDataForLastStep() && !isPublishing()) {
// Data was not published for the last completed step. So, we should flush
// that first.
try {
publish();
}
Expand All @@ -159,9 +162,13 @@ else if (isPublishing()) {
super.close();
}

private boolean isDataPublishedForCurrentStep() {
return (getLastScheduledPublishStartTime() / config.step().toMillis()) == (clock.wallTime()
/ config.step().toMillis());
private boolean shouldPublishDataForLastStep() {
if (lastMeterRolloverStartTime < 0)
return false;

final long lastPublishedStep = getLastScheduledPublishStartTime() / config.step().toMillis();
final long lastPolledStep = lastMeterRolloverStartTime / config.step().toMillis();
return lastPublishedStep < lastPolledStep;
}

/**
Expand All @@ -181,6 +188,7 @@ private void closingRolloverStepMeters() {
*/
// VisibleForTesting
void pollMetersToRollover() {
this.lastMeterRolloverStartTime = clock.wallTime();
this.getMeters()
.forEach(m -> m.match(gauge -> null, Counter::count, Timer::count, DistributionSummary::count,
meter -> null, meter -> null, FunctionCounter::count, FunctionTimer::count, meter -> null));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@
*/
class StepMeterRegistryTest {

private final AtomicInteger publishes = new AtomicInteger();

private final MockClock clock = new MockClock();

private final StepRegistryConfig config = new StepRegistryConfig() {
Expand Down Expand Up @@ -98,9 +96,9 @@ void serviceLevelObjectivesOnlyNoPercentileHistogram() {
@Issue("#484")
@Test
void publishOneLastTimeOnClose() {
assertThat(publishes.get()).isEqualTo(0);
assertThat(registry.publishCount.get()).isZero();
registry.close();
assertThat(publishes.get()).isEqualTo(1);
assertThat(registry.publishCount.get()).isEqualTo(1);
}

@Issue("#1993")
Expand Down Expand Up @@ -425,7 +423,7 @@ void scheduledRollOver() {
}

@Test
@Issue("3914")
@Issue("#3914")
void publishShouldNotHappenWhenRegistryIsDisabled() {
StepRegistryConfig disabledStepRegistryConfig = new StepRegistryConfig() {
@Override
Expand All @@ -449,27 +447,26 @@ public String get(String key) {
Counter.builder("publish_disabled_counter").register(disabledStepMeterRegistry).increment();

clock.add(config.step());
assertThat(publishes.get()).isZero();
assertThat(disabledStepMeterRegistry.publishCount.get()).isZero();
disabledStepMeterRegistry.close();
assertThat(publishes.get()).isZero();
assertThat(disabledStepMeterRegistry.publishCount.get()).isZero();
}

@Test
@Issue("3914")
@Issue("#3914")
void publishShouldNotHappenWhenRegistryIsClosed() {
Counter.builder("my.counter").register(registry).increment();

clock.add(config.step());
assertThat(publishes.get()).isZero();
assertThat(registry.publishCount.get()).isZero();
registry.close();
assertThat(publishes.get()).isEqualTo(2);
assertThat(registry.publishedCounterCounts).hasSize(2);
assertThat(registry.publishedCounterCounts.getFirst()).isOne();
assertThat(registry.publishedCounterCounts.getLast()).isZero();
assertThat(registry.publishCount.get()).isEqualTo(1);
assertThat(registry.publishedCounterCounts).hasSize(1);

clock.add(config.step());
registry.close();
assertThat(publishes.get()).isEqualTo(2);
assertThat(registry.publishCount.get()).isEqualTo(1);
assertThat(registry.publishedCounterCounts).hasSize(1);
}

@Test
Expand Down Expand Up @@ -557,8 +554,23 @@ void whenCloseDuringScheduledPublish_thenPreviousStepAndCurrentPartialStepArePub
assertThat(registry.publishedFunctionTimerTotals.pop()).isEqualTo(24);
}

@Test
@Issue("#4357")
void publishOnceWhenClosedWithinFirstStep() {
// Set the initial clock time to a valid time.
MockClock mockClock = new MockClock();
mockClock.add(config.step().multipliedBy(5));

MyStepMeterRegistry stepMeterRegistry = new MyStepMeterRegistry(config, mockClock);
assertThat(stepMeterRegistry.publishCount.get()).isZero();
stepMeterRegistry.close();
assertThat(stepMeterRegistry.publishCount.get()).isEqualTo(1);
}

private class MyStepMeterRegistry extends StepMeterRegistry {

private final AtomicInteger publishCount = new AtomicInteger();

Deque<Double> publishedCounterCounts = new ArrayDeque<>();

Deque<Long> publishedTimerCounts = new ArrayDeque<>();
Expand All @@ -575,7 +587,7 @@ private class MyStepMeterRegistry extends StepMeterRegistry {

Deque<Double> publishedFunctionTimerTotals = new ArrayDeque<>();

private long lastScheduledPublishStartTime = 0L;
private long lastScheduledPublishStartTime;

@Nullable
Runnable prePublishAction;
Expand All @@ -590,6 +602,7 @@ private class MyStepMeterRegistry extends StepMeterRegistry {

MyStepMeterRegistry(StepRegistryConfig config, Clock clock) {
super(config, clock);
this.lastScheduledPublishStartTime = super.getLastScheduledPublishStartTime();
}

void setPrePublishAction(Runnable prePublishAction) {
Expand All @@ -601,7 +614,7 @@ protected void publish() {
if (prePublishAction != null) {
prePublishAction.run();
}
publishes.incrementAndGet();
publishCount.incrementAndGet();
getMeters().stream()
.map(meter -> meter.match(g -> null, this::publishCounter, this::publishTimer, this::publishSummary,
null, tg -> null, this::publishFunctionCounter, this::publishFunctionTimer, m -> null))
Expand Down

0 comments on commit 2735d26

Please sign in to comment.