Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-36180] Fix batch message data loss #95

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.ChunkMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
Expand Down Expand Up @@ -104,6 +105,19 @@ private MessageId getActualMessageId(MessageIdAdv messageIdImpl) {
if (include) {
return messageIdImpl;
} else {
// if the message is batched, should return next single message in current batch.
if (messageIdImpl.getBatchIndex() >= 0
&& messageIdImpl.getBatchSize() > 0
&& messageIdImpl.getBatchIndex() != messageIdImpl.getBatchSize() - 1) {
return new BatchMessageIdImpl(
messageIdImpl.getLedgerId(),
messageIdImpl.getEntryId(),
messageIdImpl.getPartitionIndex(),
messageIdImpl.getBatchIndex() + 1,
messageIdImpl.getBatchSize(),
messageIdImpl.getAckSet());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Batch Message is a quite complex feature. How could we verify the acknowledge is acked cumulately in batch message? It could be acked individually.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we change the act set here for making sure the current batch index has been acknowledged?

https://github.com/apache/pulsar/blob/dccc06bf50bb5ca510b39167908c02d2b4602ca5/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdAdvUtils.java#L50

There is no need to modify this AckSet, because we finally call Consumer seek. In the seek method, the messages before batchIndex will be cumulative acked. The AckSet here will not work regardless of its status.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Batch Message is a quite complex feature. How could we verify the acknowledge is acked cumulately in batch message? It could be acked individually.

  1. In flink-pulsar-connector, the receive queue of the consumer is set to 1, will the message before the current batchIndex not be confirmed?

  2. Even if a single message is confirmed, the current connector does not support BatchIndexAck.

  3. The seek operation during task failure recovery cannot guarantee that AckSet will work, as I said above.

  4. The current connector's ack behavior is cumulative confirmation

Finally, if we don't change the seeking behavior of CursorPosition, we won't be able to recover from AckSet regardless of the AckSet in the state.
The changes in this PR are valid under the current cumulative acknowledgment behavior.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. The receive queue setting has been exposed to the use with default value 1000
  2. IIUC, the support for the batch AckSet is achieved locally by the pulsar-client after all the batch message has been acked. (BTW, this shouldn't be touched by the connector user and developer, which should be promised by the pulsar client developer.)
  3. The recover is queried from the checkpoint saved MessageId. Which the AckSet is controlled internally by the client I think.

}

// if the (ledgerId, entryId + 1) is not valid
// pulsar broker will automatically set the cursor to the next valid message
return new MessageIdImpl(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.shade.com.google.common.base.Strings;
import org.slf4j.Logger;
Expand Down Expand Up @@ -196,7 +197,14 @@ public void handleSplitsChanges(SplitsChange<PulsarPartitionSplit> splitsChanges
MessageId latestConsumedId = registeredSplit.getLatestConsumedId();

if (latestConsumedId != null) {
LOG.info("Reset subscription position by the checkpoint {}", latestConsumedId);
if (latestConsumedId instanceof BatchMessageIdImpl) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer to use MessageIdAdv globally.

Copy link
Member Author

@wenbingshen wenbingshen Aug 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MessageIdAdv is inaccurate. It contains implementations such as MessageIdImpl, not BatchMessageId. Here we want to print out the correct batchSize.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can see that all the message implementation implement the MessageIdAdv interface. Which contains all the required information for the client. I think it's more better to use MessageIdAdv instead of the MessageId here in the whole connector.

/**
 * The {@link MessageId} interface provided for advanced users.
 * <p>
 * All built-in MessageId implementations should be able to be cast to MessageIdAdv.
 * </p>
 */

LOG.info(
"Reset subscription position by the checkpoint {}, batchSize {}",
latestConsumedId,
((BatchMessageIdImpl) latestConsumedId).getBatchSize());
} else {
LOG.info("Reset subscription position by the checkpoint {}", latestConsumedId);
}
try {
CursorPosition cursorPosition;
if (latestConsumedId == MessageId.latest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,13 @@

import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -244,7 +246,7 @@ void consumeMessageCreatedBeforeHandleSplitsChangesAndUseSecondLastMessageWithou
PulsarPartitionSplitReader splitReader = splitReader();
String topicName = randomAlphabetic(10);

operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10));
operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10), 20, false);
MessageIdImpl lastMessageId =
(MessageIdImpl)
operator()
Expand All @@ -263,6 +265,37 @@ void consumeMessageCreatedBeforeHandleSplitsChangesAndUseSecondLastMessageWithou
fetchedMessages(splitReader, 1, true);
}

@Test
void consumeBatchMessageFromRecover() throws Exception {
PulsarPartitionSplitReader splitReader = splitReader();
String topicName = randomAlphabetic(10);

int numRecords = 20;
operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10), numRecords, true);
MessageIdImpl lastMessageId =
(MessageIdImpl)
operator()
.admin()
.topics()
.getLastMessageId(topicNameWithPartition(topicName, 0));
// Pretend that we consumed the 2th message of the last batch Entry
int lastConsumedBatchIndex = 2;
BitSet ackSet = new BitSet(numRecords);
ackSet.set(0, numRecords);
BatchMessageIdImpl batchMessageId =
new BatchMessageIdImpl(
lastMessageId.getLedgerId(),
lastMessageId.getEntryId(),
lastMessageId.getPartitionIndex(),
lastConsumedBatchIndex,
numRecords,
ackSet);
int expectedCount = numRecords - lastConsumedBatchIndex - 1;
// when recover, use exclusive startCursor
handleSplit(splitReader, topicName, 0, batchMessageId);
fetchedMessages(splitReader, expectedCount, true);
}

