This repository has been archived by the owner on Jan 24, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 136
[FEATURE] Add fetch down converted when entryFormat=kafka #660
Merged
BewareMyPower
merged 20 commits into
streamnative:master
from
wenbingshen:addFetchDownConverted
Aug 19, 2021
Merged
[FEATURE] Add fetch down converted when entryFormat=kafka #660
BewareMyPower
merged 20 commits into
streamnative:master
from
wenbingshen:addFetchDownConverted
Aug 19, 2021
Conversation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
…records written by the low-version producer with magic=1
…et to MemoryRecordsBuilder
…s read low version magic, and support the down conversion when low version consumer client reads high version magic data
wenbingshen
changed the title
[Don't merge only test for shade ] Add fetch down converted when entryFormat=kafka
[FEATURE] Add fetch down converted when entryFormat=kafka
Aug 18, 2021
@BewareMyPower For the unit test, I reused BasicEndToEndTestBase, because after adding the kafka-0-10 module, the lower version producer will write a message of magic=1. If the error mentioned in the pr is not fixed, the higher version consumer will report an error and the unit test will fail. For down-conversion, due to some known reasons, the FETCH version is still limited to 4, so after I removed 4, the test lower version consumers can successfully read the higher version data, but I did not release the restriction on the FETCH version=4 to this pr. PTAL. |
@BewareMyPower Flaky test seems to be related to the new code citations I introduced after merging the master branch. I will investigate, and please review this pr first. |
kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/KafkaEntryFormatter.java
Outdated
Show resolved
Hide resolved
BewareMyPower
suggested changes
Aug 19, 2021
kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/KafkaEntryFormatter.java
Outdated
Show resolved
Hide resolved
wenbingshen
commented
Aug 19, 2021
kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/KafkaEntryFormatter.java
Show resolved
Hide resolved
BewareMyPower
approved these changes
Aug 19, 2021
wangjialing218
pushed a commit
to wangjialing218/kop
that referenced
this pull request
Aug 24, 2021
…ve#660) Fixes streamnative#656 ### Motivation When entryFormat=kafka If SASL/PLAIN authentication is not turned on, so kop does not limit the version of the producer client. When a lower version producer writes a message of magic=0 or magic=1, it will cause the higher version consumer to check the record error and the consumer client will be down. ### Modifications When entryFormat=kafka, increase KafkaEntryFormatter.decode to check batch.magic and client magic of kafka records. When batch.magic is higher than client magic, perform down conversion. This pr is part of the support for the lower version of Kafka less than 0.11.x. Since the FETCH version is still 4, it is also a bugfix for the higher version consumer to read the lower version of the magic record verification error.
BewareMyPower
pushed a commit
that referenced
this pull request
Aug 25, 2021
Fixes #656 ### Motivation When entryFormat=kafka If SASL/PLAIN authentication is not turned on, so kop does not limit the version of the producer client. When a lower version producer writes a message of magic=0 or magic=1, it will cause the higher version consumer to check the record error and the consumer client will be down. ### Modifications When entryFormat=kafka, increase KafkaEntryFormatter.decode to check batch.magic and client magic of kafka records. When batch.magic is higher than client magic, perform down conversion. This pr is part of the support for the lower version of Kafka less than 0.11.x. Since the FETCH version is still 4, it is also a bugfix for the higher version consumer to read the lower version of the magic record verification error.
Sign up for free
to subscribe to this conversation on GitHub.
Already have an account?
Sign in.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Fixes #656
Motivation
When entryFormat=kafka
If SASL/PLAIN authentication is not turned on, so kop does not limit the version of the producer client. When a lower version producer writes a message of magic=0 or magic=1, it will cause the higher version consumer to check the record error and the consumer client will be down.
Modifications
When entryFormat=kafka, increase KafkaEntryFormatter.decode to check batch.magic and client magic of kafka records. When batch.magic is higher than client magic, perform down conversion.
This pr is part of the support for the lower version of Kafka less than 0.11.x. Since the FETCH version is still 4, it is also a bugfix for the higher version consumer to read the lower version of the magic record verification error.