From b336bf5dec5b5d56d35f0230bb86998c6d9f44f5 Mon Sep 17 00:00:00 2001 From: Marcus Date: Tue, 18 Jan 2022 09:58:55 -0800 Subject: [PATCH] address comments --- .../opentelemetry-extensions/pom.xml | 27 ---- .../OpenTelemetryMetricsProtobufReader.java | 150 ++++++++---------- ...penTelemetryProtobufExtensionsModule.java} | 2 +- .../OpenTelemetryMetricsInputFormatTest.java | 2 +- ...penTelemetryMetricsProtobufReaderTest.java | 38 ++--- 5 files changed, 83 insertions(+), 136 deletions(-) rename extensions-contrib/opentelemetry-extensions/src/main/java/org/apache/druid/data/input/opentelemetry/protobuf/{OpenTelemetryMetricsProtobufExtensionsModule.java => OpenTelemetryProtobufExtensionsModule.java} (95%) diff --git a/extensions-contrib/opentelemetry-extensions/pom.xml b/extensions-contrib/opentelemetry-extensions/pom.xml index fbb405ea4185..d50efbc40a12 100644 --- a/extensions-contrib/opentelemetry-extensions/pom.xml +++ b/extensions-contrib/opentelemetry-extensions/pom.xml @@ -74,32 +74,5 @@ junit test - - - org.openjdk.jmh - jmh-core - 1.27 - test - - - org.openjdk.jmh - jmh-generator-annprocess - 1.27 - test - - - - - org.apache.maven.plugins - maven-resources-plugin - 3.0.2 - - - desc - - - - - diff --git a/extensions-contrib/opentelemetry-extensions/src/main/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryMetricsProtobufReader.java b/extensions-contrib/opentelemetry-extensions/src/main/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryMetricsProtobufReader.java index b90647b924fb..4340678cfe0f 100644 --- a/extensions-contrib/opentelemetry-extensions/src/main/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryMetricsProtobufReader.java +++ b/extensions-contrib/opentelemetry-extensions/src/main/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryMetricsProtobufReader.java @@ -27,7 +27,6 @@ import io.opentelemetry.proto.metrics.v1.Metric; import io.opentelemetry.proto.metrics.v1.MetricsData; import io.opentelemetry.proto.metrics.v1.NumberDataPoint; -import io.opentelemetry.proto.metrics.v1.ResourceMetrics; import org.apache.druid.data.input.InputEntityReader; import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.InputRowListPlusRawValues; @@ -42,18 +41,18 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; -import java.util.function.Function; +import java.util.function.Predicate; import java.util.stream.Collectors; public class OpenTelemetryMetricsProtobufReader implements InputEntityReader { private static final String VALUE_COLUMN = "value"; - private final DimensionsSpec dimensionsSpec; private final ByteEntity source; private final String metricDimension; private final String metricAttributePrefix; private final String resourceAttributePrefix; + private final Predicate attributeMembership; public OpenTelemetryMetricsProtobufReader( DimensionsSpec dimensionsSpec, @@ -63,11 +62,17 @@ public OpenTelemetryMetricsProtobufReader( String resourceLabelPrefix ) { - this.dimensionsSpec = dimensionsSpec; this.source = source; this.metricDimension = metricDimension; this.metricAttributePrefix = metricLabelPrefix; this.resourceAttributePrefix = resourceLabelPrefix; + + List schemaDimensions = dimensionsSpec.getDimensionNames(); + if (!schemaDimensions.isEmpty()) { + this.attributeMembership = schemaDimensions::contains; + } else { + this.attributeMembership = att -> !dimensionsSpec.getDimensionExclusions().contains(att); + } } @Override @@ -79,123 +84,92 @@ public CloseableIterator read() List readAsList() { try { - return parseRequest(MetricsData.parseFrom(source.getBuffer())); + return parseMetricsData(MetricsData.parseFrom(source.getBuffer())); } catch (InvalidProtocolBufferException e) { throw new ParseException(e, "Protobuf message could not be parsed"); } } - private List parseRequest(final MetricsData request) + private List parseMetricsData(final MetricsData metricsData) { - - List resourceMetricsList = request.getResourceMetricsList(); - return resourceMetricsList.stream() - .flatMap( - resourceMetrics -> { - List resourceAttributes = resourceMetrics.getResource().getAttributesList(); - return resourceMetrics.getInstrumentationLibraryMetricsList() - .stream() - .flatMap(libraryMetrics -> libraryMetrics.getMetricsList() - .stream() - .map(metric -> parseMetric(metric, resourceAttributes)) - .flatMap(List::stream) - ); - } - ).collect(Collectors.toList()); + return metricsData.getResourceMetricsList() + .stream() + .flatMap(resourceMetrics -> resourceMetrics.getInstrumentationLibraryMetricsList() + .stream() + .flatMap(libraryMetrics -> libraryMetrics.getMetricsList() + .stream() + .flatMap(metric -> parseMetric(metric, resourceMetrics.getResource().getAttributesList()) + .stream() + ) + ) + ).collect(Collectors.toList()); } private List parseMetric(Metric metric, List resourceAttributes) { - - List schemaDimensions = dimensionsSpec.getDimensionNames(); - if (!schemaDimensions.isEmpty()) { - return createRows(metric, resourceAttributes, schemaDimensions::contains); - } else { - return createRows(metric, resourceAttributes, att -> !dimensionsSpec.getDimensionExclusions().contains(att)); + switch (metric.getDataCase()) { + case INT_SUM: { + List dataPoints = metric.getIntSum() + .getDataPointsList() + .stream() + .map(OpenTelemetryMetricsProtobufReader::intDataPointToNumDataPoint) + .collect(Collectors.toList()); + return parseNumDataPoints(dataPoints, metric.getName(), resourceAttributes); + } + case INT_GAUGE: { + List dataPoints = metric.getIntGauge() + .getDataPointsList() + .stream() + .map(OpenTelemetryMetricsProtobufReader::intDataPointToNumDataPoint) + .collect(Collectors.toList()); + return parseNumDataPoints(dataPoints, metric.getName(), resourceAttributes); + } + case SUM: { + return parseNumDataPoints(metric.getSum().getDataPointsList(), metric.getName(), resourceAttributes); + } + case GAUGE: { + return parseNumDataPoints(metric.getGauge().getDataPointsList(), metric.getName(), resourceAttributes); + } + // TODO Support HISTOGRAM and SUMMARY + default: + throw new IllegalStateException("Unexpected value: " + metric.getDataCase()); } } - private List createRows(Metric metric, - List resourceAttributes, - Function attributeMembershipFn) + private List parseNumDataPoints(List dataPoints, + String metricName, + List resourceAttributes) { - - List dataPoints = getDataPoints(metric); List rows = new ArrayList<>(); - for (NumberDataPoint dataPoint : dataPoints) { - final long timeUnixMilli = TimeUnit.NANOSECONDS.toMillis(dataPoint.getTimeUnixNano()); - final List metricAttributes = dataPoint.getAttributesList(); - final Number value = getValue(dataPoint); + long timeUnixMilli = TimeUnit.NANOSECONDS.toMillis(dataPoint.getTimeUnixNano()); + List metricAttributes = dataPoint.getAttributesList(); + Number value = getValue(dataPoint); - final int capacity = resourceAttributes.size() + int capacity = resourceAttributes.size() + metricAttributes.size() + 2; // metric name + value columns Map event = Maps.newHashMapWithExpectedSize(capacity); - event.put(metricDimension, metric.getName()); + event.put(metricDimension, metricName); event.put(VALUE_COLUMN, value); - List dimensions = new ArrayList<>(); - dimensions.add(metricDimension); - dimensions.add(VALUE_COLUMN); - resourceAttributes.stream() - .filter(ra -> attributeMembershipFn.apply(this.resourceAttributePrefix + ra.getKey())) - .forEach(ra -> { - dimensions.add(this.resourceAttributePrefix + ra.getKey()); - event.put(this.resourceAttributePrefix + ra.getKey(), ra.getValue().getStringValue()); - }); + .filter(ra -> attributeMembership.test(this.resourceAttributePrefix + ra.getKey())) + .forEach(ra -> event.put(this.resourceAttributePrefix + ra.getKey(), ra.getValue().getStringValue())); metricAttributes.stream() - .filter(ma -> attributeMembershipFn.apply(this.metricAttributePrefix + ma.getKey())) - .forEach(ma -> { - dimensions.add(this.metricAttributePrefix + ma.getKey()); - event.put(this.metricAttributePrefix + ma.getKey(), ma.getValue().getStringValue()); - }); + .filter(ma -> attributeMembership.test(this.metricAttributePrefix + ma.getKey())) + .forEach(ma -> event.put(this.metricAttributePrefix + ma.getKey(), ma.getValue().getStringValue())); - rows.add(new MapBasedInputRow(timeUnixMilli, dimensions, event)); + rows.add(new MapBasedInputRow(timeUnixMilli, new ArrayList<>(event.keySet()), event)); } return rows; } - private List getDataPoints(Metric metric) - { - List dataPoints; - switch (metric.getDataCase()) { - case INT_SUM: { - dataPoints = metric.getIntSum() - .getDataPointsList() - .stream() - .map(this::dataPointConverter) - .collect(Collectors.toList()); - break; - } - case INT_GAUGE: { - dataPoints = metric.getIntGauge() - .getDataPointsList() - .stream() - .map(this::dataPointConverter) - .collect(Collectors.toList()); - break; - } - case SUM: { - dataPoints = metric.getSum().getDataPointsList(); - break; - } - case GAUGE: { - dataPoints = metric.getGauge().getDataPointsList(); - break; - } - default: - throw new IllegalStateException("Unexpected value: " + metric.getDataCase()); - } - return dataPoints; - } - - private NumberDataPoint dataPointConverter(IntDataPoint dataPoint) + private static NumberDataPoint intDataPointToNumDataPoint(IntDataPoint dataPoint) { NumberDataPoint.Builder builder = NumberDataPoint.newBuilder() .setTimeUnixNano(dataPoint.getTimeUnixNano()) diff --git a/extensions-contrib/opentelemetry-extensions/src/main/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryMetricsProtobufExtensionsModule.java b/extensions-contrib/opentelemetry-extensions/src/main/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryProtobufExtensionsModule.java similarity index 95% rename from extensions-contrib/opentelemetry-extensions/src/main/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryMetricsProtobufExtensionsModule.java rename to extensions-contrib/opentelemetry-extensions/src/main/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryProtobufExtensionsModule.java index 84677611cfa8..6062f7ed3a6d 100644 --- a/extensions-contrib/opentelemetry-extensions/src/main/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryMetricsProtobufExtensionsModule.java +++ b/extensions-contrib/opentelemetry-extensions/src/main/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryProtobufExtensionsModule.java @@ -28,7 +28,7 @@ import java.util.Collections; import java.util.List; -public class OpenTelemetryMetricsProtobufExtensionsModule implements DruidModule +public class OpenTelemetryProtobufExtensionsModule implements DruidModule { @Override diff --git a/extensions-contrib/opentelemetry-extensions/src/test/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryMetricsInputFormatTest.java b/extensions-contrib/opentelemetry-extensions/src/test/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryMetricsInputFormatTest.java index 52eb5738f4ec..1230a378fcc1 100644 --- a/extensions-contrib/opentelemetry-extensions/src/test/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryMetricsInputFormatTest.java +++ b/extensions-contrib/opentelemetry-extensions/src/test/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryMetricsInputFormatTest.java @@ -32,7 +32,7 @@ public void testSerde() throws Exception OpenTelemetryMetricsProtobufInputFormat inputFormat = new OpenTelemetryMetricsProtobufInputFormat("metric.name", "descriptor.", "custom."); final ObjectMapper jsonMapper = new ObjectMapper(); - jsonMapper.registerModules(new OpenTelemetryMetricsProtobufExtensionsModule().getJacksonModules()); + jsonMapper.registerModules(new OpenTelemetryProtobufExtensionsModule().getJacksonModules()); final OpenTelemetryMetricsProtobufInputFormat actual = (OpenTelemetryMetricsProtobufInputFormat) jsonMapper.readValue( jsonMapper.writeValueAsString(inputFormat), diff --git a/extensions-contrib/opentelemetry-extensions/src/test/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryMetricsProtobufReaderTest.java b/extensions-contrib/opentelemetry-extensions/src/test/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryMetricsProtobufReaderTest.java index bb208b88ac19..62ad82e12ddf 100644 --- a/extensions-contrib/opentelemetry-extensions/src/test/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryMetricsProtobufReaderTest.java +++ b/extensions-contrib/opentelemetry-extensions/src/test/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryMetricsProtobufReaderTest.java @@ -56,9 +56,9 @@ public class OpenTelemetryMetricsProtobufReaderTest public static final String METRIC_ATTRIBUTE_FOO_KEY = "foo_key"; public static final String METRIC_ATTRIBUTE_FOO_VAL = "foo_value"; - private final MetricsData.Builder requestBuilder = MetricsData.newBuilder(); + private final MetricsData.Builder metricsDataBuilder = MetricsData.newBuilder(); - private final Metric.Builder metricBuilder = requestBuilder.addResourceMetricsBuilder() + private final Metric.Builder metricBuilder = metricsDataBuilder.addResourceMetricsBuilder() .addInstrumentationLibraryMetricsBuilder() .addMetricsBuilder(); @@ -75,7 +75,7 @@ public class OpenTelemetryMetricsProtobufReaderTest @Before public void setUp() { - requestBuilder + metricsDataBuilder .getResourceMetricsBuilder(0) .getResourceBuilder() .addAttributes( @@ -84,7 +84,7 @@ public void setUp() .setValue(AnyValue.newBuilder().setStringValue(RESOURCE_ATTRIBUTE_VALUE_USA)) .build()); - requestBuilder + metricsDataBuilder .getResourceMetricsBuilder(0) .getInstrumentationLibraryMetricsBuilder(0) .getInstrumentationLibraryBuilder() @@ -103,11 +103,11 @@ public void testIntSumParse() .setTimeUnixNano(TIMESTAMP) .addLabelsBuilder().setKey(METRIC_ATTRIBUTE_COLOR).setValue(METRIC_ATTRIBUTE_VALUE_RED); - MetricsData request = requestBuilder.build(); + MetricsData metricsData = metricsDataBuilder.build(); CloseableIterator rows = new OpenTelemetryMetricsProtobufReader( dimensionsSpec, - new ByteEntity(request.toByteArray()), + new ByteEntity(metricsData.toByteArray()), "metric.name", "descriptor.", "custom." @@ -134,11 +134,11 @@ public void testIntGaugeParse() .setTimeUnixNano(TIMESTAMP) .addLabelsBuilder().setKey(METRIC_ATTRIBUTE_COLOR).setValue(METRIC_ATTRIBUTE_VALUE_RED); - MetricsData request = requestBuilder.build(); + MetricsData metricsData = metricsDataBuilder.build(); CloseableIterator rows = new OpenTelemetryMetricsProtobufReader( dimensionsSpec, - new ByteEntity(request.toByteArray()), + new ByteEntity(metricsData.toByteArray()), "metric.name", "descriptor.", "custom." @@ -168,11 +168,11 @@ public void testSumWithAttributes() .setKey(METRIC_ATTRIBUTE_COLOR) .setValue(AnyValue.newBuilder().setStringValue(METRIC_ATTRIBUTE_VALUE_RED).build()); - MetricsData request = requestBuilder.build(); + MetricsData metricsData = metricsDataBuilder.build(); CloseableIterator rows = new OpenTelemetryMetricsProtobufReader( dimensionsSpec, - new ByteEntity(request.toByteArray()), + new ByteEntity(metricsData.toByteArray()), "metric.name", "descriptor.", "custom." @@ -201,11 +201,11 @@ public void testGaugeWithAttributes() .setKey(METRIC_ATTRIBUTE_COLOR) .setValue(AnyValue.newBuilder().setStringValue(METRIC_ATTRIBUTE_VALUE_RED).build()); - MetricsData request = requestBuilder.build(); + MetricsData metricsData = metricsDataBuilder.build(); CloseableIterator rows = new OpenTelemetryMetricsProtobufReader( dimensionsSpec, - new ByteEntity(request.toByteArray()), + new ByteEntity(metricsData.toByteArray()), "metric.name", "descriptor.", "custom." @@ -235,11 +235,11 @@ public void testBatchedMetricParse() .setValue(AnyValue.newBuilder().setStringValue(METRIC_ATTRIBUTE_VALUE_RED).build()); // Create Second Metric - Metric.Builder gaugeMetricBuilder = requestBuilder.addResourceMetricsBuilder() + Metric.Builder gaugeMetricBuilder = metricsDataBuilder.addResourceMetricsBuilder() .addInstrumentationLibraryMetricsBuilder() .addMetricsBuilder(); - requestBuilder.getResourceMetricsBuilder(1) + metricsDataBuilder.getResourceMetricsBuilder(1) .getResourceBuilder() .addAttributes( KeyValue.newBuilder() @@ -247,7 +247,7 @@ public void testBatchedMetricParse() .setValue(AnyValue.newBuilder().setStringValue(RESOURCE_ATTRIBUTE_VALUE_DEVEL)) .build()); - requestBuilder.getResourceMetricsBuilder(1) + metricsDataBuilder.getResourceMetricsBuilder(1) .getInstrumentationLibraryMetricsBuilder(0) .getInstrumentationLibraryBuilder() .setName(INSTRUMENTATION_LIBRARY_NAME) @@ -262,11 +262,11 @@ public void testBatchedMetricParse() .setKey(METRIC_ATTRIBUTE_FOO_KEY) .setValue(AnyValue.newBuilder().setStringValue(METRIC_ATTRIBUTE_FOO_VAL).build()); - MetricsData request = requestBuilder.build(); + MetricsData metricsData = metricsDataBuilder.build(); CloseableIterator rows = new OpenTelemetryMetricsProtobufReader( dimensionsSpec, - new ByteEntity(request.toByteArray()), + new ByteEntity(metricsData.toByteArray()), "metric.name", "descriptor.", "custom." @@ -302,7 +302,7 @@ public void testDimensionSpecExclusions() .setKey(METRIC_ATTRIBUTE_COLOR) .setValue(AnyValue.newBuilder().setStringValue(METRIC_ATTRIBUTE_VALUE_RED).build()); - MetricsData request = requestBuilder.build(); + MetricsData metricsData = metricsDataBuilder.build(); DimensionsSpec dimensionsSpecWithExclusions = new DimensionsSpec(null, ImmutableList.of( @@ -312,7 +312,7 @@ public void testDimensionSpecExclusions() CloseableIterator rows = new OpenTelemetryMetricsProtobufReader( dimensionsSpecWithExclusions, - new ByteEntity(request.toByteArray()), + new ByteEntity(metricsData.toByteArray()), "metric.name", "descriptor.", "custom."