diff --git a/communication/src/main/java/datadog/communication/ddagent/SharedCommunicationObjects.java b/communication/src/main/java/datadog/communication/ddagent/SharedCommunicationObjects.java index b17a94a9d05..b855731bc08 100644 --- a/communication/src/main/java/datadog/communication/ddagent/SharedCommunicationObjects.java +++ b/communication/src/main/java/datadog/communication/ddagent/SharedCommunicationObjects.java @@ -159,7 +159,7 @@ public DDAgentFeaturesDiscovery featuresDiscovery(Config config) { ret.discover(); // safe to run on same thread } else { // avoid performing blocking I/O operation on application thread - AgentTaskScheduler.INSTANCE.execute(ret::discover); + AgentTaskScheduler.INSTANCE.execute(ret::discoverIfOutdated); } } featuresDiscovery = ret; diff --git a/dd-trace-core/src/main/java/datadog/trace/common/writer/WriterFactory.java b/dd-trace-core/src/main/java/datadog/trace/common/writer/WriterFactory.java index baa55d5dbf1..a3244fc3768 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/writer/WriterFactory.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/writer/WriterFactory.java @@ -86,6 +86,7 @@ public static Writer createWriter( // The AgentWriter doesn't support the CI Visibility protocol. If CI Visibility is // enabled, check if we can use the IntakeWriter instead. if (DD_AGENT_WRITER_TYPE.equals(configuredType) && (config.isCiVisibilityEnabled())) { + featuresDiscovery.discoverIfOutdated(); if (featuresDiscovery.supportsEvpProxy() || config.isCiVisibilityAgentlessEnabled()) { configuredType = DD_INTAKE_WRITER_TYPE; } else { @@ -94,6 +95,7 @@ public static Writer createWriter( } } if (DD_AGENT_WRITER_TYPE.equals(configuredType) && (config.isLlmObsEnabled())) { + featuresDiscovery.discoverIfOutdated(); if (featuresDiscovery.supportsEvpProxy() || config.isLlmObsAgentlessEnabled()) { configuredType = DD_INTAKE_WRITER_TYPE; } else { @@ -186,6 +188,7 @@ private static RemoteApi createDDIntakeRemoteApi( SharedCommunicationObjects commObjects, DDAgentFeaturesDiscovery featuresDiscovery, TrackType trackType) { + featuresDiscovery.discoverIfOutdated(); boolean evpProxySupported = featuresDiscovery.supportsEvpProxy(); boolean useProxyApi = (evpProxySupported && TrackType.LLMOBS == trackType && !config.isLlmObsAgentlessEnabled()) diff --git a/dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java b/dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java index 28ba0832e30..498c8a9c27d 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java @@ -669,9 +669,16 @@ private CoreTracer( this.writer = writer; } + DDAgentFeaturesDiscovery featuresDiscovery = + sharedCommunicationObjects.featuresDiscovery(config); + + if (config.isCiVisibilityEnabled()) { + // ensure updated discovery and sync if the another discovery currently being done + featuresDiscovery.discoverIfOutdated(); + } + if (config.isCiVisibilityEnabled() - && (config.isCiVisibilityAgentlessEnabled() - || sharedCommunicationObjects.featuresDiscovery(config).supportsEvpProxy())) { + && (config.isCiVisibilityAgentlessEnabled() || featuresDiscovery.supportsEvpProxy())) { pendingTraceBuffer = PendingTraceBuffer.discarding(); traceCollectorFactory = new StreamingTraceCollector.Factory(this, this.timeSource, healthMetrics); @@ -732,8 +739,7 @@ private CoreTracer( if (config.isCiVisibilityAgentlessEnabled()) { addTraceInterceptor(DDIntakeTraceInterceptor.INSTANCE); } else { - DDAgentFeaturesDiscovery featuresDiscovery = - sharedCommunicationObjects.featuresDiscovery(config); + featuresDiscovery.discoverIfOutdated(); if (!featuresDiscovery.supportsEvpProxy()) { // CI Test Cycle protocol is not available addTraceInterceptor(CiVisibilityApmProtocolInterceptor.INSTANCE); diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/writer/WriterFactoryTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/writer/WriterFactoryTest.groovy index e3d9ce9c427..926ec81907a 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/writer/WriterFactoryTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/writer/WriterFactoryTest.groovy @@ -1,5 +1,7 @@ package datadog.trace.common.writer +import static datadog.trace.api.config.TracerConfig.PRIORITIZATION_TYPE + import datadog.communication.ddagent.DDAgentFeaturesDiscovery import datadog.communication.ddagent.SharedCommunicationObjects import datadog.trace.api.Config @@ -10,10 +12,16 @@ import datadog.trace.common.writer.ddintake.DDEvpProxyApi import datadog.trace.common.writer.ddintake.DDIntakeApi import datadog.trace.core.monitor.HealthMetrics import datadog.trace.test.util.DDSpecification - +import groovy.json.JsonBuilder import java.util.stream.Collectors - -import static datadog.trace.api.config.TracerConfig.PRIORITIZATION_TYPE +import okhttp3.Call +import okhttp3.HttpUrl +import okhttp3.MediaType +import okhttp3.OkHttpClient +import okhttp3.Protocol +import okhttp3.Request +import okhttp3.Response +import okhttp3.ResponseBody class WriterFactoryTest extends DDSpecification { @@ -27,19 +35,30 @@ class WriterFactoryTest extends DDSpecification { config.isCiVisibilityEnabled() >> true config.isCiVisibilityCodeCoverageEnabled() >> false - def agentFeaturesDiscovery = Mock(DDAgentFeaturesDiscovery) - agentFeaturesDiscovery.getEvpProxyEndpoint() >> DDAgentFeaturesDiscovery.V2_EVP_PROXY_ENDPOINT - agentFeaturesDiscovery.supportsContentEncodingHeadersWithEvpProxy() >> evpProxySupportsCompression + // Mock agent info response + def response = buildHttpResponse(hasEvpProxy, evpProxySupportsCompression, HttpUrl.parse(config.agentUrl + "/info")) + + // Mock HTTP client that simulates delayed response for async feature discovery + def mockCall = Mock(Call) + def mockHttpClient = Mock(OkHttpClient) + mockCall.execute() >> { + // Add a delay + sleep(400) + return response + } + mockHttpClient.newCall(_ as Request) >> mockCall + // Create SharedCommunicationObjects with mocked HTTP client def sharedComm = new SharedCommunicationObjects() - sharedComm.setFeaturesDiscovery(agentFeaturesDiscovery) + sharedComm.okHttpClient = mockHttpClient + sharedComm.agentUrl = HttpUrl.parse(config.agentUrl) sharedComm.createRemaining(config) def sampler = Mock(Sampler) when: - agentFeaturesDiscovery.supportsEvpProxy() >> hasEvpProxy config.ciVisibilityAgentlessEnabled >> isCiVisibilityAgentlessEnabled + def writer = WriterFactory.createWriter(config, sharedComm, sampler, null, HealthMetrics.NO_OP, configuredType) def apis @@ -77,4 +96,28 @@ class WriterFactoryTest extends DDSpecification { "not-found" | false | false | true | DDIntakeWriter | [DDIntakeApi] | true "not-found" | false | false | false | DDAgentWriter | [DDAgentApi] | false } + + Response buildHttpResponse(boolean hasEvpProxy, boolean evpProxySupportsCompression, HttpUrl agentUrl) { + def endpoints = [] + if (hasEvpProxy && evpProxySupportsCompression) { + endpoints = [DDAgentFeaturesDiscovery.V4_EVP_PROXY_ENDPOINT] + } else if (hasEvpProxy) { + endpoints = [DDAgentFeaturesDiscovery.V2_EVP_PROXY_ENDPOINT] + } else { + endpoints = [DDAgentFeaturesDiscovery.V4_ENDPOINT] + } + + def response = [ + "version" : "7.40.0", + "endpoints" : endpoints, + ] + + def builder = new Response.Builder() + .code(200) + .message("OK") + .protocol(Protocol.HTTP_1_1) + .request(new Request.Builder().url(agentUrl.resolve("/info")).build()) + .body(ResponseBody.create(MediaType.parse("application/json"), new JsonBuilder(response).toString())) + return builder.build() + } }