Skip to content

Commit 9a79db3

Browse files
authored
Surface process tags in dsm payloads and use them for base hash calculation (#8836)
* Surface process tags in dsm payloads and use them for base hash calculation * Use right map size * fix tests
1 parent df76e30 commit 9a79db3

File tree

7 files changed

+120
-32
lines changed

7 files changed

+120
-32
lines changed

dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultPathwayContext.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import com.datadoghq.sketch.ddsketch.encoding.VarEncodingHelper;
1010
import datadog.context.propagation.CarrierVisitor;
1111
import datadog.trace.api.Config;
12+
import datadog.trace.api.ProcessTags;
1213
import datadog.trace.api.WellKnownTags;
1314
import datadog.trace.api.datastreams.DataStreamsContext;
1415
import datadog.trace.api.datastreams.PathwayContext;
@@ -363,6 +364,10 @@ public static long getBaseHash(WellKnownTags wellKnownTags) {
363364
if (primaryTag != null) {
364365
builder.append(primaryTag);
365366
}
367+
CharSequence processTags = ProcessTags.getTagsForSerialization();
368+
if (processTags != null) {
369+
builder.append(processTags);
370+
}
366371
return FNV64Hash.generateHash(builder.toString(), FNV64Hash.Version.v1);
367372
}
368373

dd-trace-core/src/main/java/datadog/trace/core/datastreams/MsgPackDatastreamsPayloadWriter.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@
77
import datadog.communication.serialization.WritableFormatter;
88
import datadog.communication.serialization.msgpack.MsgPackWriter;
99
import datadog.trace.api.Config;
10+
import datadog.trace.api.ProcessTags;
1011
import datadog.trace.api.WellKnownTags;
12+
import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString;
1113
import datadog.trace.common.metrics.Sink;
1214
import java.util.Collection;
1315
import java.util.List;
@@ -33,6 +35,7 @@ public class MsgPackDatastreamsPayloadWriter implements DatastreamsPayloadWriter
3335
private static final byte[] BACKLOG_VALUE = "Value".getBytes(ISO_8859_1);
3436
private static final byte[] BACKLOG_TAGS = "Tags".getBytes(ISO_8859_1);
3537
private static final byte[] PRODUCTS_MASK = "ProductMask".getBytes(ISO_8859_1);
38+
private static final byte[] PROCESS_TAGS = "ProcessTags".getBytes(ISO_8859_1);
3639

3740
private static final int INITIAL_CAPACITY = 512 * 1024;
3841

@@ -80,7 +83,9 @@ public long getProductsMask() {
8083

8184
@Override
8285
public void writePayload(Collection<StatsBucket> data, String serviceNameOverride) {
83-
writer.startMap(8);
86+
final List<UTF8BytesString> processTags = ProcessTags.getTagsAsUTF8ByteStringList();
87+
final boolean hasProcessTags = processTags != null;
88+
writer.startMap(8 + (hasProcessTags ? 1 : 0));
8489
/* 1 */
8590
writer.writeUTF8(ENV);
8691
writer.writeUTF8(wellKnownTags.getEnv());
@@ -139,6 +144,13 @@ public void writePayload(Collection<StatsBucket> data, String serviceNameOverrid
139144
writer.writeUTF8(PRODUCTS_MASK);
140145
writer.writeLong(getProductsMask());
141146

147+
/* 9 */
148+
if (hasProcessTags) {
149+
writer.writeUTF8(PROCESS_TAGS);
150+
writer.startArray(processTags.size());
151+
processTags.forEach(writer::writeUTF8);
152+
}
153+
142154
buffer.mark();
143155
sink.accept(buffer.messageCount(), buffer.slice());
144156
buffer.reset();

dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsWritingTest.groovy

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import datadog.communication.ddagent.DDAgentFeaturesDiscovery
44
import datadog.communication.ddagent.SharedCommunicationObjects
55
import datadog.communication.http.OkHttpUtils
66
import datadog.trace.api.Config
7+
import datadog.trace.api.ProcessTags
78
import datadog.trace.api.TraceConfig
89
import datadog.trace.api.WellKnownTags
910
import datadog.trace.api.time.ControllableTimeSource
@@ -21,6 +22,7 @@ import spock.lang.Shared
2122
import spock.util.concurrent.PollingConditions
2223

2324
import static datadog.trace.agent.test.server.http.TestHttpServer.httpServer
25+
import static datadog.trace.api.config.GeneralConfig.EXPERIMENTAL_PROPAGATE_PROCESS_TAGS_ENABLED
2426
import static java.util.concurrent.TimeUnit.SECONDS
2527

2628
/**
@@ -103,8 +105,11 @@ class DataStreamsWritingTest extends DDCoreSpecification {
103105
assert unpacker.unpackString() == serviceNameOverride
104106
}
105107

106-
def "Write bucket to mock server"() {
107-
given:
108+
def "Write bucket to mock server with process tags enabled #processTagsEnabled"() {
109+
setup:
110+
injectSysConfig(EXPERIMENTAL_PROPAGATE_PROCESS_TAGS_ENABLED, "$processTagsEnabled")
111+
ProcessTags.reset()
112+
108113
def conditions = new PollingConditions(timeout: 2)
109114

110115
def testOkhttpClient = OkHttpUtils.buildHttpClient(HttpUrl.get(server.address), 5000L)
@@ -152,16 +157,23 @@ class DataStreamsWritingTest extends DDCoreSpecification {
152157
assert requestBodies.size() == 1
153158
}
154159

155-
validateMessage(requestBodies[0])
160+
validateMessage(requestBodies[0], processTagsEnabled)
161+
162+
cleanup:
163+
injectSysConfig(EXPERIMENTAL_PROPAGATE_PROCESS_TAGS_ENABLED, "false")
164+
ProcessTags.reset()
165+
166+
where:
167+
processTagsEnabled << [true, false]
156168
}
157169

158-
def validateMessage(byte[] message) {
170+
def validateMessage(byte[] message, boolean processTagsEnabled) {
159171
GzipSource gzipSource = new GzipSource(Okio.source(new ByteArrayInputStream(message)))
160172

161173
BufferedSource bufferedSource = Okio.buffer(gzipSource)
162174
MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(bufferedSource.inputStream())
163175

164-
assert unpacker.unpackMapHeader() == 8
176+
assert unpacker.unpackMapHeader() == 8 + (processTagsEnabled ? 1 : 0)
165177
assert unpacker.unpackString() == "Env"
166178
assert unpacker.unpackString() == "test"
167179
assert unpacker.unpackString() == "Service"
@@ -265,6 +277,16 @@ class DataStreamsWritingTest extends DDCoreSpecification {
265277
assert unpacker.unpackString() == "ProductMask"
266278
assert unpacker.unpackLong() == 1
267279

280+
def processTags = ProcessTags.getTagsAsStringList()
281+
assert unpacker.hasNext() == (processTags != null)
282+
if (processTags != null) {
283+
assert unpacker.unpackString() == "ProcessTags"
284+
assert unpacker.unpackArrayHeader() == processTags.size()
285+
processTags.each {
286+
assert unpacker.unpackString() == it
287+
}
288+
}
289+
268290
return true
269291
}
270292
}

dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DefaultPathwayContextTest.groovy

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package datadog.trace.core.datastreams
33
import datadog.communication.ddagent.DDAgentFeaturesDiscovery
44
import datadog.trace.api.Config
55
import datadog.trace.api.DDTraceId
6+
import datadog.trace.api.ProcessTags
67
import datadog.trace.api.TraceConfig
78
import datadog.trace.api.WellKnownTags
89
import datadog.trace.api.datastreams.StatsPoint
@@ -17,6 +18,7 @@ import java.util.function.Consumer
1718

1819
import static datadog.context.Context.root
1920
import static datadog.trace.api.TracePropagationStyle.DATADOG
21+
import static datadog.trace.api.config.GeneralConfig.EXPERIMENTAL_PROPAGATE_PROCESS_TAGS_ENABLED
2022
import static datadog.trace.api.config.GeneralConfig.PRIMARY_TAG
2123
import static datadog.trace.api.datastreams.DataStreamsContext.create
2224
import static datadog.trace.api.datastreams.DataStreamsContext.fromTags
@@ -428,6 +430,23 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
428430
firstBaseHash != secondBaseHash
429431
}
430432
433+
def "Process Tags used in hash calculation"() {
434+
when:
435+
def firstBaseHash = DefaultPathwayContext.getBaseHash(wellKnownTags)
436+
437+
injectSysConfig(EXPERIMENTAL_PROPAGATE_PROCESS_TAGS_ENABLED, "true")
438+
ProcessTags.reset()
439+
ProcessTags.addTag("000", "first")
440+
def secondBaseHash = DefaultPathwayContext.getBaseHash(wellKnownTags)
441+
442+
then:
443+
firstBaseHash != secondBaseHash
444+
assert ProcessTags.getTagsForSerialization().startsWithAny("000:first,")
445+
cleanup:
446+
injectSysConfig(EXPERIMENTAL_PROPAGATE_PROCESS_TAGS_ENABLED, "false")
447+
ProcessTags.reset()
448+
}
449+
431450
def "Check context extractor decorator behavior"() {
432451
given:
433452
def sink = Mock(Sink)

internal-api/src/main/java/datadog/trace/api/ProcessTags.java

Lines changed: 38 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,10 @@
66
import java.nio.file.Path;
77
import java.nio.file.Paths;
88
import java.util.Collections;
9-
import java.util.LinkedHashMap;
109
import java.util.List;
1110
import java.util.Map;
11+
import java.util.SortedMap;
12+
import java.util.TreeMap;
1213
import java.util.stream.Collectors;
1314
import java.util.stream.Stream;
1415
import org.slf4j.Logger;
@@ -19,12 +20,14 @@ public class ProcessTags {
1920
private static boolean enabled = Config.get().isExperimentalPropagateProcessTagsEnabled();
2021

2122
private static class Lazy {
22-
static final Map<String, String> TAGS = loadTags();
23+
// the tags are used to compute a hash for dsm hence that map must be sorted.
24+
static final SortedMap<String, String> TAGS = loadTags();
2325
static volatile UTF8BytesString serializedForm;
24-
static volatile List<String> listForm;
26+
static volatile List<UTF8BytesString> utf8ListForm;
27+
static volatile List<String> stringListForm;
2528

26-
private static Map<String, String> loadTags() {
27-
Map<String, String> tags = new LinkedHashMap<>();
29+
private static SortedMap<String, String> loadTags() {
30+
SortedMap<String, String> tags = new TreeMap<>();
2831
if (enabled) {
2932
try {
3033
fillBaseTags(tags);
@@ -86,15 +89,21 @@ private static void fillJbossTags(Map<String, String> tags) {
8689
}
8790

8891
static void calculate() {
89-
if (listForm != null || TAGS.isEmpty()) {
92+
if (serializedForm != null || TAGS.isEmpty()) {
9093
return;
9194
}
9295
synchronized (Lazy.TAGS) {
93-
final Stream<String> tagStream =
96+
final Stream<UTF8BytesString> tagStream =
9497
TAGS.entrySet().stream()
95-
.map(entry -> entry.getKey() + ":" + TraceUtils.normalizeTag(entry.getValue()));
96-
listForm = Collections.unmodifiableList(tagStream.collect(Collectors.toList()));
97-
serializedForm = UTF8BytesString.create(String.join(",", listForm));
98+
.map(
99+
entry ->
100+
UTF8BytesString.create(
101+
entry.getKey() + ":" + TraceUtils.normalizeTag(entry.getValue())));
102+
utf8ListForm = Collections.unmodifiableList(tagStream.collect(Collectors.toList()));
103+
stringListForm =
104+
Collections.unmodifiableList(
105+
utf8ListForm.stream().map(UTF8BytesString::toString).collect(Collectors.toList()));
106+
serializedForm = UTF8BytesString.create(String.join(",", utf8ListForm));
98107
}
99108
}
100109
}
@@ -107,21 +116,34 @@ public static void addTag(String key, String value) {
107116
synchronized (Lazy.TAGS) {
108117
Lazy.TAGS.put(key, value);
109118
Lazy.serializedForm = null;
110-
Lazy.listForm = null;
119+
Lazy.stringListForm = null;
120+
Lazy.utf8ListForm = null;
111121
}
112122
}
113123
}
114124

115-
public static List<String> getTagsAsList() {
125+
public static List<UTF8BytesString> getTagsAsUTF8ByteStringList() {
116126
if (!enabled) {
117127
return null;
118128
}
119-
final List<String> listForm = Lazy.listForm;
129+
final List<UTF8BytesString> listForm = Lazy.utf8ListForm;
120130
if (listForm != null) {
121131
return listForm;
122132
}
123133
Lazy.calculate();
124-
return Lazy.listForm;
134+
return Lazy.utf8ListForm;
135+
}
136+
137+
public static List<String> getTagsAsStringList() {
138+
if (!enabled) {
139+
return null;
140+
}
141+
final List<String> listForm = Lazy.stringListForm;
142+
if (listForm != null) {
143+
return listForm;
144+
}
145+
Lazy.calculate();
146+
return Lazy.stringListForm;
125147
}
126148

127149
public static UTF8BytesString getTagsForSerialization() {
@@ -141,7 +163,8 @@ static void empty() {
141163
synchronized (Lazy.TAGS) {
142164
Lazy.TAGS.clear();
143165
Lazy.serializedForm = null;
144-
Lazy.listForm = null;
166+
Lazy.stringListForm = null;
167+
Lazy.utf8ListForm = null;
145168
}
146169
}
147170

internal-api/src/test/groovy/datadog/trace/api/ProcessTagsForkedTest.groovy

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ class ProcessTagsForkedTest extends DDSpecification {
3030
tags =~ expected
3131
where:
3232
jar | cls | expected
33-
Paths.get("my test", "my.jar").toFile() | null | "entrypoint.name:my,entrypoint.basedir:my_test,entrypoint.workdir:[^,]+"
33+
Paths.get("my test", "my.jar").toFile() | null | "entrypoint.basedir:my_test,entrypoint.name:my,entrypoint.workdir:[^,]+"
3434
Paths.get("my.jar").toFile() | null | "entrypoint.name:my,entrypoint.workdir:[^,]+"
3535
null | "com.test.Main" | "entrypoint.name:com.test.main,entrypoint.workdir:[^,]+"
3636
null | null | "entrypoint.workdir:[^,]+"
@@ -56,9 +56,9 @@ class ProcessTagsForkedTest extends DDSpecification {
5656
System.clearProperty("jboss.server.name")
5757
where:
5858
jbossHome | mode | serverName | expected
59-
"/opt/jboss/myserver" | "[Standalone]" | "standalone" | "entrypoint.name:jboss-modules,entrypoint.basedir:somewhere,entrypoint.workdir:.+,jboss.home:myserver,server.name:standalone,jboss.mode:standalone"
60-
"/opt/jboss/myserver" | "[server1:12345]" | "server1" | "entrypoint.name:jboss-modules,entrypoint.basedir:somewhere,entrypoint.workdir:.+,jboss.home:myserver,server.name:server1,jboss.mode:domain"
61-
null | "[Standalone]" | "standalone" | "entrypoint.name:jboss-modules,entrypoint.basedir:somewhere,entrypoint.workdir:[^,]+" // don't expect jboss tags since home is missing
59+
"/opt/jboss/myserver" | "[Standalone]" | "standalone" | "entrypoint.basedir:somewhere,entrypoint.name:jboss-modules,entrypoint.workdir:.+,jboss.home:myserver,jboss.mode:standalone,server.name:standalone"
60+
"/opt/jboss/myserver" | "[server1:12345]" | "server1" | "entrypoint.basedir:somewhere,entrypoint.name:jboss-modules,entrypoint.workdir:.+,jboss.home:myserver,jboss.mode:domain,server.name:server1"
61+
null | "[Standalone]" | "standalone" | "entrypoint.basedir:somewhere,entrypoint.name:jboss-modules,entrypoint.workdir:[^,]+" // don't expect jboss tags since home is missing
6262
}
6363

6464
def 'should not calculate process tags by default'() {
@@ -72,7 +72,8 @@ class ProcessTagsForkedTest extends DDSpecification {
7272
ProcessTags.addTag("test", "value")
7373
then:
7474
assert ProcessTags.tagsForSerialization == null
75-
assert ProcessTags.tagsAsList == null
75+
assert ProcessTags.tagsAsStringList == null
76+
assert ProcessTags.tagsAsUTF8ByteStringList == null
7677
}
7778

7879
def 'should lazily recalculate when a tag is added'() {
@@ -81,18 +82,24 @@ class ProcessTagsForkedTest extends DDSpecification {
8182
ProcessTags.reset()
8283
when:
8384
def processTags = ProcessTags.tagsForSerialization
84-
def tagsAsList = ProcessTags.tagsAsList
85+
def tagsAsList = ProcessTags.tagsAsStringList
86+
def tagsAsUtf8List = ProcessTags.tagsAsUTF8ByteStringList
8587
then:
8688
assert ProcessTags.enabled
8789
assert processTags != null
8890
assert tagsAsList != null
8991
assert tagsAsList.size() > 0
92+
assert tagsAsUtf8List != null
93+
assert tagsAsUtf8List.size() == tagsAsList.size()
9094
when:
91-
ProcessTags.addTag("test", "value")
95+
// add it as first pos since 0 < any other a-z
96+
ProcessTags.addTag("0test", "value")
9297
then:
93-
assert ProcessTags.tagsForSerialization.toString() == "$processTags,test:value"
94-
def size = ProcessTags.tagsAsList.size()
98+
assert ProcessTags.tagsForSerialization.toString() == "0test:value,$processTags"
99+
def size = ProcessTags.tagsAsStringList.size()
95100
assert size == tagsAsList.size() + 1
96-
assert ProcessTags.tagsAsList[size - 1] == "test:value"
101+
assert size == ProcessTags.tagsAsUTF8ByteStringList.size()
102+
assert ProcessTags.tagsAsStringList[0] == "0test:value"
103+
assert ProcessTags.tagsAsUTF8ByteStringList[0].toString() == "0test:value"
97104
}
98105
}

remote-config/remote-config-core/src/main/java/datadog/remoteconfig/tuf/RemoteConfigRequest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public static RemoteConfigRequest newRequest(
3434
serviceEnv,
3535
serviceVersion,
3636
tags,
37-
ProcessTags.getTagsAsList());
37+
ProcessTags.getTagsAsStringList());
3838

3939
ClientInfo clientInfo =
4040
new RemoteConfigRequest.ClientInfo(

0 commit comments

Comments
 (0)