diff --git a/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorBenchmark.java b/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorBenchmark.java new file mode 100644 index 00000000000..e15d771fc0b --- /dev/null +++ b/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorBenchmark.java @@ -0,0 +1,102 @@ +package datadog.trace.common.metrics; + +import static java.util.concurrent.TimeUnit.MICROSECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; + +import datadog.communication.ddagent.DDAgentFeaturesDiscovery; +import datadog.communication.monitor.Monitoring; +import datadog.trace.api.WellKnownTags; +import datadog.trace.core.CoreSpan; +import datadog.trace.util.Strings; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +@State(Scope.Benchmark) +@Warmup(iterations = 1, time = 30, timeUnit = SECONDS) +@Measurement(iterations = 3, time = 30, timeUnit = SECONDS) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(MICROSECONDS) +@Fork(value = 1) +public class ConflatingMetricsAggregatorBenchmark { + private final DDAgentFeaturesDiscovery featuresDiscovery = + new FixedAgentFeaturesDiscovery( + Collections.singleton("peer.hostname"), Collections.emptySet()); + private final ConflatingMetricsAggregator aggregator = + new ConflatingMetricsAggregator( + new WellKnownTags("", "", "", "", "", ""), + Collections.emptySet(), + featuresDiscovery, + new NullSink(), + 2048, + 2048); + private final List> spans = generateTrace(64); + + static List> generateTrace(int len) { + final List> trace = new ArrayList<>(); + for (int i = 0; i < len; i++) { + SimpleSpan span = new SimpleSpan("", "", "", "", true, true, false, 0, 10, -1); + span.setTag("peer.hostname", Strings.random(10)); + trace.add(span); + } + return trace; + } + + static class NullSink implements Sink { + + @Override + public void register(EventListener listener) {} + + @Override + public void accept(int messageCount, ByteBuffer buffer) {} + } + + static class FixedAgentFeaturesDiscovery extends DDAgentFeaturesDiscovery { + private final Set peerTags; + private final Set spanKinds; + + public FixedAgentFeaturesDiscovery(Set peerTags, Set spanKinds) { + // create a fixed discovery with metrics enabled + super(null, Monitoring.DISABLED, null, false, true); + this.peerTags = peerTags; + this.spanKinds = spanKinds; + } + + @Override + public void discover() { + // do nothing + } + + @Override + public boolean supportsMetrics() { + return true; + } + + @Override + public Set peerTags() { + return peerTags; + } + + @Override + public Set spanKindsToComputedStats() { + return spanKinds; + } + } + + @Benchmark + public void benchmark(Blackhole blackhole) { + blackhole.consume(aggregator.publish(spans)); + } +} diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java index 9cf2058239a..dddf526d01b 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java @@ -15,6 +15,7 @@ import datadog.communication.ddagent.DDAgentFeaturesDiscovery; import datadog.communication.ddagent.SharedCommunicationObjects; import datadog.trace.api.Config; +import datadog.trace.api.Pair; import datadog.trace.api.WellKnownTags; import datadog.trace.api.cache.DDCache; import datadog.trace.api.cache.DDCaches; @@ -24,8 +25,8 @@ import datadog.trace.core.CoreSpan; import datadog.trace.core.DDTraceCoreInfo; import datadog.trace.util.AgentTaskScheduler; +import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Queue; @@ -33,6 +34,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import org.jctools.maps.NonBlockingHashMap; import org.jctools.queues.MpscCompoundQueue; import org.jctools.queues.SpmcArrayQueue; @@ -49,6 +51,21 @@ public final class ConflatingMetricsAggregator implements MetricsAggregator, Eve private static final DDCache SERVICE_NAMES = DDCaches.newFixedSizeCache(32); + private static final DDCache SPAN_KINDS = + DDCaches.newFixedSizeCache(16); + private static final DDCache< + String, Pair, Function>> + PEER_TAGS_CACHE = + DDCaches.newFixedSizeCache( + 64); // it can be unbounded since those values are returned by the agent and should be + // under control. 64 entries is enough in this case to contain all the peer tags. + private static final Function< + String, Pair, Function>> + PEER_TAGS_CACHE_ADDER = + key -> + Pair.of( + DDCaches.newFixedSizeCache(512), + value -> UTF8BytesString.create(key + ":" + value)); private static final CharSequence SYNTHETICS_ORIGIN = "synthetics"; private final Set ignoredResources; @@ -252,7 +269,7 @@ private boolean publish(CoreSpan span, boolean isTopLevel) { span.getHttpStatusCode(), isSynthetic(span), span.isTopLevel(), - span.getTag(SPAN_KIND, ""), + SPAN_KINDS.computeIfAbsent(span.getTag(SPAN_KIND, ""), UTF8BytesString::create), getPeerTags(span)); boolean isNewKey = false; MetricKey key = keys.putIfAbsent(newKey, newKey); @@ -288,12 +305,14 @@ private boolean publish(CoreSpan span, boolean isTopLevel) { return isNewKey || span.getError() > 0; } - private Map getPeerTags(CoreSpan span) { - Map peerTags = new HashMap<>(); + private List getPeerTags(CoreSpan span) { + List peerTags = new ArrayList<>(); for (String peerTag : features.peerTags()) { Object value = span.getTag(peerTag); if (value != null) { - peerTags.put(peerTag, value.toString()); + final Pair, Function> pair = + PEER_TAGS_CACHE.computeIfAbsent(peerTag, PEER_TAGS_CACHE_ADDER); + peerTags.add(pair.getLeft().computeIfAbsent(value.toString(), pair.getRight())); } } return peerTags; diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricKey.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricKey.java index de1d55b804b..73aca7d6daf 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricKey.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricKey.java @@ -4,7 +4,7 @@ import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString; import java.util.Collections; -import java.util.Map; +import java.util.List; /** The aggregation key for tracked metrics. */ public final class MetricKey { @@ -17,7 +17,7 @@ public final class MetricKey { private final int hash; private final boolean isTraceRoot; private final UTF8BytesString spanKind; - private final Map peerTags; + private final List peerTags; public MetricKey( CharSequence resource, @@ -28,7 +28,7 @@ public MetricKey( boolean synthetics, boolean isTraceRoot, CharSequence spanKind, - Map peerTags) { + List peerTags) { this.resource = null == resource ? EMPTY : UTF8BytesString.create(resource); this.service = null == service ? EMPTY : UTF8BytesString.create(service); this.operationName = null == operationName ? EMPTY : UTF8BytesString.create(operationName); @@ -37,7 +37,7 @@ public MetricKey( this.synthetics = synthetics; this.isTraceRoot = isTraceRoot; this.spanKind = null == spanKind ? EMPTY : UTF8BytesString.create(spanKind); - this.peerTags = peerTags == null ? Collections.emptyMap() : peerTags; + this.peerTags = peerTags == null ? Collections.emptyList() : peerTags; // Unrolled polynomial hashcode to avoid varargs allocation // and eliminate data dependency between iterations as in Arrays.hashCode. @@ -90,7 +90,7 @@ public UTF8BytesString getSpanKind() { return spanKind; } - public Map getPeerTags() { + public List getPeerTags() { return peerTags; } diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/SerializingMetricWriter.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/SerializingMetricWriter.java index 7c20d158f2c..964c51b2cbf 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/SerializingMetricWriter.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/SerializingMetricWriter.java @@ -1,7 +1,6 @@ package datadog.trace.common.metrics; import static java.nio.charset.StandardCharsets.ISO_8859_1; -import static java.nio.charset.StandardCharsets.UTF_8; import datadog.communication.serialization.GrowableBuffer; import datadog.communication.serialization.WritableFormatter; @@ -9,8 +8,7 @@ import datadog.trace.api.ProcessTags; import datadog.trace.api.WellKnownTags; import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString; -import datadog.trace.util.TraceUtils; -import java.util.Map; +import java.util.List; public final class SerializingMetricWriter implements MetricWriter { @@ -126,19 +124,11 @@ public void add(MetricKey key, AggregateMetric aggregate) { writer.writeUTF8(key.getSpanKind()); writer.writeUTF8(PEER_TAGS); - Map peerTags = key.getPeerTags(); + final List peerTags = key.getPeerTags(); writer.startArray(peerTags.size()); - StringBuilder peerTagBuilder = new StringBuilder(); - for (Map.Entry peerTag : peerTags.entrySet()) { - peerTagBuilder.setLength(0); - String toWrite = - peerTagBuilder - .append(peerTag.getKey()) - .append(':') - .append(TraceUtils.normalizeTag(peerTag.getValue())) - .toString(); - writer.writeUTF8(toWrite.getBytes(UTF_8)); + for (UTF8BytesString peerTag : peerTags) { + writer.writeUTF8(peerTag); } writer.writeUTF8(HITS); diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/AggregateMetricTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/AggregateMetricTest.groovy index 14754d93886..3c7a247cae3 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/AggregateMetricTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/AggregateMetricTest.groovy @@ -1,5 +1,6 @@ package datadog.trace.common.metrics +import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString import datadog.trace.test.util.DDSpecification import java.util.concurrent.BlockingDeque @@ -51,7 +52,7 @@ class AggregateMetricTest extends DDSpecification { given: AggregateMetric aggregate = new AggregateMetric().recordDurations(3, new AtomicLongArray(0L, 0L, 0L | ERROR_TAG | TOP_LEVEL_TAG)) - Batch batch = new Batch().reset(new MetricKey("foo", "bar", "qux", "type", 0, false, true, "corge", ["grault":"quux"])) + Batch batch = new Batch().reset(new MetricKey("foo", "bar", "qux", "type", 0, false, true, "corge", [UTF8BytesString.create("grault:quux")])) batch.add(0L, 10) batch.add(0L, 10) batch.add(0L, 10) @@ -126,7 +127,7 @@ class AggregateMetricTest extends DDSpecification { def "consistent under concurrent attempts to read and write"() { given: AggregateMetric aggregate = new AggregateMetric() - MetricKey key = new MetricKey("foo", "bar", "qux", "type", 0, false, true, "corge", ["grault":"quux"]) + MetricKey key = new MetricKey("foo", "bar", "qux", "type", 0, false, true, "corge", [UTF8BytesString.create("grault:quux")]) BlockingDeque queue = new LinkedBlockingDeque<>(1000) ExecutorService reader = Executors.newSingleThreadExecutor() int writerCount = 10 diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy index 1210c13762f..490e89ce68b 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy @@ -125,7 +125,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { false, true, "baz", - [:] + [] ), _) >> { MetricKey key, AggregateMetric value -> value.getHitCount() == 1 && value.getTopLevelCount() == 1 && value.getDuration() == 100 } @@ -168,7 +168,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { false, false, kind, - [:] + [] ), { AggregateMetric aggregateMetric -> aggregateMetric.getHitCount() == 1 && aggregateMetric.getTopLevelCount() == 0 && aggregateMetric.getDuration() == 100 }) @@ -221,7 +221,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { false, false, "grault", - ["country":"france"] + [UTF8BytesString.create("country:france")] ), { AggregateMetric aggregateMetric -> aggregateMetric.getHitCount() == 1 && aggregateMetric.getTopLevelCount() == 0 && aggregateMetric.getDuration() == 100 }) @@ -235,7 +235,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { false, false, "grault", - ["country":"france", "georegion":"europe"] + [UTF8BytesString.create("country:france"), UTF8BytesString.create("georegion:europe")] ), { AggregateMetric aggregateMetric -> aggregateMetric.getHitCount() == 1 && aggregateMetric.getTopLevelCount() == 0 && aggregateMetric.getDuration() == 100 }) @@ -277,7 +277,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { false, topLevel, "baz", - [:] + [] ), { AggregateMetric value -> value.getHitCount() == 1 && value.getTopLevelCount() == topLevelCount && value.getDuration() == 100 }) @@ -333,7 +333,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { false, false, "baz", - [:] + [] ), { AggregateMetric value -> value.getHitCount() == count && value.getDuration() == count * duration }) @@ -346,7 +346,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { false, false, "baz", - [:] + [] ), { AggregateMetric value -> value.getHitCount() == count && value.getDuration() == count * duration * 2 }) @@ -396,7 +396,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { false, true, "baz", - [:] + [] ), _) >> { MetricKey key, AggregateMetric value -> value.getHitCount() == 1 && value.getDuration() == duration } @@ -410,7 +410,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { false, true, "baz", - [:] + [] ), _) 1 * writer.finishBucket() >> { latch.countDown() } @@ -456,7 +456,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { false, true, "baz", - [:] + [] ), { AggregateMetric value -> value.getHitCount() == 1 && value.getDuration() == duration }) @@ -487,7 +487,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { false, true, "baz", - [:] + [] ),{ AggregateMetric value -> value.getHitCount() == 1 && value.getDuration() == duration }) @@ -501,7 +501,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { false, true, "baz", - [:] + [] ), _) 1 * writer.finishBucket() >> { latch.countDown() } @@ -547,7 +547,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { false, true, "quux", - [:] + [] ), { AggregateMetric value -> value.getHitCount() == 1 && value.getDuration() == duration }) @@ -603,7 +603,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { false, true, "garply", - [:] + [] ), { AggregateMetric value -> value.getHitCount() == 1 && value.getDuration() == duration }) diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SerializingMetricWriterTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SerializingMetricWriterTest.groovy index 8e64253942c..a36db50b441 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SerializingMetricWriterTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SerializingMetricWriterTest.groovy @@ -4,6 +4,7 @@ import datadog.trace.api.Config import datadog.trace.api.ProcessTags import datadog.trace.api.WellKnownTags import datadog.trace.api.Pair +import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString import datadog.trace.test.util.DDSpecification import org.msgpack.core.MessagePack import org.msgpack.core.MessageUnpacker @@ -53,7 +54,11 @@ class SerializingMetricWriterTest extends DDSpecification { false, false, "client", - ["country":"canada", "georegion":"amer", "peer.service":"remote-service"] + [ + UTF8BytesString.create("country:canada"), + UTF8BytesString.create("georegion:amer"), + UTF8BytesString.create("peer.service:remote-service") + ] ), new AggregateMetric().recordDurations(10, new AtomicLongArray(1L)) ), @@ -67,7 +72,11 @@ class SerializingMetricWriterTest extends DDSpecification { true, false, "producer", - ["country":"canada", "georegion":"amer", "peer.service":"remote-service"] + [ + UTF8BytesString.create("country:canada"), + UTF8BytesString.create("georegion:amer"), + UTF8BytesString.create("peer.service:remote-service") + ], ), new AggregateMetric().recordDurations(9, new AtomicLongArray(1L)) ) @@ -83,7 +92,7 @@ class SerializingMetricWriterTest extends DDSpecification { false, false, "producer", - ["messaging.destination" : "dest" + i] + [UTF8BytesString.create("messaging.destination:dest" + i)] ), new AggregateMetric().recordDurations(10, new AtomicLongArray(1L)) ) @@ -177,10 +186,8 @@ class SerializingMetricWriterTest extends DDSpecification { int peerTagsLength = unpacker.unpackArrayHeader() assert peerTagsLength == key.getPeerTags().size() for (int i = 0; i < peerTagsLength; i++) { - def string = unpacker.unpackString() - def separatorPos = string.indexOf(':') - def tagVal = key.getPeerTags()[string.substring(0, separatorPos)] - assert tagVal == string.substring(separatorPos + 1) + def unpackedPeerTag = unpacker.unpackString() + assert unpackedPeerTag == key.getPeerTags()[i].toString() } ++elementCount assert unpacker.unpackString() == "Hits" diff --git a/dd-trace-core/src/traceAgentTest/groovy/MetricsIntegrationTest.groovy b/dd-trace-core/src/traceAgentTest/groovy/MetricsIntegrationTest.groovy index b92ed985ca1..9984b9700d0 100644 --- a/dd-trace-core/src/traceAgentTest/groovy/MetricsIntegrationTest.groovy +++ b/dd-trace-core/src/traceAgentTest/groovy/MetricsIntegrationTest.groovy @@ -2,6 +2,7 @@ import datadog.communication.ddagent.DDAgentFeaturesDiscovery import datadog.communication.http.OkHttpUtils import datadog.trace.api.Config import datadog.trace.api.WellKnownTags +import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString import datadog.trace.common.metrics.AggregateMetric import datadog.trace.common.metrics.EventListener import datadog.trace.common.metrics.MetricKey @@ -34,11 +35,11 @@ class MetricsIntegrationTest extends AbstractTraceAgentTest { ) writer.startBucket(2, System.nanoTime(), SECONDS.toNanos(10)) writer.add( - new MetricKey("resource1", "service1", "operation1", "sql", 0, false, true, "xyzzy", ["grault":"quux"]), + new MetricKey("resource1", "service1", "operation1", "sql", 0, false, true, "xyzzy", [UTF8BytesString.create("grault:quux")]), new AggregateMetric().recordDurations(5, new AtomicLongArray(2, 1, 2, 250, 4, 5)) ) writer.add( - new MetricKey("resource2", "service2", "operation2", "web", 200, false, true, "xyzzy", ["grault":"quux"]), + new MetricKey("resource2", "service2", "operation2", "web", 200, false, true, "xyzzy", [UTF8BytesString.create("grault:quux")]), new AggregateMetric().recordDurations(10, new AtomicLongArray(1, 1, 200, 2, 3, 4, 5, 6, 7, 8, 9)) ) writer.finishBucket()