Skip to content

Commit

Permalink
Make the OpenTelemetry InputFormat More Flexible to Metric, Value and…
Browse files Browse the repository at this point in the history
… Attribute Types (#67)
  • Loading branch information
marcusgreer authored and m-ghazanfar committed May 29, 2023
1 parent 64e4b44 commit ae40c48
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.druid.java.util.common.parsers.ParseException;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -94,7 +95,7 @@ private List<InputRow> parseMetricsData(final MetricsData metricsData)
.getAttributesList()
.stream()
.collect(Collectors.toMap(kv -> resourceAttributePrefix + kv.getKey(),
kv -> getStringValue(kv.getValue())));
kv -> parseAnyValue(kv.getValue())));
return resourceMetrics.getInstrumentationLibraryMetricsList()
.stream()
.flatMap(libraryMetrics -> libraryMetrics.getMetricsList()
Expand Down Expand Up @@ -124,6 +125,11 @@ private List<InputRow> parseMetric(Metric metric, Map<String, Object> resourceAt
break;
}
// TODO Support HISTOGRAM and SUMMARY metrics
case HISTOGRAM:
case SUMMARY: {
inputRows = Collections.emptyList();
break;
}
default:
throw new IllegalStateException("Unexpected value: " + metric.getDataCase());
}
Expand All @@ -143,25 +149,38 @@ private InputRow parseNumberDataPoint(NumberDataPoint dataPoint,

if (dataPoint.hasAsInt()) {
event.put(valueDimension, dataPoint.getAsInt());
} else if (dataPoint.hasAsDouble()) {
event.put(valueDimension, dataPoint.getAsDouble());
} else {
throw new IllegalStateException("Unexpected dataPoint value type. Expected Int or Double");
event.put(valueDimension, dataPoint.getAsDouble());
}

event.putAll(resourceAttributes);
dataPoint.getAttributesList().forEach(att -> event.put(metricAttributePrefix + att.getKey(),
getStringValue(att.getValue())));
parseAnyValue(att.getValue())));

return createRow(TimeUnit.NANOSECONDS.toMillis(dataPoint.getTimeUnixNano()), event);
}

private static String getStringValue(AnyValue value)
private static Object parseAnyValue(AnyValue value)
{
if (value.getValueCase() == AnyValue.ValueCase.STRING_VALUE) {
return value.getStringValue();
switch (value.getValueCase()) {
case INT_VALUE:
return value.getIntValue();
case BOOL_VALUE:
return value.getBoolValue();
case ARRAY_VALUE:
return value.getArrayValue();
case BYTES_VALUE:
return value.getBytesValue();
case DOUBLE_VALUE:
return value.getDoubleValue();
case KVLIST_VALUE:
return value.getKvlistValue();
case STRING_VALUE:
return value.getStringValue();
default:
// VALUE_NOT_SET:
return "";
}
throw new IllegalStateException("Unexpected value: " + value.getValueCase());
}

InputRow createRow(long timeUnixMilli, Map<String, Object> event)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

org.apache.druid.data.input.opencensus.protobuf.OpenCensusProtobufExtensionsModule

0 comments on commit ae40c48

Please sign in to comment.