Skip to content

Commit

Permalink
Refactoring the callbacks for the group commit error handling in `Com…
Browse files Browse the repository at this point in the history
…mitHandler` (#2390)
  • Loading branch information
komamitsu committed Dec 16, 2024
1 parent 5a6d163 commit e5e05bb
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,20 @@ public CommitHandler(
this.parallelExecutor = checkNotNull(parallelExecutor);
}

protected void onPrepareFailure(Snapshot snapshot) {}
/**
* A callback invoked when any exception occurs before committing transactions.
*
* @param snapshot the failed snapshot.
*/
protected void onFailureBeforeCommit(Snapshot snapshot) {}

protected void onValidateFailure(Snapshot snapshot) {}
private void safelyCallOnFailureBeforeCommit(Snapshot snapshot) {
try {
onFailureBeforeCommit(snapshot);
} catch (Exception e) {
logger.warn("Failed to call the callback. Transaction ID: {}", snapshot.getId(), e);
}
}

private Optional<Future<Void>> invokeBeforePreparationSnapshotHook(Snapshot snapshot)
throws UnknownTransactionStatusException, CommitException {
Expand All @@ -65,11 +76,9 @@ private Optional<Future<Void>> invokeBeforePreparationSnapshotHook(Snapshot snap
return Optional.of(
beforePreparationSnapshotHook.handle(tableMetadataManager, snapshot.getReadWriteSets()));
} catch (Exception e) {
safelyCallOnFailureBeforeCommit(snapshot);
abortState(snapshot.getId());
rollbackRecords(snapshot);
// TODO: This method is actually a part of preparation phase. But the callback method name
// `onPrepareFailure()` should be renamed to more reasonable one.
onPrepareFailure(snapshot);
throw new CommitException(
CoreError.HANDLING_BEFORE_PREPARATION_SNAPSHOT_HOOK_FAILED.buildMessage(e.getMessage()),
e,
Expand All @@ -87,11 +96,9 @@ private void waitBeforePreparationSnapshotHookFuture(
try {
snapshotHookFuture.get();
} catch (Exception e) {
safelyCallOnFailureBeforeCommit(snapshot);
abortState(snapshot.getId());
rollbackRecords(snapshot);
// TODO: This method is actually a part of validation phase. But the callback method name
// `onValidateFailure()` should be renamed to more reasonable one.
onValidateFailure(snapshot);
throw new CommitException(
CoreError.HANDLING_BEFORE_PREPARATION_SNAPSHOT_HOOK_FAILED.buildMessage(e.getMessage()),
e,
Expand All @@ -104,28 +111,30 @@ public void commit(Snapshot snapshot) throws CommitException, UnknownTransaction
try {
prepare(snapshot);
} catch (PreparationException e) {
safelyCallOnFailureBeforeCommit(snapshot);
abortState(snapshot.getId());
rollbackRecords(snapshot);
if (e instanceof PreparationConflictException) {
throw new CommitConflictException(e.getMessage(), e, e.getTransactionId().orElse(null));
}
throw new CommitException(e.getMessage(), e, e.getTransactionId().orElse(null));
} catch (Exception e) {
onPrepareFailure(snapshot);
safelyCallOnFailureBeforeCommit(snapshot);
throw e;
}

try {
validate(snapshot);
} catch (ValidationException e) {
safelyCallOnFailureBeforeCommit(snapshot);
abortState(snapshot.getId());
rollbackRecords(snapshot);
if (e instanceof ValidationConflictException) {
throw new CommitConflictException(e.getMessage(), e, e.getTransactionId().orElse(null));
}
throw new CommitException(e.getMessage(), e, e.getTransactionId().orElse(null));
} catch (Exception e) {
onValidateFailure(snapshot);
safelyCallOnFailureBeforeCommit(snapshot);
throw e;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,7 @@ public CommitHandlerWithGroupCommit(
}

@Override
protected void onPrepareFailure(Snapshot snapshot) {
cancelGroupCommitIfNeeded(snapshot.getId());
}

@Override
protected void onValidateFailure(Snapshot snapshot) {
protected void onFailureBeforeCommit(Snapshot snapshot) {
cancelGroupCommitIfNeeded(snapshot.getId());
}

Expand Down Expand Up @@ -77,7 +72,12 @@ private void commitStateViaGroupCommit(Snapshot snapshot)
}

private void cancelGroupCommitIfNeeded(String id) {
groupCommitter.remove(id);
try {
groupCommitter.remove(id);
} catch (Exception e) {
logger.warn(
"Unexpectedly failed to remove the snapshot ID from the group committer. ID: {}", id);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,7 @@ public void commit_SnapshotWithDifferentPartitionPutsGiven_ShouldCommitRespectiv
verify(storage, times(4)).mutate(anyList());
verifyCoordinatorPutState(TransactionState.COMMITTED);
verifySnapshotHook(withSnapshotHook, readWriteSets);
verify(handler, never()).onPrepareFailure(any());
verify(handler, never()).onValidateFailure(any());
verify(handler, never()).onFailureBeforeCommit(any());
}

@ParameterizedTest
Expand All @@ -206,8 +205,7 @@ public void commit_SnapshotWithSamePartitionPutsGiven_ShouldCommitAtOnce(boolean
verify(storage, times(2)).mutate(anyList());
verifyCoordinatorPutState(TransactionState.COMMITTED);
verifySnapshotHook(withSnapshotHook, readWriteSets);
verify(handler, never()).onPrepareFailure(any());
verify(handler, never()).onValidateFailure(any());
verify(handler, never()).onFailureBeforeCommit(any());
}

@Test
Expand All @@ -230,6 +228,7 @@ public void commit_NoMutationExceptionThrownInPrepareRecords_ShouldThrowCCExcept
verify(coordinator, never())
.putState(new Coordinator.State(anyId(), TransactionState.COMMITTED));
verify(handler).rollbackRecords(snapshot);
verify(handler).onFailureBeforeCommit(snapshot);
}

@Test
Expand All @@ -252,6 +251,7 @@ public void commit_RetriableExecutionExceptionThrownInPrepareRecords_ShouldThrow
verify(coordinator, never())
.putState(new Coordinator.State(anyId(), TransactionState.COMMITTED));
verify(handler).rollbackRecords(snapshot);
verify(handler).onFailureBeforeCommit(snapshot);
}

@Test
Expand All @@ -274,6 +274,7 @@ public void commit_ExceptionThrownInPrepareRecords_ShouldAbortAndRollbackRecords
verify(coordinator, never())
.putState(new Coordinator.State(anyId(), TransactionState.COMMITTED));
verify(handler).rollbackRecords(snapshot);
verify(handler).onFailureBeforeCommit(snapshot);
}

@Test
Expand Down Expand Up @@ -303,6 +304,7 @@ public void commit_ExceptionThrownInPrepareRecords_ShouldAbortAndRollbackRecords
.putState(new Coordinator.State(anyId(), TransactionState.COMMITTED));
verify(coordinator).getState(anyId());
verify(handler).rollbackRecords(snapshot);
verify(handler).onFailureBeforeCommit(snapshot);
}

@Test
Expand Down Expand Up @@ -330,6 +332,7 @@ public void commit_ExceptionThrownInPrepareRecords_ShouldAbortAndRollbackRecords
.putState(new Coordinator.State(anyId(), TransactionState.COMMITTED));
verify(coordinator).getState(anyId());
verify(handler, never()).rollbackRecords(snapshot);
verify(handler).onFailureBeforeCommit(snapshot);
}

@Test
Expand Down Expand Up @@ -357,6 +360,7 @@ public void commit_ExceptionThrownInPrepareRecords_ShouldAbortAndRollbackRecords
.putState(new Coordinator.State(anyId(), TransactionState.COMMITTED));
verify(coordinator).getState(anyId());
verify(handler, never()).rollbackRecords(snapshot);
verify(handler).onFailureBeforeCommit(snapshot);
}

@Test
Expand All @@ -382,6 +386,7 @@ public void commit_ExceptionThrownInPrepareRecords_ShouldAbortAndRollbackRecords
verify(coordinator, never())
.putState(new Coordinator.State(anyId(), TransactionState.COMMITTED));
verify(handler, never()).rollbackRecords(snapshot);
verify(handler).onFailureBeforeCommit(snapshot);
}

@Test
Expand All @@ -405,6 +410,7 @@ public void commit_ValidationConflictExceptionThrownInValidation_ShouldAbortAndR
verify(coordinator, never())
.putState(new Coordinator.State(anyId(), TransactionState.COMMITTED));
verify(handler).rollbackRecords(snapshot);
verify(handler).onFailureBeforeCommit(snapshot);
}

@Test
Expand All @@ -428,6 +434,7 @@ public void commit_ExceptionThrownInValidation_ShouldAbortAndRollbackRecords()
verify(coordinator, never())
.putState(new Coordinator.State(anyId(), TransactionState.COMMITTED));
verify(handler).rollbackRecords(snapshot);
verify(handler).onFailureBeforeCommit(snapshot);
}

