Skip to content

Commit

Permalink
Add test case for commit ordering
Browse files Browse the repository at this point in the history
This test simulates multiple concurrent commits for a single group
running in a single process. The expected behavior is that commits are
delivered in order to the mock broker, with no offset in the
OffsetCommitRequest being for a lower value than previously committed
for the partition.

Signed-off-by: Adrian Preston <PRESTONA@uk.ibm.com>
  • Loading branch information
prestona committed Jul 23, 2024
1 parent fca8b83 commit 8a7180b
Showing 1 changed file with 107 additions and 0 deletions.
107 changes: 107 additions & 0 deletions offset_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package sarama
import (
"errors"
"fmt"
"runtime"
"sync"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -105,6 +107,111 @@ func TestNewOffsetManager(t *testing.T) {
}
}

// Test that the correct sequence of offset commit messages is sent to a broker when
// multiple goroutines for a group are committing offsets at the same time
func TestOffsetManagerCommitSequence(t *testing.T) {
mu := &sync.Mutex{}
lastOffset := map[int32]int64{}
outOfOrder := ""
seedBroker := NewMockBroker(t, 1)
defer seedBroker.Close()
seedBroker.SetHandlerFuncByMap(map[string]requestHandlerFunc{
"MetadataRequest": func(req *request) (rest encoderWithHeader) {
resp := new(MetadataResponse)
resp.AddBroker(seedBroker.Addr(), seedBroker.BrokerID())
return resp
},
"FindCoordinatorRequest": func(req *request) (rest encoderWithHeader) {
resp := new(FindCoordinatorResponse)
resp.Coordinator = &Broker{id: seedBroker.brokerID, addr: seedBroker.Addr()}
return resp
},
"OffsetFetchRequest": func(r *request) (rest encoderWithHeader) {
req := r.body.(*OffsetFetchRequest)
resp := new(OffsetFetchResponse)
resp.Blocks = map[string]map[int32]*OffsetFetchResponseBlock{}
for topic, partitions := range req.partitions {
for _, partition := range partitions {
if _, ok := resp.Blocks[topic]; !ok {
resp.Blocks[topic] = map[int32]*OffsetFetchResponseBlock{}
}
resp.Blocks[topic][partition] = &OffsetFetchResponseBlock{
Offset: 0,
Err: ErrNoError,
}
}
}
return resp
},
"OffsetCommitRequest": func(r *request) (rest encoderWithHeader) {
req := r.body.(*OffsetCommitRequest)
func() {
mu.Lock()
defer mu.Unlock()
if outOfOrder == "" {
for partition, offset := range req.blocks["topic"] {
last := lastOffset[partition]
if last > offset.offset {
outOfOrder =
fmt.Sprintf("out of order commit to partition %d, current committed offset: %d, offset in request: %d",
partition, last, offset.offset)
}
lastOffset[partition] = offset.offset
}
}
}()

// Potentially yield, to try and avoid each Go routine running sequentially to completion
runtime.Gosched()

resp := new(OffsetCommitResponse)
resp.Errors = map[string]map[int32]KError{}
resp.Errors["topic"] = map[int32]KError{}
for partition := range req.blocks["topic"] {
resp.Errors["topic"][partition] = ErrNoError
}
return resp
},
})
testClient, err := NewClient([]string{seedBroker.Addr()}, NewTestConfig())
if err != nil {
t.Fatal(err)
}
defer safeClose(t, testClient)
om, err := NewOffsetManagerFromClient("group", testClient)
if err != nil {
t.Error(err)
}
defer safeClose(t, om)

const numPartitions = 10
const commitsPerPartition = 1000

wg := &sync.WaitGroup{}
for p := 0; p < numPartitions; p++ {
pom, err := om.ManagePartition("topic", int32(p))
if err != nil {
t.Error(err)
}

wg.Add(1)
go func() {
for c := 0; c < commitsPerPartition; c++ {
pom.MarkOffset(int64(c+1), "")
om.Commit()
}
wg.Done()
}()
}

wg.Wait()
mu.Lock()
defer mu.Unlock()
if outOfOrder != "" {
t.Error(outOfOrder)
}
}

var offsetsautocommitTestTable = []struct {
name string
set bool // if given will override default configuration for Consumer.Offsets.AutoCommit.Enable
Expand Down

0 comments on commit 8a7180b

Please sign in to comment.