Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-36180] Fix batch message data loss #95

Merged
merged 2 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.ChunkMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
Expand Down Expand Up @@ -104,6 +105,19 @@ private MessageId getActualMessageId(MessageIdAdv messageIdImpl) {
if (include) {
return messageIdImpl;
} else {
// if the message is batched, should return next single message in current batch.
if (messageIdImpl.getBatchIndex() >= 0
&& messageIdImpl.getBatchSize() > 0
&& messageIdImpl.getBatchIndex() != messageIdImpl.getBatchSize() - 1) {
return new BatchMessageIdImpl(
messageIdImpl.getLedgerId(),
messageIdImpl.getEntryId(),
messageIdImpl.getPartitionIndex(),
messageIdImpl.getBatchIndex() + 1,
messageIdImpl.getBatchSize(),
messageIdImpl.getAckSet());
wenbingshen marked this conversation as resolved.
Show resolved Hide resolved
}

// if the (ledgerId, entryId + 1) is not valid
// pulsar broker will automatically set the cursor to the next valid message
return new MessageIdImpl(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.shade.com.google.common.base.Strings;
import org.slf4j.Logger;
Expand Down Expand Up @@ -196,7 +197,14 @@ public void handleSplitsChanges(SplitsChange<PulsarPartitionSplit> splitsChanges
MessageId latestConsumedId = registeredSplit.getLatestConsumedId();

if (latestConsumedId != null) {
LOG.info("Reset subscription position by the checkpoint {}", latestConsumedId);
if (latestConsumedId instanceof BatchMessageIdImpl) {
wenbingshen marked this conversation as resolved.
Show resolved Hide resolved
LOG.info(
"Reset subscription position by the checkpoint {}, batchSize {}",
latestConsumedId,
((BatchMessageIdImpl) latestConsumedId).getBatchSize());
} else {
LOG.info("Reset subscription position by the checkpoint {}", latestConsumedId);
}
try {
CursorPosition cursorPosition;
if (latestConsumedId == MessageId.latest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,13 @@

import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -244,7 +246,7 @@ void consumeMessageCreatedBeforeHandleSplitsChangesAndUseSecondLastMessageWithou
PulsarPartitionSplitReader splitReader = splitReader();
String topicName = randomAlphabetic(10);

operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10));
operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10), 20, false);
MessageIdImpl lastMessageId =
(MessageIdImpl)
operator()
Expand All @@ -263,6 +265,37 @@ void consumeMessageCreatedBeforeHandleSplitsChangesAndUseSecondLastMessageWithou
fetchedMessages(splitReader, 1, true);
}

@Test
void consumeBatchMessageFromRecover() throws Exception {
PulsarPartitionSplitReader splitReader = splitReader();
String topicName = randomAlphabetic(10);

int numRecords = 20;
operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10), numRecords, true);
MessageIdImpl lastMessageId =
(MessageIdImpl)
operator()
.admin()
.topics()
.getLastMessageId(topicNameWithPartition(topicName, 0));
// Pretend that we consumed the 2th message of the last batch Entry
int lastConsumedBatchIndex = 2;
BitSet ackSet = new BitSet(numRecords);
ackSet.set(0, numRecords);
BatchMessageIdImpl batchMessageId =
new BatchMessageIdImpl(
lastMessageId.getLedgerId(),
lastMessageId.getEntryId(),
lastMessageId.getPartitionIndex(),
lastConsumedBatchIndex,
numRecords,
ackSet);
int expectedCount = numRecords - lastConsumedBatchIndex - 1;
// when recover, use exclusive startCursor
handleSplit(splitReader, topicName, 0, batchMessageId);
fetchedMessages(splitReader, expectedCount, true);
}

