From e0dbf3196258ebe8763ec929803612fdd9dbcb20 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Mon, 4 Jan 2021 20:56:32 +0800 Subject: [PATCH] Fix PulsarEntryFormatter not handle null value correctly --- .../kop/format/PulsarEntryFormatter.java | 8 +- .../handlers/kop/BasicEndToEndKafkaTest.java | 50 +++++ .../handlers/kop/BasicEndToEndPulsarTest.java | 65 ++++++ .../handlers/kop/BasicEndToEndTestBase.java | 196 ++++++++++++++++++ 4 files changed, 317 insertions(+), 2 deletions(-) create mode 100644 tests/src/test/java/io/streamnative/pulsar/handlers/kop/BasicEndToEndKafkaTest.java create mode 100644 tests/src/test/java/io/streamnative/pulsar/handlers/kop/BasicEndToEndPulsarTest.java create mode 100644 tests/src/test/java/io/streamnative/pulsar/handlers/kop/BasicEndToEndTestBase.java diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/PulsarEntryFormatter.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/PulsarEntryFormatter.java index 7a5117841b..7e120e9e5a 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/PulsarEntryFormatter.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/PulsarEntryFormatter.java @@ -22,6 +22,7 @@ import io.streamnative.pulsar.handlers.kop.utils.MessageIdUtils; import java.io.IOException; import java.io.UncheckedIOException; +import java.nio.ByteBuffer; import java.util.List; import java.util.stream.IntStream; import java.util.stream.StreamSupport; @@ -178,12 +179,15 @@ public MemoryRecords decode(final List entries, final byte magic) { SingleMessageMetadata singleMessageMetadata = singleMessageMetadataBuilder.build(); Header[] headers = getHeadersFromMetadata(singleMessageMetadata.getPropertiesList()); + final ByteBuffer value = (singleMessageMetadata.getNullValue()) + ? null + : ByteBufUtils.getNioBuffer(singleMessagePayload); builder.appendWithOffset( MessageIdUtils.getOffset(entry.getLedgerId(), entry.getEntryId(), i), msgMetadata.getEventTime() > 0 ? msgMetadata.getEventTime() : msgMetadata.getPublishTime(), ByteBufUtils.getKeyByteBuffer(singleMessageMetadata), - ByteBufUtils.getNioBuffer(singleMessagePayload), + value, headers); singleMessagePayload.release(); singleMessageMetadataBuilder.recycle(); @@ -239,7 +243,7 @@ private static MessageImpl recordToEntry(Record record) { record.value().get(value); builder.value(value); } else { - builder.value(new byte[0]); + builder.value(null); } // sequence diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/BasicEndToEndKafkaTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/BasicEndToEndKafkaTest.java new file mode 100644 index 0000000000..743468b455 --- /dev/null +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/BasicEndToEndKafkaTest.java @@ -0,0 +1,50 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.kop; + +import static org.testng.Assert.assertEquals; + +import java.util.Arrays; +import java.util.List; +import lombok.Cleanup; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.testng.annotations.Test; + +/** + * Basic end-to-end test with `entryFormat=kafka`. + */ +public class BasicEndToEndKafkaTest extends BasicEndToEndTestBase { + + public BasicEndToEndKafkaTest() { + super("kafka"); + } + + @Test(timeOut = 20000) + public void testNullValueMessages() throws Exception { + final String topic = "test-produce-null-value"; + + @Cleanup + final KafkaProducer kafkaProducer = newKafkaProducer(); + sendSingleMessages(kafkaProducer, topic, Arrays.asList(null, "")); + sendBatchedMessages(kafkaProducer, topic, Arrays.asList("test", "", null)); + + final List expectedMessages = Arrays.asList(null, "", "test", "", null); + + @Cleanup + final KafkaConsumer kafkaConsumer = newKafkaConsumer(topic); + List kafkaReceives = receiveMessages(kafkaConsumer, expectedMessages.size()); + assertEquals(kafkaReceives, expectedMessages); + } +} diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/BasicEndToEndPulsarTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/BasicEndToEndPulsarTest.java new file mode 100644 index 0000000000..f36f45b473 --- /dev/null +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/BasicEndToEndPulsarTest.java @@ -0,0 +1,65 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.kop; + +import static org.testng.Assert.assertEquals; + +import java.util.Arrays; +import java.util.List; +import lombok.Cleanup; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Producer; +import org.testng.annotations.Test; + +/** + * Basic end-to-end test with `entryFormat=pulsar`. + */ +public class BasicEndToEndPulsarTest extends BasicEndToEndTestBase { + + public BasicEndToEndPulsarTest() { + super("pulsar"); + } + + @Test(timeOut = 20000) + public void testNullValueMessages() throws Exception { + final String topic = "test-produce-null-value"; + + @Cleanup + final KafkaProducer kafkaProducer = newKafkaProducer(); + sendSingleMessages(kafkaProducer, topic, Arrays.asList(null, "")); + sendBatchedMessages(kafkaProducer, topic, Arrays.asList("test", null, "")); + + @Cleanup + final Producer pulsarProducer = newPulsarProducer(topic); + sendSingleMessages(pulsarProducer, Arrays.asList(null, "")); + sendBatchedMessages(kafkaProducer, topic, Arrays.asList("test", null, "")); + + final List expectValues = Arrays.asList(null, "", "test", null, "", null, "", "test", null, ""); + + // TODO: Currently there's a bug with MultiTopicsConsumerImpl that it cannot receive messages with null + // value. So here we just subscribe a single partition with ConsumerImpl. + // See https://github.com/apache/pulsar/pull/9113 for details. + @Cleanup + final Consumer pulsarConsumer = newPulsarConsumer(topic + "-partition-0"); + List pulsarReceives = receiveMessages(pulsarConsumer, expectValues.size()); + assertEquals(pulsarReceives, expectValues); + + @Cleanup + final KafkaConsumer kafkaConsumer = newKafkaConsumer(topic); + List kafkaReceives = receiveMessages(kafkaConsumer, expectValues.size()); + assertEquals(kafkaReceives, expectValues); + } +} diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/BasicEndToEndTestBase.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/BasicEndToEndTestBase.java new file mode 100644 index 0000000000..cb29c0e2c3 --- /dev/null +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/BasicEndToEndTestBase.java @@ -0,0 +1,196 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.kop; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.client.impl.MessageIdImpl; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; + +/** + * Basic end-to-end test. + */ +@Slf4j +public class BasicEndToEndTestBase extends KopProtocolHandlerTestBase { + + public BasicEndToEndTestBase(final String entryFormat) { + super(entryFormat); + } + + @BeforeClass + @Override + protected void setup() throws Exception { + super.internalSetup(); + } + + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + private String bootstrapServers() { + return "localhost:" + getKafkaBrokerPort(); + } + + protected KafkaProducer newKafkaProducer() { + final Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers()); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + return new KafkaProducer<>(props); + } + + protected KafkaConsumer newKafkaConsumer(final String topic) { + final Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers()); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group"); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + + final KafkaConsumer consumer = new KafkaConsumer<>(props); + consumer.subscribe(Collections.singleton(topic)); + return consumer; + } + + protected Producer newPulsarProducer(final String topic) throws PulsarClientException { + return pulsarClient.newProducer().topic(topic).create(); + } + + protected Consumer newPulsarConsumer(final String topic) throws PulsarClientException { + return pulsarClient.newConsumer().topic(topic) + .subscriptionName("pulsar-sub") + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + } + + private static String format(final String s) { + return (s == null) ? "null" : ("'" + s + "'"); + } + + protected void sendSingleMessages(final KafkaProducer producer, + final String topic, + final List values) throws ExecutionException, InterruptedException { + for (String value : values) { + producer.send(new ProducerRecord<>(topic, value), (metadata, exception) -> { + Assert.assertNull(exception); + if (log.isDebugEnabled()) { + log.debug("KafkaProducer send {} to {}-partition-{}@{}", + format(value), metadata.topic(), metadata.partition(), metadata.offset()); + } + }).get(); + } + } + + protected void sendSingleMessages(final Producer producer, final List values) + throws PulsarClientException { + for (String value : values) { + final MessageId messageId = producer.newMessage().value((value == null) ? null : value.getBytes()).send(); + if (log.isDebugEnabled()) { + final MessageIdImpl impl = (MessageIdImpl) messageId; + log.debug("Pulsar Producer send {} to ({}, {})", format(value), impl.getLedgerId(), impl.getEntryId()); + } + } + } + + protected void sendBatchedMessages(final KafkaProducer producer, + final String topic, + final List values) throws ExecutionException, InterruptedException { + Future future = null; + for (String value : values) { + future = producer.send(new ProducerRecord<>(topic, value), (metadata, exception) -> { + Assert.assertNull(exception); + if (log.isDebugEnabled()) { + log.debug("KafkaProducer send {} to {}-partition-{}@{}", + format(value), metadata.topic(), metadata.partition(), metadata.offset()); + } + }); + } + Assert.assertNotNull(future); + future.get(); + } + + protected void sendBatchedMessages(final Producer producer, final List values) + throws ExecutionException, InterruptedException { + CompletableFuture future = null; + for (String value : values) { + future = producer.newMessage().value((value == null) ? null : value.getBytes()).sendAsync() + .thenApply(messageId -> { + if (log.isDebugEnabled()) { + final MessageIdImpl impl = (MessageIdImpl) messageId; + log.debug("Pulsar Producer send {} to ({}, {})", + format(value), impl.getLedgerId(), impl.getEntryId()); + } + return null; + }); + } + Assert.assertNotNull(future); + future.get(); + } + + protected List receiveMessages(final KafkaConsumer consumer, int numMessages) { + List values = new ArrayList<>(); + while (numMessages > 0) { + for (ConsumerRecord record : consumer.poll(Duration.ofMillis(100))) { + if (log.isDebugEnabled()) { + log.debug("KafkaConsumer receive: {}", record.value()); + } + values.add(record.value()); + numMessages--; + } + } + return values; + } + + protected List receiveMessages(final Consumer consumer, int numMessages) + throws PulsarClientException { + List values = new ArrayList<>(); + while (numMessages > 0) { + Message message = consumer.receive(100, TimeUnit.MILLISECONDS); + if (message != null) { + final byte[] value = message.getValue(); + values.add((value == null) ? null : new String(value)); + if (log.isDebugEnabled()) { + log.debug("Pulsar Consumer receive: {}", values.get(values.size() - 1)); + } + } + numMessages--; + } + return values; + } +}