Skip to content

Commit

Permalink
OBSDATA-483: Adapt OpenCensus and OpenTelemetry extensions to the int…
Browse files Browse the repository at this point in the history
…roduction of SettableByteEntity (#113)

* OBSDATA-483: Adapt opencensus extension to the introduction of SettableByteEntity

* OBSDATA-483: Adapt opentelemetry extension to the introduction of SettableByteEntity

* OBSDATA-483: Decide which reader to instantiate on read between opencensus and opentelemetry

* OBSDATA-483: Add logger config in opencensus tests

* OBSDATA-483: Fix issue with opening the byte entity

* OBSDATA-483: Instantiate the right iterator in every read request

* OBSDATA-483: Add comments

* OBSDATA-483: Address Xavier's comments

* OBSDATA-483: Remove unused member fields

* OBSDATA-483: Rename enum

* OBSDATA-483: Fix trace log to actually print the argument

* OBSDATA-483: Keep passing the underlying byte buffer and move its position explicitly

* OBSDATA-483: Fix checkstyle issues

* OBSDATA-483: Add back handling of InvalidProtocolBufferException

* OBSDATA-483: Extend the semaphore workflow execution time to 2 hours

* Revert "OBSDATA-483: Extend the semaphore workflow execution time to 2 hours"

* OBSDATA-483: Don't close iterator in sample
  • Loading branch information
kkonstantine authored and m-ghazanfar committed May 29, 2023
1 parent dcedf5d commit cced118
Show file tree
Hide file tree
Showing 11 changed files with 372 additions and 158 deletions.
6 changes: 6 additions & 0 deletions extensions-contrib/opencensus-extensions/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-indexing-service</artifactId>
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry.proto</groupId>
<artifactId>opentelemetry-proto</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.data.input.opencensus.protobuf;

import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowListPlusRawValues;
import org.apache.druid.data.input.KafkaUtils;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.opentelemetry.protobuf.OpenTelemetryMetricsProtobufReader;
import org.apache.druid.indexing.seekablestream.SettableByteEntity;
import org.apache.druid.java.util.common.parsers.CloseableIterator;

import java.io.IOException;
import java.lang.invoke.MethodHandle;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;

public class HybridProtobufReader implements InputEntityReader
{
private static final String VERSION_HEADER_KEY = "v";
private static final int OPENTELEMETRY_FORMAT_VERSION = 1;

private final DimensionsSpec dimensionsSpec;
private final SettableByteEntity<? extends ByteEntity> source;
private final String metricDimension;
private final String valueDimension;
private final String metricLabelPrefix;
private final String resourceLabelPrefix;

private volatile MethodHandle getHeaderMethod = null;

enum ProtobufReader
{
OPENCENSUS,
OPENTELEMETRY
}

public HybridProtobufReader(
DimensionsSpec dimensionsSpec,
SettableByteEntity<? extends ByteEntity> source,
String metricDimension,
String valueDimension,
String metricLabelPrefix,
String resourceLabelPrefix
)
{
this.dimensionsSpec = dimensionsSpec;
this.source = source;
this.metricDimension = metricDimension;
this.valueDimension = valueDimension;
this.metricLabelPrefix = metricLabelPrefix;
this.resourceLabelPrefix = resourceLabelPrefix;
}

@Override
public CloseableIterator<InputRow> read() throws IOException
{
return newReader(whichReader()).read();
}

public InputEntityReader newReader(ProtobufReader which)
{
switch (which) {
case OPENTELEMETRY:
return new OpenTelemetryMetricsProtobufReader(
dimensionsSpec,
source,
metricDimension,
valueDimension,
metricLabelPrefix,
resourceLabelPrefix
);
case OPENCENSUS:
default:
return new OpenCensusProtobufReader(
dimensionsSpec,
source,
metricDimension,
metricLabelPrefix,
resourceLabelPrefix
);
}
}

public ProtobufReader whichReader()
{
// assume InputEntity is always defined in a single classloader (the kafka-indexing-service classloader)
// so we only have to look it up once. To be completely correct we should cache the method based on classloader
if (getHeaderMethod == null) {
getHeaderMethod = KafkaUtils.lookupGetHeaderMethod(
source.getEntity().getClass().getClassLoader(),
VERSION_HEADER_KEY
);
}

try {
byte[] versionHeader = (byte[]) getHeaderMethod.invoke(source.getEntity());
if (versionHeader != null) {
int version =
ByteBuffer.wrap(versionHeader).order(ByteOrder.LITTLE_ENDIAN).getInt();
if (version == OPENTELEMETRY_FORMAT_VERSION) {
return ProtobufReader.OPENTELEMETRY;
}
}
}
catch (Throwable t) {
// assume input is opencensus if something went wrong
}
return ProtobufReader.OPENCENSUS;
}

@Override
public CloseableIterator<InputRowListPlusRawValues> sample() throws IOException
{
return read().map(row -> InputRowListPlusRawValues.of(row, ((MapBasedInputRow) row).getEvent()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,33 +24,25 @@
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.KafkaUtils;
import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.data.input.opentelemetry.protobuf.OpenTelemetryMetricsProtobufReader;
import org.apache.druid.indexing.seekablestream.SettableByteEntity;
import org.apache.druid.java.util.common.StringUtils;

import javax.annotation.Nullable;
import java.io.File;
import java.lang.invoke.MethodHandle;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Objects;

public class OpenCensusProtobufInputFormat implements InputFormat
{
private static final String DEFAULT_METRIC_DIMENSION = "name";
private static final String DEFAULT_RESOURCE_PREFIX = "resource.";
private static final String DEFAULT_VALUE_DIMENSION = "value";
private static final String VERSION_HEADER_KEY = "v";
private static final int OPENTELEMETRY_FORMAT_VERSION = 1;

private final String metricDimension;
private final String valueDimension;
private final String metricLabelPrefix;
private final String resourceLabelPrefix;

private volatile MethodHandle getHeaderMethod = null;

public OpenCensusProtobufInputFormat(
@JsonProperty("metricDimension") String metricDimension,
@JsonProperty("valueDimension") @Nullable String valueDimension,
Expand All @@ -73,41 +65,21 @@ public boolean isSplittable()
@Override
public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory)
{
// assume InputEntity is always defined in a single classloader (the kafka-indexing-service classloader)
// so we only have to look it up once. To be completely correct we should cache the method based on classloader
if (getHeaderMethod == null) {
getHeaderMethod = KafkaUtils.lookupGetHeaderMethod(
source.getClass().getClassLoader(),
OpenCensusProtobufInputFormat.VERSION_HEADER_KEY
);
}

try {
byte[] versionHeader = (byte[]) getHeaderMethod.invoke(source);
if (versionHeader != null) {
int version =
ByteBuffer.wrap(versionHeader).order(ByteOrder.LITTLE_ENDIAN).getInt();
if (version == OPENTELEMETRY_FORMAT_VERSION) {
return new OpenTelemetryMetricsProtobufReader(
inputRowSchema.getDimensionsSpec(),
(ByteEntity) source,
metricDimension,
valueDimension,
metricLabelPrefix,
resourceLabelPrefix
);
}
}
// Sampler passes a KafkaRecordEntity directly, while the normal code path wraps the same entity in a
// SettableByteEntity
SettableByteEntity<? extends ByteEntity> settableEntity;
if (source instanceof SettableByteEntity) {
settableEntity = (SettableByteEntity<? extends ByteEntity>) source;
} else {
SettableByteEntity<ByteEntity> wrapper = new SettableByteEntity<>();
wrapper.setEntity((ByteEntity) source);
settableEntity = wrapper;
}
catch (Throwable t) {
// assume input is opencensus if something went wrong
}


return new OpenCensusProtobufReader(
return new HybridProtobufReader(
inputRowSchema.getDimensionsSpec(),
(ByteEntity) source,
settableEntity,
metricDimension,
valueDimension,
metricLabelPrefix,
resourceLabelPrefix
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.data.input.impl.ParseSpec;
import org.apache.druid.indexing.seekablestream.SettableByteEntity;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;

Expand Down Expand Up @@ -103,9 +104,11 @@ public OpenCensusProtobufInputRowParser withParseSpec(ParseSpec parseSpec)
@Override
public List<InputRow> parseBatch(ByteBuffer input)
{
SettableByteEntity<ByteEntity> settableByteEntity = new SettableByteEntity<>();
settableByteEntity.setEntity(new ByteEntity(input));
return new OpenCensusProtobufReader(
parseSpec.getDimensionsSpec(),
new ByteEntity(input),
settableByteEntity,
metricDimension,
metricLabelPrefix,
resourceLabelPrefix
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,13 @@
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.indexing.seekablestream.SettableByteEntity;
import org.apache.druid.java.util.common.CloseableIterators;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.utils.CollectionUtils;

import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashSet;
Expand All @@ -55,14 +57,14 @@ public class OpenCensusProtobufReader implements InputEntityReader
private static final String VALUE_COLUMN = "value";

private final DimensionsSpec dimensionsSpec;
private final ByteEntity source;
private final SettableByteEntity<? extends ByteEntity> source;
private final String metricDimension;
private final String metricLabelPrefix;
private final String resourceLabelPrefix;

public OpenCensusProtobufReader(
DimensionsSpec dimensionsSpec,
ByteEntity source,
SettableByteEntity<? extends ByteEntity> source,
String metricDimension,
String metricLabelPrefix,
String resourceLabelPrefix
Expand Down Expand Up @@ -101,7 +103,12 @@ public InputRow next()
List<InputRow> readAsList()
{
try {
return parseMetric(Metric.parseFrom(source.getBuffer()));
ByteBuffer buffer = source.getEntity().getBuffer();
List<InputRow> rows = parseMetric(Metric.parseFrom(buffer));
// Explicitly move the position assuming that all the remaining bytes have been consumed because the protobuf
// parser does not update the position itself
buffer.position(buffer.limit());
return rows;
}
catch (InvalidProtocolBufferException e) {
throw new ParseException(null, e, "Protobuf message could not be parsed");
Expand Down
Loading

0 comments on commit cced118

Please sign in to comment.