Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Zstd version is incompatible with the version of Zstd used by the Kafka Indexing service #11816

Closed
rpless opened this issue Oct 19, 2021 · 8 comments

Comments

@rpless
Copy link
Contributor

rpless commented Oct 19, 2021

Affected Version

This issue affects 0.22.0 but also probably affects 0.21.x and 0.20.x.

Description

Currently Druid relies on version 1.3.3-1 of zstd. However, the Kafka client used in the Kafka Indexing Service relies on version 1.5.0-4.

When you attempt to ingest data from Kafka that has been compressed with zstd it yields the following exception and fails the indexing task:

java.lang.NoClassDefFoundError: com/github/luben/zstd/ZstdInputStreamNoFinalizer
	at org.apache.kafka.common.record.CompressionType$5.wrapForInput(CompressionType.java:127)
	at org.apache.kafka.common.record.DefaultRecordBatch.recordInputStream(DefaultRecordBatch.java:262)
	at org.apache.kafka.common.record.DefaultRecordBatch.compressedIterator(DefaultRecordBatch.java:266)
	at org.apache.kafka.common.record.DefaultRecordBatch.streamingIterator(DefaultRecordBatch.java:350)
	at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.nextFetchedRecord(Fetcher.java:1575)
	at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1612)
	at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1453)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:686)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:637)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1303)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1237)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)
	at org.apache.druid.indexing.kafka.KafkaRecordSupplier.poll(KafkaRecordSupplier.java:128)
	at org.apache.druid.indexing.kafka.IncrementalPublishingKafkaIndexTaskRunner.getRecords(IncrementalPublishingKafkaIndexTaskRunner.java:95)
	at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.runInternal(SeekableStreamIndexTaskRunner.java:599)
	at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.run(SeekableStreamIndexTaskRunner.java:263)
	at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask.run(SeekableStreamIndexTask.java:146)
	at org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:471)
	at org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:443)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: com.github.luben.zstd.ZstdInputStreamNoFinalizer
	at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
	... 23 more

I have manually linked the 1.5.0-4 zstd jar into a Druid cluster and this fixes the issue with the Kafka indexing service, however I also had to replace this jar in both the parquet and avro extensions and I have not yet verified that they still work.

@bananaaggle
Copy link
Contributor

bananaaggle commented Dec 23, 2021

Hi! I know what happened! I used to test ZSTD on Kafka. It was caused by commit 10123. You can see ZstdFactory.java, there is return new BufferedOutputStream(new ZstdOutputStreamNoFinalizer(buffer, RecyclingBufferPool.INSTANCE), 16 * 1024); and
return new BufferedInputStream(new ZstdInputStreamNoFinalizer(new ByteBufferInputStream(buffer), RecyclingBufferPool.INSTANCE), 16 * 1024); It used a reused buffer to reduce GC times. This buffer is a new feature in zstd 1.4.5-8. This commit is released in Kafka 2.8.0. If Druid uses any Kafka clients after 2.8.0, it should update zstd to 1.4.5-8.

@FrankChen021
Copy link
Member

@rpless Have you verified the newer Zstd version works with parquet and avro extensions?

I think we also need to add some IT test cases that produce Zstd compressed data to cover this edge case.

@bananaaggle
Copy link
Contributor

If compile can pass, I think the risk is low. Because it means API is not changed.

@rpless
Copy link
Contributor Author

rpless commented Jan 5, 2022

@FrankChen021 I have not had a chance to test those extensions. I did look quickly and I think I may have been wrong about avro depending on it, so it may only be the parquet extension that needs a test.

@toughrogrammer
Copy link

toughrogrammer commented Mar 29, 2022

Hi, I got same error.
I used 0.20.0 version of druid, and I got error when I migrate to 0.22.1 version.
I'm using zstd compression for kafka topic.

Here is sample error log.

