Skip to content

Commit

Permalink
Add integration tests
Browse files Browse the repository at this point in the history
Signed-off-by: Mihai Todor <todormihai@gmail.com>
  • Loading branch information
mihaitodor committed Feb 17, 2025
1 parent 62c0a1b commit 407f84a
Show file tree
Hide file tree
Showing 2 changed files with 191 additions and 17 deletions.
204 changes: 189 additions & 15 deletions internal/impl/kafka/enterprise/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"log/slog"
"net/http"
"strconv"
"sync"
"testing"
"time"

Expand All @@ -25,6 +26,7 @@ import (
"github.com/ory/dockertest/v3/docker"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/kmsg"
Expand Down Expand Up @@ -654,20 +656,30 @@ func startRedpanda(t *testing.T, pool *dockertest.Pool, exposeBroker bool, autoc
}, nil
}

func produceMessage(t *testing.T, rpe redpandaEndpoints, topic, message string) {
// produceMessages produces `count` messages to the given `topic` with the given `message` content. The
// `timestampOffset` indicates an offset which gets added to the `counter()` Bloblang function which is used to generate
// the message timestamps sequentially, the first one being `1 + timestampOffset`.
func produceMessages(t *testing.T, rpe redpandaEndpoints, topic, message string, timestampOffset, count int, encode bool) {
streamBuilder := service.NewStreamBuilder()
require.NoError(t, streamBuilder.SetYAML(fmt.Sprintf(`
config := ""
if encode {
config = fmt.Sprintf(`
pipeline:
processors:
- schema_registry_encode:
url: %s
subject: %s
avro_raw_json: true
`, rpe.schemaRegistryURL, topic)
}
config += fmt.Sprintf(`
output:
kafka_franz:
seed_brokers: [ %s ]
topic: %s
`, rpe.schemaRegistryURL, topic, rpe.brokerAddr, topic)))
timestamp_ms: ${! counter() + %d}
`, rpe.brokerAddr, topic, timestampOffset)
require.NoError(t, streamBuilder.SetYAML(config))
require.NoError(t, streamBuilder.SetLoggerYAML(`level: OFF`))

inFunc, err := streamBuilder.AddProducerFunc()
Expand All @@ -682,7 +694,9 @@ output:
t.Cleanup(done)

go func() {
require.NoError(t, inFunc(ctx, service.NewMessage([]byte(message))))
for range count {
require.NoError(t, inFunc(ctx, service.NewMessage([]byte(message))))
}

require.NoError(t, stream.StopWithin(3*time.Second))
}()
Expand All @@ -691,34 +705,46 @@ output:
require.NoError(t, err)
}

func readMessageWithCG(t *testing.T, rpe redpandaEndpoints, topic, consumerGroup, message string) {
// readMessagesWithCG reads `count` messages from the given `topic` with the given `consumerGroup`.
// TODO: Since `read_until` can't guarantee that we will read `count` messages, `topic` needs to have exactly `count`
// messages in it. We should add some mechanism to the Kafka inputs to allow us to read a range of offsets if possible
// (or up to a certain offset).
func readMessagesWithCG(t *testing.T, rpe redpandaEndpoints, topic, consumerGroup, message string, count int, decode bool) {
streamBuilder := service.NewStreamBuilder()
require.NoError(t, streamBuilder.SetYAML(fmt.Sprintf(`
config := fmt.Sprintf(`
input:
kafka_franz:
seed_brokers: [ %s ]
topics: [ %s ]
consumer_group: %s
start_from_oldest: true
`, rpe.brokerAddr, topic, consumerGroup)
if decode {
config += fmt.Sprintf(`
processors:
- schema_registry_decode:
url: %s
avro_raw_json: true
`, rpe.schemaRegistryURL)
}
config += `
output:
# Need to use drop explicitly with SetYAML(). Otherwise, the output will be inproc
# (or stdout if we import github.com/redpanda-data/benthos/v4/public/components/io)
drop: {}
`, rpe.brokerAddr, topic, consumerGroup, rpe.schemaRegistryURL)))
`
require.NoError(t, streamBuilder.SetYAML(config))
require.NoError(t, streamBuilder.SetLoggerYAML(`level: OFF`))

recvChan := make(chan struct{})
recvMsgWG := sync.WaitGroup{}
recvMsgWG.Add(count)
err := streamBuilder.AddConsumerFunc(func(ctx context.Context, m *service.Message) error {
defer recvMsgWG.Done()
b, err := m.AsBytes()
require.NoError(t, err)

assert.Equal(t, message, string(b))

close(recvChan)
return nil
})
require.NoError(t, err)
Expand All @@ -735,7 +761,8 @@ output:
require.NoError(t, stream.Run(ctx))
}()

<-recvChan
recvMsgWG.Wait()

require.NoError(t, stream.StopWithin(3*time.Second))
}

Expand Down Expand Up @@ -812,7 +839,7 @@ output:
err = stream.Run(context.Background())
require.NoError(t, err)

t.Log("Migrator shut down")
t.Log("Migrator pipeline shut down")

close(closeChan)
}()
Expand Down Expand Up @@ -846,7 +873,7 @@ func TestRedpandaMigratorIntegration(t *testing.T) {

// Produce one message
dummyMessage := `{"test":"foo"}`
produceMessage(t, source, dummyTopic, dummyMessage)
produceMessages(t, source, dummyTopic, dummyMessage, 0, 1, true)
t.Log("Finished producing first message in source")

// Run the Redpanda Migrator bundle
Expand Down Expand Up @@ -891,24 +918,171 @@ func TestRedpandaMigratorIntegration(t *testing.T) {

dummyCG := "foobar_cg"
// Read the message from source using a consumer group
readMessageWithCG(t, source, dummyTopic, dummyCG, dummyMessage)
readMessagesWithCG(t, source, dummyTopic, dummyCG, dummyMessage, 1, true)
checkMigrated("redpanda_migrator_offsets_input", func(_ string, meta map[string]string) {
assert.Equal(t, dummyTopic, meta["kafka_offset_topic"])
})
t.Logf("Finished reading first message from source with consumer group %q", dummyCG)

// Produce one more message in the source
secondDummyMessage := `{"test":"bar"}`
produceMessage(t, source, dummyTopic, secondDummyMessage)
produceMessages(t, source, dummyTopic, secondDummyMessage, 0, 1, true)
checkMigrated("redpanda_migrator_input", func(msg string, _ map[string]string) {
assert.Equal(t, "\x00\x00\x00\x00\x01\x06bar", msg)
})
t.Log("Finished producing second message in source")

// Read the new message from the destination using a consumer group
readMessageWithCG(t, destination, dummyTopic, dummyCG, secondDummyMessage)
readMessagesWithCG(t, destination, dummyTopic, dummyCG, secondDummyMessage, 1, true)
checkMigrated("redpanda_migrator_offsets_input", func(_ string, meta map[string]string) {
assert.Equal(t, dummyTopic, meta["kafka_offset_topic"])
})
t.Logf("Finished reading second message from destination with consumer group %q", dummyCG)
}

func TestRedpandaMigratorOffsetsIntegration(t *testing.T) {
integration.CheckSkip(t)
t.Parallel()

tests := []struct {
name string
cgAtEndOffset bool
extraCGUpdate bool
}{
{
name: "source consumer group points to the topic end offset",
cgAtEndOffset: true,
},
{
name: "source consumer group points to an older offset inside the topic",
cgAtEndOffset: false,
},
{
name: "subsequent consumer group updates are processed correctly",
cgAtEndOffset: true,
extraCGUpdate: true,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
t.Parallel()

pool, err := dockertest.NewPool("")
require.NoError(t, err)
pool.MaxWait = time.Minute

source, err := startRedpanda(t, pool, true, true)
require.NoError(t, err)
destination, err := startRedpanda(t, pool, true, true)
require.NoError(t, err)

t.Logf("Source broker: %s", source.brokerAddr)
t.Logf("Destination broker: %s", destination.brokerAddr)

dummyTopic := "test"
dummyMessage := `{"test":"foo"}`
dummyConsumerGroup := "test_cg"
messageCount := 5

// Produce messages in the source cluster
// The message timestamps are produced in ascending order, starting from 1 all the way to messageCount
produceMessages(t, source, dummyTopic, dummyMessage, 0, messageCount, false)

// Produce the exact same messages in the destination cluster
produceMessages(t, destination, dummyTopic, dummyMessage, 0, messageCount, false)

// Read the messages from the source cluster using a consumer group
readMessagesWithCG(t, source, dummyTopic, dummyConsumerGroup, dummyMessage, 5, false)

if !test.cgAtEndOffset {
// The next messages need to have more recent timestamps than the existing messages, so we use
// `messageCount` as an offset for their timestamps
produceMessages(t, source, dummyTopic, dummyMessage, messageCount, messageCount, false)
}

if test.extraCGUpdate {
// Make sure both source and destination have extra messages after the current consumer group offset
produceMessages(t, source, dummyTopic, dummyMessage, messageCount, messageCount, false)
produceMessages(t, destination, dummyTopic, dummyMessage, messageCount, messageCount, false)
}

t.Log("Finished setting up messages in the source and destination clusters")

// Migrate the consumer group offset
streamBuilder := service.NewStreamBuilder()
require.NoError(t, streamBuilder.SetYAML(fmt.Sprintf(`
input:
redpanda_migrator_offsets:
seed_brokers: [ %s ]
topics: [ %s ]
output:
redpanda_migrator_offsets:
seed_brokers: [ %s ]
`, source.brokerAddr, dummyTopic, destination.brokerAddr)))
require.NoError(t, streamBuilder.SetLoggerYAML(`level: INFO`))

migratorUpdateWG := sync.WaitGroup{}
migratorUpdateWG.Add(1)
require.NoError(t, streamBuilder.AddConsumerFunc(func(_ context.Context, m *service.Message) error {
defer migratorUpdateWG.Done()
return nil
}))

// Ensure the callback function is called after the output wrote the message
streamBuilder.SetOutputBrokerPattern(service.OutputBrokerPatternFanOutSequential)

stream, err := streamBuilder.Build()
require.NoError(t, err)

license.InjectTestService(stream.Resources())

// Run stream in the background
migratorCloseChan := make(chan struct{})
go func() {
err = stream.Run(context.Background())
require.NoError(t, err)

t.Log("redpanda_migrator_offsets pipeline shut down")

close(migratorCloseChan)
}()

t.Cleanup(func() {
require.NoError(t, stream.StopWithin(3*time.Second))

<-migratorCloseChan
})

migratorUpdateWG.Wait()

if test.extraCGUpdate {
// Trigger another consumer group update to get it to point to the end of the topic
migratorUpdateWG.Add(1)
readMessagesWithCG(t, source, dummyTopic, dummyConsumerGroup, dummyMessage, messageCount, false)
migratorUpdateWG.Wait()
}

client, err := kgo.NewClient(kgo.SeedBrokers([]string{destination.brokerAddr}...))
require.NoError(t, err)
defer client.Close()

adm := kadm.NewClient(client)
offsets, err := adm.FetchOffsets(context.Background(), dummyConsumerGroup)
currentCGOffset, ok := offsets.Lookup(dummyTopic, 0)
require.True(t, ok)

endOffset := int64(messageCount)
if test.cgAtEndOffset {
offsets, err := adm.ListEndOffsets(context.Background(), dummyTopic)
require.NoError(t, err)
o, ok := offsets.Lookup(dummyTopic, 0)
require.True(t, ok)
endOffset = o.Offset
}
assert.Equal(t, endOffset, currentCGOffset.At)
assert.Equal(t, dummyTopic, currentCGOffset.Topic)
})
}
}
4 changes: 2 additions & 2 deletions internal/impl/kafka/enterprise/redpanda_migrator.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ Source->>Source: O = Receive(__consumer_offsets)
Source->>Source: X = ListEndOffsets(T, P)
Source->>Source: X > O
Source->>Source: TS = ReadTimestamp(T, P, O)
Source->>Destination: TS
Source->>Destination: (T, P, TS)
Destination->>Destination: O' = ListOffsetsAfterMilli(T, P, TS)
Destination->>Destination: CommitOffsets(T, P, O')
```
Expand All @@ -37,7 +37,7 @@ Source->>Source: O = Receive(__consumer_offsets)
Source->>Source: X = ListEndOffsets(T, P)
Source->>Source: X == O
Source->>Source: TS = ReadTimestamp(T, P, -1)
Source->>Destination: TS
Source->>Destination: (T, P, TS)
Destination->>Destination: O' = ListOffsetsAfterMilli(T, P, TS)
Destination->>Destination: O'' = ReadOffset(T, P, -1)
Destination->>Destination: If O'' == O' then O' = ListEndOffsets(T, P)
Expand Down

0 comments on commit 407f84a

Please sign in to comment.