Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public DDAgentFeaturesDiscovery featuresDiscovery(Config config) {
ret.discover(); // safe to run on same thread
} else {
// avoid performing blocking I/O operation on application thread
AgentTaskScheduler.INSTANCE.execute(ret::discover);
AgentTaskScheduler.INSTANCE.execute(ret::discoverIfOutdated);
}
}
featuresDiscovery = ret;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ public static Writer createWriter(
// The AgentWriter doesn't support the CI Visibility protocol. If CI Visibility is
// enabled, check if we can use the IntakeWriter instead.
if (DD_AGENT_WRITER_TYPE.equals(configuredType) && (config.isCiVisibilityEnabled())) {
featuresDiscovery.discoverIfOutdated();
if (featuresDiscovery.supportsEvpProxy() || config.isCiVisibilityAgentlessEnabled()) {
configuredType = DD_INTAKE_WRITER_TYPE;
} else {
Expand All @@ -94,6 +95,7 @@ public static Writer createWriter(
}
}
if (DD_AGENT_WRITER_TYPE.equals(configuredType) && (config.isLlmObsEnabled())) {
featuresDiscovery.discoverIfOutdated();
if (featuresDiscovery.supportsEvpProxy() || config.isLlmObsAgentlessEnabled()) {
configuredType = DD_INTAKE_WRITER_TYPE;
} else {
Expand Down Expand Up @@ -186,6 +188,7 @@ private static RemoteApi createDDIntakeRemoteApi(
SharedCommunicationObjects commObjects,
DDAgentFeaturesDiscovery featuresDiscovery,
TrackType trackType) {
featuresDiscovery.discoverIfOutdated();
boolean evpProxySupported = featuresDiscovery.supportsEvpProxy();
boolean useProxyApi =
(evpProxySupported && TrackType.LLMOBS == trackType && !config.isLlmObsAgentlessEnabled())
Expand Down
14 changes: 10 additions & 4 deletions dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java
Original file line number Diff line number Diff line change
Expand Up @@ -669,9 +669,16 @@ private CoreTracer(
this.writer = writer;
}

DDAgentFeaturesDiscovery featuresDiscovery =
sharedCommunicationObjects.featuresDiscovery(config);

if (config.isCiVisibilityEnabled()) {
// ensure updated discovery and sync if the another discovery currently being done
featuresDiscovery.discoverIfOutdated();
}

if (config.isCiVisibilityEnabled()
&& (config.isCiVisibilityAgentlessEnabled()
|| sharedCommunicationObjects.featuresDiscovery(config).supportsEvpProxy())) {
&& (config.isCiVisibilityAgentlessEnabled() || featuresDiscovery.supportsEvpProxy())) {
pendingTraceBuffer = PendingTraceBuffer.discarding();
traceCollectorFactory =
new StreamingTraceCollector.Factory(this, this.timeSource, healthMetrics);
Expand Down Expand Up @@ -732,8 +739,7 @@ private CoreTracer(
if (config.isCiVisibilityAgentlessEnabled()) {
addTraceInterceptor(DDIntakeTraceInterceptor.INSTANCE);
} else {
DDAgentFeaturesDiscovery featuresDiscovery =
sharedCommunicationObjects.featuresDiscovery(config);
featuresDiscovery.discoverIfOutdated();
if (!featuresDiscovery.supportsEvpProxy()) {
// CI Test Cycle protocol is not available
addTraceInterceptor(CiVisibilityApmProtocolInterceptor.INSTANCE);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package datadog.trace.common.writer

import static datadog.trace.api.config.TracerConfig.PRIORITIZATION_TYPE

import datadog.communication.ddagent.DDAgentFeaturesDiscovery
import datadog.communication.ddagent.SharedCommunicationObjects
import datadog.trace.api.Config
Expand All @@ -10,10 +12,16 @@ import datadog.trace.common.writer.ddintake.DDEvpProxyApi
import datadog.trace.common.writer.ddintake.DDIntakeApi
import datadog.trace.core.monitor.HealthMetrics
import datadog.trace.test.util.DDSpecification

import groovy.json.JsonBuilder
import java.util.stream.Collectors

import static datadog.trace.api.config.TracerConfig.PRIORITIZATION_TYPE
import okhttp3.Call
import okhttp3.HttpUrl
import okhttp3.MediaType
import okhttp3.OkHttpClient
import okhttp3.Protocol
import okhttp3.Request
import okhttp3.Response
import okhttp3.ResponseBody

class WriterFactoryTest extends DDSpecification {

Expand All @@ -27,19 +35,30 @@ class WriterFactoryTest extends DDSpecification {
config.isCiVisibilityEnabled() >> true
config.isCiVisibilityCodeCoverageEnabled() >> false

def agentFeaturesDiscovery = Mock(DDAgentFeaturesDiscovery)
agentFeaturesDiscovery.getEvpProxyEndpoint() >> DDAgentFeaturesDiscovery.V2_EVP_PROXY_ENDPOINT
agentFeaturesDiscovery.supportsContentEncodingHeadersWithEvpProxy() >> evpProxySupportsCompression
// Mock agent info response
def response = buildHttpResponse(hasEvpProxy, evpProxySupportsCompression, HttpUrl.parse(config.agentUrl + "/info"))

// Mock HTTP client that simulates delayed response for async feature discovery
def mockCall = Mock(Call)
def mockHttpClient = Mock(OkHttpClient)
mockCall.execute() >> {
// Add a delay
sleep(400)
return response
}
mockHttpClient.newCall(_ as Request) >> mockCall

// Create SharedCommunicationObjects with mocked HTTP client
def sharedComm = new SharedCommunicationObjects()
sharedComm.setFeaturesDiscovery(agentFeaturesDiscovery)
sharedComm.okHttpClient = mockHttpClient
sharedComm.agentUrl = HttpUrl.parse(config.agentUrl)
sharedComm.createRemaining(config)

def sampler = Mock(Sampler)

when:
agentFeaturesDiscovery.supportsEvpProxy() >> hasEvpProxy
config.ciVisibilityAgentlessEnabled >> isCiVisibilityAgentlessEnabled

def writer = WriterFactory.createWriter(config, sharedComm, sampler, null, HealthMetrics.NO_OP, configuredType)

def apis
Expand Down Expand Up @@ -77,4 +96,28 @@ class WriterFactoryTest extends DDSpecification {
"not-found" | false | false | true | DDIntakeWriter | [DDIntakeApi] | true
"not-found" | false | false | false | DDAgentWriter | [DDAgentApi] | false
}

Response buildHttpResponse(boolean hasEvpProxy, boolean evpProxySupportsCompression, HttpUrl agentUrl) {
def endpoints = []
if (hasEvpProxy && evpProxySupportsCompression) {
endpoints = [DDAgentFeaturesDiscovery.V4_EVP_PROXY_ENDPOINT]
} else if (hasEvpProxy) {
endpoints = [DDAgentFeaturesDiscovery.V2_EVP_PROXY_ENDPOINT]
} else {
endpoints = [DDAgentFeaturesDiscovery.V4_ENDPOINT]
}

def response = [
"version" : "7.40.0",
"endpoints" : endpoints,
]

def builder = new Response.Builder()
.code(200)
.message("OK")
.protocol(Protocol.HTTP_1_1)
.request(new Request.Builder().url(agentUrl.resolve("/info")).build())
.body(ResponseBody.create(MediaType.parse("application/json"), new JsonBuilder(response).toString()))
return builder.build()
}
}
Loading