java.lang.NoClassDefFoundError: com/github/luben/zstd/ZstdOutputStreamNoFinalizer
	at org.apache.kafka.common.record.CompressionType$5.wrapForInput(CompressionType.java:127) ~[?:?]
	at org.apache.kafka.common.record.DefaultRecordBatch.recordInputStream(DefaultRecordBatch.java:262) ~[?:?]
	at org.apache.kafka.common.record.DefaultRecordBatch.compressedIterator(DefaultRecordBatch.java:266) ~[?:?]
	at org.apache.kafka.common.record.DefaultRecordBatch.streamingIterator(DefaultRecordBatch.java:350) ~[?:?]
	at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.nextFetchedRecord(Fetcher.java:1575) ~[?:?]
	at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1612) ~[?:?]
	at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1453) ~[?:?]
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:686) ~[?:?]
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:637) ~[?:?]
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1303) ~[?:?]
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1237) ~[?:?]
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210) ~[?:?]
	at org.apache.druid.indexing.kafka.KafkaRecordSupplier.poll(KafkaRecordSupplier.java:128) ~[?:?]
	at org.apache.druid.indexing.kafka.IncrementalPublishingKafkaIndexTaskRunner.getRecords(IncrementalPublishingKafkaIndexTaskRunner.java:95) ~[?:?]
	at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.runInternal(SeekableStreamIndexTaskRunner.java:599) ~[druid-indexing-service-0.22.1.jar:0.22.1]
	at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.run(SeekableStreamIndexTaskRunner.java:263) ~[druid-indexing-service-0.22.1.jar:0.22.1]
	at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask.run(SeekableStreamIndexTask.java:146) ~[druid-indexing-service-0.22.1.jar:0.22.1]
	at org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:471) [druid-indexing-service-0.22.1.jar:0.22.1]
	at org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:443) [druid-indexing-service-0.22.1.jar:0.22.1]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_312]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_312]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_312]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_312]
Caused by: java.lang.ClassNotFoundException: com.github.luben.zstd.ZstdOutputStreamNoFinalizer
	at java.net.URLClassLoader.findClass(URLClassLoader.java:387) ~[?:1.8.0_312]
	at java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[?:1.8.0_312]
	at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ~[?:1.8.0_312]
	... 23 more
Error!
java.lang.RuntimeException: java.util.concurrent.ExecutionException: java.lang.NoClassDefFoundError: com/github/luben/zstd/ZstdOutputStreamNoFinalizer
	at org.apache.druid.indexing.worker.executor.ExecutorLifecycle.join(ExecutorLifecycle.java:215)
	at org.apache.druid.cli.CliPeon.run(CliPeon.java:312)
	at org.apache.druid.cli.Main.main(Main.java:113)
Caused by: java.util.concurrent.ExecutionException: java.lang.NoClassDefFoundError: com/github/luben/zstd/ZstdOutputStreamNoFinalizer
	at com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:299)
	at com.google.common.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:286)
	at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)
	at org.apache.druid.indexing.worker.executor.ExecutorLifecycle.join(ExecutorLifecycle.java:212)
	... 2 more
Caused by: java.lang.NoClassDefFoundError: com/github/luben/zstd/ZstdOutputStreamNoFinalizer
	at org.apache.kafka.common.record.CompressionType$5.wrapForInput(CompressionType.java:127)
	at org.apache.kafka.common.record.DefaultRecordBatch.recordInputStream(DefaultRecordBatch.java:262)
	at org.apache.kafka.common.record.DefaultRecordBatch.compressedIterator(DefaultRecordBatch.java:266)
	at org.apache.kafka.common.record.DefaultRecordBatch.streamingIterator(DefaultRecordBatch.java:350)
	at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.nextFetchedRecord(Fetcher.java:1575)
	at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1612)
	at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1453)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:686)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:637)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1303)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1237)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)
	at org.apache.druid.indexing.kafka.KafkaRecordSupplier.poll(KafkaRecordSupplier.java:128)
	at org.apache.druid.indexing.kafka.IncrementalPublishingKafkaIndexTaskRunner.getRecords(IncrementalPublishingKafkaIndexTaskRunner.java:95)
	at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.runInternal(SeekableStreamIndexTaskRunner.java:599)
	at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.run(SeekableStreamIndexTaskRunner.java:263)
	at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask.run(SeekableStreamIndexTask.java:146)
	at org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:471)
	at org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:443)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: com.github.luben.zstd.ZstdOutputStreamNoFinalizer
	at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
	... 23 more

@Green-Angry-Bird
Copy link

This is not an issue in 0.20.1. We upgraded from 0.20.1 to 0.22.1 and started getting this issue.

@Green-Angry-Bird
Copy link

Green-Angry-Bird commented Apr 12, 2022

Current workaround:

For each node in the cluster, replace zstd-jni-1.3.3-1.jar found in lib/ with zstd-jni-1.5.0-4.jar (which can be found in Maven Central). Be sure to restart the nodes to pull in the new jar file.

@abhishekagarwal87
Copy link
Contributor

it was fixed by #12408

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

6 participants