diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 9190a8b8c09..3ea972ba075 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -28,7 +28,7 @@ /internal-api/src/test/groovy/datadog/trace/api/sampling @DataDog/apm-sdk-api-java # @DataDog/apm-serverless -/dd-trace-core/src/main/java/datadog/trace/lambda/ @DataDog/apm-serverless +/dd-trace-core/src/main/java/datadog/trace/lambda/ @DataDog/apm-serverless /dd-trace-core/src/test/groovy/datadog/trace/lambda/ @DataDog/apm-serverless # @DataDog/apm-lang-platform-java @@ -131,3 +131,9 @@ dd-trace-api/src/main/java/datadog/trace/api/llmobs/ @DataDog/ml-observability dd-java-agent/agent-llmobs/ @DataDog/ml-observability dd-trace-core/src/main/java/datadog/trace/llmobs/ @DataDog/ml-observability dd-trace-core/src/test/groovy/datadog/trace/llmobs/ @DataDog/ml-observability + +# @DataDog/rum +/internal-api/src/main/java/datadog/trace/api/rum/ @DataDog/rum +/internal-api/src/test/groovy/datadog/trace/api/rum/ @DataDog/rum +/telemetry/src/main/java/datadog/telemetry/rum/ @DataDog/rum +/telemetry/src/test/groovy/datadog/telemetry/rum/ @DataDog/rum diff --git a/dd-java-agent/instrumentation-testing/src/main/groovy/datadog/trace/agent/test/base/HttpServerTest.groovy b/dd-java-agent/instrumentation-testing/src/main/groovy/datadog/trace/agent/test/base/HttpServerTest.groovy index b207cbd5f48..b812d5a4187 100644 --- a/dd-java-agent/instrumentation-testing/src/main/groovy/datadog/trace/agent/test/base/HttpServerTest.groovy +++ b/dd-java-agent/instrumentation-testing/src/main/groovy/datadog/trace/agent/test/base/HttpServerTest.groovy @@ -2246,26 +2246,20 @@ abstract class HttpServerTest extends WithHttpServer { def "test rum injection in head for mime #mime"() { setup: assumeTrue(testRumInjection()) - def telemetryCollector = RumInjector.getTelemetryCollector() def request = new Request.Builder().url(server.address().resolve("gimme-$mime").toURL()) .get().build() when: def response = client.newCall(request).execute() def responseBody = response.body().string() - def finalSummary = telemetryCollector.summary() then: assert response.code() == 200 assert responseBody.contains(new String(RumInjector.get().getSnippetBytes("UTF-8"), "UTF-8")) == expected assert response.header("x-datadog-rum-injected") == (expected ? "1" : null) - // Check a few telemetry metrics if (expected) { - assert finalSummary.contains("injectionSucceed=") assert responseBody.length() > 0 - } else { - assert finalSummary.contains("injectionSkipped=") } where: diff --git a/dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java b/dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java index d77dd7f8b68..f1ca6fb3137 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java @@ -30,7 +30,6 @@ import datadog.trace.api.DynamicConfig; import datadog.trace.api.EndpointTracker; import datadog.trace.api.IdGenerationStrategy; -import datadog.trace.api.InstrumenterConfig; import datadog.trace.api.StatsDClient; import datadog.trace.api.TagMap; import datadog.trace.api.TraceConfig; @@ -714,11 +713,6 @@ private CoreTracer( : HealthMetrics.NO_OP); this.healthMetrics.start(); - // Start RUM injector telemetry - if (InstrumenterConfig.get().isRumEnabled()) { - RumInjector.enableTelemetry(this.statsDClient); - } - performanceMonitoring = config.isPerfMetricsEnabled() ? new MonitoringImpl(this.statsDClient, 10, SECONDS) diff --git a/internal-api/src/main/java/datadog/trace/api/rum/RumInjector.java b/internal-api/src/main/java/datadog/trace/api/rum/RumInjector.java index 343c9450aa6..ef77cff0e47 100644 --- a/internal-api/src/main/java/datadog/trace/api/rum/RumInjector.java +++ b/internal-api/src/main/java/datadog/trace/api/rum/RumInjector.java @@ -125,19 +125,11 @@ public byte[] getMarkerBytes(String encoding) { return this.markerCache.computeIfAbsent(encoding, MARKER_BYTES); } - /** - * Starts telemetry collection and reports metrics via StatsDClient. - * - * @param statsDClient The StatsDClient to report metrics to. - */ - public static void enableTelemetry(datadog.trace.api.StatsDClient statsDClient) { - if (statsDClient != null) { - RumInjectorMetrics metrics = new RumInjectorMetrics(statsDClient); - telemetryCollector = metrics; - - if (INSTANCE.isEnabled()) { - telemetryCollector.onInitializationSucceed(); - } + /** Starts telemetry collection if RUM injection is enabled. */ + public static void enableTelemetry() { + if (INSTANCE.isEnabled()) { + telemetryCollector = new RumInjectorMetrics(); + telemetryCollector.onInitializationSucceed(); } else { telemetryCollector = RumTelemetryCollector.NO_OP; } diff --git a/internal-api/src/main/java/datadog/trace/api/rum/RumInjectorMetrics.java b/internal-api/src/main/java/datadog/trace/api/rum/RumInjectorMetrics.java index f97742af3d2..54ddd499e82 100644 --- a/internal-api/src/main/java/datadog/trace/api/rum/RumInjectorMetrics.java +++ b/internal-api/src/main/java/datadog/trace/api/rum/RumInjectorMetrics.java @@ -1,14 +1,20 @@ package datadog.trace.api.rum; import datadog.trace.api.Config; -import datadog.trace.api.StatsDClient; import datadog.trace.api.cache.DDCache; import datadog.trace.api.cache.DDCaches; -import java.util.concurrent.atomic.AtomicLong; +import datadog.trace.api.telemetry.MetricCollector; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.LinkedBlockingQueue; /** * This class implements the RumTelemetryCollector interface, which is used to collect telemetry - * from the RumInjector. Metrics are then reported via StatsDClient with tagging. + * from the RumInjector. Metrics are then reported via the Datadog telemetry intake system. * * @see common @@ -16,30 +22,22 @@ */ public class RumInjectorMetrics implements RumTelemetryCollector { - private final AtomicLong injectionSucceed = new AtomicLong(); - private final AtomicLong injectionFailed = new AtomicLong(); - private final AtomicLong injectionSkipped = new AtomicLong(); - private final AtomicLong contentSecurityPolicyDetected = new AtomicLong(); - private final AtomicLong initializationSucceed = new AtomicLong(); + private final Queue metrics = new LinkedBlockingQueue<>(1024); + private final Queue distributions = + new LinkedBlockingQueue<>(1024); - private final StatsDClient statsd; + private final DDCache> succeedTagsCache = DDCaches.newFixedSizeCache(8); + private final DDCache> skippedTagsCache = DDCaches.newFixedSizeCache(8); + private final DDCache> cspTagsCache = DDCaches.newFixedSizeCache(8); + private final DDCache> responseTagsCache = DDCaches.newFixedSizeCache(8); + private final DDCache> timeTagsCache = DDCaches.newFixedSizeCache(8); + private final DDCache> failedTagsCache = DDCaches.newFixedSizeCache(16); + private final DDCache> initTagsCache = DDCaches.newFixedSizeCache(1); private final String applicationId; private final String remoteConfigUsed; - // Cache dependent on servlet version and content encoding - private final DDCache succeedTagsCache = DDCaches.newFixedSizeCache(8); - private final DDCache skippedTagsCache = DDCaches.newFixedSizeCache(8); - private final DDCache cspTagsCache = DDCaches.newFixedSizeCache(8); - private final DDCache responseTagsCache = DDCaches.newFixedSizeCache(8); - private final DDCache timeTagsCache = DDCaches.newFixedSizeCache(8); - private final DDCache failedTagsCache = DDCaches.newFixedSizeCache(16); - - private static final String[] INIT_TAGS = - new String[] {"integration_name:servlet", "integration_version:N/A"}; - - public RumInjectorMetrics(final StatsDClient statsd) { - this.statsd = statsd; + public RumInjectorMetrics() { // Get RUM config values (applicationId and remoteConfigUsed) for tagging RumInjector rumInjector = RumInjector.get(); @@ -55,146 +53,171 @@ public RumInjectorMetrics(final StatsDClient statsd) { @Override public void onInjectionSucceed(String servletVersion) { - injectionSucceed.incrementAndGet(); - - String[] tags = + List tags = succeedTagsCache.computeIfAbsent( servletVersion, version -> - new String[] { - "application_id:" + applicationId, - "integration_name:servlet", - "integration_version:" + version, - "remote_config_used:" + remoteConfigUsed - }); - - statsd.count("rum.injection.succeed", 1, tags); + Arrays.asList( + "application_id:" + applicationId, + "integration_name:servlet", + "integration_version:" + version, + "remote_config_used:" + remoteConfigUsed)); + + MetricCollector.Metric metric = + new MetricCollector.Metric("rum", true, "injection.succeed", "count", 1, tags); + metrics.offer(metric); } @Override public void onInjectionFailed(String servletVersion, String contentEncoding) { - injectionFailed.incrementAndGet(); - String cacheKey = servletVersion + ":" + contentEncoding; - String[] tags = + List tags = failedTagsCache.computeIfAbsent( cacheKey, key -> { if (contentEncoding != null) { - return new String[] { - "application_id:" + applicationId, - "content_encoding:" + contentEncoding, - "integration_name:servlet", - "integration_version:" + servletVersion, - "reason:failed_to_return_response_wrapper", - "remote_config_used:" + remoteConfigUsed - }; + return Arrays.asList( + "application_id:" + applicationId, + "content_encoding:" + contentEncoding, + "integration_name:servlet", + "integration_version:" + servletVersion, + "reason:failed_to_return_response_wrapper", + "remote_config_used:" + remoteConfigUsed); } else { - return new String[] { - "application_id:" + applicationId, - "integration_name:servlet", - "integration_version:" + servletVersion, - "reason:failed_to_return_response_wrapper", - "remote_config_used:" + remoteConfigUsed - }; + return Arrays.asList( + "application_id:" + applicationId, + "integration_name:servlet", + "integration_version:" + servletVersion, + "reason:failed_to_return_response_wrapper", + "remote_config_used:" + remoteConfigUsed); } }); - statsd.count("rum.injection.failed", 1, tags); + MetricCollector.Metric metric = + new MetricCollector.Metric("rum", true, "injection.failed", "count", 1, tags); + metrics.offer(metric); } @Override public void onInjectionSkipped(String servletVersion) { - injectionSkipped.incrementAndGet(); - - String[] tags = + List tags = skippedTagsCache.computeIfAbsent( servletVersion, version -> - new String[] { - "application_id:" + applicationId, - "integration_name:servlet", - "integration_version:" + version, - "reason:should_not_inject", - "remote_config_used:" + remoteConfigUsed - }); - - statsd.count("rum.injection.skipped", 1, tags); + Arrays.asList( + "application_id:" + applicationId, + "integration_name:servlet", + "integration_version:" + version, + "reason:should_not_inject", + "remote_config_used:" + remoteConfigUsed)); + + MetricCollector.Metric metric = + new MetricCollector.Metric("rum", true, "injection.skipped", "count", 1, tags); + metrics.offer(metric); } @Override public void onInitializationSucceed() { - initializationSucceed.incrementAndGet(); - statsd.count("rum.injection.initialization.succeed", 1, INIT_TAGS); + List tags = + initTagsCache.computeIfAbsent( + "init", key -> Arrays.asList("integration_name:servlet", "integration_version:N/A")); + + MetricCollector.Metric metric = + new MetricCollector.Metric( + "rum", true, "injection.initialization.succeed", "count", 1, tags); + metrics.offer(metric); } @Override public void onContentSecurityPolicyDetected(String servletVersion) { - contentSecurityPolicyDetected.incrementAndGet(); - - String[] tags = + List tags = cspTagsCache.computeIfAbsent( servletVersion, version -> - new String[] { - "integration_name:servlet", - "integration_version:" + version, - "kind:header", - "reason:csp_header_found", - "status:seen" - }); - statsd.count("rum.injection.content_security_policy", 1, tags); + Arrays.asList( + "integration_name:servlet", + "integration_version:" + version, + "kind:header", + "reason:csp_header_found", + "status:seen")); + + MetricCollector.Metric metric = + new MetricCollector.Metric( + "rum", true, "injection.content_security_policy", "count", 1, tags); + metrics.offer(metric); } @Override public void onInjectionResponseSize(String servletVersion, long bytes) { - String[] tags = + List tags = responseTagsCache.computeIfAbsent( servletVersion, version -> - new String[] { - "integration_name:servlet", - "integration_version:" + version, - "response_kind:header" - }); - statsd.distribution("rum.injection.response.bytes", bytes, tags); + Arrays.asList( + "integration_name:servlet", + "integration_version:" + version, + "response_kind:header")); + + MetricCollector.DistributionSeriesPoint distribution = + new MetricCollector.DistributionSeriesPoint( + "injection.response.bytes", true, "rum", (int) bytes, tags); + distributions.offer(distribution); } @Override public void onInjectionTime(String servletVersion, long milliseconds) { - String[] tags = + List tags = timeTagsCache.computeIfAbsent( servletVersion, - version -> new String[] {"integration_name:servlet", "integration_version:" + version}); - statsd.distribution("rum.injection.ms", milliseconds, tags); + version -> Arrays.asList("integration_name:servlet", "integration_version:" + version)); + + MetricCollector.DistributionSeriesPoint distribution = + new MetricCollector.DistributionSeriesPoint( + "injection.ms", true, "rum", (int) milliseconds, tags); + distributions.offer(distribution); } @Override public void close() { - injectionSucceed.set(0); - injectionFailed.set(0); - injectionSkipped.set(0); - contentSecurityPolicyDetected.set(0); - initializationSucceed.set(0); - + metrics.clear(); + distributions.clear(); succeedTagsCache.clear(); skippedTagsCache.clear(); cspTagsCache.clear(); responseTagsCache.clear(); timeTagsCache.clear(); failedTagsCache.clear(); + initTagsCache.clear(); } - public String summary() { - return "\ninitializationSucceed=" - + initializationSucceed.get() - + "\ninjectionSucceed=" - + injectionSucceed.get() - + "\ninjectionFailed=" - + injectionFailed.get() - + "\ninjectionSkipped=" - + injectionSkipped.get() - + "\ncontentSecurityPolicyDetected=" - + contentSecurityPolicyDetected.get(); + /** + * Drains all count metrics. + * + * @return Collection of metrics sent via telemetry + */ + public synchronized Collection drain() { + if (metrics.isEmpty()) { + return Collections.emptyList(); + } + + List drained = new ArrayList<>(metrics); + metrics.clear(); + return drained; + } + + /** + * Drains all distribution metrics. + * + * @return Collection of distribution points sent via telemetry + */ + public synchronized Collection + drainDistributionSeries() { + if (distributions.isEmpty()) { + return Collections.emptyList(); + } + + List drained = new ArrayList<>(distributions); + distributions.clear(); + return drained; } } diff --git a/internal-api/src/main/java/datadog/trace/api/rum/RumTelemetryCollector.java b/internal-api/src/main/java/datadog/trace/api/rum/RumTelemetryCollector.java index 74638bdffec..21e19a4cd82 100644 --- a/internal-api/src/main/java/datadog/trace/api/rum/RumTelemetryCollector.java +++ b/internal-api/src/main/java/datadog/trace/api/rum/RumTelemetryCollector.java @@ -1,5 +1,9 @@ package datadog.trace.api.rum; +import datadog.trace.api.telemetry.MetricCollector; +import java.util.Collection; +import java.util.Collections; + /** * Collect RUM injection telemetry from the RumInjector This is implemented by the * RumInjectorMetrics class @@ -33,8 +37,13 @@ public void onInjectionTime(String integrationVersion, long milliseconds) {} public void close() {} @Override - public String summary() { - return ""; + public Collection drain() { + return Collections.emptyList(); + } + + @Override + public Collection drainDistributionSeries() { + return Collections.emptyList(); } }; @@ -89,8 +98,17 @@ public String summary() { /** Closes the telemetry collector. */ default void close() {} - /** Returns a human-readable summary of the telemetry collected. */ - default String summary() { - return ""; - } + /** + * Drains all count metrics to be sent via telemetry. + * + * @return Collection of count metrics to be sent via telemetry. + */ + Collection drain(); + + /** + * Drains all distribution metrics to be sent via telemetry. + * + * @return Collection of distribution points to be sent via telemetry. + */ + Collection drainDistributionSeries(); } diff --git a/internal-api/src/test/groovy/datadog/trace/api/rum/RumInjectorMetricsTest.groovy b/internal-api/src/test/groovy/datadog/trace/api/rum/RumInjectorMetricsTest.groovy index eb7ba338bc0..1be02217fdc 100644 --- a/internal-api/src/test/groovy/datadog/trace/api/rum/RumInjectorMetricsTest.groovy +++ b/internal-api/src/test/groovy/datadog/trace/api/rum/RumInjectorMetricsTest.groovy @@ -1,20 +1,12 @@ package datadog.trace.api.rum -import datadog.trace.api.StatsDClient import spock.lang.Specification import spock.lang.Subject class RumInjectorMetricsTest extends Specification { - def statsD = Mock(StatsDClient) @Subject - def metrics = new RumInjectorMetrics(statsD) - - void assertTags(String[] args, String... expectedTags) { - expectedTags.each { expectedTag -> - assert args.contains(expectedTag), "Expected tag '$expectedTag' not found in tags: ${args as List}" - } - } + def metrics = new RumInjectorMetrics() // Note: application_id and remote_config_used tags need dynamic runtime values that depend on // the RUM configuration state, so we do not test them here. @@ -25,19 +17,35 @@ class RumInjectorMetricsTest extends Specification { metrics.onInjectionSucceed("6") then: - 1 * statsD.count('rum.injection.succeed', 1, _) >> { args -> - def tags = args[2] as String[] - assertTags(tags, "integration_name:servlet", "integration_version:3") - } - 1 * statsD.count('rum.injection.succeed', 1, _) >> { args -> - def tags = args[2] as String[] - assertTags(tags, "integration_name:servlet", "integration_version:5") - } - 1 * statsD.count('rum.injection.succeed', 1, _) >> { args -> - def tags = args[2] as String[] - assertTags(tags, "integration_name:servlet", "integration_version:6") - } - 0 * _ + def drained = metrics.drain() + drained.size() == 3 + + def servlet3 = drained[0] + servlet3.namespace == "rum" + servlet3.metricName == "injection.succeed" + servlet3.type == "count" + servlet3.value == 1 + servlet3.common == true + servlet3.tags.contains("integration_name:servlet") + servlet3.tags.contains("integration_version:3") + + def servlet5 = drained[1] + servlet5.namespace == "rum" + servlet5.metricName == "injection.succeed" + servlet5.type == "count" + servlet5.value == 1 + servlet5.common == true + servlet5.tags.contains("integration_name:servlet") + servlet5.tags.contains("integration_version:5") + + def servlet6 = drained[2] + servlet6.namespace == "rum" + servlet6.metricName == "injection.succeed" + servlet6.type == "count" + servlet6.value == 1 + servlet6.common == true + servlet6.tags.contains("integration_name:servlet") + servlet6.tags.contains("integration_version:6") } def "test onInjectionFailed"() { @@ -47,20 +55,41 @@ class RumInjectorMetricsTest extends Specification { metrics.onInjectionFailed("6", "gzip") then: - 1 * statsD.count('rum.injection.failed', 1, _) >> { args -> - def tags = args[2] as String[] - assertTags(tags, "content_encoding:gzip", "integration_name:servlet", "integration_version:3", "reason:failed_to_return_response_wrapper") - } - 1 * statsD.count('rum.injection.failed', 1, _) >> { args -> - def tags = args[2] as String[] - assert !tags.any { it.startsWith("content_encoding:") } - assertTags(tags, "integration_name:servlet", "integration_version:5", "reason:failed_to_return_response_wrapper") - } - 1 * statsD.count('rum.injection.failed', 1, _) >> { args -> - def tags = args[2] as String[] - assertTags(tags, "content_encoding:gzip", "integration_name:servlet", "integration_version:6", "reason:failed_to_return_response_wrapper") - } - 0 * _ + def drained = metrics.drain() + drained.size() == 3 + + def servlet3 = drained[0] + servlet3.namespace == "rum" + servlet3.metricName == "injection.failed" + servlet3.type == "count" + servlet3.value == 1 + servlet3.common == true + servlet3.tags.contains("content_encoding:gzip") + servlet3.tags.contains("integration_name:servlet") + servlet3.tags.contains("integration_version:3") + servlet3.tags.contains("reason:failed_to_return_response_wrapper") + + def servlet5 = drained[1] + servlet5.namespace == "rum" + servlet5.metricName == "injection.failed" + servlet5.type == "count" + servlet5.value == 1 + servlet5.common == true + !servlet5.tags.any { it.startsWith("content_encoding:") } + servlet5.tags.contains("integration_name:servlet") + servlet5.tags.contains("integration_version:5") + servlet5.tags.contains("reason:failed_to_return_response_wrapper") + + def servlet6 = drained[2] + servlet6.namespace == "rum" + servlet6.metricName == "injection.failed" + servlet6.type == "count" + servlet6.value == 1 + servlet6.common == true + servlet6.tags.contains("content_encoding:gzip") + servlet6.tags.contains("integration_name:servlet") + servlet6.tags.contains("integration_version:6") + servlet6.tags.contains("reason:failed_to_return_response_wrapper") } def "test onInjectionSkipped"() { @@ -70,173 +99,182 @@ class RumInjectorMetricsTest extends Specification { metrics.onInjectionSkipped("6") then: - 1 * statsD.count('rum.injection.skipped', 1, _) >> { args -> - def tags = args[2] as String[] - assertTags(tags, "integration_name:servlet", "integration_version:3", "reason:should_not_inject") - } - 1 * statsD.count('rum.injection.skipped', 1, _) >> { args -> - def tags = args[2] as String[] - assertTags(tags, "integration_name:servlet", "integration_version:5", "reason:should_not_inject") - } - 1 * statsD.count('rum.injection.skipped', 1, _) >> { args -> - def tags = args[2] as String[] - assertTags(tags, "integration_name:servlet", "integration_version:6", "reason:should_not_inject") - } - 0 * _ + def drained = metrics.drain() + drained.size() == 3 + + def servlet3 = drained[0] + servlet3.namespace == "rum" + servlet3.metricName == "injection.skipped" + servlet3.type == "count" + servlet3.value == 1 + servlet3.common == true + servlet3.tags.contains("integration_name:servlet") + servlet3.tags.contains("integration_version:3") + servlet3.tags.contains("reason:should_not_inject") + + def servlet5 = drained[1] + servlet5.namespace == "rum" + servlet5.metricName == "injection.skipped" + servlet5.type == "count" + servlet5.value == 1 + servlet5.common == true + servlet5.tags.contains("integration_name:servlet") + servlet5.tags.contains("integration_version:5") + servlet5.tags.contains("reason:should_not_inject") + + def servlet6 = drained[2] + servlet6.namespace == "rum" + servlet6.metricName == "injection.skipped" + servlet6.type == "count" + servlet6.value == 1 + servlet6.common == true + servlet6.tags.contains("integration_name:servlet") + servlet6.tags.contains("integration_version:6") + servlet6.tags.contains("reason:should_not_inject") } - def "test onContentSecurityPolicyDetected"() { + def "test onInitializationSucceed"() { when: - metrics.onContentSecurityPolicyDetected("3") - metrics.onContentSecurityPolicyDetected("5") - metrics.onContentSecurityPolicyDetected("6") + metrics.onInitializationSucceed() then: - 1 * statsD.count('rum.injection.content_security_policy', 1, _) >> { args -> - def tags = args[2] as String[] - assertTags(tags, "integration_name:servlet", "integration_version:3", "kind:header", "reason:csp_header_found", "status:seen") - } - 1 * statsD.count('rum.injection.content_security_policy', 1, _) >> { args -> - def tags = args[2] as String[] - assertTags(tags, "integration_name:servlet", "integration_version:5", "kind:header", "reason:csp_header_found", "status:seen") - } - 1 * statsD.count('rum.injection.content_security_policy', 1, _) >> { args -> - def tags = args[2] as String[] - assertTags(tags, "integration_name:servlet", "integration_version:6", "kind:header", "reason:csp_header_found", "status:seen") - } - 0 * _ + def drained = metrics.drain() + drained.size() == 1 + + def metric = drained[0] + metric.namespace == "rum" + metric.metricName == "injection.initialization.succeed" + metric.type == "count" + metric.value == 1 + metric.common == true + metric.tags.contains("integration_name:servlet") + metric.tags.contains("integration_version:N/A") } - def "test onInitializationSucceed"() { + def "test onContentSecurityPolicyDetected"() { when: - metrics.onInitializationSucceed() + metrics.onContentSecurityPolicyDetected("5") then: - 1 * statsD.count('rum.injection.initialization.succeed', 1, _) >> { args -> - def tags = args[2] as String[] - assertTags(tags, "integration_name:servlet", "integration_version:N/A") - } - 0 * _ + def drained = metrics.drain() + drained.size() == 1 + + def servlet5 = drained[0] + servlet5.namespace == "rum" + servlet5.metricName == "injection.content_security_policy" + servlet5.type == "count" + servlet5.value == 1 + servlet5.common == true + servlet5.tags.contains("integration_name:servlet") + servlet5.tags.contains("integration_version:5") + servlet5.tags.contains("kind:header") + servlet5.tags.contains("reason:csp_header_found") + servlet5.tags.contains("status:seen") } - def "test onInjectionResponseSize with multiple sizes"() { + def "test onInjectionResponseSize"() { when: - metrics.onInjectionResponseSize("3", 256) - metrics.onInjectionResponseSize("5", 512) - metrics.onInjectionResponseSize("6", 1024) + metrics.onInjectionResponseSize("3", 1024) + metrics.onInjectionResponseSize("5", 2048) then: - 1 * statsD.distribution('rum.injection.response.bytes', 256, _) >> { args -> - def tags = args[2] as String[] - assertTags(tags, "integration_name:servlet", "integration_version:3", "response_kind:header") - } - 1 * statsD.distribution('rum.injection.response.bytes', 512, _) >> { args -> - def tags = args[2] as String[] - assertTags(tags, "integration_name:servlet", "integration_version:5", "response_kind:header") - } - 1 * statsD.distribution('rum.injection.response.bytes', 1024, _) >> { args -> - def tags = args[2] as String[] - assertTags(tags, "integration_name:servlet", "integration_version:6", "response_kind:header") - } - 0 * _ + def drained = metrics.drainDistributionSeries() + drained.size() == 2 + + def servlet3 = drained[0] + servlet3.namespace == "rum" + servlet3.metricName == "injection.response.bytes" + servlet3.value == 1024 + servlet3.common == true + servlet3.tags.contains("integration_name:servlet") + servlet3.tags.contains("integration_version:3") + servlet3.tags.contains("response_kind:header") + + def servlet5 = drained[1] + servlet5.namespace == "rum" + servlet5.metricName == "injection.response.bytes" + servlet5.value == 2048 + servlet5.common == true + servlet5.tags.contains("integration_name:servlet") + servlet5.tags.contains("integration_version:5") + servlet5.tags.contains("response_kind:header") } - def "test onInjectionTime with multiple durations"() { + def "test onInjectionTime"() { when: - metrics.onInjectionTime("5", 5L) - metrics.onInjectionTime("3", 10L) - metrics.onInjectionTime("6", 15L) + metrics.onInjectionTime("3", 15L) + metrics.onInjectionTime("5", 25L) then: - 1 * statsD.distribution('rum.injection.ms', 5L, _) >> { args -> - def tags = args[2] as String[] - assertTags(tags, "integration_name:servlet", "integration_version:5") - } - 1 * statsD.distribution('rum.injection.ms', 10L, _) >> { args -> - def tags = args[2] as String[] - assertTags(tags, "integration_name:servlet", "integration_version:3") - } - 1 * statsD.distribution('rum.injection.ms', 15L, _) >> { args -> - def tags = args[2] as String[] - assertTags(tags, "integration_name:servlet", "integration_version:6") - } - 0 * _ + def drained = metrics.drainDistributionSeries() + drained.size() == 2 + + def servlet3 = drained[0] + servlet3.namespace == "rum" + servlet3.metricName == "injection.ms" + servlet3.value == 15 + servlet3.common == true + servlet3.tags.contains("integration_name:servlet") + servlet3.tags.contains("integration_version:3") + + def servlet5 = drained[1] + servlet5.namespace == "rum" + servlet5.metricName == "injection.ms" + servlet5.value == 25 + servlet5.common == true + servlet5.tags.contains("integration_name:servlet") + servlet5.tags.contains("integration_version:5") } - def "test summary with multiple events in different order"() { + def "test drain methods"() { when: - metrics.onInitializationSucceed() - metrics.onContentSecurityPolicyDetected("3") - metrics.onInjectionSkipped("5") - metrics.onInjectionFailed("3", "gzip") metrics.onInjectionSucceed("3") - metrics.onInjectionFailed("6", null) - metrics.onInjectionSucceed("6") - metrics.onInjectionSkipped("3") - metrics.onContentSecurityPolicyDetected("6") - metrics.onInjectionResponseSize("3", 256) - metrics.onInjectionTime("5", 5L) - def summary = metrics.summary() + metrics.onInjectionTime("3", 10L) + + def drainedData = metrics.drain() + def data = metrics.drainDistributionSeries() + + def drainedEmpty = metrics.drain() + def empty = metrics.drainDistributionSeries() then: - summary.contains("initializationSucceed=1") - summary.contains("injectionSucceed=2") - summary.contains("injectionFailed=2") - summary.contains("injectionSkipped=2") - summary.contains("contentSecurityPolicyDetected=2") - 1 * statsD.count('rum.injection.initialization.succeed', 1, _) - 2 * statsD.count('rum.injection.succeed', 1, _) - 2 * statsD.count('rum.injection.failed', 1, _) - 2 * statsD.count('rum.injection.skipped', 1, _) - 2 * statsD.count('rum.injection.content_security_policy', 1, _) - 1 * statsD.distribution('rum.injection.response.bytes', 256, _) - 1 * statsD.distribution('rum.injection.ms', 5L, _) - 0 * _ + drainedData.size() == 1 + data.size() == 1 + drainedEmpty.size() == 0 + empty.size() == 0 } - def "test metrics start at zero in summary"() { + def "test mixed metrics"() { when: - def summary = metrics.summary() + metrics.onInjectionSucceed("3") + metrics.onInjectionFailed("4", "gzip") + metrics.onInjectionResponseSize("5", 512) + metrics.onInjectionTime("6", 20L) + metrics.onContentSecurityPolicyDetected("3") + + def counts = metrics.drain() + def distributions = metrics.drainDistributionSeries() then: - summary.contains("initializationSucceed=0") - summary.contains("injectionSucceed=0") - summary.contains("injectionFailed=0") - summary.contains("injectionSkipped=0") - summary.contains("contentSecurityPolicyDetected=0") - 0 * _ + counts.size() == 3 + distributions.size() == 2 } - def "test close resets counters in summary"() { + def "test close clears queues"() { when: - metrics.onInitializationSucceed() metrics.onInjectionSucceed("3") - metrics.onInjectionFailed("3", "gzip") - metrics.onInjectionSkipped("3") - metrics.onContentSecurityPolicyDetected("3") + metrics.onInjectionTime("5", 10L) + + def hasData = !metrics.drain().isEmpty() || !metrics.drainDistributionSeries().isEmpty() - def summaryBeforeClose = metrics.summary() metrics.close() - def summaryAfterClose = metrics.summary() + + def emptyCounts = metrics.drain() + def emptyDistributions = metrics.drainDistributionSeries() then: - summaryBeforeClose.contains("initializationSucceed=1") - summaryBeforeClose.contains("injectionSucceed=1") - summaryBeforeClose.contains("injectionFailed=1") - summaryBeforeClose.contains("injectionSkipped=1") - summaryBeforeClose.contains("contentSecurityPolicyDetected=1") - - summaryAfterClose.contains("initializationSucceed=0") - summaryAfterClose.contains("injectionSucceed=0") - summaryAfterClose.contains("injectionFailed=0") - summaryAfterClose.contains("injectionSkipped=0") - summaryAfterClose.contains("contentSecurityPolicyDetected=0") - - 1 * statsD.count('rum.injection.initialization.succeed', 1, _) - 1 * statsD.count('rum.injection.succeed', 1, _) - 1 * statsD.count('rum.injection.failed', 1, _) - 1 * statsD.count('rum.injection.skipped', 1, _) - 1 * statsD.count('rum.injection.content_security_policy', 1, _) - 0 * _ + hasData == true + emptyCounts.isEmpty() + emptyDistributions.isEmpty() } } diff --git a/internal-api/src/test/groovy/datadog/trace/api/rum/RumInjectorTest.groovy b/internal-api/src/test/groovy/datadog/trace/api/rum/RumInjectorTest.groovy index 1b2fd4783fb..b65648036c1 100644 --- a/internal-api/src/test/groovy/datadog/trace/api/rum/RumInjectorTest.groovy +++ b/internal-api/src/test/groovy/datadog/trace/api/rum/RumInjectorTest.groovy @@ -88,28 +88,27 @@ class RumInjectorTest extends DDSpecification { RumInjector.getTelemetryCollector() == RumTelemetryCollector.NO_OP } - void 'enable telemetry with StatsDClient'() { + // enableTelemetry() checks that INSTANCE.isEnabled() before starting telemetry collection. + // However, INSTANCE is a static final field created at class loading, so we test whatever the actual RUM configuration is. + void 'enable telemetry'() { when: - RumInjector.enableTelemetry(mock(datadog.trace.api.StatsDClient)) + RumInjector.enableTelemetry() + def collector = RumInjector.getTelemetryCollector() + def isRumEnabled = RumInjector.get().isEnabled() then: - RumInjector.getTelemetryCollector() instanceof datadog.trace.api.rum.RumInjectorMetrics - - cleanup: - RumInjector.shutdownTelemetry() - } - - void 'enabling telemetry with a null StatsDClient sets the telemetry collector to NO_OP'() { - when: - RumInjector.enableTelemetry(null) - - then: - RumInjector.getTelemetryCollector() == RumTelemetryCollector.NO_OP + collector != null + if (isRumEnabled) { + collector instanceof RumInjectorMetrics + collector != RumTelemetryCollector.NO_OP + } else { + collector == RumTelemetryCollector.NO_OP + } } void 'shutdown telemetry'() { setup: - RumInjector.enableTelemetry(mock(datadog.trace.api.StatsDClient)) + RumInjector.enableTelemetry() when: RumInjector.shutdownTelemetry() @@ -117,112 +116,4 @@ class RumInjectorTest extends DDSpecification { then: RumInjector.getTelemetryCollector() == RumTelemetryCollector.NO_OP } - - void 'initialize rum injector'() { - when: - RumInjector.enableTelemetry(mock(datadog.trace.api.StatsDClient)) - def telemetryCollector = RumInjector.getTelemetryCollector() - telemetryCollector.onInitializationSucceed() - def summary = telemetryCollector.summary() - - then: - summary.contains("initializationSucceed=1") - - cleanup: - RumInjector.shutdownTelemetry() - } - - void 'telemetry integration works end-to-end'() { - when: - RumInjector.enableTelemetry(mock(datadog.trace.api.StatsDClient)) - - def telemetryCollector = RumInjector.getTelemetryCollector() - telemetryCollector.onInjectionSucceed("3") - telemetryCollector.onInjectionFailed("3", "gzip") - telemetryCollector.onInjectionSkipped("3") - telemetryCollector.onContentSecurityPolicyDetected("3") - telemetryCollector.onInjectionResponseSize("3", 256) - telemetryCollector.onInjectionTime("3", 5L) - - def summary = telemetryCollector.summary() - - then: - summary.contains("injectionSucceed=1") - summary.contains("injectionFailed=1") - summary.contains("injectionSkipped=1") - summary.contains("contentSecurityPolicyDetected=1") - - cleanup: - RumInjector.shutdownTelemetry() - } - - void 'response size telemetry does not throw an exception'() { - setup: - def mockStatsDClient = mock(datadog.trace.api.StatsDClient) - - when: - RumInjector.enableTelemetry(mockStatsDClient) - - def telemetryCollector = RumInjector.getTelemetryCollector() - telemetryCollector.onInjectionResponseSize("3", 256) - telemetryCollector.onInjectionResponseSize("3", 512) - telemetryCollector.onInjectionResponseSize("5", 2048) - - then: - noExceptionThrown() - - cleanup: - RumInjector.shutdownTelemetry() - } - - void 'injection time telemetry does not throw an exception'() { - setup: - def mockStatsDClient = mock(datadog.trace.api.StatsDClient) - - when: - RumInjector.enableTelemetry(mockStatsDClient) - - def telemetryCollector = RumInjector.getTelemetryCollector() - telemetryCollector.onInjectionTime("5", 5L) - telemetryCollector.onInjectionTime("5", 10L) - telemetryCollector.onInjectionTime("3", 20L) - - then: - noExceptionThrown() - - cleanup: - RumInjector.shutdownTelemetry() - } - - void 'concurrent telemetry calls return an accurate summary'() { - setup: - RumInjector.enableTelemetry(mock(datadog.trace.api.StatsDClient)) - def telemetryCollector = RumInjector.getTelemetryCollector() - def threads = [] - - when: - // simulate multiple threads calling telemetry methods - (1..50).each { i -> - threads << Thread.start { - telemetryCollector.onInjectionSucceed("3") - telemetryCollector.onInjectionFailed("3", "gzip") - telemetryCollector.onInjectionSkipped("3") - telemetryCollector.onContentSecurityPolicyDetected("3") - telemetryCollector.onInjectionResponseSize("3", 256) - telemetryCollector.onInjectionTime("3", 5L) - } - } - threads*.join() - - def summary = telemetryCollector.summary() - - then: - summary.contains("injectionSucceed=50") - summary.contains("injectionFailed=50") - summary.contains("injectionSkipped=50") - summary.contains("contentSecurityPolicyDetected=50") - - cleanup: - RumInjector.shutdownTelemetry() - } } diff --git a/internal-api/src/test/groovy/datadog/trace/api/rum/RumTelemetryCollectorTest.groovy b/internal-api/src/test/groovy/datadog/trace/api/rum/RumTelemetryCollectorTest.groovy index 19c423635a7..80857307148 100644 --- a/internal-api/src/test/groovy/datadog/trace/api/rum/RumTelemetryCollectorTest.groovy +++ b/internal-api/src/test/groovy/datadog/trace/api/rum/RumTelemetryCollectorTest.groovy @@ -25,14 +25,6 @@ class RumTelemetryCollectorTest extends Specification { noExceptionThrown() } - def "test default NO_OP summary returns an empty string"() { - when: - def summary = RumTelemetryCollector.NO_OP.summary() - - then: - summary == "" - } - def "test default NO_OP close method does not throw exception"() { when: RumTelemetryCollector.NO_OP.close() @@ -71,15 +63,23 @@ class RumTelemetryCollectorTest extends Specification { @Override void onInjectionTime(String integrationVersion, long milliseconds) { } + + @Override + java.util.Collection drain() { + return [] + } + + @Override + java.util.Collection drainDistributionSeries() { + return [] + } } when: customCollector.close() - def summary = customCollector.summary() then: noExceptionThrown() - summary == "" } def "test multiple close calls do not throw exception"() { @@ -92,15 +92,26 @@ class RumTelemetryCollectorTest extends Specification { noExceptionThrown() } - def "test multiple summary calls return the same empty string"() { + def "test default NO_OP drain methods return empty collections"() { + when: + def drainedCounts = RumTelemetryCollector.NO_OP.drain() + def drainedDistributions = RumTelemetryCollector.NO_OP.drainDistributionSeries() + + then: + drainedCounts != null + drainedCounts.isEmpty() + drainedDistributions != null + drainedDistributions.isEmpty() + } + + def "test default NO_OP drain methods do not throw exception"() { when: - def summary1 = RumTelemetryCollector.NO_OP.summary() - def summary2 = RumTelemetryCollector.NO_OP.summary() - def summary3 = RumTelemetryCollector.NO_OP.summary() + RumTelemetryCollector.NO_OP.drain() + RumTelemetryCollector.NO_OP.drain() + RumTelemetryCollector.NO_OP.drainDistributionSeries() + RumTelemetryCollector.NO_OP.drainDistributionSeries() then: - summary1 == "" - summary1 == summary2 - summary2 == summary3 + noExceptionThrown() } } diff --git a/telemetry/src/main/java/datadog/telemetry/TelemetrySystem.java b/telemetry/src/main/java/datadog/telemetry/TelemetrySystem.java index 8edac4935e2..9325899e75d 100644 --- a/telemetry/src/main/java/datadog/telemetry/TelemetrySystem.java +++ b/telemetry/src/main/java/datadog/telemetry/TelemetrySystem.java @@ -16,8 +16,11 @@ import datadog.telemetry.metric.OtelEnvMetricPeriodicAction; import datadog.telemetry.metric.WafMetricPeriodicAction; import datadog.telemetry.products.ProductChangeAction; +import datadog.telemetry.rum.RumPeriodicAction; import datadog.trace.api.Config; +import datadog.trace.api.InstrumenterConfig; import datadog.trace.api.iast.telemetry.Verbosity; +import datadog.trace.api.rum.RumInjector; import datadog.trace.util.AgentThreadFactory; import java.lang.instrument.Instrumentation; import java.util.ArrayList; @@ -69,6 +72,10 @@ static Thread createTelemetryRunnable( actions.add(new LogPeriodicAction()); log.debug("Telemetry log collection enabled"); } + if (InstrumenterConfig.get().isRumEnabled()) { + RumInjector.enableTelemetry(); + actions.add(new RumPeriodicAction(RumInjector.getTelemetryCollector())); + } actions.add(new ProductChangeAction()); if (Config.get().isApiSecurityEndpointCollectionEnabled()) { actions.add(new EndpointPeriodicAction()); diff --git a/telemetry/src/main/java/datadog/telemetry/rum/RumPeriodicAction.java b/telemetry/src/main/java/datadog/telemetry/rum/RumPeriodicAction.java new file mode 100644 index 00000000000..afcd8872933 --- /dev/null +++ b/telemetry/src/main/java/datadog/telemetry/rum/RumPeriodicAction.java @@ -0,0 +1,58 @@ +package datadog.telemetry.rum; + +import datadog.telemetry.TelemetryRunnable; +import datadog.telemetry.TelemetryService; +import datadog.telemetry.api.DistributionSeries; +import datadog.telemetry.api.Metric; +import datadog.trace.api.rum.RumTelemetryCollector; +import datadog.trace.api.telemetry.MetricCollector; +import java.util.Arrays; +import java.util.Collection; + +/** RUM version of IntegrationPeriodicAction that sends RUM telemetry metrics. */ +public class RumPeriodicAction implements TelemetryRunnable.TelemetryPeriodicAction { + + private final RumTelemetryCollector telemetryCollector; + + public RumPeriodicAction(RumTelemetryCollector telemetryCollector) { + this.telemetryCollector = telemetryCollector; + } + + @Override + public void doIteration(TelemetryService service) { + Collection counts = telemetryCollector.drain(); + for (MetricCollector.Metric metric : counts) { + Metric telemetryMetric = convertToTelemetryMetric(metric); + service.addMetric(telemetryMetric); + } + + Collection distributions = + telemetryCollector.drainDistributionSeries(); + for (MetricCollector.DistributionSeriesPoint distribution : distributions) { + DistributionSeries telemetryDistribution = convertToDistributionSeries(distribution); + service.addDistributionSeries(telemetryDistribution); + } + } + + private Metric convertToTelemetryMetric(MetricCollector.Metric raw) { + return new Metric() + .namespace(raw.namespace) + .metric(raw.metricName) + .type(Metric.TypeEnum.COUNT) + .common(raw.common) + .tags(raw.tags) + .addPointsItem(Arrays.asList(raw.timestamp, raw.value)); + } + + private DistributionSeries convertToDistributionSeries( + MetricCollector.DistributionSeriesPoint point) { + DistributionSeries distribution = + new DistributionSeries() + .namespace(point.namespace) + .metric(point.metricName) + .common(point.common) + .tags(point.tags); + distribution.addPoint(point.value); + return distribution; + } +} diff --git a/telemetry/src/test/groovy/datadog/telemetry/rum/RumPeriodicActionTest.groovy b/telemetry/src/test/groovy/datadog/telemetry/rum/RumPeriodicActionTest.groovy new file mode 100644 index 00000000000..08b06a7d064 --- /dev/null +++ b/telemetry/src/test/groovy/datadog/telemetry/rum/RumPeriodicActionTest.groovy @@ -0,0 +1,58 @@ +package datadog.telemetry.rum + +import datadog.telemetry.TelemetryService +import datadog.telemetry.api.DistributionSeries +import datadog.telemetry.api.Metric +import datadog.trace.api.rum.RumInjectorMetrics +import datadog.trace.api.rum.RumTelemetryCollector +import spock.lang.Specification + +class RumPeriodicActionTest extends Specification { + TelemetryService telemetryService = Mock() + + void 'push RUM metrics into the telemetry service'() { + setup: + def metricsCollector = new RumInjectorMetrics() + metricsCollector.onInjectionSucceed("3") + metricsCollector.onInjectionFailed("5", "gzip") + metricsCollector.onInjectionResponseSize("3", 1024) + + def periodicAction = new RumPeriodicAction(metricsCollector) + + when: + periodicAction.doIteration(telemetryService) + + then: + 1 * telemetryService.addMetric({ Metric metric -> + metric.namespace == "rum" && + metric.metric == "injection.succeed" && + metric.type == Metric.TypeEnum.COUNT + }) + + 1 * telemetryService.addMetric({ Metric metric -> + metric.namespace == "rum" && + metric.metric == "injection.failed" && + metric.type == Metric.TypeEnum.COUNT + }) + + 1 * telemetryService.addDistributionSeries({ DistributionSeries dist -> + dist.namespace == "rum" && + dist.metric == "injection.response.bytes" + }) + + 0 * _ + } + + void 'push nothing when no metrics collector is set'() { + setup: + def periodicAction = new RumPeriodicAction(RumTelemetryCollector.NO_OP) + + when: + periodicAction.doIteration(telemetryService) + + then: + 0 * telemetryService.addMetric(_) + 0 * telemetryService.addDistributionSeries(_) + 0 * _ + } +}