From 03343104e3abbad5228ca8d13cec615fd03832e9 Mon Sep 17 00:00:00 2001 From: Mihai Todor Date: Thu, 31 Oct 2024 12:35:44 +0000 Subject: [PATCH] Port @rockwotj's fix from #2973. Thanks Tyler, nice catch! <3 Signed-off-by: Mihai Todor --- CHANGELOG.md | 4 +++ .../enterprise/redpanda_migrator_output.go | 28 ++++++++++++++----- 2 files changed, 25 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9e9ba86b04..33e69a09c5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,10 @@ All notable changes to this project will be documented in this file. - New CLI flag `--secrets` added. (@Jeffail) - New CLI flag `--disable-telemetry` added. (@Jeffail) +### Fixed + +- The `kafka`, `kafka_franz` and `redpanda_migrator` outputs no longer waste CPU for large batches. (@rockwotj) + ### Changed - The `aws_sqs` output field `url` now supports interpolation functions. (@rockwotj) diff --git a/internal/impl/kafka/enterprise/redpanda_migrator_output.go b/internal/impl/kafka/enterprise/redpanda_migrator_output.go index 9e1be9e3c7..d39894cc80 100644 --- a/internal/impl/kafka/enterprise/redpanda_migrator_output.go +++ b/internal/impl/kafka/enterprise/redpanda_migrator_output.go @@ -433,10 +433,24 @@ func (w *RedpandaMigratorWriter) WriteBatch(ctx context.Context, b service.Messa return service.ErrNotConnected } + topicExecutor := b.InterpolationExecutor(w.topic) + var keyExecutor *service.MessageBatchInterpolationExecutor + if w.key != nil { + keyExecutor = b.InterpolationExecutor(w.key) + } + var partitionExecutor *service.MessageBatchInterpolationExecutor + if w.partition != nil { + partitionExecutor = b.InterpolationExecutor(w.partition) + } + var timestampExecutor *service.MessageBatchInterpolationExecutor + if w.timestamp != nil { + timestampExecutor = b.InterpolationExecutor(w.timestamp) + } + records := make([]*kgo.Record, 0, len(b)) for i, msg := range b { var topic string - if topic, err = b.TryInterpolatedString(i, w.topic); err != nil { + if topic, err = topicExecutor.TryString(i); err != nil { return fmt.Errorf("topic interpolation error: %w", err) } @@ -470,13 +484,13 @@ func (w *RedpandaMigratorWriter) WriteBatch(ctx context.Context, b service.Messa if record.Value, err = msg.AsBytes(); err != nil { return } - if w.key != nil { - if record.Key, err = b.TryInterpolatedBytes(i, w.key); err != nil { + if keyExecutor != nil { + if record.Key, err = keyExecutor.TryBytes(i); err != nil { return fmt.Errorf("key interpolation error: %w", err) } } - if w.partition != nil { - partStr, err := b.TryInterpolatedString(i, w.partition) + if partitionExecutor != nil { + partStr, err := partitionExecutor.TryString(i) if err != nil { return fmt.Errorf("partition interpolation error: %w", err) } @@ -493,8 +507,8 @@ func (w *RedpandaMigratorWriter) WriteBatch(ctx context.Context, b service.Messa }) return nil }) - if w.timestamp != nil { - if tsStr, err := b.TryInterpolatedString(i, w.timestamp); err != nil { + if timestampExecutor != nil { + if tsStr, err := timestampExecutor.TryString(i); err != nil { return fmt.Errorf("timestamp interpolation error: %w", err) } else { if ts, err := strconv.ParseInt(tsStr, 10, 64); err != nil {