diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/AvroSchemaExtractor.java b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/AvroSchemaExtractor.java index d3de61564d3..8e74e1757d4 100644 --- a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/AvroSchemaExtractor.java +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/AvroSchemaExtractor.java @@ -1,6 +1,7 @@ package datadog.trace.instrumentation.kafka_clients; import datadog.trace.api.DDTags; +import datadog.trace.bootstrap.instrumentation.api.AgentDataStreamsMonitoring; import datadog.trace.bootstrap.instrumentation.api.AgentSpan; import datadog.trace.bootstrap.instrumentation.api.AgentTracer; import datadog.trace.util.FNV64Hash; @@ -9,7 +10,11 @@ public class AvroSchemaExtractor { public static void tryExtractProducer(ProducerRecord record, AgentSpan span) { - Integer prio = span.getSamplingPriority(); + AgentDataStreamsMonitoring dsm = AgentTracer.get().getDataStreamsMonitoring(); + if (!dsm.canSampleSchema(record.topic())) { + return; + } + Integer prio = span.forceSamplingDecision(); if (prio == null || prio <= 0) { // don't extract schema if span is not sampled return; diff --git a/dd-java-agent/instrumentation/protobuf/src/main/java/datadog/trace/instrumentation/protobuf_java/AbstractMessageInstrumentation.java b/dd-java-agent/instrumentation/protobuf/src/main/java/datadog/trace/instrumentation/protobuf_java/AbstractMessageInstrumentation.java index b377ab0ea1e..57c7209dd4d 100644 --- a/dd-java-agent/instrumentation/protobuf/src/main/java/datadog/trace/instrumentation/protobuf_java/AbstractMessageInstrumentation.java +++ b/dd-java-agent/instrumentation/protobuf/src/main/java/datadog/trace/instrumentation/protobuf_java/AbstractMessageInstrumentation.java @@ -36,7 +36,9 @@ public String hierarchyMarkerType() { public ElementMatcher hierarchyMatcher() { return declaresMethod(named("writeTo")) .and(extendsClass(named(hierarchyMarkerType()))) - .and(not(nameStartsWith("com.google.protobuf"))); + .and( + not(nameStartsWith("com.google.protobuf")) + .or(named("com.google.protobuf.DynamicMessage"))); } @Override diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/SchemaSampler.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/SchemaSampler.java index f6f0a8d5610..71301d90098 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/SchemaSampler.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/SchemaSampler.java @@ -13,14 +13,11 @@ public SchemaSampler() { } public int trySample(long currentTimeMillis) { - weight.incrementAndGet(); if (currentTimeMillis >= lastSampleMillis + SAMPLE_INTERVAL_MILLIS) { synchronized (this) { if (currentTimeMillis >= lastSampleMillis + SAMPLE_INTERVAL_MILLIS) { lastSampleMillis = currentTimeMillis; - int currentWeight = weight.get(); - weight.set(0); - return currentWeight; + return weight.getAndSet(0); } } } @@ -28,6 +25,7 @@ public int trySample(long currentTimeMillis) { } public boolean canSample(long currentTimeMillis) { + weight.incrementAndGet(); return currentTimeMillis >= lastSampleMillis + SAMPLE_INTERVAL_MILLIS; } } diff --git a/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DefaultDataStreamsMonitoringTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DefaultDataStreamsMonitoringTest.groovy index c67dbe8f786..5313f1eb412 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DefaultDataStreamsMonitoringTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DefaultDataStreamsMonitoringTest.groovy @@ -77,11 +77,18 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { traceConfig }, wellKnownTags, payloadWriter, DEFAULT_BUCKET_DURATION_NANOS) then: + // the first received schema is sampled, with a weight of one. + dataStreams.canSampleSchema("schema1") dataStreams.trySampleSchema("schema1") == 1 + // the sampling is done by topic, so a schema on a different topic will also be sampled at once, also with a weight of one. + dataStreams.canSampleSchema("schema2") dataStreams.trySampleSchema("schema2") == 1 - dataStreams.trySampleSchema("schema1") == 0 - dataStreams.trySampleSchema("schema1") == 0 + // no time has passed from the last sampling, so the same schema is not sampled again (two times in a row). + !dataStreams.canSampleSchema("schema1") + !dataStreams.canSampleSchema("schema1") timeSource.advance(30*1e9 as long) + // now, 30 seconds have passed, so the schema is sampled again, with a weight of 3 (so it includes the two times the schema was not sampled). + dataStreams.canSampleSchema("schema1") dataStreams.trySampleSchema("schema1") == 3 } diff --git a/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/SchemaSamplerTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/SchemaSamplerTest.groovy index 4fb81b8307c..41615e95542 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/SchemaSamplerTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/SchemaSamplerTest.groovy @@ -13,24 +13,18 @@ class SchemaSamplerTest extends DDCoreSpecification { boolean canSample1 = sampler.canSample(currentTimeMillis) int weight1 = sampler.trySample(currentTimeMillis) boolean canSample2= sampler.canSample(currentTimeMillis + 1000) - int weight2 = sampler.trySample(currentTimeMillis + 1000) boolean canSample3 = sampler.canSample(currentTimeMillis + 2000) - int weight3 = sampler.trySample(currentTimeMillis + 2000) boolean canSample4 = sampler.canSample(currentTimeMillis + 30000) int weight4 = sampler.trySample(currentTimeMillis + 30000) boolean canSample5 = sampler.canSample(currentTimeMillis + 30001) - int weight5 = sampler.trySample(currentTimeMillis + 30001) then: canSample1 weight1 == 1 !canSample2 - weight2 == 0 !canSample3 - weight3 == 0 canSample4 weight4 == 3 !canSample5 - weight5 == 0 } }