From 8a7180b1aa6811a86a0f19f01b04b1e75b8a98a9 Mon Sep 17 00:00:00 2001 From: Adrian Preston Date: Mon, 22 Jul 2024 13:29:26 +0100 Subject: [PATCH] Add test case for commit ordering This test simulates multiple concurrent commits for a single group running in a single process. The expected behavior is that commits are delivered in order to the mock broker, with no offset in the OffsetCommitRequest being for a lower value than previously committed for the partition. Signed-off-by: Adrian Preston --- offset_manager_test.go | 107 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 107 insertions(+) diff --git a/offset_manager_test.go b/offset_manager_test.go index 34c409c1f..9afa1c090 100644 --- a/offset_manager_test.go +++ b/offset_manager_test.go @@ -3,6 +3,8 @@ package sarama import ( "errors" "fmt" + "runtime" + "sync" "sync/atomic" "testing" "time" @@ -105,6 +107,111 @@ func TestNewOffsetManager(t *testing.T) { } } +// Test that the correct sequence of offset commit messages is sent to a broker when +// multiple goroutines for a group are committing offsets at the same time +func TestOffsetManagerCommitSequence(t *testing.T) { + mu := &sync.Mutex{} + lastOffset := map[int32]int64{} + outOfOrder := "" + seedBroker := NewMockBroker(t, 1) + defer seedBroker.Close() + seedBroker.SetHandlerFuncByMap(map[string]requestHandlerFunc{ + "MetadataRequest": func(req *request) (rest encoderWithHeader) { + resp := new(MetadataResponse) + resp.AddBroker(seedBroker.Addr(), seedBroker.BrokerID()) + return resp + }, + "FindCoordinatorRequest": func(req *request) (rest encoderWithHeader) { + resp := new(FindCoordinatorResponse) + resp.Coordinator = &Broker{id: seedBroker.brokerID, addr: seedBroker.Addr()} + return resp + }, + "OffsetFetchRequest": func(r *request) (rest encoderWithHeader) { + req := r.body.(*OffsetFetchRequest) + resp := new(OffsetFetchResponse) + resp.Blocks = map[string]map[int32]*OffsetFetchResponseBlock{} + for topic, partitions := range req.partitions { + for _, partition := range partitions { + if _, ok := resp.Blocks[topic]; !ok { + resp.Blocks[topic] = map[int32]*OffsetFetchResponseBlock{} + } + resp.Blocks[topic][partition] = &OffsetFetchResponseBlock{ + Offset: 0, + Err: ErrNoError, + } + } + } + return resp + }, + "OffsetCommitRequest": func(r *request) (rest encoderWithHeader) { + req := r.body.(*OffsetCommitRequest) + func() { + mu.Lock() + defer mu.Unlock() + if outOfOrder == "" { + for partition, offset := range req.blocks["topic"] { + last := lastOffset[partition] + if last > offset.offset { + outOfOrder = + fmt.Sprintf("out of order commit to partition %d, current committed offset: %d, offset in request: %d", + partition, last, offset.offset) + } + lastOffset[partition] = offset.offset + } + } + }() + + // Potentially yield, to try and avoid each Go routine running sequentially to completion + runtime.Gosched() + + resp := new(OffsetCommitResponse) + resp.Errors = map[string]map[int32]KError{} + resp.Errors["topic"] = map[int32]KError{} + for partition := range req.blocks["topic"] { + resp.Errors["topic"][partition] = ErrNoError + } + return resp + }, + }) + testClient, err := NewClient([]string{seedBroker.Addr()}, NewTestConfig()) + if err != nil { + t.Fatal(err) + } + defer safeClose(t, testClient) + om, err := NewOffsetManagerFromClient("group", testClient) + if err != nil { + t.Error(err) + } + defer safeClose(t, om) + + const numPartitions = 10 + const commitsPerPartition = 1000 + + wg := &sync.WaitGroup{} + for p := 0; p < numPartitions; p++ { + pom, err := om.ManagePartition("topic", int32(p)) + if err != nil { + t.Error(err) + } + + wg.Add(1) + go func() { + for c := 0; c < commitsPerPartition; c++ { + pom.MarkOffset(int64(c+1), "") + om.Commit() + } + wg.Done() + }() + } + + wg.Wait() + mu.Lock() + defer mu.Unlock() + if outOfOrder != "" { + t.Error(outOfOrder) + } +} + var offsetsautocommitTestTable = []struct { name string set bool // if given will override default configuration for Consumer.Offsets.AutoCommit.Enable