You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
{{ message }}
This repository has been archived by the owner on Apr 1, 2024. It is now read-only.
Reader belongs to exclusive subscription type, and it uses nonDurable cursor. After receiving messages, Reader will ack cumulatively immediately.
The flowPermits are triggered in multiple scenarios from the client side and it is isolated from seek of Consumer. Therefore, it is possibile that flowPermits will execute after seek from the client side, like the following flow chart.
When handleSeek processing is delay from the server side, the MarkDelete position is modified in a wrong way.
The expected result is that Readercan re-consume messages from mark delete:(1,1) after seek. But it doesn't work.
Pulsar read message and seek position is not a synchronous operation, the seek request can't prevent an in-process entry reading operation. The client-side also has an opportunity to receive messages after the seek position.
Pulsar client make read messages operation and seek position operation synchronized so add an epoch into server and client consumer. After client reader consumer invoke seek , the epoch increase 1 and send seek command carry the epoch and then server consumer will update the epoch. When dispatcher messages to client will carry the epoch which the cursor read at the time. Client consumer will filter the send messages command which is smaller than current epoch.
In this way, after the client consumer send seek command successfully, because it has passed the epoch filtering, the consumer will not receive a message with a messageID greater than the user previously seek position.
Current implementation details
CommandSeek Protocal
// Reset an existing consumer to a particular message idmessageCommandSeek {
requireduint64consumer_id=1;
requireduint64request_id=2;
optionalMessageIdDatamessage_id=3;
optionaluint64message_publish_time=4;
}
CommandMessage already add epoch by PIP-84 , when client receive CommandMessage will compare the command epoch and local epoch to handle this command.
Goal
Add epoch into seek command.
API Changes
Protocal change: CommandSeek
// Reset an existing consumer to a particular message idmessageCommandSeek {
requireduint64consumer_id=1;
requireduint64request_id=2;
optionalMessageIdDatamessage_id=3;
optionaluint64message_publish_time=4;
optionaluint64consumer_epoch=5;
}
CommandSeek command add epoch field, when client send seek command to server successfully, the server will change the server consumer epoch to the command epoch. The epoch only can bigger than the old epoch in server. Now the client can filter out the message which contains less consumer epoch.
Implementation
stage 1: Check the current cursor status when handling flowPermits from the server side.
stage 2: Add epoch into seek command, and server update the consumer epoch. It can prevent an in-process entry reading operation after the seek request.
Reject Alternatives
None yet.
Note
Consumer reconnect need reset epoch.
The text was updated successfully, but these errors were encountered:
Original Issue: apache#16757
Motivation
Reader
belongs to exclusive subscription type, and it usesnonDurable
cursor. After receiving messages,Reader
will ack cumulatively immediately.The
flowPermits
are triggered in multiple scenarios from the client side and it is isolated fromseek
ofConsumer
. Therefore, it is possibile thatflowPermits
will execute afterseek
from the client side, like the following flow chart.When
handleSeek
processing is delay from the server side, theMarkDelete position
is modified in a wrong way.The expected result is that
Reader
can re-consume messages frommark delete:(1,1)
afterseek
. But it doesn't work.Pulsar read message and seek position is not a synchronous operation, the seek request can't prevent an in-process entry reading operation. The client-side also has an opportunity to receive messages after the seek position.
Pulsar client make read messages operation and seek position operation synchronized so add an epoch into server and client consumer. After client reader consumer invoke
seek
, the epoch increase 1 and sendseek
command carry the epoch and then server consumer will update the epoch. When dispatcher messages to client will carry the epoch which the cursor read at the time. Client consumer will filter the send messages command which is smaller than current epoch.In this way, after the client consumer send
seek
command successfully, because it has passed the epoch filtering, the consumer will not receive a message with a messageID greater than the user previously seek position.Current implementation details
CommandSeek Protocal
CommandMessage
CommandMessage
already add epoch by PIP-84 , when client receiveCommandMessage
will compare the command epoch and local epoch to handle this command.Goal
Add epoch into seek command.
API Changes
Protocal change: CommandSeek
CommandSeek
command add epoch field, when client send seek command to server successfully, the server will change the server consumer epoch to the command epoch. The epoch only can bigger than the old epoch in server. Now the client can filter out the message which contains less consumer epoch.Implementation
Reject Alternatives
None yet.
Note
The text was updated successfully, but these errors were encountered: