From 4077ee0ac0ee24ac4a5a529f7e62c743e04e93be Mon Sep 17 00:00:00 2001 From: Thomas Pierce Date: Thu, 10 Oct 2024 11:42:52 -0700 Subject: [PATCH] Added ForceFlush Implementation (#885) ### Description: Passed the forceflush function from SdkMeteringProvider to the AwsSpanMetricProcessor to forceFlush remaining metrics on shutdown to the cwAgent/Collector. ### Tesing: Increased the metricExporter interval and the BatchSpanProcessor delay to 10 minutes using: ``` OTEL_METRIC_EXPORT_INTERVAL=600000 \ OTEL_BSP_SCHEDULE_DELAY=600000 \ ``` Without the force flush change, exiting the sample app only flushed the traces without the metrics. With the forceFlush change, both traces and metrics were flushed to the collector. By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license. --------- Co-authored-by: Mohamed Asaker --- ...sApplicationSignalsCustomizerProvider.java | 7 ++++--- .../providers/AwsSpanMetricsProcessor.java | 17 ++++++++++++--- .../AwsSpanMetricsProcessorBuilder.java | 21 +++++++++++++++---- .../AwsSpanMetricsProcessorTest.java | 9 +++++++- 4 files changed, 43 insertions(+), 11 deletions(-) diff --git a/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/AwsApplicationSignalsCustomizerProvider.java b/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/AwsApplicationSignalsCustomizerProvider.java index c21e484e39..4cebc0a48e 100644 --- a/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/AwsApplicationSignalsCustomizerProvider.java +++ b/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/AwsApplicationSignalsCustomizerProvider.java @@ -17,7 +17,6 @@ import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.common.AttributesBuilder; -import io.opentelemetry.api.metrics.MeterProvider; import io.opentelemetry.contrib.awsxray.AlwaysRecordSampler; import io.opentelemetry.contrib.awsxray.ResourceHolder; import io.opentelemetry.exporter.otlp.http.metrics.OtlpHttpMetricExporter; @@ -164,14 +163,16 @@ private SdkTracerProviderBuilder customizeTracerProviderBuilder( MetricReader metricReader = PeriodicMetricReader.builder(metricsExporter).setInterval(exportInterval).build(); - MeterProvider meterProvider = + SdkMeterProvider meterProvider = SdkMeterProvider.builder() .setResource(ResourceHolder.getResource()) .registerMetricReader(metricReader) .build(); + // Construct and set application signals metrics processor SpanProcessor spanMetricsProcessor = - AwsSpanMetricsProcessorBuilder.create(meterProvider, ResourceHolder.getResource()) + AwsSpanMetricsProcessorBuilder.create( + meterProvider, ResourceHolder.getResource(), meterProvider::forceFlush) .build(); tracerProviderBuilder.addSpanProcessor(spanMetricsProcessor); } diff --git a/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/AwsSpanMetricsProcessor.java b/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/AwsSpanMetricsProcessor.java index 371f28ee01..a2a7a017f9 100644 --- a/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/AwsSpanMetricsProcessor.java +++ b/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/AwsSpanMetricsProcessor.java @@ -22,12 +22,14 @@ import io.opentelemetry.api.metrics.LongHistogram; import io.opentelemetry.api.trace.StatusCode; import io.opentelemetry.context.Context; +import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.resources.Resource; import io.opentelemetry.sdk.trace.ReadWriteSpan; import io.opentelemetry.sdk.trace.ReadableSpan; import io.opentelemetry.sdk.trace.SpanProcessor; import io.opentelemetry.sdk.trace.data.SpanData; import java.util.Map; +import java.util.function.Supplier; import javax.annotation.concurrent.Immutable; /** @@ -64,6 +66,7 @@ public final class AwsSpanMetricsProcessor implements SpanProcessor { private final MetricAttributeGenerator generator; private final Resource resource; + private final Supplier forceFlushAction; /** Use {@link AwsSpanMetricsProcessorBuilder} to construct this processor. */ static AwsSpanMetricsProcessor create( @@ -71,9 +74,10 @@ static AwsSpanMetricsProcessor create( LongHistogram faultHistogram, DoubleHistogram latencyHistogram, MetricAttributeGenerator generator, - Resource resource) { + Resource resource, + Supplier forceFlushAction) { return new AwsSpanMetricsProcessor( - errorHistogram, faultHistogram, latencyHistogram, generator, resource); + errorHistogram, faultHistogram, latencyHistogram, generator, resource, forceFlushAction); } private AwsSpanMetricsProcessor( @@ -81,12 +85,19 @@ private AwsSpanMetricsProcessor( LongHistogram faultHistogram, DoubleHistogram latencyHistogram, MetricAttributeGenerator generator, - Resource resource) { + Resource resource, + Supplier forceFlushAction) { this.errorHistogram = errorHistogram; this.faultHistogram = faultHistogram; this.latencyHistogram = latencyHistogram; this.generator = generator; this.resource = resource; + this.forceFlushAction = forceFlushAction; + } + + @Override + public CompletableResultCode forceFlush() { + return forceFlushAction.get(); } @Override diff --git a/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/AwsSpanMetricsProcessorBuilder.java b/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/AwsSpanMetricsProcessorBuilder.java index 6e9d30d39c..25ae0bd46e 100644 --- a/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/AwsSpanMetricsProcessorBuilder.java +++ b/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/AwsSpanMetricsProcessorBuilder.java @@ -22,7 +22,9 @@ import io.opentelemetry.api.metrics.LongHistogram; import io.opentelemetry.api.metrics.Meter; import io.opentelemetry.api.metrics.MeterProvider; +import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.resources.Resource; +import java.util.function.Supplier; /** A builder for {@link AwsSpanMetricsProcessor} */ public final class AwsSpanMetricsProcessorBuilder { @@ -42,18 +44,29 @@ public final class AwsSpanMetricsProcessorBuilder { private final MeterProvider meterProvider; private final Resource resource; + // ForceFlush action provided from {@link SdkMeterProvider#forceFlush()} so that when the + // application exits The spanMetricProcessor calls the meterProvder.forceFlush to flush + // any remaining metrics before shutdown + private final Supplier forceFlushAction; + // Optional builder elements private MetricAttributeGenerator generator = DEFAULT_GENERATOR; private String scopeName = DEFAULT_SCOPE_NAME; public static AwsSpanMetricsProcessorBuilder create( - MeterProvider meterProvider, Resource resource) { - return new AwsSpanMetricsProcessorBuilder(meterProvider, resource); + MeterProvider meterProvider, + Resource resource, + Supplier forceFlushAction) { + return new AwsSpanMetricsProcessorBuilder(meterProvider, resource, forceFlushAction); } - private AwsSpanMetricsProcessorBuilder(MeterProvider meterProvider, Resource resource) { + private AwsSpanMetricsProcessorBuilder( + MeterProvider meterProvider, + Resource resource, + Supplier forceFlushAction) { this.meterProvider = meterProvider; this.resource = resource; + this.forceFlushAction = forceFlushAction; } /** @@ -86,6 +99,6 @@ public AwsSpanMetricsProcessor build() { meter.histogramBuilder(LATENCY).setUnit(LATENCY_UNITS).build(); return AwsSpanMetricsProcessor.create( - errorHistogram, faultHistogram, latencyHistogram, generator, resource); + errorHistogram, faultHistogram, latencyHistogram, generator, resource, forceFlushAction); } } diff --git a/awsagentprovider/src/test/java/software/amazon/opentelemetry/javaagent/providers/AwsSpanMetricsProcessorTest.java b/awsagentprovider/src/test/java/software/amazon/opentelemetry/javaagent/providers/AwsSpanMetricsProcessorTest.java index bd16f81673..214c43ec95 100644 --- a/awsagentprovider/src/test/java/software/amazon/opentelemetry/javaagent/providers/AwsSpanMetricsProcessorTest.java +++ b/awsagentprovider/src/test/java/software/amazon/opentelemetry/javaagent/providers/AwsSpanMetricsProcessorTest.java @@ -77,6 +77,12 @@ private enum ExpectedStatusMetric { private MetricAttributeGenerator generatorMock; private AwsSpanMetricsProcessor awsSpanMetricsProcessor; + // Mock forceFlush function that returns success when invoked similar + // to the default implementation of forceFlush. + private CompletableResultCode forceFlushAction() { + return CompletableResultCode.ofSuccess(); + } + @BeforeEach public void setUpMocks() { errorHistogramMock = mock(LongHistogram.class); @@ -90,7 +96,8 @@ public void setUpMocks() { faultHistogramMock, latencyHistogramMock, generatorMock, - testResource); + testResource, + this::forceFlushAction); } @Test