@Test
Expand Down Expand Up @@ -458,6 +465,7 @@ public void commit_ExceptionThrownInValidation_ShouldAbortAndRollbackRecords()
.putState(new Coordinator.State(anyId(), TransactionState.COMMITTED));
verify(coordinator).getState(anyId());
verify(handler).rollbackRecords(snapshot);
verify(handler).onFailureBeforeCommit(snapshot);
}

@Test
Expand Down Expand Up @@ -486,6 +494,7 @@ public void commit_ExceptionThrownInValidation_ShouldAbortAndRollbackRecords()
.putState(new Coordinator.State(anyId(), TransactionState.COMMITTED));
verify(coordinator).getState(anyId());
verify(handler, never()).rollbackRecords(snapshot);
verify(handler).onFailureBeforeCommit(snapshot);
}

@Test
Expand Down Expand Up @@ -514,6 +523,7 @@ public void commit_ExceptionThrownInValidation_ShouldAbortAndRollbackRecords()
.putState(new Coordinator.State(anyId(), TransactionState.COMMITTED));
verify(coordinator).getState(anyId());
verify(handler, never()).rollbackRecords(snapshot);
verify(handler).onFailureBeforeCommit(snapshot);
}

@Test
Expand All @@ -540,6 +550,7 @@ public void commit_ExceptionThrownInValidation_ShouldAbortAndRollbackRecords()
verify(coordinator, never())
.putState(new Coordinator.State(anyId(), TransactionState.COMMITTED));
verify(handler, never()).rollbackRecords(snapshot);
verify(handler).onFailureBeforeCommit(snapshot);
}

