Skip to content

Commit

Permalink
Merge pull request #2975 from redpanda-data/mihaitodor-fix-redpanda-m…
Browse files Browse the repository at this point in the history
…igrator-output-perf

Port @rockwotj's fix from #2973.
  • Loading branch information
mihaitodor authored Oct 31, 2024
2 parents 7bf984a + 0334310 commit 754fd8a
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 7 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ All notable changes to this project will be documented in this file.
- New CLI flag `--secrets` added. (@Jeffail)
- New CLI flag `--disable-telemetry` added. (@Jeffail)

### Fixed

- The `kafka`, `kafka_franz` and `redpanda_migrator` outputs no longer waste CPU for large batches. (@rockwotj)

### Changed

- The `aws_sqs` output field `url` now supports interpolation functions. (@rockwotj)
Expand Down
28 changes: 21 additions & 7 deletions internal/impl/kafka/enterprise/redpanda_migrator_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,10 +433,24 @@ func (w *RedpandaMigratorWriter) WriteBatch(ctx context.Context, b service.Messa
return service.ErrNotConnected
}

topicExecutor := b.InterpolationExecutor(w.topic)
var keyExecutor *service.MessageBatchInterpolationExecutor
if w.key != nil {
keyExecutor = b.InterpolationExecutor(w.key)
}
var partitionExecutor *service.MessageBatchInterpolationExecutor
if w.partition != nil {
partitionExecutor = b.InterpolationExecutor(w.partition)
}
var timestampExecutor *service.MessageBatchInterpolationExecutor
if w.timestamp != nil {
timestampExecutor = b.InterpolationExecutor(w.timestamp)
}

records := make([]*kgo.Record, 0, len(b))
for i, msg := range b {
var topic string
if topic, err = b.TryInterpolatedString(i, w.topic); err != nil {
if topic, err = topicExecutor.TryString(i); err != nil {
return fmt.Errorf("topic interpolation error: %w", err)
}

Expand Down Expand Up @@ -470,13 +484,13 @@ func (w *RedpandaMigratorWriter) WriteBatch(ctx context.Context, b service.Messa
if record.Value, err = msg.AsBytes(); err != nil {
return
}
if w.key != nil {
if record.Key, err = b.TryInterpolatedBytes(i, w.key); err != nil {
if keyExecutor != nil {
if record.Key, err = keyExecutor.TryBytes(i); err != nil {
return fmt.Errorf("key interpolation error: %w", err)
}
}
if w.partition != nil {
partStr, err := b.TryInterpolatedString(i, w.partition)
if partitionExecutor != nil {
partStr, err := partitionExecutor.TryString(i)
if err != nil {
return fmt.Errorf("partition interpolation error: %w", err)
}
Expand All @@ -493,8 +507,8 @@ func (w *RedpandaMigratorWriter) WriteBatch(ctx context.Context, b service.Messa
})
return nil
})
if w.timestamp != nil {
if tsStr, err := b.TryInterpolatedString(i, w.timestamp); err != nil {
if timestampExecutor != nil {
if tsStr, err := timestampExecutor.TryString(i); err != nil {
return fmt.Errorf("timestamp interpolation error: %w", err)
} else {
if ts, err := strconv.ParseInt(tsStr, 10, 64); err != nil {
Expand Down

0 comments on commit 754fd8a

Please sign in to comment.