diff --git a/extensions-contrib/opentelemetry-extensions/src/main/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryMetricsProtobufReader.java b/extensions-contrib/opentelemetry-extensions/src/main/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryMetricsProtobufReader.java index 2a496d733824..868e1a529495 100644 --- a/extensions-contrib/opentelemetry-extensions/src/main/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryMetricsProtobufReader.java +++ b/extensions-contrib/opentelemetry-extensions/src/main/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryMetricsProtobufReader.java @@ -36,8 +36,10 @@ import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.java.util.common.parsers.ParseException; +import javax.annotation.Nullable; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -94,8 +96,14 @@ private List parseMetricsData(final MetricsData metricsData) Map resourceAttributes = resourceMetrics.getResource() .getAttributesList() .stream() - .collect(Collectors.toMap(kv -> resourceAttributePrefix + kv.getKey(), - kv -> parseAnyValue(kv.getValue()))); + .collect(HashMap::new, + (m, kv) -> { + Object value = parseAnyValue(kv.getValue()); + if (value != null) { + m.put(resourceAttributePrefix + kv.getKey(), value); + } + }, + HashMap::putAll); return resourceMetrics.getInstrumentationLibraryMetricsList() .stream() .flatMap(libraryMetrics -> libraryMetrics.getMetricsList() @@ -142,8 +150,8 @@ private InputRow parseNumberDataPoint(NumberDataPoint dataPoint, { int capacity = resourceAttributes.size() - + dataPoint.getAttributesCount() - + 2; // metric name + value columns + + dataPoint.getAttributesCount() + + 2; // metric name + value columns Map event = Maps.newHashMapWithExpectedSize(capacity); event.put(metricDimension, metricName); @@ -154,12 +162,17 @@ private InputRow parseNumberDataPoint(NumberDataPoint dataPoint, } event.putAll(resourceAttributes); - dataPoint.getAttributesList().forEach(att -> event.put(metricAttributePrefix + att.getKey(), - parseAnyValue(att.getValue()))); + dataPoint.getAttributesList().forEach(att -> { + Object value = parseAnyValue(att.getValue()); + if (value != null) { + event.put(metricAttributePrefix + att.getKey(), value); + } + }); return createRow(TimeUnit.NANOSECONDS.toMillis(dataPoint.getTimeUnixNano()), event); } + @Nullable private static Object parseAnyValue(AnyValue value) { switch (value.getValueCase()) { @@ -167,19 +180,16 @@ private static Object parseAnyValue(AnyValue value) return value.getIntValue(); case BOOL_VALUE: return value.getBoolValue(); - case ARRAY_VALUE: - return value.getArrayValue(); - case BYTES_VALUE: - return value.getBytesValue(); case DOUBLE_VALUE: return value.getDoubleValue(); - case KVLIST_VALUE: - return value.getKvlistValue(); case STRING_VALUE: return value.getStringValue(); + + // TODO: Support KVLIST_VALUE, ARRAY_VALUE and BYTES_VALUE + default: - // VALUE_NOT_SET: - return ""; + // VALUE_NOT_SET + return null; } } diff --git a/extensions-contrib/opentelemetry-extensions/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule b/extensions-contrib/opentelemetry-extensions/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule index 54b4400fd2cf..46a29b6f1b78 100755 --- a/extensions-contrib/opentelemetry-extensions/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule +++ b/extensions-contrib/opentelemetry-extensions/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule @@ -13,4 +13,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -org.apache.druid.data.input.opencensus.protobuf.OpenCensusProtobufExtensionsModule \ No newline at end of file +org.apache.druid.data.input.opentelemetry.protobuf.OpenTelemetryMetricsProtobufInputFormat diff --git a/extensions-contrib/opentelemetry-extensions/src/test/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryMetricsProtobufReaderTest.java b/extensions-contrib/opentelemetry-extensions/src/test/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryMetricsProtobufReaderTest.java index 80bb2b84017e..8a8edafab562 100644 --- a/extensions-contrib/opentelemetry-extensions/src/test/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryMetricsProtobufReaderTest.java +++ b/extensions-contrib/opentelemetry-extensions/src/test/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryMetricsProtobufReaderTest.java @@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableList; import io.opentelemetry.proto.common.v1.AnyValue; import io.opentelemetry.proto.common.v1.KeyValue; +import io.opentelemetry.proto.common.v1.KeyValueList; import io.opentelemetry.proto.metrics.v1.Metric; import io.opentelemetry.proto.metrics.v1.MetricsData; import org.apache.druid.data.input.InputRow; @@ -282,6 +283,66 @@ public void testDimensionSpecExclusions() Assert.assertFalse(row.getDimensions().contains("descriptor.color")); } + @Test + public void testUnsupportedValueTypes() + { + KeyValueList kvList = KeyValueList.newBuilder() + .addValues( + KeyValue.newBuilder() + .setKey("foo") + .setValue(AnyValue.newBuilder().setStringValue("bar").build())) + .build(); + + metricsDataBuilder.getResourceMetricsBuilder(0) + .getResourceBuilder() + .addAttributesBuilder() + .setKey(RESOURCE_ATTRIBUTE_ENV) + .setValue(AnyValue.newBuilder().setKvlistValue(kvList).build()); + + metricBuilder + .setName("example_sum") + .getSumBuilder() + .addDataPointsBuilder() + .setAsInt(6) + .setTimeUnixNano(TIMESTAMP) + .addAllAttributes(ImmutableList.of( + KeyValue.newBuilder() + .setKey(METRIC_ATTRIBUTE_COLOR) + .setValue(AnyValue.newBuilder().setStringValue(METRIC_ATTRIBUTE_VALUE_RED).build()).build(), + KeyValue.newBuilder() + .setKey(METRIC_ATTRIBUTE_FOO_KEY) + .setValue(AnyValue.newBuilder().setKvlistValue(kvList).build()).build())); + + MetricsData metricsData = metricsDataBuilder.build(); + + CloseableIterator rows = new OpenTelemetryMetricsProtobufReader( + dimensionsSpec, + new ByteEntity(metricsData.toByteArray()), + "metric.name", + "raw.value", + "descriptor.", + "custom." + ).read(); + + List rowList = new ArrayList<>(); + rows.forEachRemaining(rowList::add); + Assert.assertEquals(1, rowList.size()); + + InputRow row = rowList.get(0); + Assert.assertEquals(4, row.getDimensions().size()); + assertDimensionEquals(row, "metric.name", "example_sum"); + assertDimensionEquals(row, "custom.country", "usa"); + assertDimensionEquals(row, "descriptor.color", "red"); + + // Unsupported resource attribute type is omitted + Assert.assertEquals(0, row.getDimension("custom.env").size()); + + // Unsupported metric attribute type is omitted + Assert.assertEquals(0, row.getDimension("descriptor.foo_key").size()); + + assertDimensionEquals(row, "raw.value", "6"); + } + private void assertDimensionEquals(InputRow row, String dimension, Object expected) { List values = row.getDimension(dimension);