From c4420b410b8b7f23853ade996caba3087555372a Mon Sep 17 00:00:00 2001 From: Paul Grey Date: Wed, 27 Nov 2024 11:13:26 -0500 Subject: [PATCH] NIFI-14053 PublishKafka should allow specification of KafkaKey when using FlowFile publish strategy Signed-off-by: Matt Burgess This closes #9557 --- .../nifi/kafka/processors/PublishKafkaIT.java | 8 +++++--- .../nifi/kafka/processors/PublishKafka.java | 20 +++++++++---------- .../FlowFileStreamKafkaRecordConverter.java | 7 +++++-- 3 files changed, 19 insertions(+), 16 deletions(-) diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaIT.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaIT.java index 40bf041a0a51..230881543220 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaIT.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaIT.java @@ -35,10 +35,11 @@ import java.util.Map; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNull; @TestMethodOrder(MethodOrderer.MethodName.class) public class PublishKafkaIT extends AbstractPublishKafkaIT { + private static final String TEST_KEY_ATTRIBUTE = "my-key"; + private static final String TEST_KEY_VALUE = "some-key-value"; private static final String TEST_RECORD_VALUE = "value-" + System.currentTimeMillis(); @Test @@ -48,11 +49,12 @@ public void test_1_KafkaTestContainerProduceOne() throws InitializationException runner.setProperty(PublishKafka.CONNECTION_SERVICE, addKafkaConnectionService(runner)); runner.setProperty(PublishKafka.TOPIC_NAME, getClass().getName()); runner.setProperty(PublishKafka.ATTRIBUTE_HEADER_PATTERN, "a.*"); - //runner.setProperty(PublishKafka.USE_TRANSACTIONS, Boolean.FALSE.toString()); + runner.setProperty(PublishKafka.KAFKA_KEY, TEST_KEY_ATTRIBUTE); final Map attributes = new HashMap<>(); attributes.put("a1", "valueA1"); attributes.put("b1", "valueB1"); + attributes.put(TEST_KEY_ATTRIBUTE, TEST_KEY_VALUE); runner.enqueue(TEST_RECORD_VALUE, attributes); runner.run(); @@ -66,7 +68,7 @@ public void test_2_KafkaTestContainerConsumeOne() { final ConsumerRecords records = consumer.poll(DURATION_POLL); assertEquals(1, records.count()); final ConsumerRecord record = records.iterator().next(); - assertNull(record.key()); + assertEquals(TEST_KEY_VALUE, record.key()); assertEquals(TEST_RECORD_VALUE, record.value()); final List
headers = Arrays.asList(record.headers().toArray()); assertEquals(1, headers.size()); diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/PublishKafka.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/PublishKafka.java index c2844be622a2..ec61876aa130 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/PublishKafka.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/PublishKafka.java @@ -259,7 +259,6 @@ public class PublishKafka extends AbstractProcessor implements KafkaPublishCompo .required(false) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .dependsOn(PUBLISH_STRATEGY, PublishStrategy.USE_WRAPPER) .build(); static final PropertyDescriptor KEY_ATTRIBUTE_ENCODING = new PropertyDescriptor.Builder() @@ -526,18 +525,17 @@ private KafkaRecordConverter getKafkaRecordConverter(final ProcessContext contex final int maxMessageSize = context.getProperty(MAX_REQUEST_SIZE).asDataSize(DataUnit.B).intValue(); - if (readerFactory != null && writerFactory != null) { - final RecordSetWriterFactory keyWriterFactory = context.getProperty(RECORD_KEY_WRITER).asControllerService(RecordSetWriterFactory.class); - final PublishStrategy publishStrategy = PublishStrategy.valueOf(context.getProperty(PUBLISH_STRATEGY).getValue()); - final RecordMetadataStrategy metadataStrategy = RecordMetadataStrategy.valueOf(context.getProperty(RECORD_METADATA_STRATEGY).getValue()); - - final String kafkaKeyAttribute = context.getProperty(KAFKA_KEY).getValue(); - final String keyAttributeEncoding = context.getProperty(KEY_ATTRIBUTE_ENCODING).getValue(); - final String messageKeyField = context.getProperty(MESSAGE_KEY_FIELD).evaluateAttributeExpressions(flowFile).getValue(); - final KeyFactory keyFactory = ((PublishStrategy.USE_VALUE == publishStrategy) && (messageKeyField != null)) + final RecordSetWriterFactory keyWriterFactory = context.getProperty(RECORD_KEY_WRITER).asControllerService(RecordSetWriterFactory.class); + final PublishStrategy publishStrategy = PublishStrategy.valueOf(context.getProperty(PUBLISH_STRATEGY).getValue()); + final String kafkaKeyAttribute = context.getProperty(KAFKA_KEY).getValue(); + final String keyAttributeEncoding = context.getProperty(KEY_ATTRIBUTE_ENCODING).getValue(); + final String messageKeyField = context.getProperty(MESSAGE_KEY_FIELD).evaluateAttributeExpressions(flowFile).getValue(); + final KeyFactory keyFactory = ((PublishStrategy.USE_VALUE == publishStrategy) && (messageKeyField != null)) ? new MessageKeyFactory(flowFile, messageKeyField, keyWriterFactory, getLogger()) : new AttributeKeyFactory(kafkaKeyAttribute, keyAttributeEncoding); + if (readerFactory != null && writerFactory != null) { + final RecordMetadataStrategy metadataStrategy = RecordMetadataStrategy.valueOf(context.getProperty(RECORD_METADATA_STRATEGY).getValue()); if (publishStrategy == PublishStrategy.USE_WRAPPER) { return new RecordWrapperStreamKafkaRecordConverter(flowFile, metadataStrategy, readerFactory, writerFactory, keyWriterFactory, maxMessageSize, getLogger()); } else { @@ -551,7 +549,7 @@ private KafkaRecordConverter getKafkaRecordConverter(final ProcessContext contex return new DelimitedStreamKafkaRecordConverter(demarcator.getBytes(StandardCharsets.UTF_8), maxMessageSize, headersFactory); } - return new FlowFileStreamKafkaRecordConverter(maxMessageSize, headersFactory); + return new FlowFileStreamKafkaRecordConverter(maxMessageSize, headersFactory, keyFactory); } diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/producer/convert/FlowFileStreamKafkaRecordConverter.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/producer/convert/FlowFileStreamKafkaRecordConverter.java index f6e2d3528762..411f338cdb81 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/producer/convert/FlowFileStreamKafkaRecordConverter.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/producer/convert/FlowFileStreamKafkaRecordConverter.java @@ -18,6 +18,7 @@ import org.apache.nifi.kafka.processors.producer.common.ProducerUtils; import org.apache.nifi.kafka.processors.producer.header.HeadersFactory; +import org.apache.nifi.kafka.processors.producer.key.KeyFactory; import org.apache.nifi.kafka.service.api.record.KafkaRecord; import java.io.ByteArrayOutputStream; @@ -34,10 +35,12 @@ public class FlowFileStreamKafkaRecordConverter implements KafkaRecordConverter { final int maxMessageSize; final HeadersFactory headersFactory; + final KeyFactory keyFactory; - public FlowFileStreamKafkaRecordConverter(final int maxMessageSize, final HeadersFactory headersFactory) { + public FlowFileStreamKafkaRecordConverter(final int maxMessageSize, final HeadersFactory headersFactory, final KeyFactory keyFactory) { this.maxMessageSize = maxMessageSize; this.headersFactory = headersFactory; + this.keyFactory = keyFactory; } @Override @@ -50,7 +53,7 @@ public Iterator convert(final Map attributes, final recordBytes = baos.toByteArray(); } - final KafkaRecord kafkaRecord = new KafkaRecord(null, null, null, null, recordBytes, headersFactory.getHeaders(attributes)); + final KafkaRecord kafkaRecord = new KafkaRecord(null, null, null, keyFactory.getKey(attributes, null), recordBytes, headersFactory.getHeaders(attributes)); return List.of(kafkaRecord).iterator(); } }