From fca8b8361081b8ff88a0c47c53baa97e11b3f802 Mon Sep 17 00:00:00 2001 From: Adrian Preston Date: Mon, 22 Jul 2024 14:31:10 +0100 Subject: [PATCH] Ensure ordering of commit requests Hold broker lock to prevent concurrent calls to commit from potentially being re-ordered between determining which offsets to include in the offset commit request, and sending the request to Kafka. Move finding the coordinator before creating the request to avoid holding the lock in the case the coordinator is unknown. This requires a change to the "new offset manager" test, which previously didn't expect the mock broker to receive a find coordinator request. Fixes: #2940 Co-authored-by: Michael Burgess Signed-off-by: Adrian Preston --- offset_manager.go | 28 ++++++++++++++++++++++++++-- offset_manager_test.go | 3 +++ 2 files changed, 29 insertions(+), 2 deletions(-) diff --git a/offset_manager.go b/offset_manager.go index 1bf545908..2659d4caf 100644 --- a/offset_manager.go +++ b/offset_manager.go @@ -251,18 +251,31 @@ func (om *offsetManager) Commit() { } func (om *offsetManager) flushToBroker() { + broker, err := om.coordinator() + if err != nil { + om.handleError(err) + return + } + + // Care needs to be taken to unlock this. Don't want to defer the unlock as this would + // cause the lock to be held while waiting for the broker to reply. + broker.lock.Lock() req := om.constructRequest() if req == nil { + broker.lock.Unlock() return } + resp, rp, err := sendOffsetCommit(broker, req) + broker.lock.Unlock() - broker, err := om.coordinator() if err != nil { om.handleError(err) + om.releaseCoordinator(broker) + _ = broker.Close() return } - resp, err := broker.CommitOffset(req) + err = handleResponsePromise(req, resp, rp, nil) if err != nil { om.handleError(err) om.releaseCoordinator(broker) @@ -270,9 +283,20 @@ func (om *offsetManager) flushToBroker() { return } + broker.handleThrottledResponse(resp) om.handleResponse(broker, req, resp) } +func sendOffsetCommit(coordinator *Broker, req *OffsetCommitRequest) (*OffsetCommitResponse, *responsePromise, error) { + resp := new(OffsetCommitResponse) + responseHeaderVersion := resp.headerVersion() + promise, err := coordinator.send(req, true, responseHeaderVersion) + if err != nil { + return nil, nil, err + } + return resp, promise, err +} + func (om *offsetManager) constructRequest() *OffsetCommitRequest { r := &OffsetCommitRequest{ Version: 1, diff --git a/offset_manager_test.go b/offset_manager_test.go index 04322fce7..34c409c1f 100644 --- a/offset_manager_test.go +++ b/offset_manager_test.go @@ -82,6 +82,9 @@ func TestNewOffsetManager(t *testing.T) { metadataResponse := new(MetadataResponse) metadataResponse.AddBroker(seedBroker.Addr(), seedBroker.BrokerID()) seedBroker.Returns(metadataResponse) + findCoordResponse := new(FindCoordinatorResponse) + findCoordResponse.Coordinator = &Broker{id: seedBroker.brokerID, addr: seedBroker.Addr()} + seedBroker.Returns(findCoordResponse) defer seedBroker.Close() testClient, err := NewClient([]string{seedBroker.Addr()}, NewTestConfig())