-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Transaction] Transaction timeout implementation. #9229
[Transaction] Transaction timeout implementation. #9229
Conversation
# Conflicts: # pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java # pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java # pulsar-common/src/main/proto/PulsarMarkers.proto # pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
|
||
private Timer timer; | ||
|
||
private static final long tickTimeMillis = 1L; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use 1 millis as the default tick duration will lead to high CPU workload, it's better to use the default tick duration 100ms of the HashedWheelTimer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this tick time is for one timeOut, the timer is use by default.
|
||
@Override | ||
public void initialize() { | ||
this.timer = new HashedWheelTimer(new DefaultThreadFactory("transaction-timeout-tracker"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please consider reusing the external timer such as the timer that the broker used? This will saving CPU workload.
long timeoutTime = priorityQueue.peekN1(); | ||
long nowTime = clock.millis() / BASE_OF_MILLIS_TO_SECOND; | ||
if (timeoutTime < nowTime){ | ||
transactionMetadataStoreService.endTransaction(new TxnID(priorityQueue.peekN2(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How to handle the transaction is committed? we don't need to abort a transaction that already committed or aborted.
return CompletableFuture.completedFuture(false); | ||
} | ||
synchronized (this){ | ||
long nowTime = clock.millis() / BASE_OF_MILLIS_TO_SECOND; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not use millis directly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Transaction timeout unit is second.
private final static long INITIAL_TIMEOUT = 1L; | ||
private long nowTaskTimeoutTime = INITIAL_TIMEOUT; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please give more details about these 2 fields?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The timeout may wait time longer than the new transaction timeout time, so we should cancel the current timeout and create a timeout wait time is the new transaction timeout time.
* @param timeout | ||
* the absolute timestamp for transaction timeout | ||
*/ | ||
void replayAddTransaction(long sequenceId, long timeout); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why can't use addTransaction?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
transaction log not reply complete, we can't start timer to abort transaction.
new Thread(() -> { | ||
for (int i = 0; i < 100; i ++) { | ||
try { | ||
transactionMetadataStore.newTransaction(1); | ||
} catch (Exception e) { | ||
//no operation | ||
} | ||
} | ||
}).start(); | ||
|
||
new Thread(() -> { | ||
for (int i = 0; i < 100; i ++) { | ||
try { | ||
transactionMetadataStore.newTransaction(3); | ||
} catch (Exception e) { | ||
//no operation | ||
} | ||
} | ||
}).start(); | ||
|
||
new Thread(() -> { | ||
for (int i = 0; i < 100; i ++) { | ||
try { | ||
transactionMetadataStore.newTransaction(2); | ||
} catch (Exception e) { | ||
//no operation | ||
} | ||
} | ||
}).start(); | ||
|
||
new Thread(() -> { | ||
for (int i = 0; i < 100; i ++) { | ||
try { | ||
transactionMetadataStore.newTransaction(10); | ||
} catch (Exception e) { | ||
//no operation | ||
} | ||
} | ||
}).start(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use for i ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in order to new 100 transaction, we also change the for to while.
/** | ||
* When replay the log finished, we need to start the tracker. | ||
*/ | ||
void start(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the start method does not only used by transactions replay? Is there any problem that call start first then replay the transactions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, we can only define all method async.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return void, we also need to async?
@Override | ||
public void close() { | ||
priorityQueue.close(); | ||
this.close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should cancel the timeout?
/pulsarbot run-failure-checks |
1 similar comment
/pulsarbot run-failure-checks |
/pulsarbot run-failure-checks |
1 similar comment
/pulsarbot run-failure-checks |
/pulsarbot run-failure-checks |
1 similar comment
/pulsarbot run-failure-checks |
Motivation
in order to handle the transaction timeout.
when aborting and committing finish we should change the status in transaction coordinator.
implement
Verifying this change
Add the tests for it
Does this pull request potentially affect one of the following parts:
If yes was chosen, please highlight the changes
Dependencies (does it add or upgrade a dependency): (no)
The public API: (no)
The schema: (no)
The default values of configurations: (no)
The wire protocol: (no)
The rest endpoints: (no)
The admin cli options: (no)
Anything that affects deployment: (no)