diff --git a/offset_manager.go b/offset_manager.go index 1bf545908..2659d4caf 100644 --- a/offset_manager.go +++ b/offset_manager.go @@ -251,18 +251,31 @@ func (om *offsetManager) Commit() { } func (om *offsetManager) flushToBroker() { + broker, err := om.coordinator() + if err != nil { + om.handleError(err) + return + } + + // Care needs to be taken to unlock this. Don't want to defer the unlock as this would + // cause the lock to be held while waiting for the broker to reply. + broker.lock.Lock() req := om.constructRequest() if req == nil { + broker.lock.Unlock() return } + resp, rp, err := sendOffsetCommit(broker, req) + broker.lock.Unlock() - broker, err := om.coordinator() if err != nil { om.handleError(err) + om.releaseCoordinator(broker) + _ = broker.Close() return } - resp, err := broker.CommitOffset(req) + err = handleResponsePromise(req, resp, rp, nil) if err != nil { om.handleError(err) om.releaseCoordinator(broker) @@ -270,9 +283,20 @@ func (om *offsetManager) flushToBroker() { return } + broker.handleThrottledResponse(resp) om.handleResponse(broker, req, resp) } +func sendOffsetCommit(coordinator *Broker, req *OffsetCommitRequest) (*OffsetCommitResponse, *responsePromise, error) { + resp := new(OffsetCommitResponse) + responseHeaderVersion := resp.headerVersion() + promise, err := coordinator.send(req, true, responseHeaderVersion) + if err != nil { + return nil, nil, err + } + return resp, promise, err +} + func (om *offsetManager) constructRequest() *OffsetCommitRequest { r := &OffsetCommitRequest{ Version: 1, diff --git a/offset_manager_test.go b/offset_manager_test.go index 04322fce7..34c409c1f 100644 --- a/offset_manager_test.go +++ b/offset_manager_test.go @@ -82,6 +82,9 @@ func TestNewOffsetManager(t *testing.T) { metadataResponse := new(MetadataResponse) metadataResponse.AddBroker(seedBroker.Addr(), seedBroker.BrokerID()) seedBroker.Returns(metadataResponse) + findCoordResponse := new(FindCoordinatorResponse) + findCoordResponse.Coordinator = &Broker{id: seedBroker.brokerID, addr: seedBroker.Addr()} + seedBroker.Returns(findCoordResponse) defer seedBroker.Close() testClient, err := NewClient([]string{seedBroker.Addr()}, NewTestConfig())