/** Create a split reader with max message 1, fetch timeout 1s. */
private PulsarPartitionSplitReader splitReader() {
return new PulsarPartitionSplitReader(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,6 @@ public void testPulsarMaxwellChangelogSource() throws Exception {
}

private void writeRecordsToPulsar(String topic, List<String> lines) throws Exception {
pulsar.operator().sendMessages(topic, Schema.STRING, lines);
pulsar.operator().sendMessages(topic, Schema.STRING, lines, false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ void sendMessageWithPropertiesAndReadPropertiesMetadata() throws Exception {
properties.put("key1", "value1");
properties.put("key2", "value2");
try (Producer<String> producer =
pulsar.operator().createProducer(sourceTopic, Schema.STRING)) {
pulsar.operator().createProducer(sourceTopic, Schema.STRING, false)) {
producer.newMessage().value(value).properties(properties).send();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Stream;

Expand Down Expand Up @@ -146,7 +148,7 @@ public void setupTopic(String topic) throws Exception {
*/
public <T> void setupTopic(String topic, Schema<T> schema, Supplier<T> supplier)
throws Exception {
setupTopic(topic, schema, supplier, NUM_RECORDS_PER_PARTITION);
setupTopic(topic, schema, supplier, NUM_RECORDS_PER_PARTITION, false);
}

/**
Expand All @@ -159,7 +161,11 @@ public <T> void setupTopic(String topic, Schema<T> schema, Supplier<T> supplier)
* @param numRecordsPerSplit The number of records for a partition.
*/
public <T> void setupTopic(
String topic, Schema<T> schema, Supplier<T> supplier, int numRecordsPerSplit)
String topic,
Schema<T> schema,
Supplier<T> supplier,
int numRecordsPerSplit,
boolean enableBatch)
throws Exception {
String topicName = topicName(topic);
createTopic(topicName, DEFAULT_PARTITIONS);
Expand All @@ -170,7 +176,7 @@ public <T> void setupTopic(
List<T> messages =
Stream.generate(supplier).limit(numRecordsPerSplit).collect(toList());

sendMessages(partitionName, schema, messages);
sendMessages(partitionName, schema, messages, enableBatch);
}
}

Expand Down Expand Up @@ -250,7 +256,7 @@ public List<TopicPartition> topicInfo(String topic) throws Exception {
* @return message id.
*/
public <T> MessageId sendMessage(String topic, Schema<T> schema, T message) throws Exception {
List<MessageId> messageIds = sendMessages(topic, schema, singletonList(message));
List<MessageId> messageIds = sendMessages(topic, schema, singletonList(message), false);
checkArgument(messageIds.size() == 1);

return messageIds.get(0);
Expand All @@ -268,7 +274,8 @@ public <T> MessageId sendMessage(String topic, Schema<T> schema, T message) thro
*/
public <T> MessageId sendMessage(String topic, Schema<T> schema, String key, T message)
throws Exception {
List<MessageId> messageIds = sendMessages(topic, schema, key, singletonList(message));
List<MessageId> messageIds =
sendMessages(topic, schema, key, singletonList(message), false);
checkArgument(messageIds.size() == 1);

return messageIds.get(0);
Expand All @@ -283,9 +290,10 @@ public <T> MessageId sendMessage(String topic, Schema<T> schema, String key, T m
* @param <T> The type of the record.
* @return message id.
*/
public <T> List<MessageId> sendMessages(String topic, Schema<T> schema, Collection<T> messages)
public <T> List<MessageId> sendMessages(
String topic, Schema<T> schema, Collection<T> messages, boolean enableBatch)
throws Exception {
return sendMessages(topic, schema, null, messages);
return sendMessages(topic, schema, null, messages, enableBatch);
}

/**
Expand All @@ -299,16 +307,20 @@ public <T> List<MessageId> sendMessages(String topic, Schema<T> schema, Collecti
* @return message id.
*/
public <T> List<MessageId> sendMessages(
String topic, Schema<T> schema, String key, Collection<T> messages) throws Exception {
try (Producer<T> producer = createProducer(topic, schema)) {
String topic, Schema<T> schema, String key, Collection<T> messages, boolean enableBatch)
throws Exception {
try (Producer<T> producer = createProducer(topic, schema, enableBatch)) {
List<MessageId> messageIds = new ArrayList<>(messages.size());
for (T message : messages) {
TypedMessageBuilder<T> builder = producer.newMessage().value(message);
if (!Strings.isNullOrEmpty(key)) {
builder.key(key);
}
MessageId messageId = builder.send();
messageIds.add(messageId);
final CompletableFuture<MessageId> messageIdCompletableFuture = builder.sendAsync();
messageIdCompletableFuture.whenComplete(
(messageId, ignore) -> {
messageIds.add(messageId);
});
}
producer.flush();
return messageIds;
Expand Down Expand Up @@ -479,12 +491,15 @@ private void createPartitionedTopic(String topic, int numberOfPartitions) throws
}
}

public <T> Producer<T> createProducer(String topic, Schema<T> schema) throws Exception {
public <T> Producer<T> createProducer(String topic, Schema<T> schema, boolean enableBatch)
throws Exception {
return client().newProducer(schema)
.topic(topic)
.enableBatching(false)
.enableBatching(enableBatch)
.enableMultiSchema(true)
.accessMode(Shared)
.batchingMaxPublishDelay(
10, TimeUnit.SECONDS) // Give enough time to assemble the batch
.create();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,10 @@ public void writeRecords(List<String> records) {
try {
// Send messages with the key we don't need.
List<String> newRecords = records.stream().map(a -> a + keyToRead).collect(toList());
operator.sendMessages(fullTopicName, Schema.STRING, keyToExclude, newRecords);
operator.sendMessages(fullTopicName, Schema.STRING, keyToExclude, newRecords, false);

// Send messages with the given key.
operator.sendMessages(fullTopicName, Schema.STRING, keyToRead, records);
operator.sendMessages(fullTopicName, Schema.STRING, keyToRead, records, false);
} catch (Exception e) {
throw new FlinkRuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public PulsarPartitionDataWriter(
@Override
public void writeRecords(List<T> records) {
try {
operator.sendMessages(fullTopicName, schema, records);
operator.sendMessages(fullTopicName, schema, records, false);
} catch (Exception e) {
throw new FlinkRuntimeException(e);
}
Expand Down
Loading