diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/CursorPosition.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/CursorPosition.java index 72fc6147..8d92ac5e 100644 --- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/CursorPosition.java +++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/CursorPosition.java @@ -28,6 +28,7 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.schema.GenericRecord; +import org.apache.pulsar.client.impl.BatchMessageIdImpl; import org.apache.pulsar.client.impl.ChunkMessageIdImpl; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.schema.AutoConsumeSchema; @@ -104,6 +105,19 @@ private MessageId getActualMessageId(MessageIdAdv messageIdImpl) { if (include) { return messageIdImpl; } else { + // if the message is batched, should return next single message in current batch. + if (messageIdImpl.getBatchIndex() >= 0 + && messageIdImpl.getBatchSize() > 0 + && messageIdImpl.getBatchIndex() != messageIdImpl.getBatchSize() - 1) { + return new BatchMessageIdImpl( + messageIdImpl.getLedgerId(), + messageIdImpl.getEntryId(), + messageIdImpl.getPartitionIndex(), + messageIdImpl.getBatchIndex() + 1, + messageIdImpl.getBatchSize(), + messageIdImpl.getAckSet()); + } + // if the (ledgerId, entryId + 1) is not valid // pulsar broker will automatically set the cursor to the next valid message return new MessageIdImpl( diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java index 38a7adb3..2b196e30 100644 --- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java +++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java @@ -49,6 +49,7 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.BatchMessageIdImpl; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.shade.com.google.common.base.Strings; import org.slf4j.Logger; @@ -196,7 +197,14 @@ public void handleSplitsChanges(SplitsChange splitsChanges MessageId latestConsumedId = registeredSplit.getLatestConsumedId(); if (latestConsumedId != null) { - LOG.info("Reset subscription position by the checkpoint {}", latestConsumedId); + if (latestConsumedId instanceof BatchMessageIdImpl) { + LOG.info( + "Reset subscription position by the checkpoint {}, batchSize {}", + latestConsumedId, + ((BatchMessageIdImpl) latestConsumedId).getBatchSize()); + } else { + LOG.info("Reset subscription position by the checkpoint {}", latestConsumedId); + } try { CursorPosition cursorPosition; if (latestConsumedId == MessageId.latest diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReaderTest.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReaderTest.java index fa061907..b2ef7d99 100644 --- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReaderTest.java +++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReaderTest.java @@ -32,11 +32,13 @@ import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.impl.BatchMessageIdImpl; import org.apache.pulsar.client.impl.MessageIdImpl; import org.junit.jupiter.api.Test; import java.io.IOException; import java.util.ArrayList; +import java.util.BitSet; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -244,7 +246,7 @@ void consumeMessageCreatedBeforeHandleSplitsChangesAndUseSecondLastMessageWithou PulsarPartitionSplitReader splitReader = splitReader(); String topicName = randomAlphabetic(10); - operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10)); + operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10), 20, false); MessageIdImpl lastMessageId = (MessageIdImpl) operator() @@ -263,6 +265,37 @@ void consumeMessageCreatedBeforeHandleSplitsChangesAndUseSecondLastMessageWithou fetchedMessages(splitReader, 1, true); } + @Test + void consumeBatchMessageFromRecover() throws Exception { + PulsarPartitionSplitReader splitReader = splitReader(); + String topicName = randomAlphabetic(10); + + int numRecords = 20; + operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10), numRecords, true); + MessageIdImpl lastMessageId = + (MessageIdImpl) + operator() + .admin() + .topics() + .getLastMessageId(topicNameWithPartition(topicName, 0)); + // Pretend that we consumed the 2th message of the last batch Entry + int lastConsumedBatchIndex = 2; + BitSet ackSet = new BitSet(numRecords); + ackSet.set(0, numRecords); + BatchMessageIdImpl batchMessageId = + new BatchMessageIdImpl( + lastMessageId.getLedgerId(), + lastMessageId.getEntryId(), + lastMessageId.getPartitionIndex(), + lastConsumedBatchIndex, + numRecords, + ackSet); + int expectedCount = numRecords - lastConsumedBatchIndex - 1; + // when recover, use exclusive startCursor + handleSplit(splitReader, topicName, 0, batchMessageId); + fetchedMessages(splitReader, expectedCount, true); + } + /** Create a split reader with max message 1, fetch timeout 1s. */ private PulsarPartitionSplitReader splitReader() { return new PulsarPartitionSplitReader( diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/PulsarChangelogTableITCase.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/PulsarChangelogTableITCase.java index da22ee8c..c448a087 100644 --- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/PulsarChangelogTableITCase.java +++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/PulsarChangelogTableITCase.java @@ -447,6 +447,6 @@ public void testPulsarMaxwellChangelogSource() throws Exception { } private void writeRecordsToPulsar(String topic, List lines) throws Exception { - pulsar.operator().sendMessages(topic, Schema.STRING, lines); + pulsar.operator().sendMessages(topic, Schema.STRING, lines, false); } } diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/PulsarTableITCase.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/PulsarTableITCase.java index 6538676a..eb839679 100644 --- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/PulsarTableITCase.java +++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/PulsarTableITCase.java @@ -292,7 +292,7 @@ void sendMessageWithPropertiesAndReadPropertiesMetadata() throws Exception { properties.put("key1", "value1"); properties.put("key2", "value2"); try (Producer producer = - pulsar.operator().createProducer(sourceTopic, Schema.STRING)) { + pulsar.operator().createProducer(sourceTopic, Schema.STRING, false)) { producer.newMessage().value(value).properties(properties).send(); } diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java index 060eeeab..21080680 100644 --- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java +++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java @@ -50,6 +50,8 @@ import java.util.HashSet; import java.util.List; import java.util.Random; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import java.util.stream.Stream; @@ -146,7 +148,7 @@ public void setupTopic(String topic) throws Exception { */ public void setupTopic(String topic, Schema schema, Supplier supplier) throws Exception { - setupTopic(topic, schema, supplier, NUM_RECORDS_PER_PARTITION); + setupTopic(topic, schema, supplier, NUM_RECORDS_PER_PARTITION, false); } /** @@ -159,7 +161,11 @@ public void setupTopic(String topic, Schema schema, Supplier supplier) * @param numRecordsPerSplit The number of records for a partition. */ public void setupTopic( - String topic, Schema schema, Supplier supplier, int numRecordsPerSplit) + String topic, + Schema schema, + Supplier supplier, + int numRecordsPerSplit, + boolean enableBatch) throws Exception { String topicName = topicName(topic); createTopic(topicName, DEFAULT_PARTITIONS); @@ -170,7 +176,7 @@ public void setupTopic( List messages = Stream.generate(supplier).limit(numRecordsPerSplit).collect(toList()); - sendMessages(partitionName, schema, messages); + sendMessages(partitionName, schema, messages, enableBatch); } } @@ -250,7 +256,7 @@ public List topicInfo(String topic) throws Exception { * @return message id. */ public MessageId sendMessage(String topic, Schema schema, T message) throws Exception { - List messageIds = sendMessages(topic, schema, singletonList(message)); + List messageIds = sendMessages(topic, schema, singletonList(message), false); checkArgument(messageIds.size() == 1); return messageIds.get(0); @@ -268,7 +274,8 @@ public MessageId sendMessage(String topic, Schema schema, T message) thro */ public MessageId sendMessage(String topic, Schema schema, String key, T message) throws Exception { - List messageIds = sendMessages(topic, schema, key, singletonList(message)); + List messageIds = + sendMessages(topic, schema, key, singletonList(message), false); checkArgument(messageIds.size() == 1); return messageIds.get(0); @@ -283,9 +290,10 @@ public MessageId sendMessage(String topic, Schema schema, String key, T m * @param The type of the record. * @return message id. */ - public List sendMessages(String topic, Schema schema, Collection messages) + public List sendMessages( + String topic, Schema schema, Collection messages, boolean enableBatch) throws Exception { - return sendMessages(topic, schema, null, messages); + return sendMessages(topic, schema, null, messages, enableBatch); } /** @@ -299,16 +307,20 @@ public List sendMessages(String topic, Schema schema, Collecti * @return message id. */ public List sendMessages( - String topic, Schema schema, String key, Collection messages) throws Exception { - try (Producer producer = createProducer(topic, schema)) { + String topic, Schema schema, String key, Collection messages, boolean enableBatch) + throws Exception { + try (Producer producer = createProducer(topic, schema, enableBatch)) { List messageIds = new ArrayList<>(messages.size()); for (T message : messages) { TypedMessageBuilder builder = producer.newMessage().value(message); if (!Strings.isNullOrEmpty(key)) { builder.key(key); } - MessageId messageId = builder.send(); - messageIds.add(messageId); + final CompletableFuture messageIdCompletableFuture = builder.sendAsync(); + messageIdCompletableFuture.whenComplete( + (messageId, ignore) -> { + messageIds.add(messageId); + }); } producer.flush(); return messageIds; @@ -479,12 +491,15 @@ private void createPartitionedTopic(String topic, int numberOfPartitions) throws } } - public Producer createProducer(String topic, Schema schema) throws Exception { + public Producer createProducer(String topic, Schema schema, boolean enableBatch) + throws Exception { return client().newProducer(schema) .topic(topic) - .enableBatching(false) + .enableBatching(enableBatch) .enableMultiSchema(true) .accessMode(Shared) + .batchingMaxPublishDelay( + 10, TimeUnit.SECONDS) // Give enough time to assemble the batch .create(); } diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/writer/KeyedPulsarPartitionDataWriter.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/writer/KeyedPulsarPartitionDataWriter.java index 6d3c9afc..1dfea800 100644 --- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/writer/KeyedPulsarPartitionDataWriter.java +++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/writer/KeyedPulsarPartitionDataWriter.java @@ -55,10 +55,10 @@ public void writeRecords(List records) { try { // Send messages with the key we don't need. List newRecords = records.stream().map(a -> a + keyToRead).collect(toList()); - operator.sendMessages(fullTopicName, Schema.STRING, keyToExclude, newRecords); + operator.sendMessages(fullTopicName, Schema.STRING, keyToExclude, newRecords, false); // Send messages with the given key. - operator.sendMessages(fullTopicName, Schema.STRING, keyToRead, records); + operator.sendMessages(fullTopicName, Schema.STRING, keyToRead, records, false); } catch (Exception e) { throw new FlinkRuntimeException(e); } diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/writer/PulsarPartitionDataWriter.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/writer/PulsarPartitionDataWriter.java index 2d530193..af2da7d5 100644 --- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/writer/PulsarPartitionDataWriter.java +++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/writer/PulsarPartitionDataWriter.java @@ -46,7 +46,7 @@ public PulsarPartitionDataWriter( @Override public void writeRecords(List records) { try { - operator.sendMessages(fullTopicName, schema, records); + operator.sendMessages(fullTopicName, schema, records, false); } catch (Exception e) { throw new FlinkRuntimeException(e); }