From c1674020a091dad421987dc34e0868a4e9680695 Mon Sep 17 00:00:00 2001 From: Tommy Ludwig <8924140+shakuzen@users.noreply.github.com> Date: Tue, 11 Apr 2023 13:33:58 +0900 Subject: [PATCH] Start publish at randomized time in next step (#3750) This avoids different instances calling publish at the same time. It will avoid scheduling publishing at the last 20% of a step interval to avoid publishing spilling over into a subsequent step. An additional background thread is added for StepMeterRegistry that will poll all meters at the start of the step, separate from when publishing happens. This is so the values published are the ones recorded in the previous step. We call rollCount for step meters as early as possible in the step to minimize the number of recordings that will be reported for the wrong step. This is not perfect, but it is no worse than the status quo. Resolves gh-2818 --- .../instrument/push/PushMeterRegistry.java | 20 ++++++- .../instrument/step/StepMeterRegistry.java | 44 ++++++++++++++ .../push/PushMeterRegistryTest.java | 41 ++++++++++--- .../step/StepMeterRegistryTest.java | 60 ++++++++++++++++++- 4 files changed, 154 insertions(+), 11 deletions(-) diff --git a/micrometer-core/src/main/java/io/micrometer/core/instrument/push/PushMeterRegistry.java b/micrometer-core/src/main/java/io/micrometer/core/instrument/push/PushMeterRegistry.java index 2c9daf729a..22ccf58911 100644 --- a/micrometer-core/src/main/java/io/micrometer/core/instrument/push/PushMeterRegistry.java +++ b/micrometer-core/src/main/java/io/micrometer/core/instrument/push/PushMeterRegistry.java @@ -22,6 +22,7 @@ import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.util.TimeUtils; +import java.util.Random; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; @@ -32,6 +33,10 @@ public abstract class PushMeterRegistry extends MeterRegistry { private static final InternalLogger logger = InternalLoggerFactory.getInstance(PushMeterRegistry.class); + // Schedule publishing in the beginning X percent of the step to avoid spill-over into + // the next step. + private static final double PERCENT_RANGE_OF_RANDOM_PUBLISHING_OFFSET = 0.8; + private final PushRegistryConfig config; private final AtomicBoolean publishing = new AtomicBoolean(false); @@ -97,9 +102,8 @@ public void start(ThreadFactory threadFactory) { + TimeUtils.format(config.step())); scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(threadFactory); - // time publication to happen just after StepValue finishes the step long stepMillis = config.step().toMillis(); - long initialDelayMillis = stepMillis - (clock.wallTime() % stepMillis) + 1; + long initialDelayMillis = calculateInitialDelay(); scheduledExecutorService.scheduleAtFixedRate(this::publishSafely, initialDelayMillis, stepMillis, TimeUnit.MILLISECONDS); } @@ -121,4 +125,16 @@ public void close() { super.close(); } + // VisibleForTesting + long calculateInitialDelay() { + long stepMillis = config.step().toMillis(); + Random random = new Random(); + // in range of [0, X% of step - 2) + long randomOffsetWithinStep = Math.max(0, + (long) (stepMillis * random.nextDouble() * PERCENT_RANGE_OF_RANDOM_PUBLISHING_OFFSET) - 2); + long offsetToStartOfNextStep = stepMillis - (clock.wallTime() % stepMillis); + // at least 2ms into step, so it is after StepMeterRegistry's meterPollingService + return offsetToStartOfNextStep + 2 + randomOffsetWithinStep; + } + } diff --git a/micrometer-core/src/main/java/io/micrometer/core/instrument/step/StepMeterRegistry.java b/micrometer-core/src/main/java/io/micrometer/core/instrument/step/StepMeterRegistry.java index 66435c9d0f..164ee964f0 100644 --- a/micrometer-core/src/main/java/io/micrometer/core/instrument/step/StepMeterRegistry.java +++ b/micrometer-core/src/main/java/io/micrometer/core/instrument/step/StepMeterRegistry.java @@ -25,6 +25,9 @@ import io.micrometer.core.instrument.internal.DefaultMeter; import io.micrometer.core.instrument.push.PushMeterRegistry; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.function.ToDoubleFunction; import java.util.function.ToLongFunction; @@ -39,6 +42,9 @@ public abstract class StepMeterRegistry extends PushMeterRegistry { private final StepRegistryConfig config; + @Nullable + private ScheduledExecutorService meterPollingService; + public StepMeterRegistry(StepRegistryConfig config, Clock clock) { super(config, clock); this.config = config; @@ -104,6 +110,26 @@ protected DistributionStatisticConfig defaultHistogramConfig() { .merge(DistributionStatisticConfig.DEFAULT); } + @Override + public void start(ThreadFactory threadFactory) { + super.start(threadFactory); + + if (config.enabled()) { + this.meterPollingService = Executors.newSingleThreadScheduledExecutor(threadFactory); + + this.meterPollingService.scheduleAtFixedRate(this::pollMetersToRollover, getInitialDelay(), + config.step().toMillis(), TimeUnit.MILLISECONDS); + } + } + + @Override + public void stop() { + super.stop(); + if (this.meterPollingService != null) { + this.meterPollingService.shutdown(); + } + } + @Override public void close() { stop(); @@ -116,4 +142,22 @@ public void close() { super.close(); } + /** + * This will poll the values from meters, which will cause a roll over for Step-meters + * if past the step boundary. This gives some control over when roll over happens + * separate from when publishing happens. + */ + // VisibleForTesting + void pollMetersToRollover() { + this.getMeters() + .forEach(m -> m.match(gauge -> null, Counter::count, Timer::count, DistributionSummary::count, + meter -> null, meter -> null, FunctionCounter::count, FunctionTimer::count, meter -> null)); + } + + private long getInitialDelay() { + long stepMillis = config.step().toMillis(); + // schedule one millisecond into the next step + return stepMillis - (clock.wallTime() % stepMillis) + 1; + } + } diff --git a/micrometer-core/src/test/java/io/micrometer/core/instrument/push/PushMeterRegistryTest.java b/micrometer-core/src/test/java/io/micrometer/core/instrument/push/PushMeterRegistryTest.java index 2ffb100ce4..8093fb00a9 100644 --- a/micrometer-core/src/test/java/io/micrometer/core/instrument/push/PushMeterRegistryTest.java +++ b/micrometer-core/src/test/java/io/micrometer/core/instrument/push/PushMeterRegistryTest.java @@ -17,6 +17,7 @@ import io.micrometer.core.Issue; import io.micrometer.core.instrument.*; +import io.micrometer.core.instrument.Timer; import io.micrometer.core.instrument.distribution.DistributionStatisticConfig; import io.micrometer.core.instrument.distribution.pause.PauseDetector; import io.micrometer.core.instrument.step.StepMeterRegistry; @@ -25,17 +26,16 @@ import org.junit.jupiter.api.Test; import java.time.Duration; -import java.util.ArrayDeque; -import java.util.Arrays; -import java.util.Deque; -import java.util.Map; +import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.ToDoubleFunction; import java.util.function.ToLongFunction; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.LongStream; -import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static java.util.concurrent.TimeUnit.SECONDS; +import static java.util.concurrent.TimeUnit.*; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatCode; @@ -44,12 +44,13 @@ */ class PushMeterRegistryTest { + static final Duration STEP_DURATION = Duration.ofMillis(10); static ThreadFactory threadFactory = new NamedThreadFactory("PushMeterRegistryTest"); StepRegistryConfig config = new StepRegistryConfig() { @Override public Duration step() { - return Duration.ofMillis(10); + return STEP_DURATION; } @Override @@ -120,6 +121,32 @@ void scheduledPublishOverlapWithPublishOnClose() throws InterruptedException { assertThat(firstPublishValues.pop()).isEqualTo(2.5); } + @Test + @Issue("#2818") + void publishTimeIsRandomizedWithinStep() { + Duration startTime = Duration.ofMillis(4); + MockClock clock = new MockClock(); + clock.add(-1, MILLISECONDS); // set time to 0 + clock.add(startTime); + PushMeterRegistry registry = new CountingPushMeterRegistry(config, clock); + long minOffsetMillis = 8; // 4 (start) + 8 (offset) = 12 (2ms into next step) + // exclusive upper bound + long maxOffsetMillis = 14; // 4 (start) + 14 (offset) = 18 (8ms into next step; + // 80% of step is 8ms) + Set observedDelays = new HashSet<>((int) (maxOffsetMillis - minOffsetMillis)); + IntStream.range(0, 10_000).forEach(i -> { + long delay = registry.calculateInitialDelay(); + // isBetween is inclusive; subtract 1 from exclusive max offset + assertThat(delay).isBetween(minOffsetMillis, maxOffsetMillis - 1); + observedDelays.add(delay); + }); + Long[] expectedDelays = LongStream.range(minOffsetMillis, maxOffsetMillis) + .boxed() + .collect(Collectors.toList()) + .toArray(new Long[0]); + assertThat(observedDelays).containsExactly(expectedDelays); + } + private static class OverlappingStepMeterRegistry extends StepMeterRegistry { private final AtomicInteger numberOfPublishes = new AtomicInteger(); diff --git a/micrometer-core/src/test/java/io/micrometer/core/instrument/step/StepMeterRegistryTest.java b/micrometer-core/src/test/java/io/micrometer/core/instrument/step/StepMeterRegistryTest.java index 4aeb7b4ec2..bc059293f5 100644 --- a/micrometer-core/src/test/java/io/micrometer/core/instrument/step/StepMeterRegistryTest.java +++ b/micrometer-core/src/test/java/io/micrometer/core/instrument/step/StepMeterRegistryTest.java @@ -342,6 +342,62 @@ void publishOnCloseCrossesStepBoundary() { assertThat(registry.publishedFunctionTimerTotals.pop()).isEqualTo(53); } + @Test + @Issue("#2818") + void scheduledRollOver() { + Counter counter = Counter.builder("counter").register(registry); + counter.increment(); + Timer timer = Timer.builder("timer").register(registry); + timer.record(5, MILLISECONDS); + DistributionSummary summary = DistributionSummary.builder("summary").register(registry); + summary.record(7); + FunctionCounter functionCounter = FunctionCounter.builder("counter.function", this, obj -> 15) + .register(registry); + FunctionTimer functionTimer = FunctionTimer.builder("timer.function", this, obj -> 3, obj -> 53, MILLISECONDS) + .register(registry); + Gauge.builder("gauge", () -> 12).register(registry); + + // before rollover + assertThat(counter.count()).isZero(); + assertThat(timer.count()).isZero(); + assertThat(timer.totalTime(MILLISECONDS)).isZero(); + assertThat(summary.count()).isZero(); + assertThat(summary.totalAmount()).isZero(); + assertThat(functionCounter.count()).isZero(); + assertThat(functionTimer.count()).isZero(); + assertThat(functionTimer.totalTime(MILLISECONDS)).isZero(); + + clock.addSeconds(60); + // simulate this being scheduled at the start of the step + registry.pollMetersToRollover(); + + clock.addSeconds(1); + // these recordings belong to the current step and should not be published + counter.increment(); + timer.record(5, MILLISECONDS); + summary.record(8); + clock.addSeconds(10); + + // recordings that happened in the previous step should be published + registry.publish(); + assertThat(registry.publishedCounterCounts).hasSize(1); + assertThat(registry.publishedCounterCounts.pop()).isOne(); + assertThat(registry.publishedTimerCounts).hasSize(1); + assertThat(registry.publishedTimerCounts.pop()).isOne(); + assertThat(registry.publishedTimerSumMilliseconds).hasSize(1); + assertThat(registry.publishedTimerSumMilliseconds.pop()).isEqualTo(5.0); + assertThat(registry.publishedSummaryCounts).hasSize(1); + assertThat(registry.publishedSummaryCounts.pop()).isOne(); + assertThat(registry.publishedSummaryTotals).hasSize(1); + assertThat(registry.publishedSummaryTotals.pop()).isEqualTo(7); + assertThat(registry.publishedFunctionCounterCounts).hasSize(1); + assertThat(registry.publishedFunctionCounterCounts.pop()).isEqualTo(15); + assertThat(registry.publishedFunctionTimerCounts).hasSize(1); + assertThat(registry.publishedFunctionTimerCounts.pop()).isEqualTo(3); + assertThat(registry.publishedFunctionTimerTotals).hasSize(1); + assertThat(registry.publishedFunctionTimerTotals.pop()).isEqualTo(53); + } + private class MyStepMeterRegistry extends StepMeterRegistry { Deque publishedCounterCounts = new ArrayDeque<>(); @@ -378,8 +434,8 @@ protected void publish() { } publishes.incrementAndGet(); getMeters().stream() - .map(meter -> meter.match(null, this::publishCounter, this::publishTimer, this::publishSummary, null, - null, this::publishFunctionCounter, this::publishFunctionTimer, null)) + .map(meter -> meter.match(g -> null, this::publishCounter, this::publishTimer, this::publishSummary, + null, tg -> null, this::publishFunctionCounter, this::publishFunctionTimer, m -> null)) .collect(Collectors.toList()); }