Skip to content

Commit

Permalink
Ensure ordering of commit requests
Browse files Browse the repository at this point in the history
Hold broker lock to prevent concurrent calls to commit from potentially
being re-ordered between determining which offsets to include in the
offset commit request, and sending the request to Kafka.

Move finding the coordinator before creating the request to avoid
holding the lock in the case the coordinator is unknown. This requires a
change to the "new offset manager" test, which previously didn't expect
the mock broker to receive a find coordinator request.

Fixes: #2940
Co-authored-by: Michael Burgess <michburg@uk.ibm.com>
Signed-off-by: Adrian Preston <PRESTONA@uk.ibm.com>
  • Loading branch information
prestona and mpburg committed Jul 23, 2024
1 parent d2246cc commit fca8b83
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 2 deletions.
28 changes: 26 additions & 2 deletions offset_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,28 +251,52 @@ func (om *offsetManager) Commit() {
}

func (om *offsetManager) flushToBroker() {
broker, err := om.coordinator()
if err != nil {
om.handleError(err)
return
}

// Care needs to be taken to unlock this. Don't want to defer the unlock as this would
// cause the lock to be held while waiting for the broker to reply.
broker.lock.Lock()
req := om.constructRequest()
if req == nil {
broker.lock.Unlock()
return
}
resp, rp, err := sendOffsetCommit(broker, req)
broker.lock.Unlock()

broker, err := om.coordinator()
if err != nil {
om.handleError(err)
om.releaseCoordinator(broker)
_ = broker.Close()
return
}

resp, err := broker.CommitOffset(req)
err = handleResponsePromise(req, resp, rp, nil)
if err != nil {
om.handleError(err)
om.releaseCoordinator(broker)
_ = broker.Close()
return
}

broker.handleThrottledResponse(resp)
om.handleResponse(broker, req, resp)
}

func sendOffsetCommit(coordinator *Broker, req *OffsetCommitRequest) (*OffsetCommitResponse, *responsePromise, error) {
resp := new(OffsetCommitResponse)
responseHeaderVersion := resp.headerVersion()
promise, err := coordinator.send(req, true, responseHeaderVersion)
if err != nil {
return nil, nil, err
}
return resp, promise, err
}

func (om *offsetManager) constructRequest() *OffsetCommitRequest {
r := &OffsetCommitRequest{
Version: 1,
Expand Down
3 changes: 3 additions & 0 deletions offset_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ func TestNewOffsetManager(t *testing.T) {
metadataResponse := new(MetadataResponse)
metadataResponse.AddBroker(seedBroker.Addr(), seedBroker.BrokerID())
seedBroker.Returns(metadataResponse)
findCoordResponse := new(FindCoordinatorResponse)
findCoordResponse.Coordinator = &Broker{id: seedBroker.brokerID, addr: seedBroker.Addr()}
seedBroker.Returns(findCoordResponse)
defer seedBroker.Close()

testClient, err := NewClient([]string{seedBroker.Addr()}, NewTestConfig())
Expand Down

0 comments on commit fca8b83

Please sign in to comment.