Skip to content

Commit

Permalink
Start publish at randomized time in next step (#3750)
Browse files Browse the repository at this point in the history
This avoids different instances calling publish at the same time. It will avoid scheduling publishing at the last 20% of a step interval to avoid publishing spilling over into a subsequent step.
An additional background thread is added for StepMeterRegistry that will poll all meters at the start of the step, separate from when publishing happens. This is so the values published are the ones recorded in the previous step. We call rollCount for step meters as early as possible in the step to minimize the number of recordings that will be reported for the wrong step. This is not perfect, but it is no worse than the status quo.

Resolves gh-2818
  • Loading branch information
shakuzen authored Apr 11, 2023
1 parent 923717e commit c167402
Show file tree
Hide file tree
Showing 4 changed files with 154 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.util.TimeUtils;

import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
Expand All @@ -32,6 +33,10 @@ public abstract class PushMeterRegistry extends MeterRegistry {

private static final InternalLogger logger = InternalLoggerFactory.getInstance(PushMeterRegistry.class);

// Schedule publishing in the beginning X percent of the step to avoid spill-over into
// the next step.
private static final double PERCENT_RANGE_OF_RANDOM_PUBLISHING_OFFSET = 0.8;

private final PushRegistryConfig config;

private final AtomicBoolean publishing = new AtomicBoolean(false);
Expand Down Expand Up @@ -97,9 +102,8 @@ public void start(ThreadFactory threadFactory) {
+ TimeUtils.format(config.step()));

scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(threadFactory);
// time publication to happen just after StepValue finishes the step
long stepMillis = config.step().toMillis();
long initialDelayMillis = stepMillis - (clock.wallTime() % stepMillis) + 1;
long initialDelayMillis = calculateInitialDelay();
scheduledExecutorService.scheduleAtFixedRate(this::publishSafely, initialDelayMillis, stepMillis,
TimeUnit.MILLISECONDS);
}
Expand All @@ -121,4 +125,16 @@ public void close() {
super.close();
}

// VisibleForTesting
long calculateInitialDelay() {
long stepMillis = config.step().toMillis();
Random random = new Random();
// in range of [0, X% of step - 2)
long randomOffsetWithinStep = Math.max(0,
(long) (stepMillis * random.nextDouble() * PERCENT_RANGE_OF_RANDOM_PUBLISHING_OFFSET) - 2);
long offsetToStartOfNextStep = stepMillis - (clock.wallTime() % stepMillis);
// at least 2ms into step, so it is after StepMeterRegistry's meterPollingService
return offsetToStartOfNextStep + 2 + randomOffsetWithinStep;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
import io.micrometer.core.instrument.internal.DefaultMeter;
import io.micrometer.core.instrument.push.PushMeterRegistry;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.ToDoubleFunction;
import java.util.function.ToLongFunction;
Expand All @@ -39,6 +42,9 @@ public abstract class StepMeterRegistry extends PushMeterRegistry {

private final StepRegistryConfig config;

@Nullable
private ScheduledExecutorService meterPollingService;

public StepMeterRegistry(StepRegistryConfig config, Clock clock) {
super(config, clock);
this.config = config;
Expand Down Expand Up @@ -104,6 +110,26 @@ protected DistributionStatisticConfig defaultHistogramConfig() {
.merge(DistributionStatisticConfig.DEFAULT);
}

@Override
public void start(ThreadFactory threadFactory) {
super.start(threadFactory);

if (config.enabled()) {
this.meterPollingService = Executors.newSingleThreadScheduledExecutor(threadFactory);

this.meterPollingService.scheduleAtFixedRate(this::pollMetersToRollover, getInitialDelay(),
config.step().toMillis(), TimeUnit.MILLISECONDS);
}
}

@Override
public void stop() {
super.stop();
if (this.meterPollingService != null) {
this.meterPollingService.shutdown();
}
}

@Override
public void close() {
stop();
Expand All @@ -116,4 +142,22 @@ public void close() {
super.close();
}

/**
* This will poll the values from meters, which will cause a roll over for Step-meters
* if past the step boundary. This gives some control over when roll over happens
* separate from when publishing happens.
*/
// VisibleForTesting
void pollMetersToRollover() {
this.getMeters()
.forEach(m -> m.match(gauge -> null, Counter::count, Timer::count, DistributionSummary::count,
meter -> null, meter -> null, FunctionCounter::count, FunctionTimer::count, meter -> null));
}

private long getInitialDelay() {
long stepMillis = config.step().toMillis();
// schedule one millisecond into the next step
return stepMillis - (clock.wallTime() % stepMillis) + 1;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io.micrometer.core.Issue;
import io.micrometer.core.instrument.*;
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.distribution.DistributionStatisticConfig;
import io.micrometer.core.instrument.distribution.pause.PauseDetector;
import io.micrometer.core.instrument.step.StepMeterRegistry;
Expand All @@ -25,17 +26,16 @@
import org.junit.jupiter.api.Test;

import java.time.Duration;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Deque;
import java.util.Map;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.ToDoubleFunction;
import java.util.function.ToLongFunction;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.LongStream;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.concurrent.TimeUnit.*;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;

Expand All @@ -44,12 +44,13 @@
*/
class PushMeterRegistryTest {

static final Duration STEP_DURATION = Duration.ofMillis(10);
static ThreadFactory threadFactory = new NamedThreadFactory("PushMeterRegistryTest");

StepRegistryConfig config = new StepRegistryConfig() {
@Override
public Duration step() {
return Duration.ofMillis(10);
return STEP_DURATION;
}

@Override
Expand Down Expand Up @@ -120,6 +121,32 @@ void scheduledPublishOverlapWithPublishOnClose() throws InterruptedException {
assertThat(firstPublishValues.pop()).isEqualTo(2.5);
}

@Test
@Issue("#2818")
void publishTimeIsRandomizedWithinStep() {
Duration startTime = Duration.ofMillis(4);
MockClock clock = new MockClock();
clock.add(-1, MILLISECONDS); // set time to 0
clock.add(startTime);
PushMeterRegistry registry = new CountingPushMeterRegistry(config, clock);
long minOffsetMillis = 8; // 4 (start) + 8 (offset) = 12 (2ms into next step)
// exclusive upper bound
long maxOffsetMillis = 14; // 4 (start) + 14 (offset) = 18 (8ms into next step;
// 80% of step is 8ms)
Set<Long> observedDelays = new HashSet<>((int) (maxOffsetMillis - minOffsetMillis));
IntStream.range(0, 10_000).forEach(i -> {
long delay = registry.calculateInitialDelay();
// isBetween is inclusive; subtract 1 from exclusive max offset
assertThat(delay).isBetween(minOffsetMillis, maxOffsetMillis - 1);
observedDelays.add(delay);
});
Long[] expectedDelays = LongStream.range(minOffsetMillis, maxOffsetMillis)
.boxed()
.collect(Collectors.toList())
.toArray(new Long[0]);
assertThat(observedDelays).containsExactly(expectedDelays);
}

private static class OverlappingStepMeterRegistry extends StepMeterRegistry {

private final AtomicInteger numberOfPublishes = new AtomicInteger();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,62 @@ void publishOnCloseCrossesStepBoundary() {
assertThat(registry.publishedFunctionTimerTotals.pop()).isEqualTo(53);
}

@Test
@Issue("#2818")
void scheduledRollOver() {
Counter counter = Counter.builder("counter").register(registry);
counter.increment();
Timer timer = Timer.builder("timer").register(registry);
timer.record(5, MILLISECONDS);
DistributionSummary summary = DistributionSummary.builder("summary").register(registry);
summary.record(7);
FunctionCounter functionCounter = FunctionCounter.builder("counter.function", this, obj -> 15)
.register(registry);
FunctionTimer functionTimer = FunctionTimer.builder("timer.function", this, obj -> 3, obj -> 53, MILLISECONDS)
.register(registry);
Gauge.builder("gauge", () -> 12).register(registry);

// before rollover
assertThat(counter.count()).isZero();
assertThat(timer.count()).isZero();
assertThat(timer.totalTime(MILLISECONDS)).isZero();
assertThat(summary.count()).isZero();
assertThat(summary.totalAmount()).isZero();
assertThat(functionCounter.count()).isZero();
assertThat(functionTimer.count()).isZero();
assertThat(functionTimer.totalTime(MILLISECONDS)).isZero();

clock.addSeconds(60);
// simulate this being scheduled at the start of the step
registry.pollMetersToRollover();

clock.addSeconds(1);
// these recordings belong to the current step and should not be published
counter.increment();
timer.record(5, MILLISECONDS);
summary.record(8);
clock.addSeconds(10);

// recordings that happened in the previous step should be published
registry.publish();
assertThat(registry.publishedCounterCounts).hasSize(1);
assertThat(registry.publishedCounterCounts.pop()).isOne();
assertThat(registry.publishedTimerCounts).hasSize(1);
assertThat(registry.publishedTimerCounts.pop()).isOne();
assertThat(registry.publishedTimerSumMilliseconds).hasSize(1);
assertThat(registry.publishedTimerSumMilliseconds.pop()).isEqualTo(5.0);
assertThat(registry.publishedSummaryCounts).hasSize(1);
assertThat(registry.publishedSummaryCounts.pop()).isOne();
assertThat(registry.publishedSummaryTotals).hasSize(1);
assertThat(registry.publishedSummaryTotals.pop()).isEqualTo(7);
assertThat(registry.publishedFunctionCounterCounts).hasSize(1);
assertThat(registry.publishedFunctionCounterCounts.pop()).isEqualTo(15);
assertThat(registry.publishedFunctionTimerCounts).hasSize(1);
assertThat(registry.publishedFunctionTimerCounts.pop()).isEqualTo(3);
assertThat(registry.publishedFunctionTimerTotals).hasSize(1);
assertThat(registry.publishedFunctionTimerTotals.pop()).isEqualTo(53);
}

private class MyStepMeterRegistry extends StepMeterRegistry {

Deque<Double> publishedCounterCounts = new ArrayDeque<>();
Expand Down Expand Up @@ -378,8 +434,8 @@ protected void publish() {
}
publishes.incrementAndGet();
getMeters().stream()
.map(meter -> meter.match(null, this::publishCounter, this::publishTimer, this::publishSummary, null,
null, this::publishFunctionCounter, this::publishFunctionTimer, null))
.map(meter -> meter.match(g -> null, this::publishCounter, this::publishTimer, this::publishSummary,
null, tg -> null, this::publishFunctionCounter, this::publishFunctionTimer, m -> null))
.collect(Collectors.toList());
}

Expand Down

0 comments on commit c167402

Please sign in to comment.