Skip to content

Commit

Permalink
chore(test): parallelise version matrix
Browse files Browse the repository at this point in the history
Run the producer and consumer loops in parallel to speedup the FVT
  • Loading branch information
dnwe committed Jul 21, 2022
1 parent b12642e commit 7014e4b
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 39 deletions.
109 changes: 70 additions & 39 deletions functional_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
"testing"
"time"

"golang.org/x/sync/errgroup"

"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -439,33 +441,40 @@ func versionRange(lower KafkaVersion) []KafkaVersion {

func produceMsgs(t *testing.T, clientVersions []KafkaVersion, codecs []CompressionCodec, flush int, countPerVerCodec int, idempotent bool) []*ProducerMessage {
var (
wg sync.WaitGroup
producers []SyncProducer
producedMessagesMu sync.Mutex
producedMessages []*ProducerMessage
)
g := errgroup.Group{}
for _, prodVer := range clientVersions {
for _, codec := range codecs {
t.Run("producer-"+prodVer.String()+"-"+codec.String(), func(t *testing.T) {
t.Logf("*** Producing with client version %s codec %s\n", prodVer, codec)
prodCfg := NewTestConfig()
prodCfg.Version = prodVer
prodCfg.Producer.Return.Successes = true
prodCfg.Producer.Return.Errors = true
prodCfg.Producer.Flush.MaxMessages = flush
prodCfg.Producer.Compression = codec
prodCfg.Producer.Idempotent = idempotent
if idempotent {
prodCfg.Producer.RequiredAcks = WaitForAll
prodCfg.Net.MaxOpenRequests = 1
}
prodCfg := NewTestConfig()
prodCfg.ClientID = t.Name() + "-Producer-" + prodVer.String()
if codec > 0 {
prodCfg.ClientID += "-" + codec.String()
}
prodCfg.Version = prodVer
prodCfg.Producer.Return.Successes = true
prodCfg.Producer.Return.Errors = true
prodCfg.Producer.Flush.MaxMessages = flush
prodCfg.Producer.Compression = codec
prodCfg.Producer.Idempotent = idempotent
if idempotent {
prodCfg.Producer.RequiredAcks = WaitForAll
prodCfg.Net.MaxOpenRequests = 1
}

p, err := NewSyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, prodCfg)
if err != nil {
t.Errorf("Failed to create producer: version=%s, compression=%s, err=%v", prodVer, codec, err)
return
}
producers = append(producers, p)
p, err := NewSyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, prodCfg)
if err != nil {
t.Fatalf("Failed to create producer: version=%s, compression=%s, err=%v", prodVer, codec, err)
}
producers = append(producers, p)

prodVer := prodVer
codec := codec
g.Go(func() error {
t.Logf("*** Producing with client version %s codec %s\n", prodVer, codec)
var wg sync.WaitGroup
for i := 0; i < countPerVerCodec; i++ {
msg := &ProducerMessage{
Topic: "test.1",
Expand All @@ -483,10 +492,14 @@ func produceMsgs(t *testing.T, clientVersions []KafkaVersion, codecs []Compressi
producedMessagesMu.Unlock()
}()
}
wg.Wait()
return nil
})
}
}
wg.Wait()
if err := g.Wait(); err != nil {
t.Fatal(err)
}

for _, p := range producers {
safeClose(t, p)
Expand All @@ -496,6 +509,7 @@ func produceMsgs(t *testing.T, clientVersions []KafkaVersion, codecs []Compressi
sort.Slice(producedMessages, func(i, j int) bool {
return producedMessages[i].Offset < producedMessages[j].Offset
})
require.NotEmpty(t, producedMessages, "should have produced >0 messages")
t.Logf("*** Total produced %d, firstOffset=%d, lastOffset=%d\n",
len(producedMessages), producedMessages[0].Offset, producedMessages[len(producedMessages)-1].Offset)
return producedMessages
Expand All @@ -504,26 +518,32 @@ func produceMsgs(t *testing.T, clientVersions []KafkaVersion, codecs []Compressi
func consumeMsgs(t *testing.T, clientVersions []KafkaVersion, producedMessages []*ProducerMessage) {
// Consume all produced messages with all client versions supported by the
// cluster.
g := errgroup.Group{}
for _, consVer := range clientVersions {
t.Run("consumer-"+consVer.String(), func(t *testing.T) {
t.Logf("*** Consuming with client version %s\n", consVer)
// Create a partition consumer that should start from the first produced
// message.
consCfg := NewTestConfig()
consCfg.Version = consVer
c, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, consCfg)
if err != nil {
t.Fatal(err)
}
defer safeClose(t, c)
pc, err := c.ConsumePartition("test.1", 0, producedMessages[0].Offset)
if err != nil {
t.Fatal(err)
}
defer safeClose(t, pc)
// Create a partition consumer that should start from the first produced
// message.
consCfg := NewTestConfig()
consCfg.ClientID = t.Name() + "-Consumer-" + consVer.String()
consCfg.Consumer.MaxProcessingTime = time.Second
consCfg.Version = consVer
c, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, consCfg)
if err != nil {
t.Fatal(err)
}
defer safeClose(t, c)
pc, err := c.ConsumePartition("test.1", 0, producedMessages[0].Offset)
if err != nil {
t.Fatal(err)
}
defer safeClose(t, pc)

var wg sync.WaitGroup
consVer := consVer
g.Go(func() error {
// Consume as many messages as there have been produced and make sure that
// order is preserved.
t.Logf("*** Consuming with client version %s\n", consVer)
wg.Add(1)
for i, prodMsg := range producedMessages {
select {
case consMsg := <-pc.Messages():
Expand All @@ -535,10 +555,21 @@ func consumeMsgs(t *testing.T, clientVersions []KafkaVersion, producedMessages [
t.Fatalf("Consumed unexpected msg: version=%s, index=%d, want=%s, got=%s",
consVer, i, prodMsg2Str(prodMsg), consMsg2Str(consMsg))
}
case <-time.After(3 * time.Second):
t.Fatalf("Timeout waiting for: index=%d, offset=%d, msg=%s", i, prodMsg.Offset, prodMsg.Value)
if i == 0 {
t.Logf("Consumed first msg: version=%s, index=%d, got=%s",
consVer, i, consMsg2Str(consMsg))
wg.Done()
}
case <-time.After(15 * time.Second):
t.Fatalf("Timeout %s waiting for: index=%d, offset=%d, msg=%s",
consCfg.ClientID, i, prodMsg.Offset, prodMsg.Value)
}
}
return nil
})
wg.Wait() // wait for first message to be consumed before starting next consumer
}
if err := g.Wait(); err != nil {
t.Fatal(err)
}
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ require (
github.com/xdg-go/scram v1.1.1
golang.org/x/crypto v0.0.0-20220214200702-86341886e292 // indirect
golang.org/x/net v0.0.0-20220708220712-1185a9018129
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
)

Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a h1:DcqTD9SDLc+1P/r1EmRBwnVsrOwW+kk2vWf9n+1sGhs=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down

0 comments on commit 7014e4b

Please sign in to comment.