From b82a87ac93e95d268095d8d591499def68551216 Mon Sep 17 00:00:00 2001 From: Mitsunori Komatsu Date: Thu, 5 Dec 2024 20:58:22 +0900 Subject: [PATCH 1/6] Replace `on(Prepare|Validate)Failure()` with `onFailureBeforeCommit()` --- .../consensuscommit/CommitHandler.java | 16 +++++----------- .../CommitHandlerWithGroupCommit.java | 7 +------ .../consensuscommit/CommitHandlerTest.java | 15 +++++---------- 3 files changed, 11 insertions(+), 27 deletions(-) diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandler.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandler.java index 6a511683c7..37f11d446f 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandler.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandler.java @@ -51,9 +51,7 @@ public CommitHandler( this.parallelExecutor = checkNotNull(parallelExecutor); } - protected void onPrepareFailure(Snapshot snapshot) {} - - protected void onValidateFailure(Snapshot snapshot) {} + protected void onFailureBeforeCommit(Snapshot snapshot) {} private Optional> invokeBeforePreparationSnapshotHook(Snapshot snapshot) throws UnknownTransactionStatusException, CommitException { @@ -67,9 +65,7 @@ private Optional> invokeBeforePreparationSnapshotHook(Snapshot snap } catch (Exception e) { abortState(snapshot.getId()); rollbackRecords(snapshot); - // TODO: This method is actually a part of preparation phase. But the callback method name - // `onPrepareFailure()` should be renamed to more reasonable one. - onPrepareFailure(snapshot); + onFailureBeforeCommit(snapshot); throw new CommitException( CoreError.HANDLING_BEFORE_PREPARATION_SNAPSHOT_HOOK_FAILED.buildMessage(e.getMessage()), e, @@ -89,9 +85,7 @@ private void waitBeforePreparationSnapshotHookFuture( } catch (Exception e) { abortState(snapshot.getId()); rollbackRecords(snapshot); - // TODO: This method is actually a part of validation phase. But the callback method name - // `onValidateFailure()` should be renamed to more reasonable one. - onValidateFailure(snapshot); + onFailureBeforeCommit(snapshot); throw new CommitException( CoreError.HANDLING_BEFORE_PREPARATION_SNAPSHOT_HOOK_FAILED.buildMessage(e.getMessage()), e, @@ -111,7 +105,7 @@ public void commit(Snapshot snapshot) throws CommitException, UnknownTransaction } throw new CommitException(e.getMessage(), e, e.getTransactionId().orElse(null)); } catch (Exception e) { - onPrepareFailure(snapshot); + onFailureBeforeCommit(snapshot); throw e; } @@ -125,7 +119,7 @@ public void commit(Snapshot snapshot) throws CommitException, UnknownTransaction } throw new CommitException(e.getMessage(), e, e.getTransactionId().orElse(null)); } catch (Exception e) { - onValidateFailure(snapshot); + onFailureBeforeCommit(snapshot); throw e; } diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandlerWithGroupCommit.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandlerWithGroupCommit.java index addca7809b..edd2612a5c 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandlerWithGroupCommit.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandlerWithGroupCommit.java @@ -38,12 +38,7 @@ public CommitHandlerWithGroupCommit( } @Override - protected void onPrepareFailure(Snapshot snapshot) { - cancelGroupCommitIfNeeded(snapshot.getId()); - } - - @Override - protected void onValidateFailure(Snapshot snapshot) { + protected void onFailureBeforeCommit(Snapshot snapshot) { cancelGroupCommitIfNeeded(snapshot.getId()); } diff --git a/core/src/test/java/com/scalar/db/transaction/consensuscommit/CommitHandlerTest.java b/core/src/test/java/com/scalar/db/transaction/consensuscommit/CommitHandlerTest.java index fdf84d0c15..9a54af5ab2 100644 --- a/core/src/test/java/com/scalar/db/transaction/consensuscommit/CommitHandlerTest.java +++ b/core/src/test/java/com/scalar/db/transaction/consensuscommit/CommitHandlerTest.java @@ -183,8 +183,7 @@ public void commit_SnapshotWithDifferentPartitionPutsGiven_ShouldCommitRespectiv verify(storage, times(4)).mutate(anyList()); verifyCoordinatorPutState(TransactionState.COMMITTED); verifySnapshotHook(withSnapshotHook, readWriteSets); - verify(handler, never()).onPrepareFailure(any()); - verify(handler, never()).onValidateFailure(any()); + verify(handler, never()).onFailureBeforeCommit(any()); } @ParameterizedTest @@ -206,8 +205,7 @@ public void commit_SnapshotWithSamePartitionPutsGiven_ShouldCommitAtOnce(boolean verify(storage, times(2)).mutate(anyList()); verifyCoordinatorPutState(TransactionState.COMMITTED); verifySnapshotHook(withSnapshotHook, readWriteSets); - verify(handler, never()).onPrepareFailure(any()); - verify(handler, never()).onValidateFailure(any()); + verify(handler, never()).onFailureBeforeCommit(any()); } @Test @@ -687,8 +685,7 @@ public Future handle( // This means `commit()` waited until the callback was completed before throwing // an exception from `commitState()`. assertThat(Duration.between(start, end)).isGreaterThanOrEqualTo(Duration.ofSeconds(2)); - verify(handler, never()).onPrepareFailure(any()); - verify(handler, never()).onValidateFailure(any()); + verify(handler, never()).onFailureBeforeCommit(any()); } @Test @@ -710,8 +707,7 @@ public void commit_FailingSnapshotHookGiven_ShouldThrowCommitException() verify(coordinator, never()) .putState(new Coordinator.State(anyId(), TransactionState.COMMITTED)); verify(handler).rollbackRecords(snapshot); - verify(handler).onPrepareFailure(any()); - verify(handler, never()).onValidateFailure(any()); + verify(handler).onFailureBeforeCommit(any()); } @Test @@ -735,8 +731,7 @@ public void commit_FailingSnapshotHookFutureGiven_ShouldThrowCommitException() verify(coordinator, never()) .putState(new Coordinator.State(anyId(), TransactionState.COMMITTED)); verify(handler).rollbackRecords(snapshot); - verify(handler, never()).onPrepareFailure(any()); - verify(handler).onValidateFailure(snapshot); + verify(handler).onFailureBeforeCommit(snapshot); } protected void doThrowExceptionWhenCoordinatorPutState( From 3da6caee74dbb0528029a1f1031bb8c19ad4663f Mon Sep 17 00:00:00 2001 From: Mitsunori Komatsu Date: Thu, 5 Dec 2024 21:21:05 +0900 Subject: [PATCH 2/6] Make the implementation more consistent --- .../consensuscommit/CommitHandler.java | 6 ++++-- .../consensuscommit/CommitHandlerTest.java | 19 +++++++++++++++++++ 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandler.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandler.java index 37f11d446f..71bf1851a3 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandler.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandler.java @@ -63,9 +63,9 @@ private Optional> invokeBeforePreparationSnapshotHook(Snapshot snap return Optional.of( beforePreparationSnapshotHook.handle(tableMetadataManager, snapshot.getReadWriteSets())); } catch (Exception e) { + onFailureBeforeCommit(snapshot); abortState(snapshot.getId()); rollbackRecords(snapshot); - onFailureBeforeCommit(snapshot); throw new CommitException( CoreError.HANDLING_BEFORE_PREPARATION_SNAPSHOT_HOOK_FAILED.buildMessage(e.getMessage()), e, @@ -83,9 +83,9 @@ private void waitBeforePreparationSnapshotHookFuture( try { snapshotHookFuture.get(); } catch (Exception e) { + onFailureBeforeCommit(snapshot); abortState(snapshot.getId()); rollbackRecords(snapshot); - onFailureBeforeCommit(snapshot); throw new CommitException( CoreError.HANDLING_BEFORE_PREPARATION_SNAPSHOT_HOOK_FAILED.buildMessage(e.getMessage()), e, @@ -98,6 +98,7 @@ public void commit(Snapshot snapshot) throws CommitException, UnknownTransaction try { prepare(snapshot); } catch (PreparationException e) { + onFailureBeforeCommit(snapshot); abortState(snapshot.getId()); rollbackRecords(snapshot); if (e instanceof PreparationConflictException) { @@ -112,6 +113,7 @@ public void commit(Snapshot snapshot) throws CommitException, UnknownTransaction try { validate(snapshot); } catch (ValidationException e) { + onFailureBeforeCommit(snapshot); abortState(snapshot.getId()); rollbackRecords(snapshot); if (e instanceof ValidationConflictException) { diff --git a/core/src/test/java/com/scalar/db/transaction/consensuscommit/CommitHandlerTest.java b/core/src/test/java/com/scalar/db/transaction/consensuscommit/CommitHandlerTest.java index 9a54af5ab2..9327b3e161 100644 --- a/core/src/test/java/com/scalar/db/transaction/consensuscommit/CommitHandlerTest.java +++ b/core/src/test/java/com/scalar/db/transaction/consensuscommit/CommitHandlerTest.java @@ -228,6 +228,8 @@ public void commit_NoMutationExceptionThrownInPrepareRecords_ShouldThrowCCExcept verify(coordinator, never()) .putState(new Coordinator.State(anyId(), TransactionState.COMMITTED)); verify(handler).rollbackRecords(snapshot); + verify(handler).onFailureBeforeCommit(any()); + verify(handler).onFailureBeforeCommit(snapshot); } @Test @@ -250,6 +252,7 @@ public void commit_RetriableExecutionExceptionThrownInPrepareRecords_ShouldThrow verify(coordinator, never()) .putState(new Coordinator.State(anyId(), TransactionState.COMMITTED)); verify(handler).rollbackRecords(snapshot); + verify(handler).onFailureBeforeCommit(snapshot); } @Test @@ -272,6 +275,7 @@ public void commit_ExceptionThrownInPrepareRecords_ShouldAbortAndRollbackRecords verify(coordinator, never()) .putState(new Coordinator.State(anyId(), TransactionState.COMMITTED)); verify(handler).rollbackRecords(snapshot); + verify(handler).onFailureBeforeCommit(snapshot); } @Test @@ -301,6 +305,7 @@ public void commit_ExceptionThrownInPrepareRecords_ShouldAbortAndRollbackRecords .putState(new Coordinator.State(anyId(), TransactionState.COMMITTED)); verify(coordinator).getState(anyId()); verify(handler).rollbackRecords(snapshot); + verify(handler).onFailureBeforeCommit(snapshot); } @Test @@ -328,6 +333,7 @@ public void commit_ExceptionThrownInPrepareRecords_ShouldAbortAndRollbackRecords .putState(new Coordinator.State(anyId(), TransactionState.COMMITTED)); verify(coordinator).getState(anyId()); verify(handler, never()).rollbackRecords(snapshot); + verify(handler).onFailureBeforeCommit(snapshot); } @Test @@ -355,6 +361,7 @@ public void commit_ExceptionThrownInPrepareRecords_ShouldAbortAndRollbackRecords .putState(new Coordinator.State(anyId(), TransactionState.COMMITTED)); verify(coordinator).getState(anyId()); verify(handler, never()).rollbackRecords(snapshot); + verify(handler).onFailureBeforeCommit(snapshot); } @Test @@ -380,6 +387,7 @@ public void commit_ExceptionThrownInPrepareRecords_ShouldAbortAndRollbackRecords verify(coordinator, never()) .putState(new Coordinator.State(anyId(), TransactionState.COMMITTED)); verify(handler, never()).rollbackRecords(snapshot); + verify(handler).onFailureBeforeCommit(snapshot); } @Test @@ -403,6 +411,7 @@ public void commit_ValidationConflictExceptionThrownInValidation_ShouldAbortAndR verify(coordinator, never()) .putState(new Coordinator.State(anyId(), TransactionState.COMMITTED)); verify(handler).rollbackRecords(snapshot); + verify(handler).onFailureBeforeCommit(snapshot); } @Test @@ -426,6 +435,7 @@ public void commit_ExceptionThrownInValidation_ShouldAbortAndRollbackRecords() verify(coordinator, never()) .putState(new Coordinator.State(anyId(), TransactionState.COMMITTED)); verify(handler).rollbackRecords(snapshot); + verify(handler).onFailureBeforeCommit(snapshot); } @Test @@ -456,6 +466,7 @@ public void commit_ExceptionThrownInValidation_ShouldAbortAndRollbackRecords() .putState(new Coordinator.State(anyId(), TransactionState.COMMITTED)); verify(coordinator).getState(anyId()); verify(handler).rollbackRecords(snapshot); + verify(handler).onFailureBeforeCommit(snapshot); } @Test @@ -484,6 +495,7 @@ public void commit_ExceptionThrownInValidation_ShouldAbortAndRollbackRecords() .putState(new Coordinator.State(anyId(), TransactionState.COMMITTED)); verify(coordinator).getState(anyId()); verify(handler, never()).rollbackRecords(snapshot); + verify(handler).onFailureBeforeCommit(snapshot); } @Test @@ -512,6 +524,7 @@ public void commit_ExceptionThrownInValidation_ShouldAbortAndRollbackRecords() .putState(new Coordinator.State(anyId(), TransactionState.COMMITTED)); verify(coordinator).getState(anyId()); verify(handler, never()).rollbackRecords(snapshot); + verify(handler).onFailureBeforeCommit(snapshot); } @Test @@ -538,6 +551,7 @@ public void commit_ExceptionThrownInValidation_ShouldAbortAndRollbackRecords() verify(coordinator, never()) .putState(new Coordinator.State(anyId(), TransactionState.COMMITTED)); verify(handler, never()).rollbackRecords(snapshot); + verify(handler).onFailureBeforeCommit(snapshot); } @Test @@ -562,6 +576,7 @@ public void commit_ExceptionThrownInValidation_ShouldAbortAndRollbackRecords() verifyCoordinatorPutState(TransactionState.COMMITTED); verify(coordinator).getState(anyId()); verify(handler, never()).rollbackRecords(snapshot); + verify(handler, never()).onFailureBeforeCommit(any()); } @Test @@ -585,6 +600,7 @@ public void commit_ExceptionThrownInValidation_ShouldAbortAndRollbackRecords() verifyCoordinatorPutState(TransactionState.COMMITTED); verify(coordinator).getState(anyId()); verify(handler).rollbackRecords(snapshot); + verify(handler, never()).onFailureBeforeCommit(any()); } @Test @@ -607,6 +623,7 @@ public void commit_ExceptionThrownInValidation_ShouldAbortAndRollbackRecords() verifyCoordinatorPutState(TransactionState.COMMITTED); verify(coordinator).getState(anyId()); verify(handler, never()).rollbackRecords(snapshot); + verify(handler, never()).onFailureBeforeCommit(any()); } @Test @@ -629,6 +646,7 @@ public void commit_ExceptionThrownInValidation_ShouldAbortAndRollbackRecords() verifyCoordinatorPutState(TransactionState.COMMITTED); verify(coordinator).getState(anyId()); verify(handler, never()).rollbackRecords(snapshot); + verify(handler, never()).onFailureBeforeCommit(any()); } @Test @@ -647,6 +665,7 @@ public void commit_ExceptionThrownInCoordinatorCommit_ShouldThrowUnknown() verify(storage, times(2)).mutate(anyList()); verifyCoordinatorPutState(TransactionState.COMMITTED); verify(handler, never()).rollbackRecords(snapshot); + verify(handler, never()).onFailureBeforeCommit(any()); } @Test From 74377a377338945cd794d76510fee6818b307279 Mon Sep 17 00:00:00 2001 From: Mitsunori Komatsu Date: Thu, 5 Dec 2024 21:25:24 +0900 Subject: [PATCH 3/6] Ignore exception from cancelGroupCommitIfNeeded() just in case --- .../consensuscommit/CommitHandlerWithGroupCommit.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandlerWithGroupCommit.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandlerWithGroupCommit.java index edd2612a5c..4f47f73b5f 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandlerWithGroupCommit.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandlerWithGroupCommit.java @@ -72,7 +72,12 @@ private void commitStateViaGroupCommit(Snapshot snapshot) } private void cancelGroupCommitIfNeeded(String id) { - groupCommitter.remove(id); + try { + groupCommitter.remove(id); + } catch (Exception e) { + logger.warn( + "Unexpectedly failed to remove the snapshot ID from the group committer. ID: {}", id); + } } @Override From ce830c046668848f6c316691061fce7946f1c872 Mon Sep 17 00:00:00 2001 From: Mitsunori Komatsu Date: Thu, 5 Dec 2024 21:35:01 +0900 Subject: [PATCH 4/6] Add Javadoc comment --- .../db/transaction/consensuscommit/CommitHandler.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandler.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandler.java index 71bf1851a3..8d19fa2444 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandler.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandler.java @@ -51,6 +51,12 @@ public CommitHandler( this.parallelExecutor = checkNotNull(parallelExecutor); } + /** + * A callback invoked when any exception occurs before committing transactions. This method must + * not throw any exception. + * + * @param snapshot the failed snapshot. + */ protected void onFailureBeforeCommit(Snapshot snapshot) {} private Optional> invokeBeforePreparationSnapshotHook(Snapshot snapshot) From 5db61dfe615e7fd80a228798e9efc6804bc9ae51 Mon Sep 17 00:00:00 2001 From: Mitsunori Komatsu Date: Thu, 5 Dec 2024 21:43:13 +0900 Subject: [PATCH 5/6] Remove duplicated verification --- .../scalar/db/transaction/consensuscommit/CommitHandlerTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/test/java/com/scalar/db/transaction/consensuscommit/CommitHandlerTest.java b/core/src/test/java/com/scalar/db/transaction/consensuscommit/CommitHandlerTest.java index 9327b3e161..3136b10c77 100644 --- a/core/src/test/java/com/scalar/db/transaction/consensuscommit/CommitHandlerTest.java +++ b/core/src/test/java/com/scalar/db/transaction/consensuscommit/CommitHandlerTest.java @@ -228,7 +228,6 @@ public void commit_NoMutationExceptionThrownInPrepareRecords_ShouldThrowCCExcept verify(coordinator, never()) .putState(new Coordinator.State(anyId(), TransactionState.COMMITTED)); verify(handler).rollbackRecords(snapshot); - verify(handler).onFailureBeforeCommit(any()); verify(handler).onFailureBeforeCommit(snapshot); } From 782cb03b15b9a1a3b59e78bc3be8fc50c9317193 Mon Sep 17 00:00:00 2001 From: Mitsunori Komatsu Date: Tue, 10 Dec 2024 20:11:13 +0900 Subject: [PATCH 6/6] Catch an exception from a callback safely --- .../consensuscommit/CommitHandler.java | 23 ++++++++++++------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandler.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandler.java index 8d19fa2444..c8d6079a34 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandler.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandler.java @@ -52,13 +52,20 @@ public CommitHandler( } /** - * A callback invoked when any exception occurs before committing transactions. This method must - * not throw any exception. + * A callback invoked when any exception occurs before committing transactions. * * @param snapshot the failed snapshot. */ protected void onFailureBeforeCommit(Snapshot snapshot) {} + private void safelyCallOnFailureBeforeCommit(Snapshot snapshot) { + try { + onFailureBeforeCommit(snapshot); + } catch (Exception e) { + logger.warn("Failed to call the callback. Transaction ID: {}", snapshot.getId(), e); + } + } + private Optional> invokeBeforePreparationSnapshotHook(Snapshot snapshot) throws UnknownTransactionStatusException, CommitException { if (beforePreparationSnapshotHook == null) { @@ -69,7 +76,7 @@ private Optional> invokeBeforePreparationSnapshotHook(Snapshot snap return Optional.of( beforePreparationSnapshotHook.handle(tableMetadataManager, snapshot.getReadWriteSets())); } catch (Exception e) { - onFailureBeforeCommit(snapshot); + safelyCallOnFailureBeforeCommit(snapshot); abortState(snapshot.getId()); rollbackRecords(snapshot); throw new CommitException( @@ -89,7 +96,7 @@ private void waitBeforePreparationSnapshotHookFuture( try { snapshotHookFuture.get(); } catch (Exception e) { - onFailureBeforeCommit(snapshot); + safelyCallOnFailureBeforeCommit(snapshot); abortState(snapshot.getId()); rollbackRecords(snapshot); throw new CommitException( @@ -104,7 +111,7 @@ public void commit(Snapshot snapshot) throws CommitException, UnknownTransaction try { prepare(snapshot); } catch (PreparationException e) { - onFailureBeforeCommit(snapshot); + safelyCallOnFailureBeforeCommit(snapshot); abortState(snapshot.getId()); rollbackRecords(snapshot); if (e instanceof PreparationConflictException) { @@ -112,14 +119,14 @@ public void commit(Snapshot snapshot) throws CommitException, UnknownTransaction } throw new CommitException(e.getMessage(), e, e.getTransactionId().orElse(null)); } catch (Exception e) { - onFailureBeforeCommit(snapshot); + safelyCallOnFailureBeforeCommit(snapshot); throw e; } try { validate(snapshot); } catch (ValidationException e) { - onFailureBeforeCommit(snapshot); + safelyCallOnFailureBeforeCommit(snapshot); abortState(snapshot.getId()); rollbackRecords(snapshot); if (e instanceof ValidationConflictException) { @@ -127,7 +134,7 @@ public void commit(Snapshot snapshot) throws CommitException, UnknownTransaction } throw new CommitException(e.getMessage(), e, e.getTransactionId().orElse(null)); } catch (Exception e) { - onFailureBeforeCommit(snapshot); + safelyCallOnFailureBeforeCommit(snapshot); throw e; }