/** Create a split reader with max message 1, fetch timeout 1s. */
private PulsarPartitionSplitReader splitReader() {
return new PulsarPartitionSplitReader(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,6 @@ public void testPulsarMaxwellChangelogSource() throws Exception {
}

private void writeRecordsToPulsar(String topic, List<String> lines) throws Exception {
pulsar.operator().sendMessages(topic, Schema.STRING, lines);
pulsar.operator().sendMessages(topic, Schema.STRING, lines, false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ void sendMessageWithPropertiesAndReadPropertiesMetadata() throws Exception {
properties.put("key1", "value1");
properties.put("key2", "value2");
try (Producer<String> producer =
pulsar.operator().createProducer(sourceTopic, Schema.STRING)) {
pulsar.operator().createProducer(sourceTopic, Schema.STRING, false)) {
producer.newMessage().value(value).properties(properties).send();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Stream;

Expand Down Expand Up @@ -146,7 +148,7 @@ public void setupTopic(String topic) throws Exception {
*/
public <T> void setupTopic(String topic, Schema<T> schema, Supplier<T> supplier)
throws Exception {
setupTopic(topic, schema, supplier, NUM_RECORDS_PER_PARTITION);
setupTopic(topic, schema, supplier, NUM_RECORDS_PER_PARTITION, false);
}

/**
Expand All @@ -159,7 +161,11 @@ public <T> void setupTopic(String topic, Schema<T> schema, Supplier<T> supplier)
* @param numRecordsPerSplit The number of records for a partition.
*/
public <T> void setupTopic(
String topic, Schema<T> schema, Supplier<T> supplier, int numRecordsPerSplit)
String topic,
Schema<T> schema,
Supplier<T> supplier,
int numRecordsPerSplit,
boolean enableBatch)
throws Exception {
String topicName = topicName(topic);
createTopic(topicName, DEFAULT_PARTITIONS);
Expand All @@ -170,7 +176,7 @@ public <T> void setupTopic(
List<T> messages =
Stream.generate(supplier).limit(numRecordsPerSplit).collect(toList());

sendMessages(partitionName, schema, messages);
sendMessages(partitionName, schema, messages, enableBatch);
}
}

Expand Down Expand Up @@ -250,7 +256,7 @@ public List<TopicPartition> topicInfo(String topic) throws Exception {
* @return message id.
*/
public <T> MessageId sendMessage(String topic, Schema<T> schema, T message) throws Exception {
List<MessageId> messageIds = sendMessages(topic, schema, singletonList(message));
List<MessageId> messageIds = sendMessages(topic, schema, singletonList(message), false);
checkArgument(messageIds.size() == 1);

return messageIds.get(0);
Expand All @@ -268,7 +274,8 @@ public <T> MessageId sendMessage(String topic, Schema<T> schema, T message) thro
*/
public <T> MessageId sendMessage(String topic, Schema<T> schema, String key, T message)
throws Exception {
List<MessageId> messageIds = sendMessages(topic, schema, key, singletonList(message));
List<MessageId> messageIds =
sendMessages(topic, schema, key, singletonList(message), false);
checkArgument(messageIds.size() == 1);

return messageIds.get(0);
Expand All @@ -283,9 +290,10 @@ public <T> MessageId sendMessage(String topic, Schema<T> schema, String key, T m
* @param <T> The type of the record.
* @return message id.
*/
public <T> List<MessageId> sendMessages(String topic, Schema<T> schema, Collection<T> messages)
public <T> List<MessageId> sendMessages(
String topic, Schema<T> schema, Collection<T> messages, boolean enableBatch)
throws Exception {
return sendMessages(topic, schema, null, messages);
return sendMessages(topic, schema, null, messages, enableBatch);
}

/**
Expand All @@ -299,16 +307,20 @@ public <T> List<MessageId> sendMessages(String topic, Schema<T> schema, Collecti
* @return message id.
*/
public <T> List<MessageId> sendMessages(
String topic, Schema<T> schema, String key, Collection<T> messages) throws Exception {
try (Producer<T> producer = createProducer(topic, schema)) {
String topic, Schema<T> schema, String key, Collection<T> messages, boolean enableBatch)
throws Exception {
try (Producer<T> producer = createProducer(topic, schema, enableBatch)) {
List<MessageId> messageIds = new ArrayList<>(messages.size());
for (T message : messages) {
TypedMessageBuilder<T> builder = producer.newMessage().value(message);
if (!Strings.isNullOrEmpty(key)) {
builder.key(key);
}
MessageId messageId = builder.send();
messageIds.add(messageId);
final CompletableFuture<MessageId> messageIdCompletableFuture = builder.sendAsync();
messageIdCompletableFuture.whenComplete(
(messageId, ignore) -> {
messageIds.add(messageId);
});
}
producer.flush();
return messageIds;
Expand Down Expand Up @@ -479,12 +491,15 @@ private void createPartitionedTopic(String topic, int numberOfPartitions) throws
}
}

public <T> Producer<T> createProducer(String topic, Schema<T> schema) throws Exception {
public <T> Producer<T> createProducer(String topic, Schema<T> schema, boolean enableBatch)
throws Exception {
return client().newProducer(schema)
.topic(topic)
.enableBatching(false)
.enableBatching(enableBatch)
.enableMultiSchema(true)
.accessMode(Shared)
.batchingMaxPublishDelay(
10, TimeUnit.SECONDS) // Give enough time to assemble the batch
.create();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,10 @@ public void writeRecords(List<String> records) {
try {
// Send messages with the key we don't need.
List<String> newRecords = records.stream().map(a -> a + keyToRead).collect(toList());
operator.sendMessages(fullTopicName, Schema.STRING, keyToExclude, newRecords);
operator.sendMessages(fullTopicName, Schema.STRING, keyToExclude, newRecords, false);

// Send messages with the given key.
operator.sendMessages(fullTopicName, Schema.STRING, keyToRead, records);
operator.sendMessages(fullTopicName, Schema.STRING, keyToRead, records, false);
} catch (Exception e) {
throw new FlinkRuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public PulsarPartitionDataWriter(
@Override
public void writeRecords(List<T> records) {
try {
operator.sendMessages(fullTopicName, schema, records);
operator.sendMessages(fullTopicName, schema, records, false);
} catch (Exception e) {
throw new FlinkRuntimeException(e);
}
Expand Down
Loading