diff --git a/communication/src/main/java/datadog/communication/ddagent/DDAgentFeaturesDiscovery.java b/communication/src/main/java/datadog/communication/ddagent/DDAgentFeaturesDiscovery.java index e3db5e91593..e5694b5cbff 100644 --- a/communication/src/main/java/datadog/communication/ddagent/DDAgentFeaturesDiscovery.java +++ b/communication/src/main/java/datadog/communication/ddagent/DDAgentFeaturesDiscovery.java @@ -89,7 +89,8 @@ private static class State { boolean supportsDropping; String state; String configEndpoint; - String debuggerEndpoint; + String debuggerLogEndpoint; + String debuggerSnapshotEndpoint; String debuggerDiagnosticsEndpoint; String evpProxyEndpoint; String version; @@ -266,15 +267,17 @@ private boolean processInfoResponse(State newState, String response) { } } - // both debugger endpoint v2 and diagnostics endpoint are forwarding events to the DEBUGGER - // intake - // because older agents support diagnostics, we fallback to it before falling back to v1 + if (containsEndpoint(endpoints, DEBUGGER_ENDPOINT_V1)) { + newState.debuggerLogEndpoint = DEBUGGER_ENDPOINT_V1; + } + // both debugger v2 and diagnostics endpoints are forwarding events to the DEBUGGER intake + // because older agents support diagnostics, we fall back to it before falling back to v1 if (containsEndpoint(endpoints, DEBUGGER_ENDPOINT_V2)) { - newState.debuggerEndpoint = DEBUGGER_ENDPOINT_V2; + newState.debuggerSnapshotEndpoint = DEBUGGER_ENDPOINT_V2; } else if (containsEndpoint(endpoints, DEBUGGER_DIAGNOSTICS_ENDPOINT)) { - newState.debuggerEndpoint = DEBUGGER_DIAGNOSTICS_ENDPOINT; + newState.debuggerSnapshotEndpoint = DEBUGGER_DIAGNOSTICS_ENDPOINT; } else if (containsEndpoint(endpoints, DEBUGGER_ENDPOINT_V1)) { - newState.debuggerEndpoint = DEBUGGER_ENDPOINT_V1; + newState.debuggerSnapshotEndpoint = DEBUGGER_ENDPOINT_V1; } if (containsEndpoint(endpoints, DEBUGGER_DIAGNOSTICS_ENDPOINT)) { newState.debuggerDiagnosticsEndpoint = DEBUGGER_DIAGNOSTICS_ENDPOINT; @@ -359,11 +362,15 @@ public boolean supportsMetrics() { } public boolean supportsDebugger() { - return discoveryState.debuggerEndpoint != null; + return discoveryState.debuggerLogEndpoint != null; + } + + public String getDebuggerSnapshotEndpoint() { + return discoveryState.debuggerSnapshotEndpoint; } - public String getDebuggerEndpoint() { - return discoveryState.debuggerEndpoint; + public String getDebuggerLogEndpoint() { + return discoveryState.debuggerLogEndpoint; } public boolean supportsDebuggerDiagnostics() { diff --git a/communication/src/test/groovy/datadog/communication/ddagent/DDAgentFeaturesDiscoveryTest.groovy b/communication/src/test/groovy/datadog/communication/ddagent/DDAgentFeaturesDiscoveryTest.groovy index ed3130060b8..6b7125bd2b1 100644 --- a/communication/src/test/groovy/datadog/communication/ddagent/DDAgentFeaturesDiscoveryTest.groovy +++ b/communication/src/test/groovy/datadog/communication/ddagent/DDAgentFeaturesDiscoveryTest.groovy @@ -66,7 +66,7 @@ class DDAgentFeaturesDiscoveryTest extends DDSpecification { features.state() == INFO_STATE features.getConfigEndpoint() == V7_CONFIG_ENDPOINT features.supportsDebugger() - features.getDebuggerEndpoint() == "debugger/v2/input" + features.getDebuggerSnapshotEndpoint() == "debugger/v2/input" features.supportsDebuggerDiagnostics() features.supportsEvpProxy() features.supportsContentEncodingHeadersWithEvpProxy() @@ -440,7 +440,7 @@ class DDAgentFeaturesDiscoveryTest extends DDSpecification { 1 * client.newCall(_) >> { Request request -> infoResponse(request, INFO_WITH_TELEMETRY_PROXY_RESPONSE) } features.supportsTelemetryProxy() features.supportsDebugger() - features.getDebuggerEndpoint() == "debugger/v1/input" + features.getDebuggerSnapshotEndpoint() == "debugger/v1/input" !features.supportsDebuggerDiagnostics() 0 * _ } @@ -459,7 +459,7 @@ class DDAgentFeaturesDiscoveryTest extends DDSpecification { features.getEvpProxyEndpoint() == "evp_proxy/v2/" // v3 is advertised, but the tracer should ignore it !features.supportsContentEncodingHeadersWithEvpProxy() features.supportsDebugger() - features.getDebuggerEndpoint() == "debugger/v1/diagnostics" + features.getDebuggerSnapshotEndpoint() == "debugger/v1/diagnostics" features.supportsDebuggerDiagnostics() 0 * _ } diff --git a/dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/agent/DebuggerAgent.java b/dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/agent/DebuggerAgent.java index 343322a26fd..eaa32c5c9e7 100644 --- a/dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/agent/DebuggerAgent.java +++ b/dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/agent/DebuggerAgent.java @@ -282,14 +282,19 @@ private static DebuggerSink createDebuggerSink( DDAgentFeaturesDiscovery ddAgentFeaturesDiscovery, ProbeStatusSink probeStatusSink) { String tags = getDefaultTagsMergedWithGlobalTags(config); - SnapshotSink snapshotSink = - new SnapshotSink( + BatchUploader lowRateUploader = + new BatchUploader( + "Snapshots", config, - tags, - new BatchUploader( - config, - getDebuggerEndpoint(config, ddAgentFeaturesDiscovery), - SnapshotSink.RETRY_POLICY)); + getSnapshotEndpoint(config, ddAgentFeaturesDiscovery), + SnapshotSink.RETRY_POLICY); + BatchUploader highRateUploader = + new BatchUploader( + "Logs", + config, + getLogEndpoint(config, ddAgentFeaturesDiscovery), + SnapshotSink.RETRY_POLICY); + SnapshotSink snapshotSink = new SnapshotSink(config, tags, lowRateUploader, highRateUploader); SymbolSink symbolSink = new SymbolSink(config); return new DebuggerSink( config, @@ -323,11 +328,21 @@ public static String getDefaultTagsMergedWithGlobalTags(Config config) { return debuggerTags + "," + globalTags; } - private static String getDebuggerEndpoint( + private static String getLogEndpoint( + Config config, DDAgentFeaturesDiscovery ddAgentFeaturesDiscovery) { + if (ddAgentFeaturesDiscovery.supportsDebugger()) { + return ddAgentFeaturesDiscovery + .buildUrl(ddAgentFeaturesDiscovery.getDebuggerLogEndpoint()) + .toString(); + } + return config.getFinalDebuggerSnapshotUrl(); + } + + private static String getSnapshotEndpoint( Config config, DDAgentFeaturesDiscovery ddAgentFeaturesDiscovery) { if (ddAgentFeaturesDiscovery.supportsDebugger()) { return ddAgentFeaturesDiscovery - .buildUrl(ddAgentFeaturesDiscovery.getDebuggerEndpoint()) + .buildUrl(ddAgentFeaturesDiscovery.getDebuggerSnapshotEndpoint()) .toString(); } return config.getFinalDebuggerSnapshotUrl(); diff --git a/dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/agent/DebuggerTransformer.java b/dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/agent/DebuggerTransformer.java index 975b602e9e5..f74999726ff 100644 --- a/dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/agent/DebuggerTransformer.java +++ b/dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/agent/DebuggerTransformer.java @@ -181,7 +181,15 @@ public DebuggerTransformer(Config config, Configuration configuration) { config, "", new BatchUploader( - config, config.getFinalDebuggerSnapshotUrl(), SnapshotSink.RETRY_POLICY)), + "Snapshots", + config, + config.getFinalDebuggerSnapshotUrl(), + SnapshotSink.RETRY_POLICY), + new BatchUploader( + "Logs", + config, + config.getFinalDebuggerSnapshotUrl(), + SnapshotSink.RETRY_POLICY)), new SymbolSink(config))); } diff --git a/dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/sink/DebuggerSink.java b/dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/sink/DebuggerSink.java index 4891d0969e5..d95971db2f1 100644 --- a/dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/sink/DebuggerSink.java +++ b/dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/sink/DebuggerSink.java @@ -51,7 +51,12 @@ public DebuggerSink(Config config, ProbeStatusSink probeStatusSink) { config, null, new BatchUploader( - config, config.getFinalDebuggerSnapshotUrl(), SnapshotSink.RETRY_POLICY)), + "Snapshots", + config, + config.getFinalDebuggerSnapshotUrl(), + SnapshotSink.RETRY_POLICY), + new BatchUploader( + "Logs", config, config.getFinalDebuggerSnapshotUrl(), SnapshotSink.RETRY_POLICY)), new SymbolSink(config)); } diff --git a/dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/sink/ProbeStatusSink.java b/dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/sink/ProbeStatusSink.java index 9c2ef391948..1a8958a8a96 100644 --- a/dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/sink/ProbeStatusSink.java +++ b/dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/sink/ProbeStatusSink.java @@ -48,7 +48,10 @@ public class ProbeStatusSink { private final boolean useMultiPart; public ProbeStatusSink(Config config, String diagnosticsEndpoint, boolean useMultiPart) { - this(config, new BatchUploader(config, diagnosticsEndpoint, RETRY_POLICY), useMultiPart); + this( + config, + new BatchUploader("Diagnostics", config, diagnosticsEndpoint, RETRY_POLICY), + useMultiPart); } ProbeStatusSink(Config config, BatchUploader diagnosticUploader, boolean useMultiPart) { diff --git a/dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/sink/SnapshotSink.java b/dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/sink/SnapshotSink.java index 7eefe2ff296..247d9d1663e 100644 --- a/dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/sink/SnapshotSink.java +++ b/dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/sink/SnapshotSink.java @@ -18,7 +18,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** Collects snapshots that needs to be sent to the backend */ +/** + * Collects snapshots that needs to be sent to the backend the notion of high/low rate is used to + * control the flush interval low rate is used for snapshots with Captures (deep variable capture, + * captureSnapshot = true) high rate is used for snapshots without Captures (dynamic templated logs) + */ public class SnapshotSink { private static final Logger LOGGER = LoggerFactory.getLogger(SnapshotSink.class); public static final int MAX_SNAPSHOT_SIZE = 1024 * 1024; @@ -32,25 +36,32 @@ public class SnapshotSink { static final long HIGH_RATE_STEP_SIZE = 10; public static final BatchUploader.RetryPolicy RETRY_POLICY = new BatchUploader.RetryPolicy(0); + // low rate queue (aka snapshots) private final BlockingQueue lowRateSnapshots = new ArrayBlockingQueue<>(LOW_RATE_CAPACITY); + // high rate queue (aka dynamic templated logs) private final BlockingQueue highRateSnapshots = new ArrayBlockingQueue<>(HIGH_RATE_CAPACITY); private final String serviceName; private final int batchSize; private final String tags; - private final BatchUploader snapshotUploader; + // uploader for low rate (aka snapshots) + private final BatchUploader lowRateUploader; + // uploader for high rate (aka dynamic templated logs) + private final BatchUploader highRateUploader; private final AgentTaskScheduler highRateScheduler = new AgentTaskScheduler(AgentThreadFactory.AgentThread.DEBUGGER_SNAPSHOT_SERIALIZER); private final AtomicBoolean started = new AtomicBoolean(); private volatile AgentTaskScheduler.Scheduled highRateScheduled; private volatile long currentHighRateFlushInterval = HIGH_RATE_MAX_FLUSH_INTERVAL_MS; - public SnapshotSink(Config config, String tags, BatchUploader snapshotUploader) { + public SnapshotSink( + Config config, String tags, BatchUploader lowRateUploader, BatchUploader highRateUploader) { this.serviceName = TagsHelper.sanitize(config.getServiceName()); this.batchSize = config.getDynamicInstrumentationUploadBatchSize(); this.tags = tags; - this.snapshotUploader = snapshotUploader; + this.lowRateUploader = lowRateUploader; + this.highRateUploader = highRateUploader; } public void start() { @@ -66,7 +77,8 @@ public void stop() { if (localScheduled != null) { localScheduled.cancel(); } - snapshotUploader.shutdown(); + lowRateUploader.shutdown(); + highRateUploader.shutdown(); started.set(false); } @@ -75,7 +87,7 @@ public void lowRateFlush(String tags) { if (snapshots.isEmpty()) { return; } - uploadPayloads(snapshots, tags); + uploadPayloads(lowRateUploader, snapshots, tags); } public void highRateFlush(SnapshotSink ignored) { @@ -87,12 +99,12 @@ public void highRateFlush(SnapshotSink ignored) { } int count = snapshots.size(); reconsiderHighRateFlushInterval(count); - uploadPayloads(snapshots, tags); + uploadPayloads(highRateUploader, snapshots, tags); } while (!highRateSnapshots.isEmpty()); } public HttpUrl getUrl() { - return snapshotUploader.getUrl(); + return lowRateUploader.getUrl(); } public long remainingCapacity() { @@ -194,10 +206,10 @@ private String serializeSnapshot(String serviceName, Snapshot snapshot) { return prunedStr; } - private void uploadPayloads(List payloads, String tags) { + private static void uploadPayloads(BatchUploader uploader, List payloads, String tags) { List batches = IntakeBatchHelper.createBatches(payloads); for (byte[] batch : batches) { - snapshotUploader.upload(batch, tags); + uploader.upload(batch, tags); } } } diff --git a/dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/sink/SymbolSink.java b/dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/sink/SymbolSink.java index 9e3122597c8..be1a709c234 100644 --- a/dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/sink/SymbolSink.java +++ b/dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/sink/SymbolSink.java @@ -53,7 +53,7 @@ public class SymbolSink { public SymbolSink(Config config) { this( config, - new BatchUploader(config, config.getFinalDebuggerSymDBUrl(), RETRY_POLICY), + new BatchUploader("SymDB", config, config.getFinalDebuggerSymDBUrl(), RETRY_POLICY), MAX_SYMDB_UPLOAD_SIZE); } diff --git a/dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/uploader/BatchUploader.java b/dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/uploader/BatchUploader.java index 7a7b0726e09..3d478707e67 100644 --- a/dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/uploader/BatchUploader.java +++ b/dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/uploader/BatchUploader.java @@ -84,6 +84,7 @@ public RetryPolicy(int maxFailures) { public static final MediaType APPLICATION_JSON = MediaType.get("application/json"); public static final MediaType APPLICATION_GZIP = MediaType.get("application/gzip"); + private final String name; private final String containerId; private final String entityId; private final ExecutorService okHttpExecutorService; @@ -98,8 +99,9 @@ public RetryPolicy(int maxFailures) { private final Phaser inflightRequests = new Phaser(1); - public BatchUploader(Config config, String endpoint, RetryPolicy retryPolicy) { + public BatchUploader(String name, Config config, String endpoint, RetryPolicy retryPolicy) { this( + name, config, endpoint, new RatelimitedLogger(LOGGER, MINUTES_BETWEEN_ERROR_LOG, TimeUnit.MINUTES), @@ -107,11 +109,13 @@ public BatchUploader(Config config, String endpoint, RetryPolicy retryPolicy) { } BatchUploader( + String name, Config config, String endpoint, RatelimitedLogger ratelimitedLogger, RetryPolicy retryPolicy) { this( + name, config, endpoint, ratelimitedLogger, @@ -122,18 +126,20 @@ public BatchUploader(Config config, String endpoint, RetryPolicy retryPolicy) { // Visible for testing BatchUploader( + String name, Config config, String endpoint, RatelimitedLogger ratelimitedLogger, RetryPolicy retryPolicy, String containerId, String entityId) { + this.name = name; instrumentTheWorld = config.getDynamicInstrumentationInstrumentTheWorld() != null; if (endpoint == null || endpoint.length() == 0) { throw new IllegalArgumentException("Endpoint url is empty"); } urlBase = HttpUrl.get(endpoint); - LOGGER.debug("Started BatchUploader with target url {}", urlBase); + LOGGER.debug("Started BatchUploader[{}] with target url {}", name, urlBase); apiKey = config.getApiKey(); this.ratelimitedLogger = ratelimitedLogger; // This is the same thing OkHttp Dispatcher is doing except thread naming and daemonization @@ -162,7 +168,7 @@ public BatchUploader(Config config, String endpoint, RetryPolicy retryPolicy) { null, /* proxyPassword */ requestTimeout.toMillis()); responseCallback = - new ResponseCallback(ratelimitedLogger, inflightRequests, client, retryPolicy); + new ResponseCallback(name, ratelimitedLogger, inflightRequests, client, retryPolicy); debuggerMetrics = DebuggerMetrics.getInstance(config); } @@ -241,7 +247,7 @@ private void makeUploadRequest(byte[] json, String tags) { private void buildAndSendRequest(RequestBody body, int contentLength, String tags) { debuggerMetrics.histogram("batch.uploader.request.size", contentLength); if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Uploading batch data size={} bytes", contentLength); + LOGGER.debug("[{}] Uploading batch data size={} bytes", name, contentLength); } HttpUrl.Builder builder = urlBase.newBuilder(); if (tags != null && !tags.isEmpty()) { @@ -269,7 +275,7 @@ private void buildAndSendRequest(RequestBody body, int contentLength, String tag requestBuilder.addHeader(HEADER_DD_ENTITY_ID, entityId); } Request request = requestBuilder.build(); - LOGGER.debug("Sending request: {} CT: {}", request, request.body().contentType()); + LOGGER.debug("[{}] Sending request: {} CT: {}", name, request, request.body().contentType()); enqueueCall(client, request, responseCallback, retryPolicy, 0, inflightRequests); } @@ -277,7 +283,7 @@ public void shutdown() { try { inflightRequests.awaitAdvanceInterruptibly(inflightRequests.arrive(), 10, TimeUnit.SECONDS); } catch (TimeoutException | InterruptedException ignored) { - LOGGER.warn("Not all upload requests have been handled"); + LOGGER.warn("[{}] Not all upload requests have been handled", name); } okHttpExecutorService.shutdownNow(); try { @@ -285,7 +291,7 @@ public void shutdown() { } catch (final InterruptedException e) { // Note: this should only happen in main thread right before exiting, so eating up interrupted // state should be fine. - LOGGER.warn("Wait for executor shutdown interrupted"); + LOGGER.warn("[{}] Wait for executor shutdown interrupted", name); } client.connectionPool().evictAll(); } @@ -309,16 +315,19 @@ private static void enqueueCall( private static final class ResponseCallback implements Callback { + private final String name; private final RatelimitedLogger ratelimitedLogger; private final Phaser inflightRequests; private final OkHttpClient client; private final RetryPolicy retryPolicy; public ResponseCallback( + String name, final RatelimitedLogger ratelimitedLogger, Phaser inflightRequests, OkHttpClient client, RetryPolicy retryPolicy) { + this.name = name; this.ratelimitedLogger = ratelimitedLogger; this.inflightRequests = inflightRequests; this.client = client; @@ -338,11 +347,16 @@ private void handleRetry(Call call, int maxFailures) { int failureCount = failure + 1; if (failureCount <= maxFailures) { LOGGER.debug( - "Retrying upload to {}, {}/{}", call.request().url(), failureCount, maxFailures); + "[{}] Retrying upload to {}, {}/{}", + name, + call.request().url(), + failureCount, + maxFailures); enqueueCall(client, call.request(), this, retryPolicy, failureCount, inflightRequests); } else { LOGGER.warn( - "Failed permanently to upload batch to {} after {} attempts", + "[{}] Failed permanently to upload batch to {} after {} attempts", + name, call.request().url(), maxFailures); } @@ -354,7 +368,7 @@ public void onResponse(Call call, Response response) { try { inflightRequests.arriveAndDeregister(); if (response.isSuccessful()) { - LOGGER.debug("Upload done"); + LOGGER.debug("[{}] Upload done", name); retryPolicy.failures.remove(call); } else { ResponseBody body = response.body(); diff --git a/dd-java-agent/agent-debugger/src/test/java/com/datadog/debugger/sink/DebuggerSinkTest.java b/dd-java-agent/agent-debugger/src/test/java/com/datadog/debugger/sink/DebuggerSinkTest.java index 9d9cd95ca05..15fd5f991c7 100644 --- a/dd-java-agent/agent-debugger/src/test/java/com/datadog/debugger/sink/DebuggerSinkTest.java +++ b/dd-java-agent/agent-debugger/src/test/java/com/datadog/debugger/sink/DebuggerSinkTest.java @@ -62,7 +62,8 @@ public class DebuggerSinkTest { public static final int MAX_PAYLOAD = 5 * 1024 * 1024; @Mock private Config config; - @Mock private BatchUploader batchUploader; + @Mock private BatchUploader snapshotUploader; + @Mock private BatchUploader logUploader; @Captor private ArgumentCaptor payloadCaptor; private String EXPECTED_SNAPSHOT_TAGS; @@ -99,7 +100,7 @@ public void addSnapshot(boolean processTagsEnabled) throws IOException { Snapshot snapshot = createSnapshot(); sink.addSnapshot(snapshot); sink.lowRateFlush(sink); - verify(batchUploader).upload(payloadCaptor.capture(), matches(EXPECTED_SNAPSHOT_TAGS)); + verify(snapshotUploader).upload(payloadCaptor.capture(), matches(EXPECTED_SNAPSHOT_TAGS)); String strPayload = new String(payloadCaptor.getValue(), StandardCharsets.UTF_8); System.out.println(strPayload); JsonSnapshotSerializer.IntakeRequest intakeRequest = assertOneIntakeRequest(strPayload); @@ -132,7 +133,7 @@ public void addMultipleSnapshots() throws IOException { Snapshot snapshot = createSnapshot(); Arrays.asList(snapshot, snapshot).forEach(sink::addSnapshot); sink.lowRateFlush(sink); - verify(batchUploader).upload(payloadCaptor.capture(), matches(EXPECTED_SNAPSHOT_TAGS)); + verify(snapshotUploader).upload(payloadCaptor.capture(), matches(EXPECTED_SNAPSHOT_TAGS)); String strPayload = new String(payloadCaptor.getValue(), StandardCharsets.UTF_8); System.out.println(strPayload); ParameterizedType type = @@ -156,7 +157,7 @@ public void splitSnapshotBatch() { sink.addSnapshot(largeSnapshot); } sink.lowRateFlush(sink); - verify(batchUploader, times(2)) + verify(snapshotUploader, times(2)) .upload(payloadCaptor.capture(), matches(EXPECTED_SNAPSHOT_TAGS)); Assertions.assertTrue(payloadCaptor.getAllValues().get(0).length < MAX_PAYLOAD); Assertions.assertTrue(payloadCaptor.getAllValues().get(1).length < MAX_PAYLOAD); @@ -171,7 +172,7 @@ public void tooLargeSnapshot() { } sink.addSnapshot(largeSnapshot); sink.lowRateFlush(sink); - verifyNoInteractions(batchUploader); + verifyNoInteractions(snapshotUploader); } @Test @@ -183,7 +184,7 @@ public void tooLargeUTF8Snapshot() { } sink.addSnapshot(largeSnapshot); sink.lowRateFlush(sink); - verifyNoInteractions(batchUploader); + verifyNoInteractions(snapshotUploader); } static class Node { @@ -223,7 +224,7 @@ public void pruneTooLargeSnapshot() { largeSnapshot.setEntry(context); sink.addSnapshot(largeSnapshot); sink.lowRateFlush(sink); - verify(batchUploader).upload(payloadCaptor.capture(), matches(EXPECTED_SNAPSHOT_TAGS)); + verify(snapshotUploader).upload(payloadCaptor.capture(), matches(EXPECTED_SNAPSHOT_TAGS)); String strPayload = new String(payloadCaptor.getValue(), StandardCharsets.UTF_8); assertTrue(strPayload.length() < SnapshotSink.MAX_SNAPSHOT_SIZE); } @@ -243,7 +244,7 @@ private List createChildren(int level, String strPayLoad) { public void addNoSnapshots() { DebuggerSink sink = createDefaultDebuggerSink(); sink.lowRateFlush(sink); - verifyNoInteractions(batchUploader); + verifyNoInteractions(snapshotUploader); } @Test @@ -382,7 +383,7 @@ public void tooLargeDiagnostic() { String tooLargeMessage = tooLargeMessageBuilder.toString(); sink.getProbeDiagnosticsSink().addError(new ProbeId("1", 1), tooLargeMessage); sink.lowRateFlush(sink); - verifyNoInteractions(batchUploader); + verifyNoInteractions(snapshotUploader); } @Test @@ -396,14 +397,14 @@ public void tooLargeUTF8Diagnostic() { String tooLargeMessage = tooLargeMessageBuilder.toString(); sink.getProbeDiagnosticsSink().addError(new ProbeId("1", 1), tooLargeMessage); sink.lowRateFlush(sink); - verifyNoInteractions(batchUploader); + verifyNoInteractions(snapshotUploader); } @Test public void addNoDiagnostic() { DebuggerSink sink = createDefaultDebuggerSink(); sink.lowRateFlush(sink); - verifyNoInteractions(batchUploader); + verifyNoInteractions(snapshotUploader); } @Test @@ -453,7 +454,7 @@ public void addSnapshotWithCorrelationIdsMethodProbe() throws IOException { snapshot.setSpanId("456"); sink.addSnapshot(snapshot); sink.lowRateFlush(sink); - verify(batchUploader).upload(payloadCaptor.capture(), matches(EXPECTED_SNAPSHOT_TAGS)); + verify(snapshotUploader).upload(payloadCaptor.capture(), matches(EXPECTED_SNAPSHOT_TAGS)); String strPayload = new String(payloadCaptor.getValue(), StandardCharsets.UTF_8); System.out.println(strPayload); JsonSnapshotSerializer.IntakeRequest intakeRequest = assertOneIntakeRequest(strPayload); @@ -472,7 +473,7 @@ public void addSnapshotWithEvalErrors() throws IOException { Arrays.asList(new EvaluationError("obj.field", "Cannot dereference obj"))); sink.addSnapshot(snapshot); sink.lowRateFlush(sink); - verify(batchUploader).upload(payloadCaptor.capture(), matches(EXPECTED_SNAPSHOT_TAGS)); + verify(snapshotUploader).upload(payloadCaptor.capture(), matches(EXPECTED_SNAPSHOT_TAGS)); String strPayload = new String(payloadCaptor.getValue(), StandardCharsets.UTF_8); System.out.println(strPayload); JsonSnapshotSerializer.IntakeRequest intakeRequest = assertOneIntakeRequest(strPayload); @@ -501,7 +502,12 @@ public void skipSnapshot() { config, "", new BatchUploader( - config, config.getFinalDebuggerSnapshotUrl(), SnapshotSink.RETRY_POLICY)); + "Snapshots", + config, + config.getFinalDebuggerSnapshotUrl(), + SnapshotSink.RETRY_POLICY), + new BatchUploader( + "Logs", config, config.getFinalDebuggerSnapshotUrl(), SnapshotSink.RETRY_POLICY)); SymbolSink symbolSink = new SymbolSink(config); DebuggerSink sink = new DebuggerSink(config, "", debuggerMetrics, probeStatusSink, snapshotSink, symbolSink); @@ -539,7 +545,7 @@ private DebuggerSink createDefaultDebuggerSink() { tags, DebuggerMetrics.getInstance(config), probeStatusSink, - new SnapshotSink(config, tags, batchUploader), + new SnapshotSink(config, tags, snapshotUploader, logUploader), new SymbolSink(config)); } @@ -551,7 +557,7 @@ private DebuggerSink createDebuggerSink(BatchUploader diagnosticUploader, boolea tags, DebuggerMetrics.getInstance(config), probeSink, - new SnapshotSink(config, tags, batchUploader), + new SnapshotSink(config, tags, snapshotUploader, logUploader), new SymbolSink(config)); } } diff --git a/dd-java-agent/agent-debugger/src/test/java/com/datadog/debugger/sink/SnapshotSinkTest.java b/dd-java-agent/agent-debugger/src/test/java/com/datadog/debugger/sink/SnapshotSinkTest.java index f919f316129..944e7f0eb53 100644 --- a/dd-java-agent/agent-debugger/src/test/java/com/datadog/debugger/sink/SnapshotSinkTest.java +++ b/dd-java-agent/agent-debugger/src/test/java/com/datadog/debugger/sink/SnapshotSinkTest.java @@ -4,6 +4,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.matches; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -43,7 +44,8 @@ public class SnapshotSinkTest { new ProbeLocation("java.lang.String", "indexOf", null, null); @Mock private Config config; - @Mock private BatchUploader batchUploader; + @Mock private BatchUploader snapshotUploader; + @Mock private BatchUploader logUploader; @Captor private ArgumentCaptor payloadCaptor; private ProbeStatusSink probeStatusSink; private String EXPECTED_SNAPSHOT_TAGS; @@ -78,7 +80,7 @@ public void addHighRateSnapshot(boolean processTagsEnabled) throws IOException { Snapshot snapshot = createSnapshot(); snapshotSink.addHighRate(snapshot); snapshotSink.highRateFlush(null); - verify(batchUploader).upload(payloadCaptor.capture(), matches(EXPECTED_SNAPSHOT_TAGS)); + verify(logUploader).upload(payloadCaptor.capture(), matches(EXPECTED_SNAPSHOT_TAGS)); String strPayload = new String(payloadCaptor.getValue(), StandardCharsets.UTF_8); System.out.println(strPayload); JsonSnapshotSerializer.IntakeRequest intakeRequest = assertOneIntakeRequest(strPayload); @@ -156,9 +158,22 @@ public void backoffFlushInterval() { assertEquals(previousInterval + SnapshotSink.HIGH_RATE_STEP_SIZE, currentInterval); } + @Test + public void differentiateSnapshotLog() { + SnapshotSink snapshotSink = createSnapshotSink(); + Snapshot snapshot = createSnapshot(); + snapshotSink.addLowRate(snapshot); + snapshotSink.lowRateFlush(DebuggerAgent.getDefaultTagsMergedWithGlobalTags(config)); + Snapshot logSnapshot = createSnapshot(); + snapshotSink.addHighRate(logSnapshot); + snapshotSink.highRateFlush(null); + verify(snapshotUploader).upload(any(), matches(EXPECTED_SNAPSHOT_TAGS)); + verify(logUploader).upload(any(), matches(EXPECTED_SNAPSHOT_TAGS)); + } + private SnapshotSink createSnapshotSink() { String tags = DebuggerAgent.getDefaultTagsMergedWithGlobalTags(config); - return new SnapshotSink(config, tags, batchUploader); + return new SnapshotSink(config, tags, snapshotUploader, logUploader); } private Snapshot createSnapshot() { diff --git a/dd-java-agent/agent-debugger/src/test/java/com/datadog/debugger/sink/SymbolSinkTest.java b/dd-java-agent/agent-debugger/src/test/java/com/datadog/debugger/sink/SymbolSinkTest.java index bf4494c6631..46366642a30 100644 --- a/dd-java-agent/agent-debugger/src/test/java/com/datadog/debugger/sink/SymbolSinkTest.java +++ b/dd-java-agent/agent-debugger/src/test/java/com/datadog/debugger/sink/SymbolSinkTest.java @@ -241,7 +241,7 @@ static class SymbolUploaderMock extends BatchUploader { final List multiPartContents = new ArrayList<>(); public SymbolUploaderMock() { - super(Config.get(), "http://localhost", SymbolSink.RETRY_POLICY); + super("mock", Config.get(), "http://localhost", SymbolSink.RETRY_POLICY); } @Override diff --git a/dd-java-agent/agent-debugger/src/test/java/com/datadog/debugger/uploader/BatchUploaderTest.java b/dd-java-agent/agent-debugger/src/test/java/com/datadog/debugger/uploader/BatchUploaderTest.java index 25ac8ee632c..2b89cee558c 100644 --- a/dd-java-agent/agent-debugger/src/test/java/com/datadog/debugger/uploader/BatchUploaderTest.java +++ b/dd-java-agent/agent-debugger/src/test/java/com/datadog/debugger/uploader/BatchUploaderTest.java @@ -64,7 +64,7 @@ public void setup() throws IOException { when(config.getDynamicInstrumentationUploadTimeout()) .thenReturn((int) REQUEST_TIMEOUT.getSeconds()); - uploader = new BatchUploader(config, url.toString(), ratelimitedLogger, retryPolicy); + uploader = new BatchUploader("test", config, url.toString(), ratelimitedLogger, retryPolicy); } @AfterEach @@ -80,7 +80,7 @@ public void tearDown() throws IOException { @Test void testUnixDomainSocket() { when(config.getAgentUnixDomainSocket()).thenReturn("/tmp/ddagent/agent.sock"); - uploader = new BatchUploader(config, "http://localhost:8126", retryPolicy); + uploader = new BatchUploader("test", config, "http://localhost:8126", retryPolicy); assertEquals( "datadog.common.socket.UnixDomainSocketFactory", uploader.getClient().socketFactory().getClass().getTypeName()); @@ -88,7 +88,7 @@ void testUnixDomainSocket() { @Test void testOkHttpClientForcesCleartextConnspecWhenNotUsingTLS() { - uploader = new BatchUploader(config, "http://example.com", retryPolicy); + uploader = new BatchUploader("test", config, "http://example.com", retryPolicy); final List connectionSpecs = uploader.getClient().connectionSpecs(); assertEquals(connectionSpecs.size(), 1); @@ -97,7 +97,7 @@ void testOkHttpClientForcesCleartextConnspecWhenNotUsingTLS() { @Test void testOkHttpClientUsesDefaultConnspecsOverTLS() { - uploader = new BatchUploader(config, "https://example.com", retryPolicy); + uploader = new BatchUploader("test", config, "https://example.com", retryPolicy); final List connectionSpecs = uploader.getClient().connectionSpecs(); assertEquals(connectionSpecs.size(), 2); @@ -175,7 +175,7 @@ public void testTooManyRequests() throws IOException, InterruptedException { // test. So we specify insanely large timeout here. when(config.getDynamicInstrumentationUploadTimeout()) .thenReturn((int) FOREVER_REQUEST_TIMEOUT.getSeconds()); - uploader = new BatchUploader(config, url.toString(), retryPolicy); + uploader = new BatchUploader("test", config, url.toString(), retryPolicy); // We have to block all parallel requests to make sure queue is kept full for (int i = 0; i < BatchUploader.MAX_RUNNING_REQUESTS; i++) { @@ -216,7 +216,7 @@ public void testShutdown() throws IOException, InterruptedException { @Test public void testEmptyUrl() { Assertions.assertThrows( - IllegalArgumentException.class, () -> new BatchUploader(config, "", retryPolicy)); + IllegalArgumentException.class, () -> new BatchUploader("test", config, "", retryPolicy)); } @Test @@ -224,7 +224,8 @@ public void testNoContainerId() throws InterruptedException { // we don't explicitly specify a container ID server.enqueue(RESPONSE_200); BatchUploader uploaderWithNoContainerId = - new BatchUploader(config, url.toString(), ratelimitedLogger, retryPolicy, null, null); + new BatchUploader( + "test", config, url.toString(), ratelimitedLogger, retryPolicy, null, null); uploaderWithNoContainerId.upload(SNAPSHOT_BUFFER); uploaderWithNoContainerId.shutdown(); @@ -239,6 +240,7 @@ public void testContainerIdHeader() throws InterruptedException { BatchUploader uploaderWithContainerId = new BatchUploader( + "test", config, url.toString(), ratelimitedLogger, @@ -259,7 +261,7 @@ public void testApiKey() throws InterruptedException { when(config.getApiKey()).thenReturn(API_KEY_VALUE); BatchUploader uploaderWithApiKey = - new BatchUploader(config, url.toString(), ratelimitedLogger, retryPolicy); + new BatchUploader("test", config, url.toString(), ratelimitedLogger, retryPolicy); uploaderWithApiKey.upload(SNAPSHOT_BUFFER); uploaderWithApiKey.shutdown(); diff --git a/dd-smoke-tests/debugger-integration-tests/src/test/java/datadog/smoketest/AgentDebuggerIntegrationTest.java b/dd-smoke-tests/debugger-integration-tests/src/test/java/datadog/smoketest/AgentDebuggerIntegrationTest.java index 4c8745afe21..fba8571a769 100644 --- a/dd-smoke-tests/debugger-integration-tests/src/test/java/datadog/smoketest/AgentDebuggerIntegrationTest.java +++ b/dd-smoke-tests/debugger-integration-tests/src/test/java/datadog/smoketest/AgentDebuggerIntegrationTest.java @@ -1,17 +1,16 @@ package datadog.smoketest; +import static datadog.smoketest.debugger.TestApplicationHelper.waitForSpecificLine; import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; -import com.datadog.debugger.agent.JsonSnapshotSerializer; import com.datadog.debugger.probe.LogProbe; -import com.datadog.debugger.sink.Snapshot; -import com.squareup.moshi.JsonAdapter; +import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import okhttp3.mockwebserver.MockResponse; -import okhttp3.mockwebserver.RecordedRequest; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; @@ -31,14 +30,13 @@ void testLatestJdk() throws Exception { ProcessBuilderHelper.createProcessBuilder( classpath, commandParams, logFilePath, "App", EXPECTED_UPLOADS) .start(); - RecordedRequest request = retrieveSnapshotRequest(); - assertNotNull(request); + AtomicBoolean snapshotReceived = new AtomicBoolean(false); + registerSnapshotListener( + snapshot -> { + snapshotReceived.set(true); + }); + processRequests(snapshotReceived::get); assertFalse(logHasErrors(logFilePath, it -> false)); - String bodyStr = request.getBody().readUtf8(); - LOG.info("got snapshot: {}", bodyStr); - JsonAdapter> adapter = createAdapterForSnapshot(); - Snapshot snapshot = adapter.fromJson(bodyStr).get(0).getDebugger().getSnapshot(); - assertNotNull(snapshot); } @Test @@ -50,12 +48,13 @@ void testShutdown() throws Exception { LogProbe.builder().probeId(PROBE_ID).where(MAIN_CLASS_NAME, METHOD_NAME).build(); setCurrentConfiguration(createConfig(probe)); targetProcess = createProcessBuilder(logFilePath, METHOD_NAME, EXPECTED_UPLOADS).start(); - - RecordedRequest request = retrieveSnapshotRequest(); + AtomicBoolean snapshotReceived = new AtomicBoolean(false); + registerSnapshotListener( + snapshot -> { + snapshotReceived.set(true); + }); + processRequests(snapshotReceived::get); assertFalse(logHasErrors(logFilePath, it -> false)); - assertNotNull(request); - assertTrue(request.getBodySize() > 0); - // Wait for the app exit with some extra time. // The expectation is that agent doesn't prevent app from exiting. assertTrue(targetProcess.waitFor(REQUEST_WAIT_TIMEOUT + 10, TimeUnit.SECONDS)); @@ -73,16 +72,49 @@ void testDestroy() throws Exception { new MockResponse() .setHeadersDelay(REQUEST_WAIT_TIMEOUT * 2, TimeUnit.SECONDS) .setResponseCode(200)); - // wait for 3 snapshots (2 status + 1 snapshot) targetProcess = createProcessBuilder(logFilePath, METHOD_NAME, EXPECTED_UPLOADS).start(); - - RecordedRequest request = retrieveSnapshotRequest(); - assertNotNull(request); - assertTrue(request.getBodySize() > 0); - retrieveSnapshotRequest(); + AtomicBoolean snapshotReceived = new AtomicBoolean(false); + registerSnapshotListener( + snapshot -> { + snapshotReceived.set(true); + }); + processRequests(snapshotReceived::get); targetProcess.destroy(); // Wait for the app exit with some extra time. // The expectation is that agent doesn't prevent app from exiting. assertTrue(targetProcess.waitFor(REQUEST_WAIT_TIMEOUT + 10, TimeUnit.SECONDS)); } + + @Test + @DisplayName("testEndpoints") + void testEndpoints() throws Exception { + setCurrentConfiguration(createConfig(Collections.emptyList())); + targetProcess = createProcessBuilder(logFilePath, "", "").start(); + waitForSpecificLine( + logFilePath.toString(), + "INFO com.datadog.debugger.agent.DebuggerAgent - Started Dynamic Instrumentation", + null); + Assertions.assertFalse( + logHasErrors( + logFilePath, + line -> { + if (line.contains("Started BatchUploader[Diagnostics]")) { + return !line.matches( + ".* Started BatchUploader\\[Diagnostics] with target url http://localhost:\\d+/debugger/v1/diagnostics"); + } + if (line.contains("Started BatchUploader[Snapshots]")) { + return !line.matches( + ".* Started BatchUploader\\[Snapshots] with target url http://localhost:\\d+/debugger/v1/diagnostics"); + } + if (line.contains("Started BatchUploader[Logs]")) { + return !line.matches( + ".* Started BatchUploader\\[Logs] with target url http://localhost:\\d+/debugger/v1/input"); + } + if (line.contains("Started BatchUploader[SymDB]")) { + return !line.matches( + ".* Started BatchUploader\\[SymDB] with target url http://localhost:\\d+/symdb/v1/input"); + } + return false; + })); + } } diff --git a/dd-smoke-tests/debugger-integration-tests/src/test/java/datadog/smoketest/BaseIntegrationTest.java b/dd-smoke-tests/debugger-integration-tests/src/test/java/datadog/smoketest/BaseIntegrationTest.java index 27db9687a10..132e7e8a823 100644 --- a/dd-smoke-tests/debugger-integration-tests/src/test/java/datadog/smoketest/BaseIntegrationTest.java +++ b/dd-smoke-tests/debugger-integration-tests/src/test/java/datadog/smoketest/BaseIntegrationTest.java @@ -21,10 +21,11 @@ import datadog.trace.bootstrap.debugger.CapturedContext; import datadog.trace.bootstrap.debugger.ProbeRateLimiter; import datadog.trace.test.agent.decoder.DecodedMessage; -import datadog.trace.test.agent.decoder.DecodedSpan; import datadog.trace.test.agent.decoder.DecodedTrace; import datadog.trace.test.agent.decoder.Decoder; +import java.io.BufferedReader; import java.io.IOException; +import java.io.StringReader; import java.net.DatagramPacket; import java.net.DatagramSocket; import java.net.SocketException; @@ -45,6 +46,7 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Predicate; +import okhttp3.Headers; import okhttp3.HttpUrl; import okhttp3.mockwebserver.MockResponse; import okhttp3.mockwebserver.MockWebServer; @@ -61,7 +63,9 @@ public abstract class BaseIntegrationTest { protected static final Logger LOG = LoggerFactory.getLogger(BaseIntegrationTest.class); protected static final String TRACE_URL_PATH = "/v0.4/traces"; protected static final String CONFIG_URL_PATH = "/v0.7/config"; - protected static final String SNAPSHOT_URL_PATH = "/debugger/v1/input"; + protected static final String LOG_UPLOAD_URL_PATH = "/debugger/v1/input"; + protected static final String SNAPSHOT_UPLOAD_URL_PATH = "/debugger/v2/input"; + protected static final String DIAGNOSTICS_URL_PATH = "/debugger/v1/diagnostics"; protected static final int REQUEST_WAIT_TIMEOUT = 10; private static final Path LOG_FILE_BASE = Paths.get( @@ -72,7 +76,9 @@ public abstract class BaseIntegrationTest { "{\"endpoints\": [\"" + TRACE_URL_PATH + "\", \"" - + SNAPSHOT_URL_PATH + + LOG_UPLOAD_URL_PATH + + "\", \"" + + DIAGNOSTICS_URL_PATH + "\", \"" + CONFIG_URL_PATH + "\"]}"; @@ -81,7 +87,7 @@ public abstract class BaseIntegrationTest { private static final MockResponse TELEMETRY_RESPONSE = new MockResponse().setResponseCode(202); protected static final MockResponse EMPTY_200_RESPONSE = new MockResponse().setResponseCode(200); - private static final ByteString DIAGNOSTICS_STR = ByteString.encodeUtf8("diagnostics"); + private static final ByteString DIAGNOSTICS_STR = ByteString.encodeUtf8("{\"diagnostics\":"); private static final String LD_CONFIG_ID = UUID.randomUUID().toString(); private static final String APM_CONFIG_ID = UUID.randomUUID().toString(); public static final String LIVE_DEBUGGING_PRODUCT = "LIVE_DEBUGGING"; @@ -118,7 +124,7 @@ void setup(TestInfo testInfo) throws Exception { datadogAgentServer.setDispatcher(probeMockDispatcher); probeUrl = datadogAgentServer.url(CONFIG_URL_PATH); LOG.info("DatadogAgentServer on {}", datadogAgentServer.getPort()); - snapshotUrl = datadogAgentServer.url(SNAPSHOT_URL_PATH); + snapshotUrl = datadogAgentServer.url(LOG_UPLOAD_URL_PATH); statsDServer = new StatsDServer(); statsDServer.start(); LOG.info("statsDServer on {}", statsDServer.getPort()); @@ -168,60 +174,13 @@ protected List getDebuggerCommandParams() { "-Ddd.dynamic.instrumentation.capture.timeout=200")); } - protected RecordedRequest retrieveSnapshotRequest() throws Exception { - return retrieveRequest( - request -> { - try { - return request.getPath().startsWith(SNAPSHOT_URL_PATH) - && request.getBody().indexOf(ByteString.encodeUtf8("diagnostics")) == -1; - } catch (IOException ex) { - throw new RuntimeException(ex); - } - }); - } - - protected DecodedSpan retrieveSpanRequest(String name) throws Exception { - DecodedSpan decodedSpan = null; - int attempt = 3; - retrieveSpan: - do { - LOG.info("retrieveSpanRequest..."); - RecordedRequest spanRequest = - retrieveRequest(request -> request.getPath().equals(TRACE_URL_PATH)); - attempt--; - if (spanRequest == null) { - continue; - } - DecodedMessage decodedMessage = Decoder.decodeV04(spanRequest.getBody().readByteArray()); - LOG.info( - "Traces={} Spans={}", - decodedMessage.getTraces().size(), - decodedMessage.getTraces().get(0).getSpans().size()); - for (int traceIdx = 0; traceIdx < decodedMessage.getTraces().size(); traceIdx++) { - List spans = decodedMessage.getTraces().get(traceIdx).getSpans(); - for (int spanIdx = 0; spanIdx < spans.size(); spanIdx++) { - decodedSpan = spans.get(spanIdx); - LOG.info( - "Trace[{}}].Span[{}}] name={}} resource={}} Meta={}", - traceIdx, - spanIdx, - decodedSpan.getName(), - decodedSpan.getResource(), - decodedSpan.getMeta()); - if (decodedSpan.getName().equals(name)) { - break retrieveSpan; - } - } - } - } while (attempt > 0); - return decodedSpan; - } - protected enum RequestType { SNAPSHOT { @Override public void process(BaseIntegrationTest baseIntegrationTest, RecordedRequest request) { - if (!request.getPath().startsWith(SNAPSHOT_URL_PATH)) { + if (!(request.getPath().startsWith(LOG_UPLOAD_URL_PATH) + || request.getPath().startsWith(DIAGNOSTICS_URL_PATH) + || request.getPath().startsWith(SNAPSHOT_UPLOAD_URL_PATH))) { return; } try { @@ -270,7 +229,7 @@ public void process(BaseIntegrationTest baseIntegrationTest, RecordedRequest req PROBE_STATUS { @Override public void process(BaseIntegrationTest baseIntegrationTest, RecordedRequest request) { - if (!request.getPath().startsWith(SNAPSHOT_URL_PATH)) { + if (!request.getPath().startsWith(DIAGNOSTICS_URL_PATH)) { return; } try { @@ -286,7 +245,12 @@ public void process(BaseIntegrationTest baseIntegrationTest, RecordedRequest req String bodyStr = request.getBody().readUtf8(); LOG.info("got probe status: {}", bodyStr); try { - List probeStatuses = adapter.fromJson(bodyStr); + // Http multipart decode + String partBody = parseMultiPart(request, bodyStr); + if (partBody == null) { + return; + } + List probeStatuses = adapter.fromJson(partBody); for (Consumer listener : baseIntegrationTest.probeStatusListeners) { for (ProbeStatus probeStatus : probeStatuses) { listener.accept(probeStatus); @@ -301,6 +265,46 @@ public void process(BaseIntegrationTest baseIntegrationTest, RecordedRequest req public abstract void process(BaseIntegrationTest baseIntegrationTest, RecordedRequest request); } + private static String parseMultiPart(RecordedRequest request, String bodyStr) throws IOException { + // To parse it as multipart, you'll need to use the boundary from the Content-Type header + String contentType = request.getHeader("Content-Type"); + String boundary = null; + // Extract the boundary from the Content-Type header + if (contentType != null && contentType.startsWith("multipart/")) { + String[] parts = contentType.split("boundary="); + if (parts.length > 1) { + boundary = parts[1].trim(); + } + } + + try (BufferedReader reader = new BufferedReader(new StringReader(bodyStr))) { + String line; + while ((line = reader.readLine()) != null) { + // Process each line of the multipart body + if (line.startsWith("--" + boundary)) { + // This is a part boundary + // The next line should be headers for this part + Headers.Builder partHeaders = new Headers.Builder(); + while (!(line = reader.readLine()).isEmpty()) { + // Parse headers + int colon = line.indexOf(':'); + if (colon != -1) { + partHeaders.add(line.substring(0, colon).trim(), line.substring(colon + 1).trim()); + } + } + + // Now read the part content until we hit the boundary + StringBuilder partContent = new StringBuilder(); + while ((line = reader.readLine()) != null && !line.startsWith("--" + boundary)) { + partContent.append(line).append("\n"); + } + return partContent.toString().trim(); + } + } + } + return null; + } + protected void registerIntakeRequestListener( Consumer listener) { intakeRequestListeners.add(listener); @@ -411,7 +415,7 @@ private MockResponse datadogAgentDispatch(RecordedRequest request) { // Ack every telemetry request. This is needed if telemetry is enabled in the tests. return TELEMETRY_RESPONSE; } - if (request.getPath().startsWith(SNAPSHOT_URL_PATH)) { + if (request.getPath().startsWith(LOG_UPLOAD_URL_PATH)) { return EMPTY_200_RESPONSE; } if (request.getPath().equals("/v0.7/config")) { @@ -561,23 +565,26 @@ protected void assertCaptureThrowable( assertEquals(message, throwable.getMessage()); } - protected static boolean logHasErrors(Path logFilePath, Function checker) - throws IOException { - long errorLines = - Files.lines(logFilePath) - .filter( - it -> - it.contains(" ERROR ") - || it.contains("ASSERTION FAILED") - || it.contains("Error:") - || checker.apply(it)) - .peek(System.out::println) - .count(); - boolean hasErrors = errorLines > 0; - if (hasErrors) { - LOG.info("Test application log is containing errors. See full run logs in {}", logFilePath); + protected static boolean logHasErrors(Path logFilePath, Function checker) { + try { + long errorLines = + Files.lines(logFilePath) + .filter( + it -> + it.contains(" ERROR ") + || it.contains("ASSERTION FAILED") + || it.contains("Error:") + || checker.apply(it)) + .peek(System.out::println) + .count(); + boolean hasErrors = errorLines > 0; + if (hasErrors) { + LOG.info("Test application log is containing errors. See full run logs in {}", logFilePath); + } + return hasErrors; + } catch (IOException e) { + throw new RuntimeException(e); } - return hasErrors; } private static class MockDispatcher extends okhttp3.mockwebserver.QueueDispatcher { diff --git a/dd-smoke-tests/debugger-integration-tests/src/test/java/datadog/smoketest/LogProbesIntegrationTest.java b/dd-smoke-tests/debugger-integration-tests/src/test/java/datadog/smoketest/LogProbesIntegrationTest.java index ef60048e630..e85c9906460 100644 --- a/dd-smoke-tests/debugger-integration-tests/src/test/java/datadog/smoketest/LogProbesIntegrationTest.java +++ b/dd-smoke-tests/debugger-integration-tests/src/test/java/datadog/smoketest/LogProbesIntegrationTest.java @@ -15,13 +15,10 @@ import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; -import com.datadog.debugger.agent.JsonSnapshotSerializer; import com.datadog.debugger.agent.ProbeStatus; import com.datadog.debugger.el.DSL; import com.datadog.debugger.el.ProbeCondition; import com.datadog.debugger.probe.LogProbe; -import com.datadog.debugger.sink.Snapshot; -import com.squareup.moshi.JsonAdapter; import datadog.environment.JavaVirtualMachine; import datadog.trace.bootstrap.debugger.CapturedContext; import datadog.trace.bootstrap.debugger.MethodLocation; @@ -33,7 +30,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; -import okhttp3.mockwebserver.RecordedRequest; +import java.util.concurrent.atomic.AtomicInteger; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.condition.DisabledIf; @@ -48,8 +45,12 @@ void testInaccessibleObject() throws Exception { LogProbe.builder().probeId(PROBE_ID).where(MAIN_CLASS_NAME, METHOD_NAME).build(); setCurrentConfiguration(createConfig(probe)); targetProcess = createProcessBuilder(logFilePath, METHOD_NAME, EXPECTED_UPLOADS).start(); - RecordedRequest request = retrieveSnapshotRequest(); - assertNotNull(request); + AtomicBoolean snapshotReceived = new AtomicBoolean(false); + registerSnapshotListener( + snapshot -> { + snapshotReceived.set(true); + }); + processRequests(snapshotReceived::get); assertFalse(logHasErrors(logFilePath, it -> false)); } @@ -323,8 +324,16 @@ private void doSamplingSnapshot(ProbeCondition probeCondition, MethodLocation ev createProcessBuilder( logFilePath, "loopingFullMethod", EXPECTED_UPLOADS, String.valueOf(LOOP_COUNT)) .start(); - int count = countSnapshots(); - assertTrue(count >= 2 && count <= 20, "snapshots=" + count); + AtomicInteger snapshotCount = new AtomicInteger(); + registerSnapshotListener( + snapshot -> { + snapshotCount.incrementAndGet(); + }); + processRequests( + () -> { + LOG.info("snapshots={}", snapshotCount.get()); + return snapshotCount.get() >= 2 && snapshotCount.get() <= 20; + }); } @Test @@ -349,8 +358,16 @@ void testSamplingLogDefault() throws Exception { createProcessBuilder( logFilePath, "loopingFullMethod", EXPECTED_UPLOADS, String.valueOf(LOOP_COUNT)) .start(); - int count = countSnapshots(); - assertTrue(count >= 850 && count <= 1000, "logs=" + count); + AtomicInteger snapshotCount = new AtomicInteger(); + registerSnapshotListener( + snapshot -> { + snapshotCount.incrementAndGet(); + }); + processRequests( + () -> { + LOG.info("snapshots={}", snapshotCount.get()); + return snapshotCount.get() >= 850 && snapshotCount.get() <= 1000; + }); } @Test @@ -375,7 +392,16 @@ void testSamplingLogCustom() throws Exception { createProcessBuilder( logFilePath, "loopingFullMethod", EXPECTED_UPLOADS, String.valueOf(LOOP_COUNT)) .start(); - assertTrue(countSnapshots() < 200); + AtomicInteger snapshotCount = new AtomicInteger(); + registerSnapshotListener( + snapshot -> { + snapshotCount.incrementAndGet(); + }); + processRequests( + () -> { + LOG.info("snapshots={}", snapshotCount.get()); + return snapshotCount.get() > 0 && snapshotCount.get() < 200; + }); } @Test @@ -393,22 +419,20 @@ void testUncaughtException() throws Exception { setCurrentConfiguration(createConfig(probe)); targetProcess = createProcessBuilder(logFilePath, METHOD_NAME, EXPECTED_UPLOADS, "uncaught").start(); - RecordedRequest request = retrieveSnapshotRequest(); - assertNotNull(request); - assertFalse(logHasErrors(logFilePath, it -> false)); - String bodyStr = request.getBody().readUtf8(); - JsonAdapter> adapter = createAdapterForSnapshot(); - System.out.println(bodyStr); - JsonSnapshotSerializer.IntakeRequest intakeRequest = adapter.fromJson(bodyStr).get(0); - Snapshot snapshot = intakeRequest.getDebugger().getSnapshot(); - assertEquals("123356536", snapshot.getProbe().getId()); - CapturedContext.CapturedThrowable throwable = - snapshot.getCaptures().getReturn().getCapturedThrowable(); - assertEquals("oops uncaught!", throwable.getMessage()); - assertTrue(throwable.getStacktrace().size() > 0); - assertEquals( - "datadog.smoketest.debugger.Main.exceptionMethod", - throwable.getStacktrace().get(0).getFunction()); + AtomicBoolean snapshotReceived = new AtomicBoolean(); + registerSnapshotListener( + snapshot -> { + assertEquals("123356536", snapshot.getProbe().getId()); + CapturedContext.CapturedThrowable throwable = + snapshot.getCaptures().getReturn().getCapturedThrowable(); + assertEquals("oops uncaught!", throwable.getMessage()); + assertTrue(throwable.getStacktrace().size() > 0); + assertEquals( + "datadog.smoketest.debugger.Main.exceptionMethod", + throwable.getStacktrace().get(0).getFunction()); + snapshotReceived.set(true); + }); + processRequests(snapshotReceived::get); } @Test @@ -426,43 +450,20 @@ void testCaughtException() throws Exception { setCurrentConfiguration(createConfig(probe)); targetProcess = createProcessBuilder(logFilePath, METHOD_NAME, EXPECTED_UPLOADS, "caught").start(); - RecordedRequest request = retrieveSnapshotRequest(); - assertNotNull(request); - assertFalse(logHasErrors(logFilePath, it -> false)); - String bodyStr = request.getBody().readUtf8(); - JsonAdapter> adapter = createAdapterForSnapshot(); - System.out.println(bodyStr); - JsonSnapshotSerializer.IntakeRequest intakeRequest = adapter.fromJson(bodyStr).get(0); - Snapshot snapshot = intakeRequest.getDebugger().getSnapshot(); - assertEquals("123356536", snapshot.getProbe().getId()); - assertEquals(1, snapshot.getCaptures().getCaughtExceptions().size()); - CapturedContext.CapturedThrowable throwable = - snapshot.getCaptures().getCaughtExceptions().get(0); - assertEquals("oops caught!", throwable.getMessage()); - assertEquals( - "datadog.smoketest.debugger.Main.exceptionMethod", - throwable.getStacktrace().get(0).getFunction()); - } - - private int countSnapshots() throws Exception { - int snapshotCount = 0; - RecordedRequest request; - do { - request = retrieveSnapshotRequest(); - if (request != null) { - String bodyStr = request.getBody().readUtf8(); - JsonAdapter> adapter = - createAdapterForSnapshot(); - List intakeRequests = adapter.fromJson(bodyStr); - long count = - intakeRequests.stream() - .map(intakeRequest -> intakeRequest.getDebugger().getSnapshot()) - .count(); - snapshotCount += count; - } - } while (request != null); - LOG.info("snapshots={}", snapshotCount); - return snapshotCount; + AtomicBoolean snapshotReceived = new AtomicBoolean(); + registerSnapshotListener( + snapshot -> { + assertEquals("123356536", snapshot.getProbe().getId()); + assertEquals(1, snapshot.getCaptures().getCaughtExceptions().size()); + CapturedContext.CapturedThrowable throwable = + snapshot.getCaptures().getCaughtExceptions().get(0); + assertEquals("oops caught!", throwable.getMessage()); + assertEquals( + "datadog.smoketest.debugger.Main.exceptionMethod", + throwable.getStacktrace().get(0).getFunction()); + snapshotReceived.set(true); + }); + processRequests(snapshotReceived::get); } private ProbeId getProbeId(int i) { diff --git a/dd-smoke-tests/debugger-integration-tests/src/test/java/datadog/smoketest/ProbeStateIntegrationTest.java b/dd-smoke-tests/debugger-integration-tests/src/test/java/datadog/smoketest/ProbeStateIntegrationTest.java index d540485b2c8..ffe609b764e 100644 --- a/dd-smoke-tests/debugger-integration-tests/src/test/java/datadog/smoketest/ProbeStateIntegrationTest.java +++ b/dd-smoke-tests/debugger-integration-tests/src/test/java/datadog/smoketest/ProbeStateIntegrationTest.java @@ -8,7 +8,6 @@ import com.datadog.debugger.probe.LogProbe; import com.datadog.debugger.sink.Snapshot; import java.util.Collections; -import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; @@ -37,17 +36,15 @@ void testAddRemoveProbes() throws Exception { addProbe(logProbe); waitForInstrumentation(appUrl); execute(appUrl, FULL_METHOD_NAME); - List snapshots = waitForSnapshots(); - assertEquals(1, snapshots.size()); - assertEquals(FULL_METHOD_NAME, snapshots.get(0).getProbe().getLocation().getMethod()); + Snapshot snapshot = waitForOneSnapshot(); + assertEquals(FULL_METHOD_NAME, snapshot.getProbe().getLocation().getMethod()); setCurrentConfiguration(createConfig(Collections.emptyList())); // remove probes waitForReTransformation(appUrl); addProbe(logProbe); waitForInstrumentation(appUrl); execute(appUrl, FULL_METHOD_NAME); - snapshots = waitForSnapshots(); - assertEquals(1, snapshots.size()); - assertEquals(FULL_METHOD_NAME, snapshots.get(0).getProbe().getLocation().getMethod()); + snapshot = waitForOneSnapshot(); + assertEquals(FULL_METHOD_NAME, snapshot.getProbe().getLocation().getMethod()); } @Test @@ -61,17 +58,15 @@ void testDisableEnableProbes() throws Exception { addProbe(logProbe); waitForInstrumentation(appUrl); execute(appUrl, FULL_METHOD_NAME); - List snapshots = waitForSnapshots(); - assertEquals(1, snapshots.size()); - assertEquals(FULL_METHOD_NAME, snapshots.get(0).getProbe().getLocation().getMethod()); + Snapshot snapshot = waitForOneSnapshot(); + assertEquals(FULL_METHOD_NAME, snapshot.getProbe().getLocation().getMethod()); setCurrentConfiguration(createConfig(Collections.emptyList())); // no probe waitForReTransformation(appUrl); addProbe(logProbe); waitForInstrumentation(appUrl); execute(appUrl, FULL_METHOD_NAME); - snapshots = waitForSnapshots(); - assertEquals(1, snapshots.size()); - assertEquals(FULL_METHOD_NAME, snapshots.get(0).getProbe().getLocation().getMethod()); + snapshot = waitForOneSnapshot(); + assertEquals(FULL_METHOD_NAME, snapshot.getProbe().getLocation().getMethod()); } @Test @@ -86,9 +81,8 @@ void testDisableEnableProbesUsingDenyList() throws Exception { addProbe(logProbe); waitForInstrumentation(appUrl); execute(appUrl, FULL_METHOD_NAME); - List snapshots = waitForSnapshots(); - assertEquals(1, snapshots.size()); - assertEquals(FULL_METHOD_NAME, snapshots.get(0).getProbe().getLocation().getMethod()); + Snapshot snapshot = waitForOneSnapshot(); + assertEquals(FULL_METHOD_NAME, snapshot.getProbe().getLocation().getMethod()); datadogAgentServer.enqueue(EMPTY_200_RESPONSE); // expect BLOCKED status Configuration.FilterList denyList = @@ -103,9 +97,8 @@ void testDisableEnableProbesUsingDenyList() throws Exception { waitForReTransformation(appUrl); waitForAProbeStatus(ProbeStatus.Status.INSTALLED); execute(appUrl, FULL_METHOD_NAME); - snapshots = waitForSnapshots(); - assertEquals(1, snapshots.size()); - assertEquals(FULL_METHOD_NAME, snapshots.get(0).getProbe().getLocation().getMethod()); + snapshot = waitForOneSnapshot(); + assertEquals(FULL_METHOD_NAME, snapshot.getProbe().getLocation().getMethod()); } @Test @@ -120,9 +113,8 @@ void testDisableEnableProbesUsingAllowList() throws Exception { addProbe(logProbe); waitForInstrumentation(appUrl); execute(appUrl, FULL_METHOD_NAME); - List snapshots = waitForSnapshots(); - assertEquals(1, snapshots.size()); - assertEquals(FULL_METHOD_NAME, snapshots.get(0).getProbe().getLocation().getMethod()); + Snapshot snapshot = waitForOneSnapshot(); + assertEquals(FULL_METHOD_NAME, snapshot.getProbe().getLocation().getMethod()); datadogAgentServer.enqueue(EMPTY_200_RESPONSE); // expect BLOCKED status Configuration.FilterList allowList = @@ -137,9 +129,8 @@ void testDisableEnableProbesUsingAllowList() throws Exception { waitForReTransformation(appUrl); waitForAProbeStatus(ProbeStatus.Status.INSTALLED); execute(appUrl, FULL_METHOD_NAME); - snapshots = waitForSnapshots(); - assertEquals(1, snapshots.size()); - assertEquals(FULL_METHOD_NAME, snapshots.get(0).getProbe().getLocation().getMethod()); + snapshot = waitForOneSnapshot(); + assertEquals(FULL_METHOD_NAME, snapshot.getProbe().getLocation().getMethod()); } @Test diff --git a/dd-smoke-tests/debugger-integration-tests/src/test/java/datadog/smoketest/ServerAppDebuggerIntegrationTest.java b/dd-smoke-tests/debugger-integration-tests/src/test/java/datadog/smoketest/ServerAppDebuggerIntegrationTest.java index 9f083dd6f44..816544a31d7 100644 --- a/dd-smoke-tests/debugger-integration-tests/src/test/java/datadog/smoketest/ServerAppDebuggerIntegrationTest.java +++ b/dd-smoke-tests/debugger-integration-tests/src/test/java/datadog/smoketest/ServerAppDebuggerIntegrationTest.java @@ -2,20 +2,17 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; -import com.datadog.debugger.agent.JsonSnapshotSerializer; import com.datadog.debugger.agent.ProbeStatus; import com.datadog.debugger.probe.LogProbe; import com.datadog.debugger.probe.SpanDecorationProbe; import com.datadog.debugger.sink.Snapshot; -import com.squareup.moshi.JsonAdapter; import datadog.trace.bootstrap.debugger.ProbeId; import datadog.trace.test.agent.decoder.DecodedSpan; import datadog.trace.util.TagsHelper; import java.io.IOException; -import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.stream.Collectors; +import java.util.concurrent.atomic.AtomicReference; import okhttp3.HttpUrl; import okhttp3.OkHttpClient; import okhttp3.Request; @@ -82,16 +79,11 @@ protected void stopApp(String appUrl) throws IOException { LOG.info("Stop done"); } - protected List waitForSnapshots() throws Exception { - RecordedRequest snapshotRequest = retrieveSnapshotRequest(); - assertNotNull(snapshotRequest); - String bodyStr = snapshotRequest.getBody().readUtf8(); - LOG.info("got snapshot: {}", bodyStr); - JsonAdapter> adapter = createAdapterForSnapshot(); - List intakeRequests = adapter.fromJson(bodyStr); - return intakeRequests.stream() - .map(intakeRequest -> intakeRequest.getDebugger().getSnapshot()) - .collect(Collectors.toList()); + protected Snapshot waitForOneSnapshot() throws Exception { + AtomicReference snapshotReceived = new AtomicReference<>(); + registerSnapshotListener(snapshotReceived::set); + processRequests(() -> snapshotReceived.get() != null); + return snapshotReceived.get(); } protected void execute(String appUrl, String methodName) throws IOException { diff --git a/dd-smoke-tests/debugger-integration-tests/src/test/java/datadog/smoketest/TracerDebuggerIntegrationTest.java b/dd-smoke-tests/debugger-integration-tests/src/test/java/datadog/smoketest/TracerDebuggerIntegrationTest.java index fdd7c813e36..36e798bfb45 100644 --- a/dd-smoke-tests/debugger-integration-tests/src/test/java/datadog/smoketest/TracerDebuggerIntegrationTest.java +++ b/dd-smoke-tests/debugger-integration-tests/src/test/java/datadog/smoketest/TracerDebuggerIntegrationTest.java @@ -7,10 +7,8 @@ import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; -import com.datadog.debugger.agent.JsonSnapshotSerializer; import com.datadog.debugger.probe.LogProbe; import com.datadog.debugger.sink.Snapshot; -import com.squareup.moshi.JsonAdapter; import datadog.environment.JavaVirtualMachine; import datadog.trace.agent.test.utils.PortUtils; import datadog.trace.bootstrap.debugger.MethodLocation; @@ -21,11 +19,11 @@ import java.nio.file.Path; import java.time.Duration; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.LockSupport; import java.util.regex.Pattern; import okhttp3.OkHttpClient; import okhttp3.Request; -import okhttp3.mockwebserver.RecordedRequest; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.condition.DisabledIf; @@ -64,22 +62,29 @@ void testTracer(boolean processTagsEnabled) throws Exception { "(HttpServletRequest, HttpServletResponse)") .captureSnapshot(true) .build(); - JsonSnapshotSerializer.IntakeRequest request = doTestTracer(logProbe, processTagsEnabled); - Snapshot snapshot = request.getDebugger().getSnapshot(); - assertEquals(PROBE_ID.getId(), snapshot.getProbe().getId()); - assertTrue(Pattern.matches("[0-9a-f]+", request.getTraceId())); - assertTrue(Pattern.matches("\\d+", request.getSpanId())); - assertFalse( - logHasErrors(logFilePath, it -> it.contains("TypePool$Resolution$NoSuchTypeException"))); - if (processTagsEnabled) { - assertNotNull(request.getProcessTags()); - assertTrue( - request - .getProcessTags() - .contains("entrypoint.name:" + TagsHelper.sanitize(DEBUGGER_TEST_APP_CLASS))); - } else { - assertNull(request.getProcessTags()); - } + AtomicBoolean requestReceived = new AtomicBoolean(false); + registerIntakeRequestListener( + intakeRequest -> { + assertEquals( + PROBE_ID.getId(), intakeRequest.getDebugger().getSnapshot().getProbe().getId()); + assertTrue(Pattern.matches("[0-9a-f]+", intakeRequest.getTraceId())); + assertTrue(Pattern.matches("\\d+", intakeRequest.getSpanId())); + assertFalse( + logHasErrors( + logFilePath, it -> it.contains("TypePool$Resolution$NoSuchTypeException"))); + if (processTagsEnabled) { + assertNotNull(intakeRequest.getProcessTags()); + assertTrue( + intakeRequest + .getProcessTags() + .contains("entrypoint.name:" + TagsHelper.sanitize(DEBUGGER_TEST_APP_CLASS))); + } else { + assertNull(intakeRequest.getProcessTags()); + } + requestReceived.set(true); + }); + doTestTracer(logProbe, processTagsEnabled); + processRequests(requestReceived::get); } @Test @@ -98,13 +103,20 @@ void testTracerDynamicLog() throws Exception { .captureSnapshot(false) .evaluateAt(MethodLocation.EXIT) .build(); - JsonSnapshotSerializer.IntakeRequest request = doTestTracer(logProbe); - Snapshot snapshot = request.getDebugger().getSnapshot(); - assertEquals(PROBE_ID.getId(), snapshot.getProbe().getId()); - assertTrue(Pattern.matches("[0-9a-f]+", request.getTraceId())); - assertTrue(Pattern.matches("\\d+", request.getSpanId())); - assertFalse( - logHasErrors(logFilePath, it -> it.contains("TypePool$Resolution$NoSuchTypeException"))); + AtomicBoolean requestReceived = new AtomicBoolean(false); + registerIntakeRequestListener( + intakeRequest -> { + Snapshot snapshot = intakeRequest.getDebugger().getSnapshot(); + assertEquals(PROBE_ID.getId(), snapshot.getProbe().getId()); + assertTrue(Pattern.matches("[0-9a-f]+", intakeRequest.getTraceId())); + assertTrue(Pattern.matches("\\d+", intakeRequest.getSpanId())); + assertFalse( + logHasErrors( + logFilePath, it -> it.contains("TypePool$Resolution$NoSuchTypeException"))); + requestReceived.set(true); + }); + doTestTracer(logProbe); + processRequests(requestReceived::get); } @Test @@ -119,14 +131,21 @@ void testTracerSameMethod() throws Exception { .where("datadog.smoketest.debugger.controller.WebController", "processWithArg", null) .captureSnapshot(true) .build(); - JsonSnapshotSerializer.IntakeRequest request = doTestTracer(logProbe); - Snapshot snapshot = request.getDebugger().getSnapshot(); - assertEquals(PROBE_ID.getId(), snapshot.getProbe().getId()); - assertEquals(42, snapshot.getCaptures().getEntry().getArguments().get("argInt").getValue()); - // no locals captured - assertNull(snapshot.getCaptures().getEntry().getLocals()); - assertTrue(Pattern.matches("[0-9a-f]+", request.getTraceId())); - assertTrue(Pattern.matches("\\d+", request.getSpanId())); + AtomicBoolean requestReceived = new AtomicBoolean(false); + registerIntakeRequestListener( + intakeRequest -> { + Snapshot snapshot = intakeRequest.getDebugger().getSnapshot(); + assertEquals(PROBE_ID.getId(), snapshot.getProbe().getId()); + assertEquals( + 42, snapshot.getCaptures().getEntry().getArguments().get("argInt").getValue()); + // no locals captured + assertNull(snapshot.getCaptures().getEntry().getLocals()); + assertTrue(Pattern.matches("[0-9a-f]+", intakeRequest.getTraceId())); + assertTrue(Pattern.matches("\\d+", intakeRequest.getSpanId())); + requestReceived.set(true); + }); + doTestTracer(logProbe); + processRequests(requestReceived::get); } @Test @@ -142,13 +161,20 @@ void testTracerLineSnapshotProbe() throws Exception { .where("WebController.java", 15) .captureSnapshot(true) .build(); - JsonSnapshotSerializer.IntakeRequest request = doTestTracer(logProbe); - Snapshot snapshot = request.getDebugger().getSnapshot(); - assertEquals(PROBE_ID.getId(), snapshot.getProbe().getId()); - assertEquals( - 42, snapshot.getCaptures().getLines().get(15).getArguments().get("argInt").getValue()); - assertTrue(Pattern.matches("[0-9a-f]+", request.getTraceId())); - assertTrue(Pattern.matches("\\d+", request.getSpanId())); + AtomicBoolean requestReceived = new AtomicBoolean(false); + registerIntakeRequestListener( + intakeRequest -> { + Snapshot snapshot = intakeRequest.getDebugger().getSnapshot(); + assertEquals(PROBE_ID.getId(), snapshot.getProbe().getId()); + assertEquals( + 42, + snapshot.getCaptures().getLines().get(15).getArguments().get("argInt").getValue()); + assertTrue(Pattern.matches("[0-9a-f]+", intakeRequest.getTraceId())); + assertTrue(Pattern.matches("\\d+", intakeRequest.getSpanId())); + requestReceived.set(true); + }); + doTestTracer(logProbe); + processRequests(requestReceived::get); } @Test @@ -166,20 +192,25 @@ void testTracerLineDynamicLogProbe() throws Exception { .where("WebController.java", 15) .captureSnapshot(false) .build(); - JsonSnapshotSerializer.IntakeRequest request = doTestTracer(logProbe); - Snapshot snapshot = request.getDebugger().getSnapshot(); - assertEquals(PROBE_ID.getId(), snapshot.getProbe().getId()); - assertEquals("processWithArg 42", request.getMessage()); - assertTrue(Pattern.matches("[0-9a-f]+", request.getTraceId())); - assertTrue(Pattern.matches("\\d+", request.getSpanId())); + AtomicBoolean requestReceived = new AtomicBoolean(false); + registerIntakeRequestListener( + intakeRequest -> { + Snapshot snapshot = intakeRequest.getDebugger().getSnapshot(); + assertEquals(PROBE_ID.getId(), snapshot.getProbe().getId()); + assertEquals("processWithArg 42", intakeRequest.getMessage()); + assertTrue(Pattern.matches("[0-9a-f]+", intakeRequest.getTraceId())); + assertTrue(Pattern.matches("\\d+", intakeRequest.getSpanId())); + requestReceived.set(true); + }); + doTestTracer(logProbe); + processRequests(requestReceived::get); } - private JsonSnapshotSerializer.IntakeRequest doTestTracer(LogProbe logProbe) throws Exception { - return doTestTracer(logProbe, false); + private void doTestTracer(LogProbe logProbe) throws Exception { + doTestTracer(logProbe, false); } - private JsonSnapshotSerializer.IntakeRequest doTestTracer( - LogProbe logProbe, boolean enableProcessTags) throws Exception { + private void doTestTracer(LogProbe logProbe, boolean enableProcessTags) throws Exception { setCurrentConfiguration(createConfig(logProbe)); String httpPort = String.valueOf(PortUtils.randomOpenPort()); ProcessBuilder processBuilder = createProcessBuilder(logFilePath, "--server.port=" + httpPort); @@ -197,6 +228,7 @@ private JsonSnapshotSerializer.IntakeRequest doTestTracer( Duration.ofMillis(100), Duration.ofSeconds(30)); sendRequest("http://localhost:" + httpPort + "/greeting"); + /* RecordedRequest snapshotRequest = retrieveSnapshotRequest(); if (snapshotRequest == null) { System.out.println("retry instrumentation because probable race with Tracer..."); @@ -213,10 +245,12 @@ private JsonSnapshotSerializer.IntakeRequest doTestTracer( snapshotRequest = retrieveSnapshotRequest(); } assertNotNull(snapshotRequest); + String bodyStr = snapshotRequest.getBody().readUtf8(); JsonAdapter> adapter = createAdapterForSnapshot(); System.out.println(bodyStr); return adapter.fromJson(bodyStr).get(0); + */ } @Override