-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Transaction] Fix transaction sequenceId generate error. #13209
[Transaction] Fix transaction sequenceId generate error. #13209
Conversation
@congbobo184:Thanks for your contribution. For this PR, do we need to update docs? |
@congbobo184:Thanks for providing doc info! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this change compatible with existing Pulsar clusters ?
It will be compatible |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
LedgerEntry ledgerEntry = entries.getEntry(lh.getLastAddConfirmed()); | ||
if (ledgerEntry != null) { | ||
TransactionMetadataEntry lastConfirmEntry = new TransactionMetadataEntry(); | ||
ByteBuf buffer = ledgerEntry.getEntryBuffer(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The buffer should be released after getting the max local txn ID.
} else { | ||
if (entries != null) { | ||
try { | ||
LedgerEntry ledgerEntry = entries.getEntry(lh.getLastAddConfirmed()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TheledgerEntry
should be closed after getting the max local txn ID.
|
||
private volatile long maxLocalTxnId = -1; | ||
@Getter | ||
private final AtomicLong sequenceId = new AtomicLong(TC_ID_NOT_USED); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We'd better return long not AtomicLong for the getter method.
return txnLog.initialize().thenApply(__ -> | ||
new MLTransactionMetadataStore(transactionCoordinatorId, txnLog, timeoutTracker, recoverTracker)); | ||
new MLTransactionMetadataStore(transactionCoordinatorId, txnLog, timeoutTracker, | ||
recoverTracker, mlTransactionLogInterceptor.getSequenceId())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's better to introduce modify sequence ID method for mlTransactionLogInterceptor
, share AtomicLong across multiple instances is not good for maintainance
(cherry picked from commit 0994254)
Motivation
now, we recover transaction sequenceId from lastConfirmEntry or managedLedger properties. But update managedLedger properties is not a Synchronize op, so it will recover error sequenceId.
Modifications
use ManagedLedgerInterceptor will fix this problem.
Verifying this change
Add the tests for it
Does this pull request potentially affect one of the following parts:
If yes was chosen, please highlight the changes
Dependencies (does it add or upgrade a dependency): (no)
The public API: (no)
The schema: (no)
The default values of configurations: (no)
The wire protocol: (no)
The rest endpoints: (no)
The admin cli options: (no)
Anything that affects deployment: (no)