From 6b5991873b2dd0ccadb54bd0ab334c52095f5659 Mon Sep 17 00:00:00 2001 From: wenbingshen Date: Thu, 29 Aug 2024 21:32:01 +0800 Subject: [PATCH 1/2] fix batch message data loss --- .../enumerator/cursor/CursorPosition.java | 12 +++++++ .../reader/PulsarPartitionSplitReader.java | 8 ++++- .../PulsarPartitionSplitReaderTest.java | 34 ++++++++++++++++++- .../table/PulsarChangelogTableITCase.java | 2 +- .../pulsar/table/PulsarTableITCase.java | 2 +- .../runtime/PulsarRuntimeOperator.java | 31 ++++++++++------- .../KeyedPulsarPartitionDataWriter.java | 4 +-- .../writer/PulsarPartitionDataWriter.java | 2 +- 8 files changed, 75 insertions(+), 20 deletions(-) diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/CursorPosition.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/CursorPosition.java index 72fc6147..c2765345 100644 --- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/CursorPosition.java +++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/CursorPosition.java @@ -28,6 +28,7 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.schema.GenericRecord; +import org.apache.pulsar.client.impl.BatchMessageIdImpl; import org.apache.pulsar.client.impl.ChunkMessageIdImpl; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.schema.AutoConsumeSchema; @@ -104,6 +105,17 @@ private MessageId getActualMessageId(MessageIdAdv messageIdImpl) { if (include) { return messageIdImpl; } else { + // if the message is batched, should return next single message in current batch. + if (messageIdImpl.getBatchIndex() >= 0 && messageIdImpl.getBatchSize() > 0 + && messageIdImpl.getBatchIndex() != messageIdImpl.getBatchSize() - 1) { + return new BatchMessageIdImpl(messageIdImpl.getLedgerId(), + messageIdImpl.getEntryId(), + messageIdImpl.getPartitionIndex(), + messageIdImpl.getBatchIndex() + 1, + messageIdImpl.getBatchSize(), + messageIdImpl.getAckSet()); + } + // if the (ledgerId, entryId + 1) is not valid // pulsar broker will automatically set the cursor to the next valid message return new MessageIdImpl( diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java index 38a7adb3..c7ccf45b 100644 --- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java +++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java @@ -49,6 +49,7 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.BatchMessageIdImpl; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.shade.com.google.common.base.Strings; import org.slf4j.Logger; @@ -196,7 +197,12 @@ public void handleSplitsChanges(SplitsChange splitsChanges MessageId latestConsumedId = registeredSplit.getLatestConsumedId(); if (latestConsumedId != null) { - LOG.info("Reset subscription position by the checkpoint {}", latestConsumedId); + if (latestConsumedId instanceof BatchMessageIdImpl) { + LOG.info("Reset subscription position by the checkpoint {}, batchSize {}", + latestConsumedId, ((BatchMessageIdImpl) latestConsumedId).getBatchSize()); + } else { + LOG.info("Reset subscription position by the checkpoint {}", latestConsumedId); + } try { CursorPosition cursorPosition; if (latestConsumedId == MessageId.latest diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReaderTest.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReaderTest.java index fa061907..8083e335 100644 --- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReaderTest.java +++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReaderTest.java @@ -32,11 +32,13 @@ import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.impl.BatchMessageIdImpl; import org.apache.pulsar.client.impl.MessageIdImpl; import org.junit.jupiter.api.Test; import java.io.IOException; import java.util.ArrayList; +import java.util.BitSet; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -244,7 +246,7 @@ void consumeMessageCreatedBeforeHandleSplitsChangesAndUseSecondLastMessageWithou PulsarPartitionSplitReader splitReader = splitReader(); String topicName = randomAlphabetic(10); - operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10)); + operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10), 20, false); MessageIdImpl lastMessageId = (MessageIdImpl) operator() @@ -263,6 +265,36 @@ void consumeMessageCreatedBeforeHandleSplitsChangesAndUseSecondLastMessageWithou fetchedMessages(splitReader, 1, true); } + @Test + void consumeBatchMessageFromRecover() throws Exception { + PulsarPartitionSplitReader splitReader = splitReader(); + String topicName = randomAlphabetic(10); + + int numRecords = 20; + operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10), numRecords, true); + MessageIdImpl lastMessageId = + (MessageIdImpl) + operator() + .admin() + .topics() + .getLastMessageId(topicNameWithPartition(topicName, 0)); + // Pretend that we consumed the 2th message of the last batch Entry + int lastConsumedBatchIndex = 2; + BitSet ackSet = new BitSet(numRecords); + ackSet.set(0, numRecords); + BatchMessageIdImpl batchMessageId = + new BatchMessageIdImpl(lastMessageId.getLedgerId(), lastMessageId.getEntryId(), + lastMessageId.getPartitionIndex(), lastConsumedBatchIndex, numRecords, ackSet); + int expectedCount = numRecords - lastConsumedBatchIndex - 1; + // when recover, use exclusive startCursor + handleSplit( + splitReader, + topicName, + 0, + batchMessageId); + fetchedMessages(splitReader, expectedCount, true); + } + /** Create a split reader with max message 1, fetch timeout 1s. */ private PulsarPartitionSplitReader splitReader() { return new PulsarPartitionSplitReader( diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/PulsarChangelogTableITCase.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/PulsarChangelogTableITCase.java index da22ee8c..c448a087 100644 --- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/PulsarChangelogTableITCase.java +++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/PulsarChangelogTableITCase.java @@ -447,6 +447,6 @@ public void testPulsarMaxwellChangelogSource() throws Exception { } private void writeRecordsToPulsar(String topic, List lines) throws Exception { - pulsar.operator().sendMessages(topic, Schema.STRING, lines); + pulsar.operator().sendMessages(topic, Schema.STRING, lines, false); } } diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/PulsarTableITCase.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/PulsarTableITCase.java index 6538676a..eb839679 100644 --- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/PulsarTableITCase.java +++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/PulsarTableITCase.java @@ -292,7 +292,7 @@ void sendMessageWithPropertiesAndReadPropertiesMetadata() throws Exception { properties.put("key1", "value1"); properties.put("key2", "value2"); try (Producer producer = - pulsar.operator().createProducer(sourceTopic, Schema.STRING)) { + pulsar.operator().createProducer(sourceTopic, Schema.STRING, false)) { producer.newMessage().value(value).properties(properties).send(); } diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java index 060eeeab..3eef7b3c 100644 --- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java +++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java @@ -50,6 +50,8 @@ import java.util.HashSet; import java.util.List; import java.util.Random; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import java.util.stream.Stream; @@ -146,7 +148,7 @@ public void setupTopic(String topic) throws Exception { */ public void setupTopic(String topic, Schema schema, Supplier supplier) throws Exception { - setupTopic(topic, schema, supplier, NUM_RECORDS_PER_PARTITION); + setupTopic(topic, schema, supplier, NUM_RECORDS_PER_PARTITION, false); } /** @@ -159,7 +161,7 @@ public void setupTopic(String topic, Schema schema, Supplier supplier) * @param numRecordsPerSplit The number of records for a partition. */ public void setupTopic( - String topic, Schema schema, Supplier supplier, int numRecordsPerSplit) + String topic, Schema schema, Supplier supplier, int numRecordsPerSplit, boolean enableBatch) throws Exception { String topicName = topicName(topic); createTopic(topicName, DEFAULT_PARTITIONS); @@ -170,7 +172,7 @@ public void setupTopic( List messages = Stream.generate(supplier).limit(numRecordsPerSplit).collect(toList()); - sendMessages(partitionName, schema, messages); + sendMessages(partitionName, schema, messages, enableBatch); } } @@ -250,7 +252,7 @@ public List topicInfo(String topic) throws Exception { * @return message id. */ public MessageId sendMessage(String topic, Schema schema, T message) throws Exception { - List messageIds = sendMessages(topic, schema, singletonList(message)); + List messageIds = sendMessages(topic, schema, singletonList(message), false); checkArgument(messageIds.size() == 1); return messageIds.get(0); @@ -268,7 +270,7 @@ public MessageId sendMessage(String topic, Schema schema, T message) thro */ public MessageId sendMessage(String topic, Schema schema, String key, T message) throws Exception { - List messageIds = sendMessages(topic, schema, key, singletonList(message)); + List messageIds = sendMessages(topic, schema, key, singletonList(message), false); checkArgument(messageIds.size() == 1); return messageIds.get(0); @@ -283,9 +285,9 @@ public MessageId sendMessage(String topic, Schema schema, String key, T m * @param The type of the record. * @return message id. */ - public List sendMessages(String topic, Schema schema, Collection messages) + public List sendMessages(String topic, Schema schema, Collection messages, boolean enableBatch) throws Exception { - return sendMessages(topic, schema, null, messages); + return sendMessages(topic, schema, null, messages, enableBatch); } /** @@ -299,16 +301,18 @@ public List sendMessages(String topic, Schema schema, Collecti * @return message id. */ public List sendMessages( - String topic, Schema schema, String key, Collection messages) throws Exception { - try (Producer producer = createProducer(topic, schema)) { + String topic, Schema schema, String key, Collection messages, boolean enableBatch) throws Exception { + try (Producer producer = createProducer(topic, schema, enableBatch)) { List messageIds = new ArrayList<>(messages.size()); for (T message : messages) { TypedMessageBuilder builder = producer.newMessage().value(message); if (!Strings.isNullOrEmpty(key)) { builder.key(key); } - MessageId messageId = builder.send(); - messageIds.add(messageId); + final CompletableFuture messageIdCompletableFuture = builder.sendAsync(); + messageIdCompletableFuture.whenComplete((messageId, ignore) -> { + messageIds.add(messageId); + }); } producer.flush(); return messageIds; @@ -479,12 +483,13 @@ private void createPartitionedTopic(String topic, int numberOfPartitions) throws } } - public Producer createProducer(String topic, Schema schema) throws Exception { + public Producer createProducer(String topic, Schema schema, boolean enableBatch) throws Exception { return client().newProducer(schema) .topic(topic) - .enableBatching(false) + .enableBatching(enableBatch) .enableMultiSchema(true) .accessMode(Shared) + .batchingMaxPublishDelay(10, TimeUnit.SECONDS) // Give enough time to assemble the batch .create(); } diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/writer/KeyedPulsarPartitionDataWriter.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/writer/KeyedPulsarPartitionDataWriter.java index 6d3c9afc..1dfea800 100644 --- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/writer/KeyedPulsarPartitionDataWriter.java +++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/writer/KeyedPulsarPartitionDataWriter.java @@ -55,10 +55,10 @@ public void writeRecords(List records) { try { // Send messages with the key we don't need. List newRecords = records.stream().map(a -> a + keyToRead).collect(toList()); - operator.sendMessages(fullTopicName, Schema.STRING, keyToExclude, newRecords); + operator.sendMessages(fullTopicName, Schema.STRING, keyToExclude, newRecords, false); // Send messages with the given key. - operator.sendMessages(fullTopicName, Schema.STRING, keyToRead, records); + operator.sendMessages(fullTopicName, Schema.STRING, keyToRead, records, false); } catch (Exception e) { throw new FlinkRuntimeException(e); } diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/writer/PulsarPartitionDataWriter.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/writer/PulsarPartitionDataWriter.java index 2d530193..af2da7d5 100644 --- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/writer/PulsarPartitionDataWriter.java +++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/writer/PulsarPartitionDataWriter.java @@ -46,7 +46,7 @@ public PulsarPartitionDataWriter( @Override public void writeRecords(List records) { try { - operator.sendMessages(fullTopicName, schema, records); + operator.sendMessages(fullTopicName, schema, records, false); } catch (Exception e) { throw new FlinkRuntimeException(e); } From 91883e0e7bde01dc2cd48c25921267adde000e16 Mon Sep 17 00:00:00 2001 From: wenbingshen Date: Fri, 30 Aug 2024 10:36:12 +0800 Subject: [PATCH 2/2] fix spotless --- .../enumerator/cursor/CursorPosition.java | 6 ++-- .../reader/PulsarPartitionSplitReader.java | 6 ++-- .../PulsarPartitionSplitReaderTest.java | 15 +++++----- .../runtime/PulsarRuntimeOperator.java | 28 +++++++++++++------ 4 files changed, 35 insertions(+), 20 deletions(-) diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/CursorPosition.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/CursorPosition.java index c2765345..8d92ac5e 100644 --- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/CursorPosition.java +++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/CursorPosition.java @@ -106,9 +106,11 @@ private MessageId getActualMessageId(MessageIdAdv messageIdImpl) { return messageIdImpl; } else { // if the message is batched, should return next single message in current batch. - if (messageIdImpl.getBatchIndex() >= 0 && messageIdImpl.getBatchSize() > 0 + if (messageIdImpl.getBatchIndex() >= 0 + && messageIdImpl.getBatchSize() > 0 && messageIdImpl.getBatchIndex() != messageIdImpl.getBatchSize() - 1) { - return new BatchMessageIdImpl(messageIdImpl.getLedgerId(), + return new BatchMessageIdImpl( + messageIdImpl.getLedgerId(), messageIdImpl.getEntryId(), messageIdImpl.getPartitionIndex(), messageIdImpl.getBatchIndex() + 1, diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java index c7ccf45b..2b196e30 100644 --- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java +++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java @@ -198,8 +198,10 @@ public void handleSplitsChanges(SplitsChange splitsChanges if (latestConsumedId != null) { if (latestConsumedId instanceof BatchMessageIdImpl) { - LOG.info("Reset subscription position by the checkpoint {}, batchSize {}", - latestConsumedId, ((BatchMessageIdImpl) latestConsumedId).getBatchSize()); + LOG.info( + "Reset subscription position by the checkpoint {}, batchSize {}", + latestConsumedId, + ((BatchMessageIdImpl) latestConsumedId).getBatchSize()); } else { LOG.info("Reset subscription position by the checkpoint {}", latestConsumedId); } diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReaderTest.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReaderTest.java index 8083e335..b2ef7d99 100644 --- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReaderTest.java +++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReaderTest.java @@ -283,15 +283,16 @@ void consumeBatchMessageFromRecover() throws Exception { BitSet ackSet = new BitSet(numRecords); ackSet.set(0, numRecords); BatchMessageIdImpl batchMessageId = - new BatchMessageIdImpl(lastMessageId.getLedgerId(), lastMessageId.getEntryId(), - lastMessageId.getPartitionIndex(), lastConsumedBatchIndex, numRecords, ackSet); + new BatchMessageIdImpl( + lastMessageId.getLedgerId(), + lastMessageId.getEntryId(), + lastMessageId.getPartitionIndex(), + lastConsumedBatchIndex, + numRecords, + ackSet); int expectedCount = numRecords - lastConsumedBatchIndex - 1; // when recover, use exclusive startCursor - handleSplit( - splitReader, - topicName, - 0, - batchMessageId); + handleSplit(splitReader, topicName, 0, batchMessageId); fetchedMessages(splitReader, expectedCount, true); } diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java index 3eef7b3c..21080680 100644 --- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java +++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java @@ -161,7 +161,11 @@ public void setupTopic(String topic, Schema schema, Supplier supplier) * @param numRecordsPerSplit The number of records for a partition. */ public void setupTopic( - String topic, Schema schema, Supplier supplier, int numRecordsPerSplit, boolean enableBatch) + String topic, + Schema schema, + Supplier supplier, + int numRecordsPerSplit, + boolean enableBatch) throws Exception { String topicName = topicName(topic); createTopic(topicName, DEFAULT_PARTITIONS); @@ -270,7 +274,8 @@ public MessageId sendMessage(String topic, Schema schema, T message) thro */ public MessageId sendMessage(String topic, Schema schema, String key, T message) throws Exception { - List messageIds = sendMessages(topic, schema, key, singletonList(message), false); + List messageIds = + sendMessages(topic, schema, key, singletonList(message), false); checkArgument(messageIds.size() == 1); return messageIds.get(0); @@ -285,7 +290,8 @@ public MessageId sendMessage(String topic, Schema schema, String key, T m * @param The type of the record. * @return message id. */ - public List sendMessages(String topic, Schema schema, Collection messages, boolean enableBatch) + public List sendMessages( + String topic, Schema schema, Collection messages, boolean enableBatch) throws Exception { return sendMessages(topic, schema, null, messages, enableBatch); } @@ -301,7 +307,8 @@ public List sendMessages(String topic, Schema schema, Collecti * @return message id. */ public List sendMessages( - String topic, Schema schema, String key, Collection messages, boolean enableBatch) throws Exception { + String topic, Schema schema, String key, Collection messages, boolean enableBatch) + throws Exception { try (Producer producer = createProducer(topic, schema, enableBatch)) { List messageIds = new ArrayList<>(messages.size()); for (T message : messages) { @@ -310,9 +317,10 @@ public List sendMessages( builder.key(key); } final CompletableFuture messageIdCompletableFuture = builder.sendAsync(); - messageIdCompletableFuture.whenComplete((messageId, ignore) -> { - messageIds.add(messageId); - }); + messageIdCompletableFuture.whenComplete( + (messageId, ignore) -> { + messageIds.add(messageId); + }); } producer.flush(); return messageIds; @@ -483,13 +491,15 @@ private void createPartitionedTopic(String topic, int numberOfPartitions) throws } } - public Producer createProducer(String topic, Schema schema, boolean enableBatch) throws Exception { + public Producer createProducer(String topic, Schema schema, boolean enableBatch) + throws Exception { return client().newProducer(schema) .topic(topic) .enableBatching(enableBatch) .enableMultiSchema(true) .accessMode(Shared) - .batchingMaxPublishDelay(10, TimeUnit.SECONDS) // Give enough time to assemble the batch + .batchingMaxPublishDelay( + 10, TimeUnit.SECONDS) // Give enough time to assemble the batch .create(); }