diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatterFactory.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatterFactory.java index b280bdde48..fcfcfa0c17 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatterFactory.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatterFactory.java @@ -24,7 +24,7 @@ */ public class EntryFormatterFactory { - enum EntryFormat { + public enum EntryFormat { PULSAR, KAFKA, MIXED_KAFKA diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLog.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLog.java index f3b5ffd33c..619866e472 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLog.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLog.java @@ -112,6 +112,7 @@ class AnalyzeResult { public class PartitionLog { public static final String KAFKA_TOPIC_UUID_PROPERTY_NAME = "kafkaTopicUUID"; + public static final String KAFKA_ENTRY_FORMATTER_PROPERTY_NAME = "kafkaEntryFormat"; private static final String PID_PREFIX = "KOP-PID-PREFIX"; private static final KopLogValidator.CompressionCodec DEFAULT_COMPRESSION = @@ -252,7 +253,8 @@ private CompletableFuture> fetchTopicProperties(Optional topicProperties) { final String entryFormat; if (topicProperties != null) { - entryFormat = topicProperties.getOrDefault("kafkaEntryFormat", kafkaConfig.getEntryFormat()); + entryFormat = topicProperties + .getOrDefault(KAFKA_ENTRY_FORMATTER_PROPERTY_NAME, kafkaConfig.getEntryFormat()); } else { entryFormat = kafkaConfig.getEntryFormat(); } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/MetadataUtils.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/MetadataUtils.java index 017039a1e0..4c7d972ccc 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/MetadataUtils.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/MetadataUtils.java @@ -15,8 +15,11 @@ import com.google.common.collect.Sets; import io.streamnative.pulsar.handlers.kop.KafkaServiceConfiguration; +import io.streamnative.pulsar.handlers.kop.format.EntryFormatterFactory; +import io.streamnative.pulsar.handlers.kop.storage.PartitionLog; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Set; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.internals.Topic; @@ -332,10 +335,12 @@ private static void createTopicIfNotExist(final KafkaServiceConfiguration conf, final String topic, final int numPartitions, final boolean partitioned) throws PulsarAdminException { + Map properties = Map.of( + PartitionLog.KAFKA_ENTRY_FORMATTER_PROPERTY_NAME, EntryFormatterFactory.EntryFormat.PULSAR.name()); if (partitioned) { log.info("Creating partitioned topic {} (with {} partitions) if it does not exist", topic, numPartitions); try { - admin.topics().createPartitionedTopic(topic, numPartitions); + admin.topics().createPartitionedTopic(topic, numPartitions, properties); } catch (PulsarAdminException.ConflictException e) { log.info("Resources concurrent creating for topic : {}, caused by : {}", topic, e.getMessage()); } @@ -347,7 +352,7 @@ private static void createTopicIfNotExist(final KafkaServiceConfiguration conf, } else { log.info("Creating non-partitioned topic {}-{} if it does not exist", topic, numPartitions); try { - admin.topics().createNonPartitionedTopic(topic); + admin.topics().createNonPartitionedTopic(topic, properties); } catch (PulsarAdminException.ConflictException e) { log.info("Resources concurrent creating for topic : {}, caused by : {}", topic, e.getMessage()); } diff --git a/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/utils/MetadataUtilsTest.java b/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/utils/MetadataUtilsTest.java index 1ed3f7607c..726266ca6f 100644 --- a/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/utils/MetadataUtilsTest.java +++ b/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/utils/MetadataUtilsTest.java @@ -126,11 +126,11 @@ public void testCreateKafkaMetadataIfMissing() throws Exception { verify(mockNamespaces, times(1)).setNamespaceMessageTTL(eq(conf.getKafkaMetadataTenant() + "/" + conf.getKafkaMetadataNamespace()), any(Integer.class)); verify(mockTopics, times(1)).createPartitionedTopic( - eq(offsetsTopic.getFullName()), eq(conf.getOffsetsTopicNumPartitions())); + eq(offsetsTopic.getFullName()), eq(conf.getOffsetsTopicNumPartitions()), any()); verify(mockTopics, times(1)).createPartitionedTopic( - eq(txnTopic.getFullName()), eq(conf.getKafkaTxnLogTopicNumPartitions())); + eq(txnTopic.getFullName()), eq(conf.getKafkaTxnLogTopicNumPartitions()), any()); verify(mockTopics, times(1)).createPartitionedTopic( - eq(txnProducerStateTopic.getFullName()), eq(conf.getKafkaTxnProducerStateTopicNumPartitions())); + eq(txnProducerStateTopic.getFullName()), eq(conf.getKafkaTxnProducerStateTopicNumPartitions()), any()); // check user topics namespace doesn't set the policy verify(mockNamespaces, times(1)).createNamespace(eq(conf.getKafkaTenant() + "/" + conf.getKafkaNamespace()), any(Set.class));