-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for alter/list partition reassignements APIs #1617
Add support for alter/list partition reassignements APIs #1617
Conversation
@sladkoff 💯 Do you think you could rebase your branch to make sure it's ✅on CI 🙏 |
601e78f
to
093729b
Compare
093729b
to
c4390e1
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks again, the protocol changes look good 👍
A small number of enquiries below:
admin.go
Outdated
} | ||
|
||
request := &AlterPartitionReassignmentsRequest{ | ||
TimeoutMs: int32(10000), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where did this 10_000 millisecond default timeout come from? The protocol description suggests 60_000 should be the default?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we probably just put something in there :) Using the suggested default makes sense. I added this.
admin.go
Outdated
} | ||
|
||
request := &ListPartitionReassignmentsRequest{ | ||
TimeoutMs: int32(10000), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As above, shouldn't this be 60_000 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes
admin_test.go
Outdated
seedBroker.SetHandlerByMap(map[string]MockResponse{ | ||
"MetadataRequest": NewMockMetadataResponse(t). | ||
SetController(seedBroker.BrokerID()). | ||
SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please could you add a second mockBroker to the test and set that as the controller in the mock metadata response and not the seed broker? That should exercise that the request must be sent to the Controller and not just any broker (or the broker currently connected to)
admin_test.go
Outdated
seedBroker.SetHandlerByMap(map[string]MockResponse{ | ||
"MetadataRequest": NewMockMetadataResponse(t). | ||
SetController(seedBroker.BrokerID()). | ||
SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As above, please use different MockBroker for the Controller
admin.go
Outdated
|
||
request.AddBlock(topic, partitions) | ||
|
||
b, err := ca.findAnyBroker() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought that ListPartitionReassignments also needed to be sent to the Controller (not just any broker) like the Alter request? The Java Admin client seems to do so here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch. I fixed that
@dnwe I added commits which address your remarks. Can you have a look? One of the CI jobs failed, but it does not look related to our change, right? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated changes look good to me.
One small remaining fix below and then I think the PR is ready to be approved and merged
@@ -512,7 +512,7 @@ func (ca *clusterAdmin) ListPartitionReassignments(topic string, partitions []in | |||
|
|||
request.AddBlock(topic, partitions) | |||
|
|||
b, err := ca.findAnyBroker() | |||
b, err := ca.Controller() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now this is using the controller you probably want to wrapper the body in a return ca.retryOnError(isErrNoController, ...)
call like you did for AlterPartitionReassignments
so it refreshes the cached controller if it is stale
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the retryOnError
function cannot be used when a value needs to be returned. other functions in admin.go
where a controller is used and a return type is needed also don't use retrying. How should this be handled? should we implement a retry function that returns something like interface{}
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dwi-di ah of course — let's just leave this for now and we can always follow-up in another PR
I'm guessing it was just a slight flake — but it was in the new TestListPartitionReassignmentRequest func:
|
I was able to re-create with:
|
Ah I can see the problem, the ordering of the two blocks in your check of |
ah I see, but what do you propose to do here? the only way to fix it is to change the encoder to sort the blocks based on topic name, right? but that does not feel right, because it is not really needed, only for the test... |
Yes I think it just isn't possible to use the existing |
I modified the flaky test, that should be all then. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, LGTM 👍
…ignment * feature/update-replication-assignment: feat: Add alter and list partition reassignments
@dnwe Nice! Can we expect to get these changes into a release soon? |
we should probably wait for #1640 before releasing this, otherwise the list partition replica assignement status is not really usable. |
That's fair. @dnwe any objections on cutting release today? |
@bai yes I was merging a few PRs in yesterday as I think we do need to cut a release with this and also to include @KJTsanaktsidis's fix for the session ID cache exhaustion that was introduced in 1.26 and is a nasty bug. |
@bai we might need to check on the github actions builds though, as they've been a bit flakey recently. Not sure if we can do anything to improve them |
I think we can address GitHub Actions issue short-term by reducing build matrix — it's gotten so much flakier with 9 builds instead of 6. Hopefully when we have that docker-compose setup in place it'll get better 🙏 Want to do this release yourself or would you prefer me cutting it? |
Sure I can give a go to cutting the release. Just a tag in Git and then create a release with a small changelog linking to the PRs? |
@dnwe I usually use So basically I'm not doing manual git tag but use GitHub UI to release 🤷♀️ |
@sladkoff released and announced! https://github.com/Shopify/sarama/releases/tag/v1.26.2 https://twitter.com/oldmanuk/status/1258035278771621888 (apologies for missing shoutouts to those I couldn't immediately find on Twitter) |
Props to Dominic for cutting this one and all of you folks for contributing 🙏 |
We added support for API Keys 45 (AlterPartitionReassignments) and 46 (ListPartitionReassignments).
This enables sarama to update the replication factor of topics for Kafka 2.4.0.0+ (See also #1238).
We had to do change quite a lot since these APIs require the use of newer data structures in the body as well as in the header of the requests/responses (mainly compact structures and tagged fields).
Please let us know what you think.