Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,23 +1,50 @@
package datadog.trace.bootstrap.instrumentation.java.concurrent;

import datadog.trace.api.Platform;
import datadog.trace.api.config.ProfilingConfig;
import datadog.trace.api.profiling.QueueTiming;
import datadog.trace.api.profiling.Timer;
import datadog.trace.api.sampling.PerRecordingRateLimiter;
import datadog.trace.bootstrap.ContextStore;
import datadog.trace.bootstrap.config.provider.ConfigProvider;
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
import datadog.trace.bootstrap.instrumentation.jfr.InstrumentationBasedProfiling;
import java.time.Duration;
import java.time.temporal.ChronoUnit;

public class QueueTimerHelper {

private static final class RateLimiterHolder {
// indirection to prevent needing to instantiate the class and its transitive dependencies
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Did you add this preemptively, or was NI analysis complaining for a build?

// in graal native image
private static final PerRecordingRateLimiter RATE_LIMITER =
new PerRecordingRateLimiter(
Duration.of(500, ChronoUnit.MILLIS),
10_000, // hard limit on queue events
Duration.ofSeconds(
ConfigProvider.getInstance()
.getInteger(
ProfilingConfig.PROFILING_UPLOAD_PERIOD,
ProfilingConfig.PROFILING_UPLOAD_PERIOD_DEFAULT)));
}

public static <T> void startQueuingTimer(
ContextStore<T, State> taskContextStore, Class<?> schedulerClass, T task) {
State state = taskContextStore.get(task);
startQueuingTimer(state, schedulerClass, task);
}

public static void startQueuingTimer(State state, Class<?> schedulerClass, Object task) {
if (Platform.isNativeImageBuilder()) {
// explicitly not supported for Graal native image
return;
}
// avoid calling this before JFR is initialised because it will lead to reading the wrong
// TSC frequency before JFR has set it up properly
if (task != null && state != null && InstrumentationBasedProfiling.isJFRReady()) {
if (task != null
&& state != null
&& InstrumentationBasedProfiling.isJFRReady()
&& RateLimiterHolder.RATE_LIMITER.permit()) {
QueueTiming timing =
(QueueTiming) AgentTracer.get().getProfilingContext().start(Timer.TimerType.QUEUEING);
timing.setTask(task);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import datadog.trace.api.sampling.AdaptiveSampler;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import jdk.jfr.Event;
import jdk.jfr.EventType;

Expand All @@ -24,18 +23,4 @@ public void start() {
public boolean sample() {
return sampleType.isEnabled() && sampler.sample();
}

protected static int samplingWindowsPerRecording(
long uploadPeriodSeconds, Duration samplingWindow) {
/*
* Java8 doesn't have dividedBy#Duration so we have to implement poor man's version.
* None of these durations should be big enough to warrant dealing with bigints.
* We also do not care about nanoseconds here.
*/
return (int)
Math.min(
Duration.of(uploadPeriodSeconds, ChronoUnit.SECONDS).toMillis()
/ samplingWindow.toMillis(),
Integer.MAX_VALUE);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package datadog.trace.bootstrap.instrumentation.jfr.backpressure;

import static datadog.trace.api.sampling.PerRecordingRateLimiter.samplingWindowsPerRecording;

import datadog.trace.api.Config;
import datadog.trace.bootstrap.instrumentation.jfr.WindowSampler;
import java.time.Duration;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package datadog.trace.bootstrap.instrumentation.jfr.directallocation;

import static datadog.trace.api.sampling.PerRecordingRateLimiter.samplingWindowsPerRecording;

import datadog.trace.api.Config;
import datadog.trace.bootstrap.instrumentation.jfr.WindowSampler;
import java.time.Duration;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package datadog.trace.bootstrap.instrumentation.jfr.exceptions;

import static datadog.trace.api.sampling.PerRecordingRateLimiter.samplingWindowsPerRecording;

import datadog.trace.api.Config;
import datadog.trace.bootstrap.instrumentation.jfr.WindowSampler;
import java.time.Duration;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package datadog.trace.api.sampling;

import java.time.Duration;
import java.time.temporal.ChronoUnit;

public class PerRecordingRateLimiter {

private final AdaptiveSampler sampler;

public PerRecordingRateLimiter(Duration windowDuration, int limit, Duration recordingLength) {
this(windowDuration, limit, recordingLength, 16);
}

public PerRecordingRateLimiter(
Duration windowDuration, int limit, Duration recordingLength, int budgetLookback) {
int lookback = samplingWindowsPerRecording(recordingLength.getSeconds(), windowDuration);
int samplesPerWindow =
limit / samplingWindowsPerRecording(recordingLength.getSeconds(), windowDuration);
sampler = new AdaptiveSampler(windowDuration, samplesPerWindow, lookback, budgetLookback, true);
}

public boolean permit() {
return sampler.sample();
}

public static int samplingWindowsPerRecording(long uploadPeriodSeconds, Duration samplingWindow) {
/*
* Java8 doesn't have dividedBy#Duration so we have to implement poor man's version.
* None of these durations should be big enough to warrant dealing with bigints.
* We also do not care about nanoseconds here.
*/
return (int)
Math.min(
Duration.of(uploadPeriodSeconds, ChronoUnit.SECONDS).toMillis()
/ samplingWindow.toMillis(),
Integer.MAX_VALUE);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package datadog.trace.api.sampling;

import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.time.Duration;
import java.util.Arrays;
import org.junit.jupiter.api.Test;

public class PerRecordingRateLimiterTest {

@Test
public void testLimitApplied() {
Duration window = Duration.ofMillis(100);
int limit = 20;
Duration recordingDurationSeconds = Duration.ofSeconds(1);
PerRecordingRateLimiter rateLimiter =
new PerRecordingRateLimiter(window, limit, recordingDurationSeconds, 5);
// no rate limiting is applied during the first window
int[] slots = new int[(int) (recordingDurationSeconds.toMillis() / window.toMillis())];
long start = System.nanoTime();
while (true) {
int slot = (int) (NANOSECONDS.toMillis(System.nanoTime() - start) / window.toMillis());
if (slot >= slots.length) {
break;
}
if (rateLimiter.permit()) {
slots[slot]++;
}
}
assertTrue(Arrays.stream(slots).max().orElse(limit + 1) <= limit);
assertTrue(Arrays.stream(slots).sum() <= 2 * limit);
}
}