diff --git a/internal/impl/kafka/output_kafka_franz.go b/internal/impl/kafka/output_kafka_franz.go index 1c93389e73..89d614ecfa 100644 --- a/internal/impl/kafka/output_kafka_franz.go +++ b/internal/impl/kafka/output_kafka_franz.go @@ -366,10 +366,24 @@ func (f *FranzKafkaWriter) WriteBatch(ctx context.Context, b service.MessageBatc return service.ErrNotConnected } + topicExecutor := b.InterpolationExecutor(f.topic) + var keyExecutor *service.MessageBatchInterpolationExecutor + if f.key != nil { + keyExecutor = b.InterpolationExecutor(f.key) + } + var partitionExecutor *service.MessageBatchInterpolationExecutor + if f.partition != nil { + partitionExecutor = b.InterpolationExecutor(f.partition) + } + var timestampExecutor *service.MessageBatchInterpolationExecutor + if f.timestamp != nil { + timestampExecutor = b.InterpolationExecutor(f.timestamp) + } + records := make([]*kgo.Record, 0, len(b)) for i, msg := range b { var topic string - if topic, err = b.TryInterpolatedString(i, f.topic); err != nil { + if topic, err = topicExecutor.TryString(i); err != nil { return fmt.Errorf("topic interpolation error: %w", err) } @@ -377,13 +391,13 @@ func (f *FranzKafkaWriter) WriteBatch(ctx context.Context, b service.MessageBatc if record.Value, err = msg.AsBytes(); err != nil { return } - if f.key != nil { - if record.Key, err = b.TryInterpolatedBytes(i, f.key); err != nil { + if keyExecutor != nil { + if record.Key, err = keyExecutor.TryBytes(i); err != nil { return fmt.Errorf("key interpolation error: %w", err) } } - if f.partition != nil { - partStr, err := b.TryInterpolatedString(i, f.partition) + if partitionExecutor != nil { + partStr, err := partitionExecutor.TryString(i) if err != nil { return fmt.Errorf("partition interpolation error: %w", err) } @@ -400,8 +414,8 @@ func (f *FranzKafkaWriter) WriteBatch(ctx context.Context, b service.MessageBatc }) return nil }) - if f.timestamp != nil { - if tsStr, err := b.TryInterpolatedString(i, f.timestamp); err != nil { + if timestampExecutor != nil { + if tsStr, err := timestampExecutor.TryString(i); err != nil { return fmt.Errorf("timestamp interpolation error: %w", err) } else { if ts, err := strconv.ParseInt(tsStr, 10, 64); err != nil { diff --git a/internal/impl/kafka/output_sarama_kafka.go b/internal/impl/kafka/output_sarama_kafka.go index e04f3878bb..89d5606af7 100644 --- a/internal/impl/kafka/output_sarama_kafka.go +++ b/internal/impl/kafka/output_sarama_kafka.go @@ -505,17 +505,28 @@ func (k *kafkaWriter) WriteBatch(ctx context.Context, msg service.MessageBatch) return service.ErrNotConnected } + topicExecutor := msg.InterpolationExecutor(k.topic) + keyExecutor := msg.InterpolationExecutor(k.key) + var partitionExecutor *service.MessageBatchInterpolationExecutor + if k.partition != nil { + partitionExecutor = msg.InterpolationExecutor(k.partition) + } + var timestampExecutor *service.MessageBatchInterpolationExecutor + if k.timestamp != nil { + timestampExecutor = msg.InterpolationExecutor(k.timestamp) + } + boff := k.backoffCtor() userDefinedHeaders := k.buildUserDefinedHeaders(k.staticHeaders) msgs := []*sarama.ProducerMessage{} for i := 0; i < len(msg); i++ { - key, err := msg.TryInterpolatedBytes(i, k.key) + key, err := keyExecutor.TryBytes(i) if err != nil { return fmt.Errorf("key interpolation error: %w", err) } - topic, err := msg.TryInterpolatedString(i, k.topic) + topic, err := topicExecutor.TryString(i) if err != nil { return fmt.Errorf("topic interpolation error: %w", err) } @@ -543,8 +554,8 @@ func (k *kafkaWriter) WriteBatch(ctx context.Context, msg service.MessageBatch) // partitioner. Although samara will (currently) ignore the partition // field when not using a manual partitioner, we should only set it when // we explicitly want that. - if k.partition != nil { - partitionString, err := msg.TryInterpolatedString(i, k.partition) + if partitionExecutor != nil { + partitionString, err := partitionExecutor.TryString(i) if err != nil { return fmt.Errorf("partition interpolation error: %w", err) } @@ -563,8 +574,8 @@ func (k *kafkaWriter) WriteBatch(ctx context.Context, msg service.MessageBatch) nextMsg.Partition = int32(partitionInt) } - if k.timestamp != nil { - if tsStr, err := msg.TryInterpolatedString(i, k.timestamp); err != nil { + if timestampExecutor != nil { + if tsStr, err := timestampExecutor.TryString(i); err != nil { return fmt.Errorf("timestamp interpolation error: %w", err) } else { if ts, err := strconv.ParseInt(tsStr, 10, 64); err != nil {