This PIP addresses enhancements message visibility for the peek-message CLI, specifically related to transaction messages and markers.
Currently, when peeking messages, users may encounter server internal marker
and uncommitted
or aborted
for transaction message,
which should typically be hidden to ensure data integrity and consistency.
Transaction markers are internal messages used by Pulsar to denote the start, end, and status (commit/abort) of a transaction. They are not meant to be consumed by clients.
Similarly, other internal markers used by Pulsar for various system operations should also not be visible to clients as they could lead to confusion and misinterpretation of the data.
- Uncommitted Messages: These should not be visible to consumers until the transaction is committed.
- Aborted Messages: These should be filtered out and never made visible to consumers.
The current implementation exposes all messages, including transaction markers and messages from uncommitted or aborted transactions, when peeking. This behavior can confuse users and lead to incorrect data handling. The proposal aims to provide more control over what types of messages are visible during message peeking operations.
- Implement flags to selectively display
server markers
,uncommitted messages(include aborted messages) for transaction
in peek operations. - Set the default behavior to only show messages from committed transactions to ensure data integrity.
- Any modifications to the core transaction handling mechanism.
- Any changes to consumer logic.
The proposal introduces three new flags to the peek-messages
command:
--show-server-marker
: Controls the visibility of server markers (default:false
).---transaction-isolation-level
: Controls the visibility of messages for transactions. (default:READ_COMMITTED
). Options:- READ_COMMITTED: Can only consume all transactional messages which have been committed.
- READ_UNCOMMITTED: Can consume all messages, even transactional messages which have been aborted.
These flags will allow administrators and developers to tailor the peek functionality to their needs, improving the usability and security of message handling in transactional contexts.
To support the --show-server-marker
and ---transaction-isolation-level
flags, needs to introduce specific tag into the headers
of messages returned by the
peekNthMessage REST API.
X-Pulsar-marker-type
: Already exists.X-Pulsar-txn-uncommitted
: This entry is determined to be an uncommitted transaction by comparing itsposition
andmaxReadPosition
.X-Pulsar-txn-aborted
: It is determined to be aborted by calling thepersistentTopic.isAbort()
method.
Then, In the CLI, these markers can be used to determine whether to filter out these messages and proceed to the next one. For an implementation example, see the following code: shibd#34
New command line flags added for the bin/pulsar-admin topics peek-messages
command:
Flag | Abbreviation | Type | Default | Description |
---|---|---|---|---|
--show-server-marker |
-ssm |
Boolean | false |
Enables the display of internal server write markers. |
---transaction-isolation-level |
-til |
Enum | false |
Enables theSets the isolation level for consuming messages within transactions. - 'READ_COMMITTED' allows consuming only committed transactional messages. - 'READ_UNCOMMITTED' allows consuming all messages, even transactional messages which have been aborted. |
Add two methods to the admin.Topics() interface.
/**
* Peek messages from a topic subscription.
*
* @param topic
* topic name
* @param subName
* Subscription name
* @param numMessages
* Number of messages
* @param showServerMarker
* Enables the display of internal server write markers
* @param transactionIsolationLevel
* Sets the isolation level for consuming messages within transactions.
* - 'READ_COMMITTED' allows consuming only committed transactional messages.
* - 'READ_UNCOMMITTED' allows consuming all messages,
* even transactional messages which have been aborted.
* @return
* @throws NotAuthorizedException
* Don't have admin permission
* @throws NotFoundException
* Topic or subscription does not exist
* @throws PulsarAdminException
* Unexpected error
*/
List<Message<byte[]>> peekMessages(String topic, String subName, int numMessages,
boolean showServerMarker, TransactionIsolationLevel transactionIsolationLevel)
throws PulsarAdminException;
/**
* Peek messages from a topic subscription asynchronously.
*
* @param topic
* topic name
* @param subName
* Subscription name
* @param numMessages
* Number of messages
* @param showServerMarker
* Enables the display of internal server write markers
@param transactionIsolationLevel
* Sets the isolation level for consuming messages within transactions.
* - 'READ_COMMITTED' allows consuming only committed transactional messages.
* - 'READ_UNCOMMITTED' allows consuming all messages,
* even transactional messages which have been aborted.
* @return a future that can be used to track when the messages are returned
*/
CompletableFuture<List<Message<byte[]>>> peekMessagesAsync(
String topic, String subName, int numMessages,
boolean showServerMarker, TransactionIsolationLevel transactionIsolationLevel);
Reverting to a previous version of Pulsar without this feature will remove the additional flags from the CLI. Users who prefer the new behavior will need to manually adjust their usage patterns when reverting.
While upgrading to the new version of Pulsar that includes these changes, the default behavior of the peek-messages
command will change.
Existing scripts or commands that rely on the old behavior (where transaction markers and messages from uncommitted or aborted transactions are visible) will need to explicitly set the new flags (--show-server-marker true
and --transaction-isolation-level READ_UNCOMMITTED
to maintain the old behavior.
This change is necessary as the previous default behavior did not align with typical expectations around data visibility and integrity in transactional systems.