Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ private static class State {
boolean supportsDropping;
String state;
String configEndpoint;
String debuggerEndpoint;
String debuggerLogEndpoint;
String debuggerSnapshotEndpoint;
String debuggerDiagnosticsEndpoint;
String evpProxyEndpoint;
String version;
Expand Down Expand Up @@ -266,15 +267,17 @@ private boolean processInfoResponse(State newState, String response) {
}
}

// both debugger endpoint v2 and diagnostics endpoint are forwarding events to the DEBUGGER
// intake
// because older agents support diagnostics, we fallback to it before falling back to v1
if (containsEndpoint(endpoints, DEBUGGER_ENDPOINT_V1)) {
newState.debuggerLogEndpoint = DEBUGGER_ENDPOINT_V1;
}
// both debugger v2 and diagnostics endpoints are forwarding events to the DEBUGGER intake
// because older agents support diagnostics, we fall back to it before falling back to v1
if (containsEndpoint(endpoints, DEBUGGER_ENDPOINT_V2)) {
newState.debuggerEndpoint = DEBUGGER_ENDPOINT_V2;
newState.debuggerSnapshotEndpoint = DEBUGGER_ENDPOINT_V2;
} else if (containsEndpoint(endpoints, DEBUGGER_DIAGNOSTICS_ENDPOINT)) {
newState.debuggerEndpoint = DEBUGGER_DIAGNOSTICS_ENDPOINT;
newState.debuggerSnapshotEndpoint = DEBUGGER_DIAGNOSTICS_ENDPOINT;
} else if (containsEndpoint(endpoints, DEBUGGER_ENDPOINT_V1)) {
newState.debuggerEndpoint = DEBUGGER_ENDPOINT_V1;
newState.debuggerSnapshotEndpoint = DEBUGGER_ENDPOINT_V1;
}
if (containsEndpoint(endpoints, DEBUGGER_DIAGNOSTICS_ENDPOINT)) {
newState.debuggerDiagnosticsEndpoint = DEBUGGER_DIAGNOSTICS_ENDPOINT;
Expand Down Expand Up @@ -359,11 +362,15 @@ public boolean supportsMetrics() {
}

public boolean supportsDebugger() {
return discoveryState.debuggerEndpoint != null;
return discoveryState.debuggerLogEndpoint != null;
}

public String getDebuggerSnapshotEndpoint() {
return discoveryState.debuggerSnapshotEndpoint;
}

public String getDebuggerEndpoint() {
return discoveryState.debuggerEndpoint;
public String getDebuggerLogEndpoint() {
return discoveryState.debuggerLogEndpoint;
}

public boolean supportsDebuggerDiagnostics() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class DDAgentFeaturesDiscoveryTest extends DDSpecification {
features.state() == INFO_STATE
features.getConfigEndpoint() == V7_CONFIG_ENDPOINT
features.supportsDebugger()
features.getDebuggerEndpoint() == "debugger/v2/input"
features.getDebuggerSnapshotEndpoint() == "debugger/v2/input"
features.supportsDebuggerDiagnostics()
features.supportsEvpProxy()
features.supportsContentEncodingHeadersWithEvpProxy()
Expand Down Expand Up @@ -440,7 +440,7 @@ class DDAgentFeaturesDiscoveryTest extends DDSpecification {
1 * client.newCall(_) >> { Request request -> infoResponse(request, INFO_WITH_TELEMETRY_PROXY_RESPONSE) }
features.supportsTelemetryProxy()
features.supportsDebugger()
features.getDebuggerEndpoint() == "debugger/v1/input"
features.getDebuggerSnapshotEndpoint() == "debugger/v1/input"
!features.supportsDebuggerDiagnostics()
0 * _
}
Expand All @@ -459,7 +459,7 @@ class DDAgentFeaturesDiscoveryTest extends DDSpecification {
features.getEvpProxyEndpoint() == "evp_proxy/v2/" // v3 is advertised, but the tracer should ignore it
!features.supportsContentEncodingHeadersWithEvpProxy()
features.supportsDebugger()
features.getDebuggerEndpoint() == "debugger/v1/diagnostics"
features.getDebuggerSnapshotEndpoint() == "debugger/v1/diagnostics"
features.supportsDebuggerDiagnostics()
0 * _
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,14 +282,19 @@ private static DebuggerSink createDebuggerSink(
DDAgentFeaturesDiscovery ddAgentFeaturesDiscovery,
ProbeStatusSink probeStatusSink) {
String tags = getDefaultTagsMergedWithGlobalTags(config);
SnapshotSink snapshotSink =
new SnapshotSink(
BatchUploader lowRateUploader =
new BatchUploader(
"Snapshots",
config,
tags,
new BatchUploader(
config,
getDebuggerEndpoint(config, ddAgentFeaturesDiscovery),
SnapshotSink.RETRY_POLICY));
getSnapshotEndpoint(config, ddAgentFeaturesDiscovery),
SnapshotSink.RETRY_POLICY);
BatchUploader highRateUploader =
new BatchUploader(
"Logs",
config,
getLogEndpoint(config, ddAgentFeaturesDiscovery),
SnapshotSink.RETRY_POLICY);
SnapshotSink snapshotSink = new SnapshotSink(config, tags, lowRateUploader, highRateUploader);
SymbolSink symbolSink = new SymbolSink(config);
return new DebuggerSink(
config,
Expand Down Expand Up @@ -323,11 +328,21 @@ public static String getDefaultTagsMergedWithGlobalTags(Config config) {
return debuggerTags + "," + globalTags;
}

private static String getDebuggerEndpoint(
private static String getLogEndpoint(
Config config, DDAgentFeaturesDiscovery ddAgentFeaturesDiscovery) {
if (ddAgentFeaturesDiscovery.supportsDebugger()) {
return ddAgentFeaturesDiscovery
.buildUrl(ddAgentFeaturesDiscovery.getDebuggerLogEndpoint())
.toString();
}
return config.getFinalDebuggerSnapshotUrl();
}

private static String getSnapshotEndpoint(
Config config, DDAgentFeaturesDiscovery ddAgentFeaturesDiscovery) {
if (ddAgentFeaturesDiscovery.supportsDebugger()) {
return ddAgentFeaturesDiscovery
.buildUrl(ddAgentFeaturesDiscovery.getDebuggerEndpoint())
.buildUrl(ddAgentFeaturesDiscovery.getDebuggerSnapshotEndpoint())
.toString();
}
return config.getFinalDebuggerSnapshotUrl();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,15 @@ public DebuggerTransformer(Config config, Configuration configuration) {
config,
"",
new BatchUploader(
config, config.getFinalDebuggerSnapshotUrl(), SnapshotSink.RETRY_POLICY)),
"Snapshots",
config,
config.getFinalDebuggerSnapshotUrl(),
SnapshotSink.RETRY_POLICY),
new BatchUploader(
"Logs",
config,
config.getFinalDebuggerSnapshotUrl(),
SnapshotSink.RETRY_POLICY)),
Comment on lines +187 to +192
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can see that we are using RETRY_POLICY always? Can it be some sort of default?
Also I have a feeling that RETRY_POLICY should be in BatchUploader class - WDYT?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for SymDB the policy is different see

public static final BatchUploader.RetryPolicy RETRY_POLICY = new BatchUploader.RetryPolicy(10);

new SymbolSink(config)));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,12 @@ public DebuggerSink(Config config, ProbeStatusSink probeStatusSink) {
config,
null,
new BatchUploader(
config, config.getFinalDebuggerSnapshotUrl(), SnapshotSink.RETRY_POLICY)),
"Snapshots",
config,
config.getFinalDebuggerSnapshotUrl(),
SnapshotSink.RETRY_POLICY),
new BatchUploader(
"Logs", config, config.getFinalDebuggerSnapshotUrl(), SnapshotSink.RETRY_POLICY)),
new SymbolSink(config));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,10 @@ public class ProbeStatusSink {
private final boolean useMultiPart;

public ProbeStatusSink(Config config, String diagnosticsEndpoint, boolean useMultiPart) {
this(config, new BatchUploader(config, diagnosticsEndpoint, RETRY_POLICY), useMultiPart);
this(
config,
new BatchUploader("Diagnostics", config, diagnosticsEndpoint, RETRY_POLICY),
useMultiPart);
}

ProbeStatusSink(Config config, BatchUploader diagnosticUploader, boolean useMultiPart) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Collects snapshots that needs to be sent to the backend */
/**
* Collects snapshots that needs to be sent to the backend the notion of high/low rate is used to
* control the flush interval low rate is used for snapshots with Captures (deep variable capture,
* captureSnapshot = true) high rate is used for snapshots without Captures (dynamic templated logs)
*/
public class SnapshotSink {
private static final Logger LOGGER = LoggerFactory.getLogger(SnapshotSink.class);
public static final int MAX_SNAPSHOT_SIZE = 1024 * 1024;
Expand All @@ -32,25 +36,32 @@ public class SnapshotSink {
static final long HIGH_RATE_STEP_SIZE = 10;
public static final BatchUploader.RetryPolicy RETRY_POLICY = new BatchUploader.RetryPolicy(0);

// low rate queue (aka snapshots)
private final BlockingQueue<Snapshot> lowRateSnapshots =
new ArrayBlockingQueue<>(LOW_RATE_CAPACITY);
// high rate queue (aka dynamic templated logs)
private final BlockingQueue<Snapshot> highRateSnapshots =
new ArrayBlockingQueue<>(HIGH_RATE_CAPACITY);
private final String serviceName;
private final int batchSize;
private final String tags;
private final BatchUploader snapshotUploader;
// uploader for low rate (aka snapshots)
private final BatchUploader lowRateUploader;
// uploader for high rate (aka dynamic templated logs)
private final BatchUploader highRateUploader;
private final AgentTaskScheduler highRateScheduler =
new AgentTaskScheduler(AgentThreadFactory.AgentThread.DEBUGGER_SNAPSHOT_SERIALIZER);
private final AtomicBoolean started = new AtomicBoolean();
private volatile AgentTaskScheduler.Scheduled<SnapshotSink> highRateScheduled;
private volatile long currentHighRateFlushInterval = HIGH_RATE_MAX_FLUSH_INTERVAL_MS;

public SnapshotSink(Config config, String tags, BatchUploader snapshotUploader) {
public SnapshotSink(
Config config, String tags, BatchUploader lowRateUploader, BatchUploader highRateUploader) {
this.serviceName = TagsHelper.sanitize(config.getServiceName());
this.batchSize = config.getDynamicInstrumentationUploadBatchSize();
this.tags = tags;
this.snapshotUploader = snapshotUploader;
this.lowRateUploader = lowRateUploader;
this.highRateUploader = highRateUploader;
}

public void start() {
Expand All @@ -66,7 +77,8 @@ public void stop() {
if (localScheduled != null) {
localScheduled.cancel();
}
snapshotUploader.shutdown();
lowRateUploader.shutdown();
highRateUploader.shutdown();
started.set(false);
}

Expand All @@ -75,7 +87,7 @@ public void lowRateFlush(String tags) {
if (snapshots.isEmpty()) {
return;
}
uploadPayloads(snapshots, tags);
uploadPayloads(lowRateUploader, snapshots, tags);
}

public void highRateFlush(SnapshotSink ignored) {
Expand All @@ -87,12 +99,12 @@ public void highRateFlush(SnapshotSink ignored) {
}
int count = snapshots.size();
reconsiderHighRateFlushInterval(count);
uploadPayloads(snapshots, tags);
uploadPayloads(highRateUploader, snapshots, tags);
} while (!highRateSnapshots.isEmpty());
}

public HttpUrl getUrl() {
return snapshotUploader.getUrl();
return lowRateUploader.getUrl();
}

public long remainingCapacity() {
Expand Down Expand Up @@ -194,10 +206,10 @@ private String serializeSnapshot(String serviceName, Snapshot snapshot) {
return prunedStr;
}

private void uploadPayloads(List<String> payloads, String tags) {
private static void uploadPayloads(BatchUploader uploader, List<String> payloads, String tags) {
List<byte[]> batches = IntakeBatchHelper.createBatches(payloads);
for (byte[] batch : batches) {
snapshotUploader.upload(batch, tags);
uploader.upload(batch, tags);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class SymbolSink {
public SymbolSink(Config config) {
this(
config,
new BatchUploader(config, config.getFinalDebuggerSymDBUrl(), RETRY_POLICY),
new BatchUploader("SymDB", config, config.getFinalDebuggerSymDBUrl(), RETRY_POLICY),
MAX_SYMDB_UPLOAD_SIZE);
}

Expand Down
Loading