Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
marcusgreer committed Jan 18, 2022
1 parent 636d703 commit b336bf5
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 136 deletions.
27 changes: 0 additions & 27 deletions extensions-contrib/opentelemetry-extensions/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -74,32 +74,5 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<!-- jmh -->
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-core</artifactId>
<version>1.27</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-generator-annprocess</artifactId>
<version>1.27</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<version>3.0.2</version>
<configuration>
<nonFilteredFileExtensions>
<nonFilteredFileExtension>desc</nonFilteredFileExtension>
</nonFilteredFileExtensions>
</configuration>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import io.opentelemetry.proto.metrics.v1.Metric;
import io.opentelemetry.proto.metrics.v1.MetricsData;
import io.opentelemetry.proto.metrics.v1.NumberDataPoint;
import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowListPlusRawValues;
Expand All @@ -42,18 +41,18 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;

public class OpenTelemetryMetricsProtobufReader implements InputEntityReader
{
private static final String VALUE_COLUMN = "value";

private final DimensionsSpec dimensionsSpec;
private final ByteEntity source;
private final String metricDimension;
private final String metricAttributePrefix;
private final String resourceAttributePrefix;
private final Predicate<String> attributeMembership;

public OpenTelemetryMetricsProtobufReader(
DimensionsSpec dimensionsSpec,
Expand All @@ -63,11 +62,17 @@ public OpenTelemetryMetricsProtobufReader(
String resourceLabelPrefix
)
{
this.dimensionsSpec = dimensionsSpec;
this.source = source;
this.metricDimension = metricDimension;
this.metricAttributePrefix = metricLabelPrefix;
this.resourceAttributePrefix = resourceLabelPrefix;

List<String> schemaDimensions = dimensionsSpec.getDimensionNames();
if (!schemaDimensions.isEmpty()) {
this.attributeMembership = schemaDimensions::contains;
} else {
this.attributeMembership = att -> !dimensionsSpec.getDimensionExclusions().contains(att);
}
}

@Override
Expand All @@ -79,123 +84,92 @@ public CloseableIterator<InputRow> read()
List<InputRow> readAsList()
{
try {
return parseRequest(MetricsData.parseFrom(source.getBuffer()));
return parseMetricsData(MetricsData.parseFrom(source.getBuffer()));
}
catch (InvalidProtocolBufferException e) {
throw new ParseException(e, "Protobuf message could not be parsed");
}
}

private List<InputRow> parseRequest(final MetricsData request)
private List<InputRow> parseMetricsData(final MetricsData metricsData)
{

List<ResourceMetrics> resourceMetricsList = request.getResourceMetricsList();
return resourceMetricsList.stream()
.flatMap(
resourceMetrics -> {
List<KeyValue> resourceAttributes = resourceMetrics.getResource().getAttributesList();
return resourceMetrics.getInstrumentationLibraryMetricsList()
.stream()
.flatMap(libraryMetrics -> libraryMetrics.getMetricsList()
.stream()
.map(metric -> parseMetric(metric, resourceAttributes))
.flatMap(List::stream)
);
}
).collect(Collectors.toList());
return metricsData.getResourceMetricsList()
.stream()
.flatMap(resourceMetrics -> resourceMetrics.getInstrumentationLibraryMetricsList()
.stream()
.flatMap(libraryMetrics -> libraryMetrics.getMetricsList()
.stream()
.flatMap(metric -> parseMetric(metric, resourceMetrics.getResource().getAttributesList())
.stream()
)
)
).collect(Collectors.toList());
}

private List<InputRow> parseMetric(Metric metric, List<KeyValue> resourceAttributes)
{

List<String> schemaDimensions = dimensionsSpec.getDimensionNames();
if (!schemaDimensions.isEmpty()) {
return createRows(metric, resourceAttributes, schemaDimensions::contains);
} else {
return createRows(metric, resourceAttributes, att -> !dimensionsSpec.getDimensionExclusions().contains(att));
switch (metric.getDataCase()) {
case INT_SUM: {
List<NumberDataPoint> dataPoints = metric.getIntSum()
.getDataPointsList()
.stream()
.map(OpenTelemetryMetricsProtobufReader::intDataPointToNumDataPoint)
.collect(Collectors.toList());
return parseNumDataPoints(dataPoints, metric.getName(), resourceAttributes);
}
case INT_GAUGE: {
List<NumberDataPoint> dataPoints = metric.getIntGauge()
.getDataPointsList()
.stream()
.map(OpenTelemetryMetricsProtobufReader::intDataPointToNumDataPoint)
.collect(Collectors.toList());
return parseNumDataPoints(dataPoints, metric.getName(), resourceAttributes);
}
case SUM: {
return parseNumDataPoints(metric.getSum().getDataPointsList(), metric.getName(), resourceAttributes);
}
case GAUGE: {
return parseNumDataPoints(metric.getGauge().getDataPointsList(), metric.getName(), resourceAttributes);
}
// TODO Support HISTOGRAM and SUMMARY
default:
throw new IllegalStateException("Unexpected value: " + metric.getDataCase());
}
}

private List<InputRow> createRows(Metric metric,
List<KeyValue> resourceAttributes,
Function<String, Boolean> attributeMembershipFn)
private List<InputRow> parseNumDataPoints(List<NumberDataPoint> dataPoints,
String metricName,
List<KeyValue> resourceAttributes)
{

List<NumberDataPoint> dataPoints = getDataPoints(metric);
List<InputRow> rows = new ArrayList<>();

for (NumberDataPoint dataPoint : dataPoints) {

final long timeUnixMilli = TimeUnit.NANOSECONDS.toMillis(dataPoint.getTimeUnixNano());
final List<KeyValue> metricAttributes = dataPoint.getAttributesList();
final Number value = getValue(dataPoint);
long timeUnixMilli = TimeUnit.NANOSECONDS.toMillis(dataPoint.getTimeUnixNano());
List<KeyValue> metricAttributes = dataPoint.getAttributesList();
Number value = getValue(dataPoint);

final int capacity = resourceAttributes.size()
int capacity = resourceAttributes.size()
+ metricAttributes.size()
+ 2; // metric name + value columns

Map<String, Object> event = Maps.newHashMapWithExpectedSize(capacity);
event.put(metricDimension, metric.getName());
event.put(metricDimension, metricName);
event.put(VALUE_COLUMN, value);

List<String> dimensions = new ArrayList<>();
dimensions.add(metricDimension);
dimensions.add(VALUE_COLUMN);

resourceAttributes.stream()
.filter(ra -> attributeMembershipFn.apply(this.resourceAttributePrefix + ra.getKey()))
.forEach(ra -> {
dimensions.add(this.resourceAttributePrefix + ra.getKey());
event.put(this.resourceAttributePrefix + ra.getKey(), ra.getValue().getStringValue());
});
.filter(ra -> attributeMembership.test(this.resourceAttributePrefix + ra.getKey()))
.forEach(ra -> event.put(this.resourceAttributePrefix + ra.getKey(), ra.getValue().getStringValue()));

metricAttributes.stream()
.filter(ma -> attributeMembershipFn.apply(this.metricAttributePrefix + ma.getKey()))
.forEach(ma -> {
dimensions.add(this.metricAttributePrefix + ma.getKey());
event.put(this.metricAttributePrefix + ma.getKey(), ma.getValue().getStringValue());
});
.filter(ma -> attributeMembership.test(this.metricAttributePrefix + ma.getKey()))
.forEach(ma -> event.put(this.metricAttributePrefix + ma.getKey(), ma.getValue().getStringValue()));

rows.add(new MapBasedInputRow(timeUnixMilli, dimensions, event));
rows.add(new MapBasedInputRow(timeUnixMilli, new ArrayList<>(event.keySet()), event));
}
return rows;
}

private List<NumberDataPoint> getDataPoints(Metric metric)
{
List<NumberDataPoint> dataPoints;
switch (metric.getDataCase()) {
case INT_SUM: {
dataPoints = metric.getIntSum()
.getDataPointsList()
.stream()
.map(this::dataPointConverter)
.collect(Collectors.toList());
break;
}
case INT_GAUGE: {
dataPoints = metric.getIntGauge()
.getDataPointsList()
.stream()
.map(this::dataPointConverter)
.collect(Collectors.toList());
break;
}
case SUM: {
dataPoints = metric.getSum().getDataPointsList();
break;
}
case GAUGE: {
dataPoints = metric.getGauge().getDataPointsList();
break;
}
default:
throw new IllegalStateException("Unexpected value: " + metric.getDataCase());
}
return dataPoints;
}

private NumberDataPoint dataPointConverter(IntDataPoint dataPoint)
private static NumberDataPoint intDataPointToNumDataPoint(IntDataPoint dataPoint)
{
NumberDataPoint.Builder builder = NumberDataPoint.newBuilder()
.setTimeUnixNano(dataPoint.getTimeUnixNano())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import java.util.Collections;
import java.util.List;

public class OpenTelemetryMetricsProtobufExtensionsModule implements DruidModule
public class OpenTelemetryProtobufExtensionsModule implements DruidModule
{

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public void testSerde() throws Exception
OpenTelemetryMetricsProtobufInputFormat inputFormat = new OpenTelemetryMetricsProtobufInputFormat("metric.name", "descriptor.", "custom.");

final ObjectMapper jsonMapper = new ObjectMapper();
jsonMapper.registerModules(new OpenTelemetryMetricsProtobufExtensionsModule().getJacksonModules());
jsonMapper.registerModules(new OpenTelemetryProtobufExtensionsModule().getJacksonModules());

final OpenTelemetryMetricsProtobufInputFormat actual = (OpenTelemetryMetricsProtobufInputFormat) jsonMapper.readValue(
jsonMapper.writeValueAsString(inputFormat),
Expand Down
Loading

0 comments on commit b336bf5

Please sign in to comment.