Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package datadog.trace.instrumentation.kafka_clients;

import datadog.trace.api.DDTags;
import datadog.trace.bootstrap.instrumentation.api.AgentDataStreamsMonitoring;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
import datadog.trace.util.FNV64Hash;
Expand All @@ -9,7 +10,11 @@

public class AvroSchemaExtractor {
public static void tryExtractProducer(ProducerRecord record, AgentSpan span) {
Integer prio = span.getSamplingPriority();
AgentDataStreamsMonitoring dsm = AgentTracer.get().getDataStreamsMonitoring();
if (!dsm.canSampleSchema(record.topic())) {
return;
}
Integer prio = span.forceSamplingDecision();
if (prio == null || prio <= 0) {
// don't extract schema if span is not sampled
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ public String hierarchyMarkerType() {
public ElementMatcher<TypeDescription> hierarchyMatcher() {
return declaresMethod(named("writeTo"))
.and(extendsClass(named(hierarchyMarkerType())))
.and(not(nameStartsWith("com.google.protobuf")));
.and(
not(nameStartsWith("com.google.protobuf"))
.or(named("com.google.protobuf.DynamicMessage")));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,19 @@ public SchemaSampler() {
}

public int trySample(long currentTimeMillis) {
weight.incrementAndGet();
if (currentTimeMillis >= lastSampleMillis + SAMPLE_INTERVAL_MILLIS) {
synchronized (this) {
if (currentTimeMillis >= lastSampleMillis + SAMPLE_INTERVAL_MILLIS) {
lastSampleMillis = currentTimeMillis;
int currentWeight = weight.get();
weight.set(0);
return currentWeight;
return weight.getAndSet(0);
}
}
}
return 0;
}

public boolean canSample(long currentTimeMillis) {
weight.incrementAndGet();
return currentTimeMillis >= lastSampleMillis + SAMPLE_INTERVAL_MILLIS;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,18 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification {
def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { traceConfig }, wellKnownTags, payloadWriter, DEFAULT_BUCKET_DURATION_NANOS)

then:
// the first received schema is sampled, with a weight of one.
dataStreams.canSampleSchema("schema1")
dataStreams.trySampleSchema("schema1") == 1
// the sampling is done by topic, so a schema on a different topic will also be sampled at once, also with a weight of one.
dataStreams.canSampleSchema("schema2")
dataStreams.trySampleSchema("schema2") == 1
dataStreams.trySampleSchema("schema1") == 0
dataStreams.trySampleSchema("schema1") == 0
// no time has passed from the last sampling, so the same schema is not sampled again (two times in a row).
!dataStreams.canSampleSchema("schema1")
!dataStreams.canSampleSchema("schema1")
timeSource.advance(30*1e9 as long)
// now, 30 seconds have passed, so the schema is sampled again, with a weight of 3 (so it includes the two times the schema was not sampled).
dataStreams.canSampleSchema("schema1")
dataStreams.trySampleSchema("schema1") == 3
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,18 @@ class SchemaSamplerTest extends DDCoreSpecification {
boolean canSample1 = sampler.canSample(currentTimeMillis)
int weight1 = sampler.trySample(currentTimeMillis)
boolean canSample2= sampler.canSample(currentTimeMillis + 1000)
int weight2 = sampler.trySample(currentTimeMillis + 1000)
boolean canSample3 = sampler.canSample(currentTimeMillis + 2000)
int weight3 = sampler.trySample(currentTimeMillis + 2000)
boolean canSample4 = sampler.canSample(currentTimeMillis + 30000)
int weight4 = sampler.trySample(currentTimeMillis + 30000)
boolean canSample5 = sampler.canSample(currentTimeMillis + 30001)
int weight5 = sampler.trySample(currentTimeMillis + 30001)

then:
canSample1
weight1 == 1
!canSample2
weight2 == 0
!canSample3
weight3 == 0
canSample4
weight4 == 3
!canSample5
weight5 == 0
}
}