-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Java Reader Client] Start reader inside batch result in read first message in batch. #6345
Changes from all commits
86b513c
a17c1cd
60af2b3
c2ab1b4
91f75d8
9c2d6b8
669f787
9320b6e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -45,6 +45,7 @@ | |
import java.util.concurrent.ExecutorService; | ||
import java.util.concurrent.ScheduledExecutorService; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; | ||
import java.util.concurrent.atomic.AtomicLong; | ||
import java.util.concurrent.locks.ReadWriteLock; | ||
|
@@ -123,6 +124,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle | |
private final SubscriptionMode subscriptionMode; | ||
private volatile BatchMessageIdImpl startMessageId; | ||
|
||
private volatile BatchMessageIdImpl seekMessageId; | ||
private final AtomicBoolean duringSeek; | ||
|
||
private final BatchMessageIdImpl initialStartMessageId; | ||
private final long startMessageRollbackDurationInSec; | ||
|
||
|
@@ -205,6 +209,8 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat | |
stats = ConsumerStatsDisabled.INSTANCE; | ||
} | ||
|
||
duringSeek = new AtomicBoolean(false); | ||
|
||
if (conf.getAckTimeoutMillis() != 0) { | ||
if (conf.getTickDurationMillis() > 0) { | ||
this.unAckedMessageTracker = new UnAckedMessageTracker(client, this, conf.getAckTimeoutMillis(), | ||
|
@@ -667,6 +673,13 @@ private BatchMessageIdImpl clearReceiverQueue() { | |
List<Message<?>> currentMessageQueue = new ArrayList<>(incomingMessages.size()); | ||
incomingMessages.drainTo(currentMessageQueue); | ||
INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0); | ||
|
||
if (duringSeek.compareAndSet(true, false)) { | ||
return seekMessageId; | ||
} else if (subscriptionMode == SubscriptionMode.Durable) { | ||
return null; | ||
} | ||
|
||
if (!currentMessageQueue.isEmpty()) { | ||
MessageIdImpl nextMessageInQueue = (MessageIdImpl) currentMessageQueue.get(0).getMessageId(); | ||
BatchMessageIdImpl previousMessage; | ||
|
@@ -867,7 +880,7 @@ void messageReceived(MessageIdData messageId, int redeliveryCount, ByteBuf heade | |
// and return undecrypted payload | ||
if (isMessageUndecryptable || (numMessages == 1 && !msgMetadata.hasNumMessagesInBatch())) { | ||
|
||
if (isResetIncludedAndSameEntryLedger(messageId) && isPriorEntryIndex(messageId.getEntryId())) { | ||
if (isSameEntry(messageId) && isPriorEntryIndex(messageId.getEntryId())) { | ||
// We need to discard entries that were prior to startMessageId | ||
if (log.isDebugEnabled()) { | ||
log.debug("[{}] [{}] Ignoring message from before the startMessageId: {}", subscription, | ||
|
@@ -1018,7 +1031,7 @@ void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, int redeliv | |
ByteBuf singleMessagePayload = Commands.deSerializeSingleMessageInBatch(uncompressedPayload, | ||
singleMessageMetadataBuilder, i, batchSize); | ||
|
||
if (isResetIncludedAndSameEntryLedger(messageId) && isPriorBatchIndex(i)) { | ||
if (isSameEntry(messageId) && isPriorBatchIndex(i)) { | ||
// If we are receiving a batch message, we need to discard messages that were prior | ||
// to the startMessageId | ||
if (log.isDebugEnabled()) { | ||
|
@@ -1091,8 +1104,8 @@ private boolean isPriorBatchIndex(long idx) { | |
return resetIncludeHead ? idx < startMessageId.getBatchIndex() : idx <= startMessageId.getBatchIndex(); | ||
} | ||
|
||
private boolean isResetIncludedAndSameEntryLedger(MessageIdData messageId) { | ||
return !resetIncludeHead && startMessageId != null | ||
private boolean isSameEntry(MessageIdData messageId) { | ||
return startMessageId != null | ||
&& messageId.getLedgerId() == startMessageId.getLedgerId() | ||
&& messageId.getEntryId() == startMessageId.getEntryId(); | ||
} | ||
|
@@ -1477,7 +1490,10 @@ public CompletableFuture<Void> seekAsync(long timestamp) { | |
cnx.sendRequestWithId(seek, requestId).thenRun(() -> { | ||
log.info("[{}][{}] Successfully reset subscription to publish time {}", topic, subscription, timestamp); | ||
acknowledgmentsGroupingTracker.flushAndClean(); | ||
lastDequeuedMessage = MessageId.earliest; | ||
|
||
seekMessageId = new BatchMessageIdImpl((MessageIdImpl) MessageId.earliest); | ||
duringSeek.set(true); | ||
|
||
incomingMessages.clear(); | ||
INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0); | ||
seekFuture.complete(null); | ||
|
@@ -1520,7 +1536,10 @@ public CompletableFuture<Void> seekAsync(MessageId messageId) { | |
cnx.sendRequestWithId(seek, requestId).thenRun(() -> { | ||
log.info("[{}][{}] Successfully reset subscription to message id {}", topic, subscription, messageId); | ||
acknowledgmentsGroupingTracker.flushAndClean(); | ||
lastDequeuedMessage = messageId; | ||
|
||
seekMessageId = new BatchMessageIdImpl((MessageIdImpl) messageId); | ||
duringSeek.set(true); | ||
Comment on lines
+1540
to
+1541
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi! Looks like this commit is a breaking change, it broke my integration test for Reader. The logic of unit test is as follows
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The logic w/o this PR is problematic because Therefore, If it breaks your test, please reconsider your code logic by using the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, I like this improvement, just want to add that additional effort needs to be done here to make it work properly, since Seek+Read is a very common reader pattern. If I put a small delay between 2 and 3 step, then everything works properly, because connection There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does the test fail if the reader is used concurrently by multiple readers? Link to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry, fixed the link. No, reader is used by single thread, it waits for seek result and immediately calls hasMessageAvailableAsync There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is another issue with the same test with Batching enabled - as long as There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. FYI, adding this line fixes both issues for me at the moment |
||
|
||
incomingMessages.clear(); | ||
INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0); | ||
seekFuture.complete(null); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible only add
seekMessageId
to achieve the seek logic?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so. Without the flag, we cannot tell if seekMessageId or lastDequeueMessage should be used as the start message id.