@Test
Expand All @@ -564,6 +575,7 @@ public void commit_ExceptionThrownInValidation_ShouldAbortAndRollbackRecords()
verifyCoordinatorPutState(TransactionState.COMMITTED);
verify(coordinator).getState(anyId());
verify(handler, never()).rollbackRecords(snapshot);
verify(handler, never()).onFailureBeforeCommit(any());
}

@Test
Expand All @@ -587,6 +599,7 @@ public void commit_ExceptionThrownInValidation_ShouldAbortAndRollbackRecords()
verifyCoordinatorPutState(TransactionState.COMMITTED);
verify(coordinator).getState(anyId());
verify(handler).rollbackRecords(snapshot);
verify(handler, never()).onFailureBeforeCommit(any());
}

@Test
Expand All @@ -609,6 +622,7 @@ public void commit_ExceptionThrownInValidation_ShouldAbortAndRollbackRecords()
verifyCoordinatorPutState(TransactionState.COMMITTED);
verify(coordinator).getState(anyId());
verify(handler, never()).rollbackRecords(snapshot);
verify(handler, never()).onFailureBeforeCommit(any());
}

@Test
Expand All @@ -631,6 +645,7 @@ public void commit_ExceptionThrownInValidation_ShouldAbortAndRollbackRecords()
verifyCoordinatorPutState(TransactionState.COMMITTED);
verify(coordinator).getState(anyId());
verify(handler, never()).rollbackRecords(snapshot);
verify(handler, never()).onFailureBeforeCommit(any());
}

@Test
Expand All @@ -649,6 +664,7 @@ public void commit_ExceptionThrownInCoordinatorCommit_ShouldThrowUnknown()
verify(storage, times(2)).mutate(anyList());
verifyCoordinatorPutState(TransactionState.COMMITTED);
verify(handler, never()).rollbackRecords(snapshot);
verify(handler, never()).onFailureBeforeCommit(any());
}

@Test
Expand Down Expand Up @@ -687,8 +703,7 @@ public Future<Void> handle(
// This means `commit()` waited until the callback was completed before throwing
// an exception from `commitState()`.
assertThat(Duration.between(start, end)).isGreaterThanOrEqualTo(Duration.ofSeconds(2));
verify(handler, never()).onPrepareFailure(any());
verify(handler, never()).onValidateFailure(any());
verify(handler, never()).onFailureBeforeCommit(any());
}

@Test
Expand All @@ -710,8 +725,7 @@ public void commit_FailingSnapshotHookGiven_ShouldThrowCommitException()
verify(coordinator, never())
.putState(new Coordinator.State(anyId(), TransactionState.COMMITTED));
verify(handler).rollbackRecords(snapshot);
verify(handler).onPrepareFailure(any());
verify(handler, never()).onValidateFailure(any());
verify(handler).onFailureBeforeCommit(any());
}

@Test
Expand All @@ -735,8 +749,7 @@ public void commit_FailingSnapshotHookFutureGiven_ShouldThrowCommitException()
verify(coordinator, never())
.putState(new Coordinator.State(anyId(), TransactionState.COMMITTED));
verify(handler).rollbackRecords(snapshot);
verify(handler, never()).onPrepareFailure(any());
verify(handler).onValidateFailure(snapshot);
verify(handler).onFailureBeforeCommit(snapshot);
}

protected void doThrowExceptionWhenCoordinatorPutState(
Expand Down

0 comments on commit e5e05bb

Please sign in to comment.