Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer with API version set to V0_10_0_0 fails to consume compressed topics #720

Closed
dynamix opened this issue Aug 8, 2016 · 4 comments · Fixed by #735
Closed

Consumer with API version set to V0_10_0_0 fails to consume compressed topics #720

dynamix opened this issue Aug 8, 2016 · 4 comments · Fixed by #735

Comments

@dynamix
Copy link

dynamix commented Aug 8, 2016

Versions
  • Sarama Version: master/e8020bffa1cae3ddf8068cb03416fc53d1627f3d
  • Kafka Version: 0.10
  • Go Version: 1.6
Configuration
Logs
2016/08/08 15:21:57 Initializing new client
2016/08/08 15:21:57 client/metadata fetching metadata for all topics from broker 192.168.0.132:9092
2016/08/08 15:21:57 Connected to broker at 192.168.0.132:9092 (unregistered)
2016/08/08 15:21:57 client/brokers registered new broker #0 at 192.168.0.132:9092
2016/08/08 15:21:57 Successfully initialized new client
2016/08/08 15:21:57 producer/broker/0 starting up
2016/08/08 15:21:57 producer/broker/0 state change to [open] on test/0
2016/08/08 15:21:57 Connected to broker at 192.168.0.132:9092 (registered as #0)
2016/08/08 15:21:57 Initializing new client
2016/08/08 15:21:57 client/metadata fetching metadata for all topics from broker 192.168.0.132:9092
2016/08/08 15:21:57 Connected to broker at 192.168.0.132:9092 (unregistered)
2016/08/08 15:21:57 client/brokers registered new broker #0 at 192.168.0.132:9092
2016/08/08 15:21:57 Successfully initialized new client
2016/08/08 15:21:57 Connected to broker at 192.168.0.132:9092 (registered as #0)
2016/08/08 15:21:57 consumer/broker/0 added subscription to test/0
2016/08/08 15:21:57 kafka: error while consuming test/0: kafka: response did not contain all the expected topic/partition blocks
2016/08/08 15:21:57 consumer/broker/0 abandoned subscription to test/0 because kafka: response did not contain all the expected topic/partition blocks
2016/08/08 15:21:59 consumer/test/0 finding new broker
2016/08/08 15:21:59 client/metadata fetching metadata for [test] from broker 192.168.0.132:9092
2016/08/08 15:21:59 consumer/broker/0 added subscription to test/0
2016/08/08 15:21:59 kafka: error while consuming test/0: kafka: response did not contain all the expected topic/partition blocks
2016/08/08 15:21:59 consumer/broker/0 abandoned subscription to test/0 because kafka: response did not contain all the expected topic/partition blocks
2016/08/08 15:22:01 consumer/test/0 finding new broker
2016/08/08 15:22:01 kafka: error while consuming test/0: kafka: response did not contain all the expected topic/partition blocks
2016/08/08 15:22:01 consumer/broker/0 abandoned subscription to test/0 because kafka: response did not contain all the expected topic/partition blocks
2016/08/08 15:22:03 consumer/test/0 finding new broker
2016/08/08 15:22:03 kafka: error while consuming test/0: kafka: response did not contain all the expected topic/partition blocks
2016/08/08 15:22:03 consumer/broker/0 abandoned subscription to test/0 because kafka: response did not contain all the expected topic/partition blocks
2016/08/08 15:22:05 consumer/test/0 finding new broker
2016/08/08 15:22:05 kafka: error while consuming test/0: kafka: response did not contain all the expected topic/partition blocks
2016/08/08 15:22:05 consumer/broker/0 abandoned subscription to test/0 because kafka: response did not contain all the expected topic/partition blocks

(removed the "ClientID set to sarama" entries)

Problem Description

Code that reproduces the issue (broker ip should be set to a proper value): https://gist.github.com/dynamix/e7682ff7a76d28bd500ebee52c036b28

A consumer where Config.Version is set to V0_10_0_0 does not properly consume compressed topics. If the Config.Version is set to any lower value - everything works as it should.

Looking at what kafka returns - the offsets of messages inside a compressed message are relative and not absolute for 0.10+. The Java Consumer has a special handling for relative offsets: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java#L242-L318

Attempted fix (not sure if this handles all cases): #721

@eapache
Copy link
Contributor

eapache commented Aug 8, 2016

Is there non-code documentation of these relative offsets? I don't remember seeing anything about this in the release notes or on https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol

@dynamix
Copy link
Author

dynamix commented Aug 8, 2016

None that I could find. The documentation in regards to compression does not mention this however the Java client implements it (I only checked the Java client). (Seems I did a pretty bad job at finding this considering KIP-31 :-()

@eapache
Copy link
Contributor

eapache commented Aug 9, 2016

@edenhill does rdkafka handle this properly? Is there documentation on this change anywhere?

eapache added a commit that referenced this issue Aug 23, 2016
New message format does something weird with these. See
https://cwiki.apache.org/confluence/display/KAFKA/KIP-31+-+Move+to+relative+offsets+in+compressed+message+sets

Fixes #720. Supercedes #721. Thanks to @dynamix for the first draft of the fix.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants