-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
OBSDATA-483: Adapt OpenCensus and OpenTelemetry extensions to the introduction of SettableByteEntity #113
Conversation
Tested by adapting existing tests. Will check test coverage to see if we are still missing any paths in the relevant classes. |
ac49c60
to
4265dc3
Compare
10e598f
to
25ee854
Compare
778d601
to
dcb596b
Compare
A few more tests might be granted but taking this to a full review mode given that the changes are already being tested in staging. |
...main/java/org/apache/druid/data/input/opencensus/protobuf/OpenCensusProtobufInputFormat.java
Show resolved
Hide resolved
} | ||
|
||
List<InputRow> readAsList() | ||
{ | ||
try { | ||
return parseMetric(Metric.parseFrom(source.getBuffer())); | ||
return parseMetric(Metric.parseFrom(source.open())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since we have a ByteEntity, we can avoid the overhead of parsing the underlying ByteBuffer via InputStream and read it directly like we did before using
return parseMetric(Metric.parseFrom(source.open())); | |
return parseMetric(Metric.parseFrom(source.getEntity().getBuffer())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's been a few weeks but this was the big discovery that fixed the weird exception about the buffer being reused before it gets drained (fully read). That's how I had it before but this doesn't work currently.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, I am a bit worried about the performance implication. We go through all the trouble to pass this as a ByteEntity for that reason. I don't see how opening the buffer as an inputstream would prevent the problem you are describing. What exactly do you mean by "getting reused"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looking a bit at the protobuf code, it looks like it has some pretty sophisticated optimizations for ByteBuffers and will read the underlying array directly or bypass the JVM bounds check using unsafe if possible to get the best performance.
It is likely that this causes the ByteBuffer position to not advance and trigger the check here https://github.com/apache/druid/blob/6346b9561df7a92557acb51a56585084a0eb8633/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SettableByteEntity.java#L97-L99
A simple workaround might be to call buffer.position(buffer.position() + buffer.remaining())
when done parsing from the buffer.
While the additional check is nice, I think it breaks the existing "contract" in that previously there was no requirement to read out the buffer entirely,. I believe it was mainly added to avoid some kind of unexpected bug or concurrency issue with SettableByteEntity
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we have performance issues by calling open
here. If we look at the code of SettableByteEntity#open all it does is to check if the input stream is already open and if it doesn't it returns it. (I want to say I checked that before I write the call here but it's been a few days).
Looking there at SettableByteEntity#setEntity which I assume is called every time before a call to open
in our case, it seems that it does something similar to what you describe. It sets the buffer for that inputstream and resets the opened
flag. So, in a sense it re-uses the bytebuffer that is given via the entity
that is passed.
So not a traditional open
with its costs it seems (but I still might be missing something given my low familiarity with this code). Wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's still more efficient for the protobuf decoder to have access to the buffer, since it can skip ahead rather than having to consume all the bytes in a linear fashion from the inputstream. The InputStream will also incur the cost of bounds checks on the ByteBuffer, which the protobuf decoder currently goes out of its way to avoid.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the fact that SettableByteEntity seems geared towards returning an InputStream is something we'll probably want to improve upstream. Part of the reason we have ByteEntity is so that we can optimize access to the underlying buffer for performance. I don't think anything stands in our way here to get the underlying ByteBuffer now, and we can improve the API to make that easier in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see what you mean now. I didn't go into that depth with the protobuf calls (still a bit challenging to get to the non-sequential parsing you are referring to).
The main other example with protobuf beyond the classes we discuss here that I can find is in
ProtobufReader#intermediateRowIterator
and while it passes a ByteBuffer it first reads and converts the stream into a bytearray.
Iterators.singletonIterator(protobufBytesDecoder.parse(ByteBuffer.wrap(IOUtils.toByteArray(source.open()))))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed the code to set the buffer position explicitly to the limit. Let me know if the code now matches what you had in mind.
} | ||
catch (InvalidProtocolBufferException e) { | ||
catch (IOException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
using the suggestion above we can also avoid having to catch additional IOExceptions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See comment above re: why we need to use open
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like we revert this change now and only catch InvalidProtocolBufferException?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I reinstated InvalidProtocolBufferException
but that doesn't remove the IOException in sample
and read
methods.
@@ -217,7 +279,7 @@ private void addPointRows(Point point, Metric metric, LabelContext labelContext) | |||
} | |||
|
|||
@Override | |||
public CloseableIterator<InputRowListPlusRawValues> sample() | |||
public CloseableIterator<InputRowListPlusRawValues> sample() throws IOException |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this change is unnecessary if we read the ByteBuffer directly as suggested above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same answer here as above. I still need to use open
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like we can avoid throwing checked exceptions here now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is thrown by the read
method in the base interface. Doesn't have to do anything with the use of open
after all. If we want to use try-with-resources which is good IMO, we need to add this exception to the method declaration.
This is declared in:
Line 143 in 9400125
public CloseableIterator<InputRow> read() throws IOException |
while elsewhere the overriding method omits the exception in the signature.
The interface is: https://github.com/confluentinc/druid/blob/0.22.1-confluent/core/src/main/java/org/apache/druid/data/input/InputEntityReader.java
and hasn't changed in recent versions.
@@ -81,29 +97,75 @@ public OpenCensusProtobufReader( | |||
} | |||
|
|||
@Override | |||
public CloseableIterator<InputRow> read() | |||
public CloseableIterator<InputRow> read() throws IOException |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this change is unnecessary if we read the ByteBuffer directly as suggested below
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See my comment about still having to use open
. That's where this is coming from right?
...main/java/org/apache/druid/data/input/opencensus/protobuf/OpenCensusProtobufInputFormat.java
Show resolved
Hide resolved
metricDimension, | ||
null, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's bit odd to pass null here, since this would likely break if the downstream code ever tried to use this to write a column with a null name.
It looks like there is actually a bug in the existing OpenCensusProtobufInputFormat here since it does not actually ever use the valueDimension field when decoding OpenCensus and instead hard-codes it as value
I think we can fix that and have OpenCensusProtobufReader honor the valueDimension
setting so we don't have to pass null
here. This should not break any backwards compatibility, since we do not set the valueDimension
anywhere in our configs today, and the default for valueDimension is value
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll create a ticket since this seems a separate issue that given that it's not trivial I'd rather I don't fix as part of this blocker.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// assume InputEntity is always defined in a single classloader (the kafka-indexing-service classloader) | ||
// so we only have to look it up once. To be completely correct we should cache the method based on classloader | ||
MethodHandle getHeaderMethod = KafkaUtils.lookupGetHeaderMethod( | ||
source.getEntity().getClass().getClassLoader(), | ||
VERSION_HEADER_KEY | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as a follow-up improvement, we might want to cache the getHeaderMethod
so we don't have to do the lookup on every record. Once we have successfully looked up the method handle once we shouldn't have to do it again, since that's probably not a cheap call to make.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that this is called from the hybrid class I believe I can re-instate the check for null
getHeaderMethod
.
); | ||
case OPEN_CENSUS: | ||
default: | ||
return new OpenCensusProtobufReaderInternal(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rather than creating an internal reader for this particular case, what would you think of keeping this code path more symmetrical by moving the code that handles the format switching to a new class (e.g. HybridProtobufReader
), and then have that call either new OpenTelemetryMetricsProtobufReader(...)
or new OpenCensusProtobufReader(...)
so we don't pollute the OpenCensus reader code with otel-specific handling, and keep the Otel and OC readers clean counterparts of each-other.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. Wasn't sure initially if we want to promote one or the other reader more.
Done.
@@ -80,27 +78,15 @@ public OpenTelemetryMetricsProtobufReader( | |||
@Override | |||
public CloseableIterator<InputRow> read() | |||
{ | |||
Supplier<Iterator<InputRow>> supplier = Suppliers.memoize(() -> readAsList().iterator()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it looks like we might want to keep the memoizing supplier here, since it was added to avoid the scenario where exceptions are thrown before we actually call next(), see #99
It's a bit odd that the test we added in that PR didn't catch this regression.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It catches it locally now for me. Is it because we weren't running the PR build?
.../apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryMetricsProtobufInputFormat.java
Show resolved
Hide resolved
} | ||
|
||
List<InputRow> readAsList() | ||
{ | ||
try { | ||
return parseMetricsData(MetricsData.parseFrom(source.getBuffer())); | ||
return parseMetricsData(MetricsData.parseFrom(source.open())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same comment about using the buffer instead of the input stream
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the review @xvrl !
I believe I didn't miss anything with a change or an answer. Will have to deploy on staging too to confirm some of the changes that were rolled back. Didn't revert the call to open
because I'm pretty sure this will break things.
...main/java/org/apache/druid/data/input/opencensus/protobuf/OpenCensusProtobufInputFormat.java
Show resolved
Hide resolved
...main/java/org/apache/druid/data/input/opencensus/protobuf/OpenCensusProtobufInputFormat.java
Show resolved
Hide resolved
.../apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryMetricsProtobufInputFormat.java
Show resolved
Hide resolved
} | ||
|
||
List<InputRow> readAsList() | ||
{ | ||
try { | ||
return parseMetric(Metric.parseFrom(source.getBuffer())); | ||
return parseMetric(Metric.parseFrom(source.open())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's been a few weeks but this was the big discovery that fixed the weird exception about the buffer being reused before it gets drained (fully read). That's how I had it before but this doesn't work currently.
} | ||
catch (InvalidProtocolBufferException e) { | ||
catch (IOException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See comment above re: why we need to use open
metricDimension, | ||
null, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll create a ticket since this seems a separate issue that given that it's not trivial I'd rather I don't fix as part of this blocker.
@@ -81,29 +97,75 @@ public OpenCensusProtobufReader( | |||
} | |||
|
|||
@Override | |||
public CloseableIterator<InputRow> read() | |||
public CloseableIterator<InputRow> read() throws IOException |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See my comment about still having to use open
. That's where this is coming from right?
// assume InputEntity is always defined in a single classloader (the kafka-indexing-service classloader) | ||
// so we only have to look it up once. To be completely correct we should cache the method based on classloader | ||
MethodHandle getHeaderMethod = KafkaUtils.lookupGetHeaderMethod( | ||
source.getEntity().getClass().getClassLoader(), | ||
VERSION_HEADER_KEY | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that this is called from the hybrid class I believe I can re-instate the check for null
getHeaderMethod
.
@@ -217,7 +279,7 @@ private void addPointRows(Point point, Metric metric, LabelContext labelContext) | |||
} | |||
|
|||
@Override | |||
public CloseableIterator<InputRowListPlusRawValues> sample() | |||
public CloseableIterator<InputRowListPlusRawValues> sample() throws IOException |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same answer here as above. I still need to use open
@@ -80,27 +78,15 @@ public OpenTelemetryMetricsProtobufReader( | |||
@Override | |||
public CloseableIterator<InputRow> read() | |||
{ | |||
Supplier<Iterator<InputRow>> supplier = Suppliers.memoize(() -> readAsList().iterator()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It catches it locally now for me. Is it because we weren't running the PR build?
d95473b
to
e0c3831
Compare
a8a3723
to
f40ab69
Compare
Just rebased on top of 24.0.1 |
0251a5d
to
262f759
Compare
Rebased on top of Ran the tests for |
@@ -221,8 +228,10 @@ InputRow createRow(long timeUnixMilli, Map<String, Object> event) | |||
} | |||
|
|||
@Override | |||
public CloseableIterator<InputRowListPlusRawValues> sample() | |||
public CloseableIterator<InputRowListPlusRawValues> sample() throws IOException |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here we can avoid throwing IOException here now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
replied above. It's required by try-with-resources
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the follow up @xvrl
I added back InvalidProtocolBufferException
but left a comment on why IOException
can't be removed without an error if we are to keep try-with-resources
.
} | ||
catch (InvalidProtocolBufferException e) { | ||
catch (IOException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I reinstated InvalidProtocolBufferException
but that doesn't remove the IOException in sample
and read
methods.
@@ -217,7 +279,7 @@ private void addPointRows(Point point, Metric metric, LabelContext labelContext) | |||
} | |||
|
|||
@Override | |||
public CloseableIterator<InputRowListPlusRawValues> sample() | |||
public CloseableIterator<InputRowListPlusRawValues> sample() throws IOException |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is thrown by the read
method in the base interface. Doesn't have to do anything with the use of open
after all. If we want to use try-with-resources which is good IMO, we need to add this exception to the method declaration.
This is declared in:
Line 143 in 9400125
public CloseableIterator<InputRow> read() throws IOException |
while elsewhere the overriding method omits the exception in the signature.
The interface is: https://github.com/confluentinc/druid/blob/0.22.1-confluent/core/src/main/java/org/apache/druid/data/input/InputEntityReader.java
and hasn't changed in recent versions.
@@ -221,8 +228,10 @@ InputRow createRow(long timeUnixMilli, Map<String, Object> event) | |||
} | |||
|
|||
@Override | |||
public CloseableIterator<InputRowListPlusRawValues> sample() | |||
public CloseableIterator<InputRowListPlusRawValues> sample() throws IOException |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
replied above. It's required by try-with-resources
Happy to follow up on outstanding comments. |
.semaphore/semaphore.yml
Outdated
# Increase time limit to allow tests to run to completion | ||
execution_time_limit: | ||
hours: 2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm surprised it takes that long now, we used to be able to run tests much faster than that in the past, but we can leave that until we figure out what it taking so long
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A similar setting on another repo has a comment: # Increase time limit to occasionally allow for cache population.
Which could have been the case here as well, given that the successful run took 40min. Reverting this patch and we can reconsider if this starts happening more frequently.
return read().map(row -> InputRowListPlusRawValues.of(row, ((MapBasedInputRow) row).getEvent())); | ||
try (CloseableIterator<InputRow> iterator = read()) { | ||
return iterator.map(row -> InputRowListPlusRawValues.of(row, ((MapBasedInputRow) row).getEvent())); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are we sure we should use try-with-resources here? If we close the underlying iterator before returning the mapped one, wouldn't this cause issues? It seems it should be the responsibility of the called to close the returned iterator, not this method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right. Only that InputEntityIteratingReader which is one of the few users of this call doesn't close it as it should when it handles the IOException
.
I guess it doesn't blow up because of low coverage of sample methods (I found out manually on the UI). Reverting to unblock this for now and we can reconsider in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Btw, in the Hybrid class, IOException has to stay because we are calling the interface method (that has it)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch about InputEntityIteratingReader maybe you can file an issue upstream to raise that potential problem
…2 hours" This reverts commit 29a63d0.
0b5922c
to
ebc5909
Compare
…roduction of SettableByteEntity (#113) * OBSDATA-483: Adapt opencensus extension to the introduction of SettableByteEntity * OBSDATA-483: Adapt opentelemetry extension to the introduction of SettableByteEntity * OBSDATA-483: Decide which reader to instantiate on read between opencensus and opentelemetry * OBSDATA-483: Add logger config in opencensus tests * OBSDATA-483: Fix issue with opening the byte entity * OBSDATA-483: Instantiate the right iterator in every read request * OBSDATA-483: Add comments * OBSDATA-483: Address Xavier's comments * OBSDATA-483: Remove unused member fields * OBSDATA-483: Rename enum * OBSDATA-483: Fix trace log to actually print the argument * OBSDATA-483: Keep passing the underlying byte buffer and move its position explicitly * OBSDATA-483: Fix checkstyle issues * OBSDATA-483: Add back handling of InvalidProtocolBufferException * OBSDATA-483: Extend the semaphore workflow execution time to 2 hours * Revert "OBSDATA-483: Extend the semaphore workflow execution time to 2 hours" * OBSDATA-483: Don't close iterator in sample
…roduction of SettableByteEntity (#113) * OBSDATA-483: Adapt opencensus extension to the introduction of SettableByteEntity * OBSDATA-483: Adapt opentelemetry extension to the introduction of SettableByteEntity * OBSDATA-483: Decide which reader to instantiate on read between opencensus and opentelemetry * OBSDATA-483: Add logger config in opencensus tests * OBSDATA-483: Fix issue with opening the byte entity * OBSDATA-483: Instantiate the right iterator in every read request * OBSDATA-483: Add comments * OBSDATA-483: Address Xavier's comments * OBSDATA-483: Remove unused member fields * OBSDATA-483: Rename enum * OBSDATA-483: Fix trace log to actually print the argument * OBSDATA-483: Keep passing the underlying byte buffer and move its position explicitly * OBSDATA-483: Fix checkstyle issues * OBSDATA-483: Add back handling of InvalidProtocolBufferException * OBSDATA-483: Extend the semaphore workflow execution time to 2 hours * Revert "OBSDATA-483: Extend the semaphore workflow execution time to 2 hours" * OBSDATA-483: Don't close iterator in sample
* Bring dockerfile up to date * add opencensus extension * make checkstyle happy * bump pom version for opencensus extension * fix issues related to shading opencensus extension The extension packaging included both shaded and unshaded dependencies in the classpath. Shading should not be necessary in this case. Also excludes guava dependencies, which are already provided by Druid and don't need to be added to the extensions jars. * METRICS-516: Adding Resource labels in OpenCensus Extension * bump extension version to match release * confluent-extensions with custom transform specs (#9) * fix extraction transform serde (#10) * fix check-style build errors * setup semaphore build * add checkstyle * fix edge cases for internal topics * METRICS-1302: Added prefix support for resource labels. (#14) * METRICS-1302: Added prefix support for resource labels. * Addressed review comments. * Added and moved configs to ingestion spec, optimized code. * Addressed review comments * Updated metric dimesnion and other review comments * Flipped ternary operator * Moved from NullHandling to StringUtils. * Removed unnecessary HashMap. * Removed verbosity for instance variables. * Added getters for configs, labels for distribution metric. (#15) * Added getters for configs, labels for distribution metric. * Addressed review comments * Removed extra brackets in JsonProperty. * Default resource label prefix to blank - Backward Compatibility (#16) * update opencensus parent pom version * update opencensus extensions for 0.19.x * update parent pom version for confluent-extensions * Add the capability to speed up S3 uploads using AWS transfer manager * fix conflicting protobuf dependencies Align protobuf dependencies to use the main pom one * fix timestamp milliseconds in OpenCensusProtobufInputRowParser - fix millisecond resolution being dropped when converting timestamps - remove unnecessary conversion of ByteBuffer to ByteString - make test code a little more concise * improve OpenCensusProtobufInputRowParser performance (#25) - remove the need to parse timestamps into their own column - reduce the number of times we copy maps of labels - pre-size hashmaps and arrays when possible - use loops instead of streams in critical sections Combined these changes improve parsing performance by about 15% - added benchmark for reference * deprecate OpenCensusInputRowParser in favor of OpenCensusProtobufInputFormat (#26) InputRowParsers have been deprecated in favor or InputFormat. This implements the InputFormat version of the OpenCensus Protobuf parser, and deprecates the existing InputRowParser implementation. - the existing InputRowParser behavior is unchanged. - the InputFormat behaves like the InputRowParser, except for the default resource prefix which now defaults to "resource." instead of empty. - both implementations internally delegate to OpenCensusProtobufReader, which is covered by the existing InputRowParser tests. * add default query context and update timeout to 30 sec * Setting default query lane from druid console. * Giving more heap space for test jvm in semaphore config. * update parent pom version for Confluent extensions * Add Java 11 image build and remove unused MySQL images * fix docker image build failure caused by apache#10506 * switch build to use Java 11 by default * Fixed forbiddenapi error * Added phases before checks * Fixed * OpenTelemetry Emitter Extension (#47) Add OpenTelemetry Emitter Extension * Add dependency check (#59) * Add dependency check * Fix maven-dependency-plugin errors * Add --fail-at-end flag * Fix comment * METRICS-3663 OpenTelemetry Metrics InputFormat (#63) * An OpenTelemetry metrics extension * An InputFormat that is able to ingest metrics that are in the OpenTelemetry format * Unit tests for the InputFormat * Benchmarking Tests for the new OpenTelemetryMetricsProtobufInputFormat * update parent pom version for Confluent extensions * Adding getRequiredColumns() in our custom transforms. * Updating shade-plugin version in opentelemetry-emitter. * Removing the unwanted maven-shade-plugin change. * Adding JDK version to DockerFile and removing unwanted executions from main pom.xml file. (#75) * Passing JDK_VERSION as build args to docker build. (#76) * Make the OpenTelemetry InputFormat More Flexible to Metric, Value and Attribute Types (#67) * Hybrid OpenCensusProtobufInputFormat in opencensus-extensions (#69) * Support OpenTelemetry payloads in OpenCensusProtobufInputFormat Support reading mixed OpenTelemetry and OpenCensus topics based on Kafka version header * workaround classloader isolation Workaround classloader isolation by using method handles to get access to KafkaRecordEntity related methods and check record headers Co-authored-by: Xavier Léauté <xl+github@xvrl.net> * Modify the OpenTelemetry ProtobufReader's Handling of Attribute Types (#77) * Only handle INT_VALUE, BOOL_VALUE, DOUBLE_VALUE and STRING_VALUE and return null otherwise * fix wrong class in the DruidModule service provider definition * Fixing Opencensus extension build failures. * fix dependency check (#79) * fix OpenTelemetry extension module service definition (#73) (#81) * Setting default refresh value for task view as none. (#88) As part of this we added a default parameter that can be passed for refresh widget to avoid every refresh widget getting affected. * go/codeowners: Generate CODEOWNERS [ci skip] (#87) * fixes in pom.xml files * adapt to new input argument in ParseException * adapt to the new constructor for DimensionsSpec * update obs-data team as codeownders (#98) * [OBSDATA-334] Patch opencensus/opentelemetry parse exception (#99) * [METRICS-4487] add obs-oncall as codeowners (#101) * DP-8085 - Migrate to Sempahore self-hosted agent (#100) * [OBSDATA-334] Patch opentelemetry IllegalStateException for unsupported metric types (#103) * Fixing checkstyle issues in openncensus and opentelemetry extensions. (#109) * Remove SNAPSHOT from versions in confluent pom files * Fixing CI/CD in 24.0.0 upgrade branch (#116) * OBSDATA-440 Adding SegmentMetadataEvent and publishing them via KafkaSegmentMetadataEmitter (#117) * Change unsupported type message from WARN to TRACE (#119) * Use place holder for logging invalid format (#120) Use place holder for logging invalid format for better performance * DP-9370 - use cc-service-bot to manage Semaphore project (#118) * chore: update repo semaphore project * DP-9632: remediate duplicate Semaphore workflows (#121) Only build the master branch and the `x.x.x-confluent` Druid release branches by default * chore: update repo semaphore project * Bump version to 24.0.1 in confluent extensions after rebasing on top of druid-24.0.1 * Bump version to 24.0.2 in confluent extensions after rebasing on top of druid-24.0.2 * OBSDATA-483: Adapt OpenCensus and OpenTelemetry extensions to the introduction of SettableByteEntity (#113) * OBSDATA-483: Adapt opencensus extension to the introduction of SettableByteEntity * OBSDATA-483: Adapt opentelemetry extension to the introduction of SettableByteEntity * OBSDATA-483: Decide which reader to instantiate on read between opencensus and opentelemetry * OBSDATA-483: Add logger config in opencensus tests * OBSDATA-483: Fix issue with opening the byte entity * OBSDATA-483: Instantiate the right iterator in every read request * OBSDATA-483: Add comments * OBSDATA-483: Address Xavier's comments * OBSDATA-483: Remove unused member fields * OBSDATA-483: Rename enum * OBSDATA-483: Fix trace log to actually print the argument * OBSDATA-483: Keep passing the underlying byte buffer and move its position explicitly * OBSDATA-483: Fix checkstyle issues * OBSDATA-483: Add back handling of InvalidProtocolBufferException * OBSDATA-483: Extend the semaphore workflow execution time to 2 hours * Revert "OBSDATA-483: Extend the semaphore workflow execution time to 2 hours" * OBSDATA-483: Don't close iterator in sample * chore: update repo semaphore project (#124) Co-authored-by: Confluent Jenkins Bot <jenkins@confluent.io> * [Metrics-4776] OpenTelemetry Extensions - Upgrade otel-proto version (#125) * Upgrade proto version * Fix names and tests - Upgrade version * Fix open census tests * Fix test name * Move to Java 17 (#128) * bumping version of java to 17 for semaphore test run * bumping java version to 17 as per https://github.com/confluentinc/druid/pull/127/files * After speaking with Xavier, made these changes * Trying to add required flags to run druid using java 17 (#130) * Use apache-jar-resource-bundle:1.5 instead of 1.5-SNAPSHOT (apache#14054) (#131) Co-authored-by: Tejaswini Bandlamudi <96047043+tejaswini-imply@users.noreply.github.com> * update parent pom version for Confluent extensions * Fix CI/CD while upgrading to Druid 25.0.0 * Fix jest and prettify checks * Adding SegmentMetadataEvent and publishing them via KafkaEmitter (apache#14281) (#139) (cherry picked from commit 4ff6026) * Downgrade busybox version to fix k8s IT (apache#14518) (#143) Co-authored-by: Rishabh Singh <6513075+findingrish@users.noreply.github.com> * Passing TARGETARCH in build_args to Docker build (#144) * Downgrade busybox version to fix k8s IT (apache#14518) * Add TargetArch needed in distribution/Dockerfile * Fix linting --------- Co-authored-by: Rishabh Singh <6513075+findingrish@users.noreply.github.com> * remove docker-maven-plugin and Dockerfile customizations - remove our custom profile to build using dockerfile-maven-plugin, since that plugin is no longer maintained. - remove our custom Dockerfile patches since we can now use the BUILD_FROM_SOURCE argument to decide if we want to build the tarball outside of docker. * Revert "Trying to add required flags to run druid using java 17 (#130)" (#147) This reverts our custom patch from commit 7cf2de4. The necessary Java 17 exports are now included as part of 25.0.0 in https://github.com/confluentinc/druid/blob/25.0.0-confluent/examples/bin/run-java#L27-L56 which is now called by the druid.sh docker startup script as well. The exports for java.base/jdk.internal.perf=ALL-UNNAMED are no longer needed since apache#12481 (comment) * removing use of semaphore cache as the public semaphore will not have cache (#145) (#148) * utilize workflow level caching to publish the built artifacts to the tests. otherwise turn off all caching of .m2 etc * remove .m2/settings.xml to ensure build passes without internal artifact store --------- Co-authored-by: Jeremy Kuhnash <111304461+jkuhnashconfluent@users.noreply.github.com> * OBSDATA-1365: add support for debian based base images (#149) * Debeian based base image upgrade * updated suggestions * Update Dockerfile * minor correction --------- * Revert "fix KafkaInputFormat with nested columns by delegating to underlying inputRow map instead of eagerly copying (apache#13406) (apache#13447)" (#155) This reverts commit 23500a4. * Filter Out Metrics with NoRecordedValue Flag Set (#157) Metrics that contain the NoRecordedValue Flag are being written to Druid with a 0 value. We should properly handle them in the backend * memcached cache: switch to AWS elasticache-java-cluster-client and add TLS support (apache#14827) (#159) This PR updates the library used for Memcached client to AWS Elasticache Client : https://github.com/awslabs/aws-elasticache-cluster-client-memcached-for-java This enables us to use the option of encrypting data in transit: Amazon ElastiCache for Memcached now supports encryption of data in transit For clusters running the Memcached engine, ElastiCache supports Auto Discovery—the ability for client programs to automatically identify all of the nodes in a cache cluster, and to initiate and maintain connections to all of these nodes. Benefits of Auto Discovery - Amazon ElastiCache AWS has forked spymemcached 2.12.1, and has since added all the patches included in 2.12.2 and 2.12.3 as part of the 1.2.0 release. So, this can now be considered as an equivalent drop-in replacement. GitHub - awslabs/aws-elasticache-cluster-client-memcached-for-java: Amazon ElastiCache Cluster Client for Java - enhanced library to connect to ElastiCache clusters. https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/elasticache/AmazonElastiCacheClient.html#AmazonElastiCacheClient-- How to enable TLS with Elasticache On server side: https://docs.aws.amazon.com/AmazonElastiCache/latest/mem-ug/in-transit-encryption-mc.html#in-transit-encryption-enable-existing-mc On client side: GitHub - awslabs/aws-elasticache-cluster-client-memcached-for-java: Amazon ElastiCache Cluster Client for Java - enhanced library to connect to ElastiCache clusters. * PRSP-3603 Bump org.xerial.snappy:snappy-java to latest version to address CVEs (#164) * Bump org.xerial.snappy:snappy-java from 1.1.8.4 to 1.1.10.5 * Add licenses * [backport] Upgrade Avro to latest version (apache#14440) (#162) Upgraded Avro to 1.11.1 (cherry picked from commit 72cf91f) Co-authored-by: Tejaswini Bandlamudi <96047043+tejaswini-imply@users.noreply.github.com> * Revert "PRSP-3603 Bump org.xerial.snappy:snappy-java to latest version to address CVEs (#164)" (#166) This reverts commit 185d655. * Upgrade Avro to latest version to address CVEs (#167) * OBSDATA-1697: Do not build extensions not loaded by cc-druid (#152) Create new profiles to enable only the used extensions during the build. This helps address CVEs that were being flagged due to the unused extensions. --------- Co-authored-by: Keerthana Srikanth <ksrikanth@confluent.io> * update parent pom version for Confluent extensions * Add value to child POMs * Upgrade dependencies to match upstream v28 & checkstyle fix * KafkaEmitter changes * Modifying RowFunction interface * Fix test cases * Fix test cases * Fix test cases * Fix test cases * upgrade dependency as per druid 28 * Removing unnecessary change * Change Maven repository URL * Add Druid.xml * Update tag name to match version * Fix dist-used profile to use Hadoop compile version (#173) * Changes based on PR comments * Fix refreshButton * Use onRefresh only once * Fix snapshot so that the test passes --------- Co-authored-by: Travis Thompson <trthomps@confluent.io> Co-authored-by: Sumit Arrawatia <sumit.arrawatia@gmail.com> Co-authored-by: Xavier Léauté <xvrl@apache.org> Co-authored-by: Apoorv Mittal <amittal@confluent.io> Co-authored-by: Xavier Léauté <xavier@confluent.io> Co-authored-by: Huajun Qin <hqin@yahoo.com> Co-authored-by: Huajun Qin <huajun@confluent.io> Co-authored-by: CodingParsley <nayachen98@gmail.com> Co-authored-by: Harini Rajendran <hrajendran@confluent.io> Co-authored-by: Ivan Vankovich <ivankovich@c02yt5a0lvdr.attlocal.net> Co-authored-by: Ivan Vankovich <ivankovich@confluent.io> Co-authored-by: Marcus Greer <marcusgreer96@gmail.com> Co-authored-by: Harini Rajendran <harini.rajendran@yahoo.com> Co-authored-by: Yun Fu <fuyun12345@gmail.com> Co-authored-by: Xavier Léauté <xl+github@xvrl.net> Co-authored-by: lokesh-lingarajan <llingarajan@confluent.io> Co-authored-by: Luke Young <91491244+lyoung-confluent@users.noreply.github.com> Co-authored-by: Konstantine Karantasis <konstantine@confluent.io> Co-authored-by: Naya Chen <nchen@confluent.io> Co-authored-by: nlou9 <39046184+nlou9@users.noreply.github.com> Co-authored-by: Corey Christous <cchristous@gmail.com> Co-authored-by: Confluent Jenkins Bot <jenkins@confluent.io> Co-authored-by: ConfluentTools <96149134+ConfluentTools@users.noreply.github.com> Co-authored-by: Kamal Narayan <119908061+kamal-narayan@users.noreply.github.com> Co-authored-by: David Steere <hampycapper@msn.com> Co-authored-by: Tejaswini Bandlamudi <96047043+tejaswini-imply@users.noreply.github.com> Co-authored-by: Ghazanfar-CFLT <mghazanfar@confluent.io> Co-authored-by: Rishabh Singh <6513075+findingrish@users.noreply.github.com> Co-authored-by: Jeremy Kuhnash <111304461+jkuhnashconfluent@users.noreply.github.com> Co-authored-by: Hardik Bajaj <58038410+hardikbajaj@users.noreply.github.com> Co-authored-by: Michael Li <mli@confluent.io> Co-authored-by: Keerthana Srikanth <ksrikanth@confluent.io> Co-authored-by: Jan Werner <105367074+janjwerner-confluent@users.noreply.github.com> Co-authored-by: mustajibmk <120099779+mustajibmk@users.noreply.github.com> Co-authored-by: Pankaj kumar <pkumar@confluent.io>
…roduction of SettableByteEntity (#113) * OBSDATA-483: Adapt opencensus extension to the introduction of SettableByteEntity * OBSDATA-483: Adapt opentelemetry extension to the introduction of SettableByteEntity * OBSDATA-483: Decide which reader to instantiate on read between opencensus and opentelemetry * OBSDATA-483: Add logger config in opencensus tests * OBSDATA-483: Fix issue with opening the byte entity * OBSDATA-483: Instantiate the right iterator in every read request * OBSDATA-483: Add comments * OBSDATA-483: Address Xavier's comments * OBSDATA-483: Remove unused member fields * OBSDATA-483: Rename enum * OBSDATA-483: Fix trace log to actually print the argument * OBSDATA-483: Keep passing the underlying byte buffer and move its position explicitly * OBSDATA-483: Fix checkstyle issues * OBSDATA-483: Add back handling of InvalidProtocolBufferException * OBSDATA-483: Extend the semaphore workflow execution time to 2 hours * Revert "OBSDATA-483: Extend the semaphore workflow execution time to 2 hours" * OBSDATA-483: Don't close iterator in sample
…roduction of SettableByteEntity (#113) * OBSDATA-483: Adapt opencensus extension to the introduction of SettableByteEntity * OBSDATA-483: Adapt opentelemetry extension to the introduction of SettableByteEntity * OBSDATA-483: Decide which reader to instantiate on read between opencensus and opentelemetry * OBSDATA-483: Add logger config in opencensus tests * OBSDATA-483: Fix issue with opening the byte entity * OBSDATA-483: Instantiate the right iterator in every read request * OBSDATA-483: Add comments * OBSDATA-483: Address Xavier's comments * OBSDATA-483: Remove unused member fields * OBSDATA-483: Rename enum * OBSDATA-483: Fix trace log to actually print the argument * OBSDATA-483: Keep passing the underlying byte buffer and move its position explicitly * OBSDATA-483: Fix checkstyle issues * OBSDATA-483: Add back handling of InvalidProtocolBufferException * OBSDATA-483: Extend the semaphore workflow execution time to 2 hours * Revert "OBSDATA-483: Extend the semaphore workflow execution time to 2 hours" * OBSDATA-483: Don't close iterator in sample
Fixes OBSDATA-483
Description
In our deployments, metrics emitted from a Kafka topic can be a mix of formats between OpenTelemetry and OpenCensus formats. Up until version 24.0.0, the entity passed as input to the format readers could be cast as
ByteEntity
and have the protobuf message parsed directly from theByteBuffer
of this entity. However with druid-24.0.0 a wrapper classSettableByteEntity
is introduced and now the classes in the opencensus and opentelemtry extensions need to be adapted to handleSettableByteEntity
objects that use composition instead of inheritance in relation toByteEntity
objects.Key changed/added classes in this PR
HybridProtobufReader
is introduced that can delegate reading and parsing of the input entity to appropriate reader (OpenCensus or OpenTelemetry)KafkaRecordEntity
directly and not aSettableByteEntity
the code is adapted for now to distinguish between the two inputs by checking the instance of the input (a potential near future fix is to adapt samplers to also useSettableByteEntity
)ByteBuffer
that is passed to it as an arguemnt.This PR has: