-
Notifications
You must be signed in to change notification settings - Fork 136
Update Kafka wire protocol to 3.4.0 and implement KIP-699 and KIP-709 #1981
Update Kafka wire protocol to 3.4.0 and implement KIP-699 and KIP-709 #1981
Conversation
Codecov Report
@@ Coverage Diff @@
## master #1981 +/- ##
============================================
- Coverage 17.26% 17.07% -0.19%
- Complexity 726 728 +2
============================================
Files 190 191 +1
Lines 14036 14218 +182
Branches 1318 1329 +11
============================================
+ Hits 2423 2428 +5
- Misses 11437 11614 +177
Partials 176 176
|
3a4e749
to
930e502
Compare
fce6f45
to
325f515
Compare
325f515
to
97b819f
Compare
.setPort(result.node.port()) | ||
.setErrorCode(result.error.code()) | ||
.setErrorMessage(result.error.message()) | ||
.setKey(coordinatorKey)); | ||
}); | ||
} else if (request.data().keyType() == FindCoordinatorRequest.CoordinatorType.GROUP.id()) { | ||
authorize(AclOperation.DESCRIBE, Resource.of(ResourceType.GROUP, request.data().key())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
authorize(AclOperation.DESCRIBE, Resource.of(ResourceType.GROUP, request.data().key())) | |
authorize(AclOperation.DESCRIBE, Resource.of(ResourceType.GROUP, coordinatorKey)) |
This change should be able to fix the group authorize exception
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
@@ -311,6 +312,7 @@ protected final void internalSetup(boolean startBroker) throws Exception { | |||
createClient(); | |||
|
|||
MetadataUtils.createOffsetMetadataIfMissing(conf.getKafkaMetadataTenant(), admin, clusterData, this.conf); | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unused change
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
pom.xml
Outdated
@@ -48,7 +48,7 @@ | |||
<!-- dependencies --> | |||
<jackson.version>2.14.2</jackson.version> | |||
<jackson-databind.version>2.14.2</jackson-databind.version> | |||
<kafka.version>2.8.0</kafka.version> | |||
<kafka.version>3.4.0</kafka.version> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we upgrade to Kafka 3.4.1
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
@@ -1187,7 +1198,8 @@ public CompletableFuture<Long> forcePurgeAbortTx() { | |||
public CompletableFuture<Long> recoverTxEntries( | |||
long offset, | |||
Executor executor) { | |||
if (!kafkaConfig.isKafkaTransactionCoordinatorEnabled()) { | |||
if (!kafkaConfig.isKafkaTransactionCoordinatorEnabled() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why we need the isInterceptorConfigured
check?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure about this change, there are two implementations MLTransactionSequenceIdGenerator
and ManagedLedgerInterceptorImpl
, maybe need more context from Enrico.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if you don't have the interceptor then you don't have the "index" (Kafka offset) in the records, and it is not possible to perform recovery.
IIRC this was because there are some tests about having legacy/broken configuration.
Now that KOP/S4K are mature it is not possible that people don't configure the interceptor,
maybe you can drop the check (but I bet that some tests will fail)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I'll drop it and fix the tests.
@@ -184,22 +188,52 @@ public static DescribeGroupsResponse newDescribeGroups( | |||
return new DescribeGroupsResponse(data); | |||
} | |||
|
|||
public static FindCoordinatorResponse newFindCoordinator(Node node) { | |||
public static FindCoordinatorResponse newFindCoordinator(List<String> coordinatorKeys, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we reuse this method? It looks like this method is not used anymore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that this method is used for Pulsar proxy, I remove this method and create a new one for generating Coordinator
results.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if the method is not used then we can drop it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
awesome
return CompletableFuture.completedFuture(Long.valueOf(0)); | ||
return CompletableFuture.completedFuture(0L); | ||
} | ||
if (!isBrokerIndexMetadataInterceptorConfigured(persistentTopic.getBrokerService())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we don't have this check, the UpgradeTest
will encounter an NPE problem while getting the current offset in method fetchOldestAvailableIndexFromTopic
, but we can't disable the Kafka transaction in that test right now, because newly Kafka client requires the producer id.
Actually, I think this check is necessary because the Kafka transaction can't work without continuously offset, maybe we can add this check while starting the broker. @Demogorgon314
…streamnative#1981) (cherry picked from commit 71b77b2) ### Motivation Update Kafka wire protocol to 3.4.0 and implement KIP-699 and KIP-709. --------- Co-authored-by: Enrico Olivelli <enrico.olivelli@datastax.com> (cherry picked from commit 19801c1)
(cherry picked from commit 71b77b2)
Motivation
Update Kafka wire protocol to 3.4.0 and implement KIP-699 and KIP-709.
Modifications
Describe the modifications you've done.
Documentation
Check the box below.
Need to update docs?
doc-required
(If you need help on updating docs, create a doc issue)
no-need-doc
(Please explain why)
doc
(If this PR contains doc changes)