diff --git a/extensions-contrib/opentelemetry-extensions/pom.xml b/extensions-contrib/opentelemetry-extensions/pom.xml new file mode 100644 index 000000000000..7b02d9539b68 --- /dev/null +++ b/extensions-contrib/opentelemetry-extensions/pom.xml @@ -0,0 +1,96 @@ + + + + 4.0.0 + + org.apache.druid.extensions.contrib + druid-opentelemetry-extensions + druid-opentelemetry-extensions + druid-opentelemetry-extensions + + + 0.11.0-alpha + + + + druid + org.apache.druid + 0.21.0 + ../../pom.xml + + + + com.google.protobuf + protobuf-java + + + io.opentelemetry.proto + opentelemetry-proto + ${opentelemetry.proto.version} + + + com.google.guava + guava + provided + + + com.google.inject + guice + provided + + + com.fasterxml.jackson.core + jackson-annotations + provided + + + com.fasterxml.jackson.core + jackson-databind + provided + + + org.apache.druid + druid-core + ${project.parent.version} + provided + + + + junit + junit + test + + + + org.openjdk.jmh + jmh-core + 1.27 + test + + + org.openjdk.jmh + jmh-generator-annprocess + 1.27 + test + + + diff --git a/extensions-contrib/opentelemetry-extensions/src/main/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryMetricsProtobufInputFormat.java b/extensions-contrib/opentelemetry-extensions/src/main/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryMetricsProtobufInputFormat.java new file mode 100644 index 000000000000..2089c2c7d676 --- /dev/null +++ b/extensions-contrib/opentelemetry-extensions/src/main/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryMetricsProtobufInputFormat.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.opentelemetry.protobuf; + +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.data.input.InputEntity; +import org.apache.druid.data.input.InputEntityReader; +import org.apache.druid.data.input.InputFormat; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.impl.ByteEntity; +import org.apache.druid.java.util.common.StringUtils; + +import java.io.File; +import java.util.Objects; + +public class OpenTelemetryMetricsProtobufInputFormat implements InputFormat +{ + private static final String DEFAULT_METRIC_DIMENSION = "metric"; + private static final String DEFAULT_VALUE_DIMENSION = "value"; + private static final String DEFAULT_RESOURCE_PREFIX = "resource."; + + private final String metricDimension; + private final String valueDimension; + private final String metricAttributePrefix; + private final String resourceAttributePrefix; + + public OpenTelemetryMetricsProtobufInputFormat( + @JsonProperty("metricDimension") String metricDimension, + @JsonProperty("valueDimension") String valueDimension, + @JsonProperty("metricAttributePrefix") String metricAttributePrefix, + @JsonProperty("resourceAttributePrefix") String resourceAttributePrefix + ) + { + this.metricDimension = metricDimension != null ? metricDimension : DEFAULT_METRIC_DIMENSION; + this.valueDimension = valueDimension != null ? valueDimension : DEFAULT_VALUE_DIMENSION; + this.metricAttributePrefix = StringUtils.nullToEmptyNonDruidDataString(metricAttributePrefix); + this.resourceAttributePrefix = resourceAttributePrefix != null ? resourceAttributePrefix : DEFAULT_RESOURCE_PREFIX; + } + + @Override + public boolean isSplittable() + { + return false; + } + + @Override + public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory) + { + return new OpenTelemetryMetricsProtobufReader( + inputRowSchema.getDimensionsSpec(), + (ByteEntity) source, + metricDimension, + valueDimension, + metricAttributePrefix, + resourceAttributePrefix + ); + } + + @JsonProperty + public String getMetricDimension() + { + return metricDimension; + } + + @JsonProperty + public String getValueDimension() + { + return valueDimension; + } + + @JsonProperty + public String getMetricAttributePrefix() + { + return metricAttributePrefix; + } + + @JsonProperty + public String getResourceAttributePrefix() + { + return resourceAttributePrefix; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (!(o instanceof OpenTelemetryMetricsProtobufInputFormat)) { + return false; + } + OpenTelemetryMetricsProtobufInputFormat that = (OpenTelemetryMetricsProtobufInputFormat) o; + return Objects.equals(metricDimension, that.metricDimension) + && Objects.equals(valueDimension, that.valueDimension) + && Objects.equals(metricAttributePrefix, that.metricAttributePrefix) + && Objects.equals(resourceAttributePrefix, that.resourceAttributePrefix); + } + + @Override + public int hashCode() + { + return Objects.hash(metricDimension, valueDimension, metricAttributePrefix, resourceAttributePrefix); + } +} diff --git a/extensions-contrib/opentelemetry-extensions/src/main/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryMetricsProtobufReader.java b/extensions-contrib/opentelemetry-extensions/src/main/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryMetricsProtobufReader.java new file mode 100644 index 000000000000..fe360e2eac0f --- /dev/null +++ b/extensions-contrib/opentelemetry-extensions/src/main/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryMetricsProtobufReader.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.opentelemetry.protobuf; + +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.google.protobuf.InvalidProtocolBufferException; +import io.opentelemetry.proto.common.v1.AnyValue; +import io.opentelemetry.proto.metrics.v1.Metric; +import io.opentelemetry.proto.metrics.v1.MetricsData; +import io.opentelemetry.proto.metrics.v1.NumberDataPoint; +import org.apache.druid.data.input.InputEntityReader; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowListPlusRawValues; +import org.apache.druid.data.input.MapBasedInputRow; +import org.apache.druid.data.input.impl.ByteEntity; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.java.util.common.CloseableIterators; +import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.apache.druid.java.util.common.parsers.ParseException; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +public class OpenTelemetryMetricsProtobufReader implements InputEntityReader +{ + + private final ByteEntity source; + private final String metricDimension; + private final String valueDimension; + private final String metricAttributePrefix; + private final String resourceAttributePrefix; + private final DimensionsSpec dimensionsSpec; + + public OpenTelemetryMetricsProtobufReader( + DimensionsSpec dimensionsSpec, + ByteEntity source, + String metricDimension, + String valueDimension, + String metricAttributePrefix, + String resourceAttributePrefix + ) + { + this.dimensionsSpec = dimensionsSpec; + this.source = source; + this.metricDimension = metricDimension; + this.valueDimension = valueDimension; + this.metricAttributePrefix = metricAttributePrefix; + this.resourceAttributePrefix = resourceAttributePrefix; + } + + @Override + public CloseableIterator read() + { + return CloseableIterators.withEmptyBaggage(readAsList().iterator()); + } + + List readAsList() + { + try { + return parseMetricsData(MetricsData.parseFrom(source.getBuffer())); + } + catch (InvalidProtocolBufferException e) { + throw new ParseException(e, "Protobuf message could not be parsed"); + } + } + + private List parseMetricsData(final MetricsData metricsData) + { + return metricsData.getResourceMetricsList() + .stream() + .flatMap(resourceMetrics -> { + Map resourceAttributes = resourceMetrics.getResource() + .getAttributesList() + .stream() + .collect(Collectors.toMap(kv -> resourceAttributePrefix + kv.getKey(), + kv -> getStringValue(kv.getValue()))); + return resourceMetrics.getInstrumentationLibraryMetricsList() + .stream() + .flatMap(libraryMetrics -> libraryMetrics.getMetricsList() + .stream() + .flatMap(metric -> parseMetric(metric, resourceAttributes).stream())); + }) + .collect(Collectors.toList()); + } + + private List parseMetric(Metric metric, Map resourceAttributes) + { + final List inputRows; + String metricName = metric.getName(); + switch (metric.getDataCase()) { + case SUM: { + inputRows = new ArrayList<>(metric.getSum().getDataPointsCount()); + metric.getSum() + .getDataPointsList() + .forEach(dataPoint -> inputRows.add(parseNumberDataPoint(dataPoint, resourceAttributes, metricName))); + break; + } + case GAUGE: { + inputRows = new ArrayList<>(metric.getGauge().getDataPointsCount()); + metric.getGauge() + .getDataPointsList() + .forEach(dataPoint -> inputRows.add(parseNumberDataPoint(dataPoint, resourceAttributes, metricName))); + break; + } + // TODO Support HISTOGRAM and SUMMARY metrics + default: + throw new IllegalStateException("Unexpected value: " + metric.getDataCase()); + } + return inputRows; + } + + private InputRow parseNumberDataPoint(NumberDataPoint dataPoint, + Map resourceAttributes, + String metricName) + { + + int capacity = resourceAttributes.size() + + dataPoint.getAttributesCount() + + 2; // metric name + value columns + Map event = Maps.newHashMapWithExpectedSize(capacity); + event.put(metricDimension, metricName); + + if (dataPoint.hasAsInt()) { + event.put(valueDimension, dataPoint.getAsInt()); + } else if (dataPoint.hasAsDouble()) { + event.put(valueDimension, dataPoint.getAsDouble()); + } else { + throw new IllegalStateException("Unexpected dataPoint value type. Expected Int or Double"); + } + + event.putAll(resourceAttributes); + dataPoint.getAttributesList().forEach(att -> event.put(metricAttributePrefix + att.getKey(), + getStringValue(att.getValue()))); + + return createRow(TimeUnit.NANOSECONDS.toMillis(dataPoint.getTimeUnixNano()), event); + } + + private static String getStringValue(AnyValue value) + { + if (value.getValueCase() == AnyValue.ValueCase.STRING_VALUE) { + return value.getStringValue(); + } + throw new IllegalStateException("Unexpected value: " + value.getValueCase()); + } + + InputRow createRow(long timeUnixMilli, Map event) + { + final List dimensions; + if (!dimensionsSpec.getDimensionNames().isEmpty()) { + dimensions = dimensionsSpec.getDimensionNames(); + } else { + dimensions = new ArrayList<>(Sets.difference(event.keySet(), dimensionsSpec.getDimensionExclusions())); + } + return new MapBasedInputRow(timeUnixMilli, dimensions, event); + } + + @Override + public CloseableIterator sample() + { + return read().map(row -> InputRowListPlusRawValues.of(row, ((MapBasedInputRow) row).getEvent())); + } +} diff --git a/extensions-contrib/opentelemetry-extensions/src/main/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryProtobufExtensionsModule.java b/extensions-contrib/opentelemetry-extensions/src/main/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryProtobufExtensionsModule.java new file mode 100644 index 000000000000..4c027c31248c --- /dev/null +++ b/extensions-contrib/opentelemetry-extensions/src/main/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryProtobufExtensionsModule.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.opentelemetry.protobuf; + +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.inject.Binder; +import org.apache.druid.initialization.DruidModule; + +import java.util.Collections; +import java.util.List; + +public class OpenTelemetryProtobufExtensionsModule implements DruidModule +{ + + @Override + public List getJacksonModules() + { + return Collections.singletonList( + new SimpleModule("OpenTelemetryProtobufInputFormat") + .registerSubtypes( + new NamedType(OpenTelemetryMetricsProtobufInputFormat.class, "opentelemetry-metrics-protobuf") + ) + ); + } + + @Override + public void configure(Binder binder) + { + } +} diff --git a/extensions-contrib/opentelemetry-extensions/src/test/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryBenchmark.java b/extensions-contrib/opentelemetry-extensions/src/test/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryBenchmark.java new file mode 100644 index 000000000000..bbb77f1c1a1f --- /dev/null +++ b/extensions-contrib/opentelemetry-extensions/src/test/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryBenchmark.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.opentelemetry.protobuf; + +import com.google.common.collect.ImmutableList; +import io.opentelemetry.proto.common.v1.AnyValue; +import io.opentelemetry.proto.common.v1.KeyValue; +import io.opentelemetry.proto.metrics.v1.InstrumentationLibraryMetrics; +import io.opentelemetry.proto.metrics.v1.Metric; +import io.opentelemetry.proto.metrics.v1.MetricsData; +import io.opentelemetry.proto.metrics.v1.NumberDataPoint; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import io.opentelemetry.proto.resource.v1.Resource; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.impl.ByteEntity; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.StringDimensionSchema; +import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.infra.Blackhole; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.time.Instant; +import java.util.concurrent.TimeUnit; + +@Fork(1) +@State(Scope.Benchmark) +public class OpenTelemetryBenchmark +{ + + private static ByteBuffer BUFFER; + + @Param(value = {"1", "2", "4", "8" }) + private int resourceMetricCount = 1; + + @Param(value = {"1"}) + private int instrumentationLibraryCount = 1; + + @Param(value = {"1", "2", "4", "8" }) + private int metricsCount = 1; + + @Param(value = {"1", "2", "4", "8" }) + private int dataPointCount; + + private static final long TIMESTAMP = TimeUnit.MILLISECONDS.toNanos(Instant.parse("2019-07-12T09:30:01.123Z").toEpochMilli()); + + private static final InputRowSchema ROW_SCHEMA = new InputRowSchema(null, + new DimensionsSpec(ImmutableList.of( + new StringDimensionSchema("name"), + new StringDimensionSchema("value"), + new StringDimensionSchema("foo_key")), + null, null), + null); + + private static final OpenTelemetryMetricsProtobufInputFormat INPUT_FORMAT = + new OpenTelemetryMetricsProtobufInputFormat("name", + "value", + "", + "resource."); + + private ByteBuffer createMetricBuffer() + { + MetricsData.Builder metricsData = MetricsData.newBuilder(); + for (int i = 0; i < resourceMetricCount; i++) { + ResourceMetrics.Builder resourceMetricsBuilder = metricsData.addResourceMetricsBuilder(); + Resource.Builder resourceBuilder = resourceMetricsBuilder.getResourceBuilder(); + + for (int resourceAttributeI = 0; resourceAttributeI < 5; resourceAttributeI++) { + KeyValue.Builder resourceAttributeBuilder = resourceBuilder.addAttributesBuilder(); + resourceAttributeBuilder.setKey("resource.label_key_" + resourceAttributeI); + resourceAttributeBuilder.setValue(AnyValue.newBuilder().setStringValue("resource.label_value")); + } + + for (int j = 0; j < instrumentationLibraryCount; j++) { + InstrumentationLibraryMetrics.Builder instrumentationLibraryMetricsBuilder = + resourceMetricsBuilder.addInstrumentationLibraryMetricsBuilder(); + + for (int k = 0; k < metricsCount; k++) { + Metric.Builder metricBuilder = instrumentationLibraryMetricsBuilder.addMetricsBuilder(); + metricBuilder.setName("io.confluent.domain/such/good/metric/wow"); + + for (int l = 0; l < dataPointCount; l++) { + NumberDataPoint.Builder dataPointBuilder = metricBuilder.getSumBuilder().addDataPointsBuilder(); + dataPointBuilder.setAsDouble(42.0).setTimeUnixNano(TIMESTAMP); + + for (int metricAttributeI = 0; metricAttributeI < 10; metricAttributeI++) { + KeyValue.Builder attributeBuilder = dataPointBuilder.addAttributesBuilder(); + attributeBuilder.setKey("foo_key_" + metricAttributeI); + attributeBuilder.setValue(AnyValue.newBuilder().setStringValue("foo-value")); + } + } + } + } + } + return ByteBuffer.wrap(metricsData.build().toByteArray()); + } + + @Setup + public void init() + { + BUFFER = createMetricBuffer(); + } + + @Benchmark() + public void measureSerde(Blackhole blackhole) throws IOException + { + for (CloseableIterator it = INPUT_FORMAT.createReader(ROW_SCHEMA, new ByteEntity(BUFFER), null).read(); it.hasNext(); ) { + InputRow row = it.next(); + blackhole.consume(row); + } + } +} diff --git a/extensions-contrib/opentelemetry-extensions/src/test/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryMetricsInputFormatTest.java b/extensions-contrib/opentelemetry-extensions/src/test/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryMetricsInputFormatTest.java new file mode 100644 index 000000000000..536247ab5716 --- /dev/null +++ b/extensions-contrib/opentelemetry-extensions/src/test/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryMetricsInputFormatTest.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.opentelemetry.protobuf; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.data.input.InputFormat; +import org.junit.Assert; +import org.junit.Test; + +public class OpenTelemetryMetricsInputFormatTest +{ + @Test + public void testSerde() throws Exception + { + OpenTelemetryMetricsProtobufInputFormat inputFormat = new OpenTelemetryMetricsProtobufInputFormat( + "metric.name", + "raw.value", + "descriptor.", + "custom." + ); + + final ObjectMapper jsonMapper = new ObjectMapper(); + jsonMapper.registerModules(new OpenTelemetryProtobufExtensionsModule().getJacksonModules()); + + final OpenTelemetryMetricsProtobufInputFormat actual = (OpenTelemetryMetricsProtobufInputFormat) jsonMapper.readValue( + jsonMapper.writeValueAsString(inputFormat), + InputFormat.class + ); + Assert.assertEquals(inputFormat, actual); + Assert.assertEquals("metric.name", actual.getMetricDimension()); + Assert.assertEquals("raw.value", actual.getValueDimension()); + Assert.assertEquals("descriptor.", actual.getMetricAttributePrefix()); + Assert.assertEquals("custom.", actual.getResourceAttributePrefix()); + } + + @Test + public void testDefaults() + { + OpenTelemetryMetricsProtobufInputFormat inputFormat = new OpenTelemetryMetricsProtobufInputFormat( + null, + null, + null, + null + ); + + Assert.assertEquals("metric", inputFormat.getMetricDimension()); + Assert.assertEquals("value", inputFormat.getValueDimension()); + Assert.assertEquals("", inputFormat.getMetricAttributePrefix()); + Assert.assertEquals("resource.", inputFormat.getResourceAttributePrefix()); + } +} diff --git a/extensions-contrib/opentelemetry-extensions/src/test/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryMetricsProtobufReaderTest.java b/extensions-contrib/opentelemetry-extensions/src/test/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryMetricsProtobufReaderTest.java new file mode 100644 index 000000000000..80bb2b84017e --- /dev/null +++ b/extensions-contrib/opentelemetry-extensions/src/test/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryMetricsProtobufReaderTest.java @@ -0,0 +1,292 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.opentelemetry.protobuf; + +import com.google.common.collect.ImmutableList; +import io.opentelemetry.proto.common.v1.AnyValue; +import io.opentelemetry.proto.common.v1.KeyValue; +import io.opentelemetry.proto.metrics.v1.Metric; +import io.opentelemetry.proto.metrics.v1.MetricsData; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.impl.ByteEntity; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.StringDimensionSchema; +import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +public class OpenTelemetryMetricsProtobufReaderTest +{ + private static final long TIMESTAMP = TimeUnit.MILLISECONDS.toNanos(Instant.parse("2019-07-12T09:30:01.123Z").toEpochMilli()); + public static final String RESOURCE_ATTRIBUTE_COUNTRY = "country"; + public static final String RESOURCE_ATTRIBUTE_VALUE_USA = "usa"; + + public static final String RESOURCE_ATTRIBUTE_ENV = "env"; + public static final String RESOURCE_ATTRIBUTE_VALUE_DEVEL = "devel"; + + public static final String INSTRUMENTATION_LIBRARY_NAME = "mock-instr-lib"; + public static final String INSTRUMENTATION_LIBRARY_VERSION = "1.0"; + + public static final String METRIC_ATTRIBUTE_COLOR = "color"; + public static final String METRIC_ATTRIBUTE_VALUE_RED = "red"; + + public static final String METRIC_ATTRIBUTE_FOO_KEY = "foo_key"; + public static final String METRIC_ATTRIBUTE_FOO_VAL = "foo_value"; + + private final MetricsData.Builder metricsDataBuilder = MetricsData.newBuilder(); + + private final Metric.Builder metricBuilder = metricsDataBuilder.addResourceMetricsBuilder() + .addInstrumentationLibraryMetricsBuilder() + .addMetricsBuilder(); + + private final DimensionsSpec dimensionsSpec = new DimensionsSpec(ImmutableList.of( + new StringDimensionSchema("descriptor." + METRIC_ATTRIBUTE_COLOR), + new StringDimensionSchema("descriptor." + METRIC_ATTRIBUTE_FOO_KEY), + new StringDimensionSchema("custom." + RESOURCE_ATTRIBUTE_ENV), + new StringDimensionSchema("custom." + RESOURCE_ATTRIBUTE_COUNTRY) + ), null, null); + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Before + public void setUp() + { + metricsDataBuilder + .getResourceMetricsBuilder(0) + .getResourceBuilder() + .addAttributes(KeyValue.newBuilder() + .setKey(RESOURCE_ATTRIBUTE_COUNTRY) + .setValue(AnyValue.newBuilder().setStringValue(RESOURCE_ATTRIBUTE_VALUE_USA))); + + metricsDataBuilder + .getResourceMetricsBuilder(0) + .getInstrumentationLibraryMetricsBuilder(0) + .getInstrumentationLibraryBuilder() + .setName(INSTRUMENTATION_LIBRARY_NAME) + .setVersion(INSTRUMENTATION_LIBRARY_VERSION); + + } + + @Test + public void testSumWithAttributes() + { + metricBuilder + .setName("example_sum") + .getSumBuilder() + .addDataPointsBuilder() + .setAsInt(6) + .setTimeUnixNano(TIMESTAMP) + .addAttributesBuilder() // test sum with attributes + .setKey(METRIC_ATTRIBUTE_COLOR) + .setValue(AnyValue.newBuilder().setStringValue(METRIC_ATTRIBUTE_VALUE_RED).build()); + + MetricsData metricsData = metricsDataBuilder.build(); + + CloseableIterator rows = new OpenTelemetryMetricsProtobufReader( + dimensionsSpec, + new ByteEntity(metricsData.toByteArray()), + "metric.name", + "raw.value", + "descriptor.", + "custom." + ).read(); + + List rowList = new ArrayList<>(); + rows.forEachRemaining(rowList::add); + Assert.assertEquals(1, rowList.size()); + + InputRow row = rowList.get(0); + Assert.assertEquals(4, row.getDimensions().size()); + assertDimensionEquals(row, "metric.name", "example_sum"); + assertDimensionEquals(row, "custom.country", "usa"); + assertDimensionEquals(row, "descriptor.color", "red"); + assertDimensionEquals(row, "raw.value", "6"); + } + + @Test + public void testGaugeWithAttributes() + { + metricBuilder.setName("example_gauge") + .getGaugeBuilder() + .addDataPointsBuilder() + .setAsInt(6) + .setTimeUnixNano(TIMESTAMP) + .addAttributesBuilder() // test sum with attributes + .setKey(METRIC_ATTRIBUTE_COLOR) + .setValue(AnyValue.newBuilder().setStringValue(METRIC_ATTRIBUTE_VALUE_RED).build()); + + MetricsData metricsData = metricsDataBuilder.build(); + + CloseableIterator rows = new OpenTelemetryMetricsProtobufReader( + dimensionsSpec, + new ByteEntity(metricsData.toByteArray()), + "metric.name", + "raw.value", + "descriptor.", + "custom." + ).read(); + + Assert.assertTrue(rows.hasNext()); + InputRow row = rows.next(); + + Assert.assertEquals(4, row.getDimensions().size()); + assertDimensionEquals(row, "metric.name", "example_gauge"); + assertDimensionEquals(row, "custom.country", "usa"); + assertDimensionEquals(row, "descriptor.color", "red"); + assertDimensionEquals(row, "raw.value", "6"); + } + + @Test + public void testBatchedMetricParse() + { + metricBuilder.setName("example_sum") + .getSumBuilder() + .addDataPointsBuilder() + .setAsInt(6) + .setTimeUnixNano(TIMESTAMP) + .addAttributesBuilder() // test sum with attributes + .setKey(METRIC_ATTRIBUTE_COLOR) + .setValue(AnyValue.newBuilder().setStringValue(METRIC_ATTRIBUTE_VALUE_RED).build()); + + // Create Second Metric + Metric.Builder gaugeMetricBuilder = metricsDataBuilder.addResourceMetricsBuilder() + .addInstrumentationLibraryMetricsBuilder() + .addMetricsBuilder(); + + metricsDataBuilder.getResourceMetricsBuilder(1) + .getResourceBuilder() + .addAttributes(KeyValue.newBuilder() + .setKey(RESOURCE_ATTRIBUTE_ENV) + .setValue(AnyValue.newBuilder().setStringValue(RESOURCE_ATTRIBUTE_VALUE_DEVEL)) + .build()); + + metricsDataBuilder.getResourceMetricsBuilder(1) + .getInstrumentationLibraryMetricsBuilder(0) + .getInstrumentationLibraryBuilder() + .setName(INSTRUMENTATION_LIBRARY_NAME) + .setVersion(INSTRUMENTATION_LIBRARY_VERSION); + + gaugeMetricBuilder.setName("example_gauge") + .getGaugeBuilder() + .addDataPointsBuilder() + .setAsInt(8) + .setTimeUnixNano(TIMESTAMP) + .addAttributesBuilder() // test sum with attributes + .setKey(METRIC_ATTRIBUTE_FOO_KEY) + .setValue(AnyValue.newBuilder().setStringValue(METRIC_ATTRIBUTE_FOO_VAL).build()); + + MetricsData metricsData = metricsDataBuilder.build(); + + CloseableIterator rows = new OpenTelemetryMetricsProtobufReader( + dimensionsSpec, + new ByteEntity(metricsData.toByteArray()), + "metric.name", + "raw.value", + "descriptor.", + "custom." + ).read(); + + Assert.assertTrue(rows.hasNext()); + InputRow row = rows.next(); + + Assert.assertEquals(4, row.getDimensions().size()); + assertDimensionEquals(row, "metric.name", "example_sum"); + assertDimensionEquals(row, "custom.country", "usa"); + assertDimensionEquals(row, "descriptor.color", "red"); + assertDimensionEquals(row, "raw.value", "6"); + + Assert.assertTrue(rows.hasNext()); + row = rows.next(); + Assert.assertEquals(4, row.getDimensions().size()); + assertDimensionEquals(row, "metric.name", "example_gauge"); + assertDimensionEquals(row, "custom.env", "devel"); + assertDimensionEquals(row, "descriptor.foo_key", "foo_value"); + assertDimensionEquals(row, "raw.value", "8"); + + } + + @Test + public void testDimensionSpecExclusions() + { + metricsDataBuilder.getResourceMetricsBuilder(0) + .getResourceBuilder() + .addAttributesBuilder() + .setKey(RESOURCE_ATTRIBUTE_ENV) + .setValue(AnyValue.newBuilder().setStringValue(RESOURCE_ATTRIBUTE_VALUE_DEVEL).build()); + + metricBuilder.setName("example_gauge") + .getGaugeBuilder() + .addDataPointsBuilder() + .setAsInt(6) + .setTimeUnixNano(TIMESTAMP) + .addAllAttributes(ImmutableList.of( + KeyValue.newBuilder() + .setKey(METRIC_ATTRIBUTE_COLOR) + .setValue(AnyValue.newBuilder().setStringValue(METRIC_ATTRIBUTE_VALUE_RED).build()).build(), + KeyValue.newBuilder() + .setKey(METRIC_ATTRIBUTE_FOO_KEY) + .setValue(AnyValue.newBuilder().setStringValue(METRIC_ATTRIBUTE_FOO_VAL).build()).build())); + + MetricsData metricsData = metricsDataBuilder.build(); + + DimensionsSpec dimensionsSpecWithExclusions = new DimensionsSpec(null, + ImmutableList.of( + "descriptor." + METRIC_ATTRIBUTE_COLOR, + "custom." + RESOURCE_ATTRIBUTE_COUNTRY + ), null); + + CloseableIterator rows = new OpenTelemetryMetricsProtobufReader( + dimensionsSpecWithExclusions, + new ByteEntity(metricsData.toByteArray()), + "metric.name", + "raw.value", + "descriptor.", + "custom." + ).read(); + + Assert.assertTrue(rows.hasNext()); + InputRow row = rows.next(); + + Assert.assertEquals(4, row.getDimensions().size()); + assertDimensionEquals(row, "metric.name", "example_gauge"); + assertDimensionEquals(row, "raw.value", "6"); + assertDimensionEquals(row, "custom.env", "devel"); + assertDimensionEquals(row, "descriptor.foo_key", "foo_value"); + Assert.assertFalse(row.getDimensions().contains("custom.country")); + Assert.assertFalse(row.getDimensions().contains("descriptor.color")); + } + + private void assertDimensionEquals(InputRow row, String dimension, Object expected) + { + List values = row.getDimension(dimension); + Assert.assertEquals(1, values.size()); + Assert.assertEquals(expected, values.get(0)); + } + +} diff --git a/pom.xml b/pom.xml index ba322ffb9513..47a25f80e8e7 100644 --- a/pom.xml +++ b/pom.xml @@ -230,6 +230,7 @@ extensions-contrib/opencensus-extensions extensions-contrib/confluent-extensions extensions-contrib/opentelemetry-emitter + extensions-contrib/opentelemetry-extensions distribution