-
Notifications
You must be signed in to change notification settings - Fork 14
Refactor OpenTransaction
interface
#7411
base: develop
Are you sure you want to change the base?
Conversation
...ared/src/test/java/com/palantir/atlasdb/transaction/impl/SnapshotTransactionManagerTest.java
Show resolved
Hide resolved
OpenTransaction
interface.OpenTransaction
interface
atlasdb-api/src/main/java/com/palantir/atlasdb/transaction/api/OpenTransaction.java
Show resolved
Hide resolved
atlasdb-api/src/main/java/com/palantir/atlasdb/transaction/api/TransactionManager.java
Outdated
Show resolved
Hide resolved
...l-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransactionManager.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, thanks! Couple of small bits we should finish up I think, but thanks a lot for the contribution :)
atlasdb-api/src/main/java/com/palantir/atlasdb/transaction/api/OpenTransaction.java
Show resolved
Hide resolved
...ared/src/test/java/com/palantir/atlasdb/transaction/impl/SnapshotTransactionManagerTest.java
Show resolved
Hide resolved
...l-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransactionManager.java
Show resolved
Hide resolved
atlasdb-api/src/main/java/com/palantir/atlasdb/transaction/api/TransactionManager.java
Outdated
Show resolved
Hide resolved
580296f
to
f306d38
Compare
Generate changelog in
|
...l-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransactionManager.java
Outdated
Show resolved
Hide resolved
...l-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransactionManager.java
Show resolved
Hide resolved
...l-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransactionManager.java
Show resolved
Hide resolved
// N.B. run this before clearing the state from the cache with | ||
// requestTransactionStateRemovalFromCache |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was quite nuanced:
lockWatchManager.onTransactionCommit
needs to run beforelockWatchManager.requestTransactionStateRemovalFromCache
- onSuccess ran before the finally block before, and therefore it worked
- now we need to add to
onCommitOrAbort
in the right order, otherwise tests failed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also thought of writing
transaction.onCommitOrAbort(() -> {
if (!transaction.isAborted()) {
// in onCommitOrAbort + not aborted == committed
lockWatchManager.onTransactionCommit(transaction.getTimestamp());
}
lockWatchManager.requestTransactionStateRemovalFromCache(....)
});
but decided not do this because I like guaranteeing both are run in case one fails.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also thought of adding txn.isCommitted()
as a method, but not doing that for now - could pick do in a flup pr if we decide to go down that route.
Edit: decided to go down this route and added txn.isDefinitelyCommitted()
c88ecd1
to
8d5df25
Compare
* the commit was successful, finished, or aborted. | ||
* | ||
* If the callback is run, it will run before any {@link Transaction#onSuccess(Runnable)} callbacks. | ||
* Aborts the transaction if uncommitted and cleanups transaction state. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"and cleans up"?
* If the callback is run, it will run before any {@link Transaction#onSuccess(Runnable)} callbacks. | ||
* Aborts the transaction if uncommitted and cleanups transaction state. | ||
* All open transactions <b>must be closed</b>. | ||
* Not closing transactions after they're no longer in use may lead to arbitrary delays elsewhere in the system. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"may lead to memory leaks and arbitrary delays..."
* @return <code>true</code> <code>commit()</code> has been called and did not throw, otherwise <code>false</code>. | ||
*/ | ||
@Idempotent | ||
boolean isDefinitivelyCommitted(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would ALMOST think that we should just expose the State enum?
ANd very clearly indicate that this is all from the perspective of "did this Transaction Java Object Instance, MANAGE to get to the point of setting it's state. So for example if the thread was interrupted at some point, as you say, the transaction may have been committed, but this will not be reflected in this object and results of these methods.
Like it might be useful to expose the enum and draw the state diagram?
* All pre-commit conditions are also cleaned in case {@link #startTransactions(List)} throws. | ||
* <p> | ||
* Note the caller must call {@link OpenTransaction#close()} after the transaction is committed to perform | ||
* additional cleanup. Failure to do so might incur in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you didn't finish a sentence here?
* no guarantees on the ordering of transactions returned with respect to their start timestamp. | ||
* | ||
* @return a batch of transactions with associated immutable timestamp locks | ||
* @deprecated Similar functionality will exist, but this method is likely to change in the future | ||
*/ | ||
@Deprecated | ||
@Timed | ||
List<OpenTransaction> startTransactions(List<? extends PreCommitCondition> condition); | ||
List<? extends OpenTransaction> startTransactions(List<? extends PreCommitCondition> condition); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For my edification: what do you get out of doing "? extends ", other than the callers now having to do the same thing?
@@ -453,6 +463,10 @@ enum TransactionType { | |||
/** | |||
* Allow consumers to register callbacks to be run on {@link #commit()} or {@link #abort()}, | |||
* after a transaction has committed or aborted. | |||
* <p> | |||
* {@link #onCommitOrAbort(Runnable)} callbacks are added in a stack and run in opposite order they were added to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You already say this later?
Callbacks are run in the reverse order they were added - i.e. the last added callback will be run first.
postTaskContext.stop(); | ||
return result; | ||
|
||
hasClosed = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so IF you're making sure this is idempotent and e.g. openTransactionCounter.dec is not called multiple times and that's your intent, let's stick this in the finally?
} finally { | ||
lockWatchManager.requestTransactionStateRemovalFromCache(getTimestamp()); | ||
postTaskContext = postTaskTimer.time(); | ||
openTransactionCounter.dec(); | ||
} | ||
scrubForAggressiveHardDelete(extractSnapshotTransaction(txn)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similarly, should ScrubForAggressiveHardDelete happen only on the happy path, or always, regardless of exceptions?
@@ -192,7 +192,9 @@ public TransactionToken startTransaction() { | |||
public void commit(TransactionToken token) { | |||
OpenTransaction openTxn = transactions.getIfPresent(token); | |||
if (openTxn != null) { | |||
openTxn.finish((TxTask) transaction -> null); | |||
try (openTxn) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you strictly need the try-with-resources? It doesn't feel like you would?
@@ -290,7 +290,7 @@ public void doesNotCallStartTransactionForReadOnlyTransactionsIfFlagIsNotSet() { | |||
public void startEmptyBatchOfTransactionsDoesNotCallTimelockService() { | |||
TimelockService timelockService = spy(inMemoryTimelockClassExtension.getLegacyTimelockService()); | |||
SnapshotTransactionManager transactionManager = createSnapshotTransactionManager(timelockService, false); | |||
List<OpenTransaction> transactions = transactionManager.startTransactions(ImmutableList.of()); | |||
List<? extends OpenTransaction> transactions = transactionManager.startTransactions(ImmutableList.of()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, see, this is super annoying :P
@@ -1831,6 +1831,11 @@ public boolean isUncommitted() { | |||
return state.get() == State.UNCOMMITTED; | |||
} | |||
|
|||
@Override | |||
public boolean isDefinitivelyCommitted() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I'm reading this code, I also realized we missed something here and it's why I'm kind of thinking a bit more broadly:
- in #commit, you don't check whether the method has been called, before doing #runPreCommitCallbacks. Meaning, technically if something throws there, someone could then call #commit again, and have them rerun. It's all WEIRD TERRITORY, but I think it's pointing at something deeper?
I feel like in general we're not modelling the state machine rigorously enough in all of this code. I would almost want to use internal state machine library to generate the code to make sure this is all done properly?
} catch (Exception e) { | ||
condition.cleanup(); | ||
Closer closer = Closer.create(); // N.B. using closer to run all cleanup tasks even if one cleanup throws |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yaaas, much better
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Although, I just started reading startTransactionsInternal, and my immediate thought was "shit we forgot to cleanup conditions when that throws exceptions...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if instead of this split thing, we try something like this:
- StartTransactionsInternal at the beginning creates a
Closer onExceptionCloser = Closer.create();
As you acquire resources, you register things with it, so you'd immediately accumulate all of the conditions there.
Then in #startTransactionsImpl, IFF you end up with an exception, you execute the closer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Basically, this is SUCH critical code, that from a reader's perspective, EVERYTIME I see resource acquisition, on the next line I want it being registered with SOMETHING that's responsible for cleaning it up in case of exceptions.
@@ -190,20 +192,30 @@ protected boolean shouldStopRetrying(int numTimesFailed) { | |||
public <T, C extends PreCommitCondition, E extends Exception> T runTaskWithConditionThrowOnConflict( | |||
C condition, ConditionAwareTransactionTask<T, C, E> task) throws E, TransactionFailedRetriableException { | |||
checkOpen(); | |||
OpenTransaction openTransaction; | |||
try (OpenTransactionImpl openTransaction = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SO the way this code works is:
- if you throw as part of startTransactions, the PreCommitCondition will be cleaned up immediately there. Otherwise, the ownership of those moves to OpenTransaction and is therefore cleaned up as part of it's #close implementation.
Meaning, we improve the flow here, without breaking any functionality.
@@ -190,20 +192,30 @@ protected boolean shouldStopRetrying(int numTimesFailed) { | |||
public <T, C extends PreCommitCondition, E extends Exception> T runTaskWithConditionThrowOnConflict( | |||
C condition, ConditionAwareTransactionTask<T, C, E> task) throws E, TransactionFailedRetriableException { | |||
checkOpen(); | |||
OpenTransaction openTransaction; | |||
try (OpenTransactionImpl openTransaction = | |||
runTimed(() -> Iterables.getOnlyElement(startTransactions(ImmutableList.of(condition))), "setupTask")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For interest, it was very funny to have a look at the implementation of List.of in the JDK, and weirdly, it has List12, which is kinda bizzareo. Guava defo does better here
try { | ||
closer.close(); | ||
} catch (IOException ex) { | ||
e.addSuppressed(ex); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we instead use your ThreadSafeCloser and add a method #closeAfterException(Throwable) or something that will do the logging and #addSuppressed?
I'm also honestly not that fussed with #addSuppressed, it feels like noise. And specifically, in the internal skiing project, might lead to giant stacktraces (and just lots of logging?), because we use pretty large batches for #startTransactions?
How about we log the individual failures somehow as separate loglines
closer.close(); | ||
} catch (IOException e) { | ||
t.addSuppressed(e); | ||
log.info("Failed to cleanup startTransaction resources on startTransaction failure", t); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, you're doing this again which makes me think my original comments are valuable.
// SnapshotTransaction constructor. But in case of failure, we need to manually unlock it here. | ||
|
||
Closer closer = Closer.create(); // N.B. using closer to run all cleanup tasks even if one cleanup throws | ||
responses.forEach( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note for reviewers: Closer goes in stack order, so we're technically changing the order here. It shouldn't matter, these are unrelated.
responses.stream(), conditions.stream(), (response, condition) -> { | ||
LockToken immutableTsLock = | ||
response.immutableTimestamp().getLock(); | ||
LongSupplier startTimestampSupplier = response.startTimestampAndPartition()::timestamp; | ||
|
||
ExpectationsAwareTransaction transaction = createTransaction( | ||
immutableTs, startTimestampSupplier, immutableTsLock, condition); | ||
transaction.onSuccess( | ||
() -> lockWatchManager.onTransactionCommit(transaction.getTimestamp())); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, deep breaths before reviewing this...
@@ -224,35 +236,63 @@ public List<OpenTransaction> startTransactions(List<? extends PreCommitCondition | |||
recordImmutableTimestamp(immutableTs); | |||
cleaner.punch(responses.get(0).startTimestampAndPartition().timestamp()); | |||
|
|||
List<OpenTransaction> transactions = Streams.zip( | |||
List<OpenTransactionImpl> transactions = Streams.zip( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, something doesn't add up here if you're having to make this available and return the impl
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about, instead of having the #execute method on OpenTransactionImpl, just inline it here? Like you're really not gaining anything from this, but now EVERYTHING is polluted with these weird generics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct me if there's a deeper reason
transaction.onSuccess( | ||
() -> lockWatchManager.onTransactionCommit(transaction.getTimestamp())); | ||
|
||
transaction.onCommitOrAbort(transaction::reportExpectationsCollectedData); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one: should this just be an internal transaction thing in SnapshotTransaction? Why is this even registered here, as opposed to much deeper?
This definitely feels silly, it shouldn't even be on the interface.
} finally { | ||
lockWatchManager.requestTransactionStateRemovalFromCache(getTimestamp()); | ||
postTaskContext = postTaskTimer.time(); | ||
openTransactionCounter.dec(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if this was ALSO moved somewhere higher? to somewhere around the place where we increment this value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that would be the most correct thing
transaction.onCommitOrAbort(transaction::reportExpectationsCollectedData); | ||
transaction.onCommitOrAbort(condition::cleanup); | ||
transaction.onCommitOrAbort( | ||
() -> lockWatchManager.requestTransactionStateRemovalFromCache( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alternative to this VERY CAREFUL ORDERING, is to actually delegate this to LockWatchManager honestly? It feels wrong that such CRITICAL piece of functionality has high code-distance from LockWatchManager itself. This is probably a pre-req for this PR merging?
I'd support something like maybe:
- StartIdentifiedAtlasDbTransactionResponse has some sort of Consumer, to which you push the outcome of the transaction and it runs the lockWatchManager code.
General
Before this PR:
The interface
OpenTransaction
was confusing. It stated that methods needed to usefinish
orfinishWithCallback
for a transaction to succeed, but our main internal consumer called these only onclose()
for cleanup, which caused #7407.After this PR:
This PR clarifies the usage of
OpenTransaction
. It explicitly delegates to clients the responsibility of callingcommit()
orabort()
, and moves cleanup to aclose()
method. It also makes it explicit thatclose()
is idempotent, and can be called several times (with further calls simply no-oping).==COMMIT_MSG==
==COMMIT_MSG==
Priority:
Concerns / possible downsides (what feedback would you like?):
Is documentation needed?:
Compatibility
Does this PR create any API breaks (e.g. at the Java or HTTP layers) - if so, do we have compatibility?:
Does this PR change the persisted format of any data - if so, do we have forward and backward compatibility?:
The code in this PR may be part of a blue-green deploy. Can upgrades from previous versions safely coexist? (Consider restarts of blue or green nodes.):
Does this PR rely on statements being true about other products at a deployment - if so, do we have correct product dependencies on these products (or other ways of verifying that these statements are true)?:
Does this PR need a schema migration?
Testing and Correctness
What, if any, assumptions are made about the current state of the world? If they change over time, how will we find out?:
What was existing testing like? What have you done to improve it?:
If this PR contains complex concurrent or asynchronous code, is it correct? The onus is on the PR writer to demonstrate this.:
If this PR involves acquiring locks or other shared resources, how do we ensure that these are always released?:
Execution
How would I tell this PR works in production? (Metrics, logs, etc.):
Has the safety of all log arguments been decided correctly?:
Will this change significantly affect our spending on metrics or logs?:
How would I tell that this PR does not work in production? (monitors, etc.):
If this PR does not work as expected, how do I fix that state? Would rollback be straightforward?:
If the above plan is more complex than “recall and rollback”, please tag the support PoC here (if it is the end of the week, tag both the current and next PoC):
Scale
Would this PR be expected to pose a risk at scale? Think of the shopping product at our largest stack.:
Would this PR be expected to perform a large number of database calls, and/or expensive database calls (e.g., row range scans, concurrent CAS)?:
Would this PR ever, with time and scale, become the wrong thing to do - and if so, how would we know that we need to do something differently?:
Development Process
Where should we start reviewing?:
If this PR is in excess of 500 lines excluding versions lock-files, why does it not make sense to split it?:
Please tag any other people who should be aware of this PR:
@jeremyk-91
@raiju