Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MetricReader and Multi-export features #3578

Merged
merged 35 commits into from
Sep 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
d654410
Start of multi-exporter codebase.
jsuereth Sep 3, 2021
9b77bc5
Restore InMemoryExporter + fix javadocs.
jsuereth Sep 6, 2021
6b6357e
Quick javadoc fix
jsuereth Sep 6, 2021
f8af124
Fix some javadoc build failures and missing things.
jsuereth Sep 6, 2021
fd448a9
Fix bug with autoconfigure
jsuereth Sep 6, 2021
e6b5281
Modify InMemoryMetricReader + register methods to allow for immutable…
jsuereth Sep 12, 2021
abea187
Merge remote-tracking branch 'otel/main' into wip-multi-exporters
jsuereth Sep 18, 2021
559cec5
Make metric-readers 'immutable' post-construction of SdkMeterProvider…
jsuereth Sep 18, 2021
b054e70
Fix integration test.
jsuereth Sep 18, 2021
7c8de33
Fix jmh benchmarks for trace.
jsuereth Sep 18, 2021
b30d600
Fix jmh build.
jsuereth Sep 18, 2021
8533e83
Fix broken in-memory exporter things, and Noop API issue.
jsuereth Sep 19, 2021
646faff
Merge remote-tracking branch 'origin/main' into wip-multi-exporters
jsuereth Sep 23, 2021
a1a296f
Migrate MetricReader.Factory to MetricReaderFactory.
jsuereth Sep 23, 2021
bfba057
Rename PeriodMetricReader.Factory to PeriodicMetricReaderFactory.
jsuereth Sep 23, 2021
bf31313
Apply suggestions from code review
jsuereth Sep 23, 2021
779c569
Minor cleanup of ColectionHandle.
jsuereth Sep 23, 2021
b0b56e0
Fixes from review.
jsuereth Sep 23, 2021
e97907f
Extract delta accumulation and add tests.
jsuereth Sep 23, 2021
f2aa483
Fragment the abstraction in SynchronousMetricStorage for better separ…
jsuereth Sep 23, 2021
baa6f0b
Add tests (and minor optimisation) for delta metric storage.
jsuereth Sep 23, 2021
20c7397
Add test for temporal metric storage.
jsuereth Sep 24, 2021
4f5338a
Update temporal metric storage to fix issue w/ async instruments.
jsuereth Sep 24, 2021
9fec8fe
Migrate Async storage to use temporal metric storage (with better tes…
jsuereth Sep 24, 2021
0b94478
Another fix from review.
jsuereth Sep 24, 2021
799af98
Remove start epoch nano parameter from storage constructors.
jsuereth Sep 24, 2021
44833fc
Fixes from review.
jsuereth Sep 24, 2021
a697826
Migrate flush -> forceFlush as per review.
jsuereth Sep 24, 2021
c259d86
Add test coverage for flush.
jsuereth Sep 24, 2021
0a02048
Merge remote-tracking branch 'origin/main' into wip-multi-exporters
jsuereth Sep 24, 2021
45238b5
Fixes from review.
jsuereth Sep 27, 2021
730e144
Fix public API breakage.
jsuereth Sep 27, 2021
f181723
Fixes from review.
jsuereth Sep 27, 2021
19db3a1
Merge branch 'main' of github.com:open-telemetry/opentelemetry-java i…
Sep 28, 2021
e46f459
Fix threadsafe annotations
jsuereth Sep 30, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,16 @@ void testExportGzipCompressed() {
assertThat(parseRequestBody(gzipDecompress(request.content().array()))).isEqualTo(payload);
}

@Test
void testExport_flush() {
OtlpHttpMetricExporter exporter = OtlpHttpMetricExporter.builder().build();
try {
assertThat(exporter.flush().isSuccess()).isTrue();
} finally {
exporter.shutdown();
}
}

private static void assertRequestCommon(AggregatedHttpRequest request) {
assertThat(request.method()).isEqualTo(HttpMethod.POST);
assertThat(request.path()).isEqualTo("/v1/metrics");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.testing.InMemoryMetricReader;
import io.opentelemetry.sdk.resources.Resource;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
Expand Down Expand Up @@ -49,8 +50,10 @@ public class GrpcGzipBenchmark {
private static final Codec IDENTITY_CODEC = Codec.Identity.NONE;

static {
InMemoryMetricReader metricReader = new InMemoryMetricReader();
SdkMeterProvider meterProvider =
SdkMeterProvider.builder()
.registerMetricReader(metricReader)
.setResource(
Resource.create(
Attributes.builder()
Expand Down Expand Up @@ -118,7 +121,7 @@ public class GrpcGzipBenchmark {
histogram.record(3.0);
histogram.record(4.0);
histogram.record(5.0);
Collection<MetricData> metricData = meterProvider.collectAllMetrics();
Collection<MetricData> metricData = metricReader.collectAllMetrics();

List<ResourceMetrics> resourceMetrics =
Arrays.stream(ResourceMetricsMarshaler.create(metricData))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.opentelemetry.exporter.otlp.internal.metrics.MetricsRequestMarshaler;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.testing.InMemoryMetricReader;
import io.opentelemetry.sdk.resources.Resource;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
Expand All @@ -40,8 +41,10 @@ public class MetricsRequestMarshalerBenchmark {
private static final Collection<MetricData> METRICS;

static {
InMemoryMetricReader metricReader = new InMemoryMetricReader();
SdkMeterProvider meterProvider =
SdkMeterProvider.builder()
.registerMetricReader(metricReader)
.setResource(
Resource.create(
Attributes.builder()
Expand Down Expand Up @@ -110,7 +113,7 @@ public class MetricsRequestMarshalerBenchmark {
histogram.record(4.0);
histogram.record(5.0);

METRICS = meterProvider.collectAllMetrics();
METRICS = metricReader.collectAllMetrics();
}

@Benchmark
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,24 @@

package io.opentelemetry.exporter.prometheus;

import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.export.MetricProducer;
import io.opentelemetry.sdk.metrics.export.MetricReader;
import io.opentelemetry.sdk.metrics.export.MetricReaderFactory;
import io.prometheus.client.Collector;
import io.prometheus.client.CollectorRegistry;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;

public final class PrometheusCollector extends Collector {
/**
* A reader of OpenTelemetry metrics that exports into Prometheus as a Collector.
*
* <p>Usage: <code>sdkMeterProvider.registerMetricReader(PrometheusCollector.create());</code>
*/
public final class PrometheusCollector extends Collector implements MetricReader {
private final MetricProducer metricProducer;

PrometheusCollector(MetricProducer metricProducer) {
Expand All @@ -26,15 +36,39 @@ public List<MetricFamilySamples> collect() {
for (MetricData metricData : allMetrics) {
allSamples.add(MetricAdapter.toMetricFamilySamples(metricData));
}
return allSamples;
return Collections.unmodifiableList(allSamples);
}

/**
* Returns a new builder instance for this exporter.
*
* @return a new builder instance for this exporter.
* Returns a new collector to be registered with a {@link
* io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder}.
*/
public static PrometheusCollectorBuilder builder() {
return new PrometheusCollectorBuilder();
public static MetricReaderFactory create() {
return new Factory();
}

// Prometheus cannot flush.
@Override
public CompletableResultCode flush() {
return CompletableResultCode.ofSuccess();
}

@Override
public CompletableResultCode shutdown() {
CollectorRegistry.defaultRegistry.unregister(this);
return CompletableResultCode.ofSuccess();
}

/** Our implementation of the metric reader factory. */
// NOTE: This should be updated to (optionally) start the simple Http server exposing the metrics
// path.
private static class Factory implements MetricReaderFactory {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a big deal but looks like can be singleton

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a refactoring (I can do in this PR or later) where we instantiate the HTTP server as part of the factory (configurable). Would you like that in this PR? I'll add a note in the meantime.

@Override
public MetricReader apply(MetricProducer producer) {
PrometheusCollector collector = new PrometheusCollector(producer);
// When SdkMeterProvider constructs us, we register with prometheus.
collector.register();
return collector;
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ class PrometheusCollectorTest {

@BeforeEach
void setUp() {
prometheusCollector =
PrometheusCollector.builder().setMetricProducer(metricProducer).buildAndRegister();
// Apply the SDK metric producer registers with prometheus.
prometheusCollector = new PrometheusCollector(metricProducer);
prometheusCollector.register();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@
import io.opentelemetry.proto.trace.v1.ResourceSpans;
import io.opentelemetry.proto.trace.v1.Span.Link;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.metrics.export.IntervalMetricReader;
import io.opentelemetry.sdk.metrics.export.MetricExporter;
import io.opentelemetry.sdk.metrics.export.MetricReaderFactory;
import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.trace.IdGenerator;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
Expand Down Expand Up @@ -131,9 +132,7 @@ void beforeEach() {
}

@AfterEach
void afterEach() {
IntervalMetricReader.resetGlobalForTest();
}
void afterEach() {}

@Test
void testOtlpGrpcTraceExport() {
Expand Down Expand Up @@ -263,23 +262,22 @@ void testOtlpHttpMetricExport() {
}

private static void testMetricExport(MetricExporter metricExporter) {
SdkMeterProvider meterProvider = SdkMeterProvider.builder().setResource(RESOURCE).build();
IntervalMetricReader intervalMetricReader =
IntervalMetricReader.builder()
.setMetricExporter(metricExporter)
.setMetricProducers(Collections.singletonList(meterProvider))
.setExportIntervalMillis(5000)
.build();
intervalMetricReader.startAndRegisterGlobal();
MetricReaderFactory reader = PeriodicMetricReader.create(metricExporter, Duration.ofSeconds(5));
SdkMeterProvider meterProvider =
SdkMeterProvider.builder().setResource(RESOURCE).registerMetricReader(reader).build();

Meter meter = meterProvider.meterBuilder(OtlpExporterIntegrationTest.class.getName()).build();

LongCounter longCounter = meter.counterBuilder("my-counter").build();
longCounter.add(100, Attributes.builder().put("key", "value").build());

Awaitility.await()
.atMost(Duration.ofSeconds(30))
.until(() -> grpcServer.metricRequests.size() == 1);
try {
Awaitility.await()
.atMost(Duration.ofSeconds(30))
.until(() -> grpcServer.metricRequests.size() == 1);
} finally {
meterProvider.close();
}

ExportMetricsServiceRequest request = grpcServer.metricRequests.get(0);
assertThat(request.getResourceMetricsCount()).isEqualTo(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public CompletableResultCode export(Collection<MetricData> metrics) {

@Override
public CompletableResultCode flush() {
return null;
return CompletableResultCode.ofSuccess();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@
import io.opentelemetry.sdk.metrics.data.LongPointData;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.data.PointData;
import io.opentelemetry.sdk.metrics.export.IntervalMetricReader;
import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader;
import io.opentelemetry.sdk.metrics.testing.InMemoryMetricExporter;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import io.opentelemetry.sdk.trace.export.BatchSpanProcessor;
import io.opentelemetry.semconv.resource.attributes.ResourceAttributes;
import java.io.IOException;
import java.util.Collections;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -105,7 +105,7 @@ public class OtlpPipelineStressTest {

private SdkTracerProvider sdkTracerProvider;
private OpenTelemetry openTelemetry;
private IntervalMetricReader intervalMetricReader;
private SdkMeterProvider meterProvider;
private Proxy collectorProxy;
private ToxiproxyClient toxiproxyClient;

Expand All @@ -131,7 +131,7 @@ void setUp() throws IOException {

@AfterEach
void tearDown() throws IOException {
intervalMetricReader.shutdown();
meterProvider.close();
sdkTracerProvider.shutdown();

toxiproxyClient.reset();
Expand Down Expand Up @@ -185,7 +185,7 @@ void oltpExportWithFlakyCollector() throws IOException, InterruptedException {

Thread.sleep(10000);
List<MetricData> finishedMetricItems = metricExporter.getFinishedMetricItems();
intervalMetricReader.shutdown();
meterProvider.close();
Thread.sleep(1000);
reportMetrics(finishedMetricItems);
Thread.sleep(10000);
Expand Down Expand Up @@ -247,15 +247,12 @@ private void setupSdk() {
.build());

// set up the metric exporter and wire it into the SDK and a timed reader.
SdkMeterProvider meterProvider =
SdkMeterProvider.builder().setResource(resource).buildAndRegisterGlobal();

intervalMetricReader =
IntervalMetricReader.builder()
.setMetricExporter(metricExporter)
.setMetricProducers(Collections.singleton(meterProvider))
.setExportIntervalMillis(1000)
.buildAndStart();
meterProvider =
SdkMeterProvider.builder()
.setResource(resource)
.registerMetricReader(
PeriodicMetricReader.create(metricExporter, Duration.ofSeconds(1)))
.buildAndRegisterGlobal();

// set up the span exporter and wire it into the SDK
OtlpGrpcSpanExporter spanExporter =
Expand Down
Loading