Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix a corner case issue that causes inconsistent Coordinator states when lazy recovery happens before group commit #2135

Merged
merged 15 commits into from
Sep 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,106 @@ void putStateForGroupCommit(
put(put);
}

public void putStateForLazyRecoveryRollback(String id) throws CoordinatorException {
if (keyManipulator.isFullKey(id)) {
putStateForLazyRecoveryRollbackForGroupCommit(id);
return;
}

putState(new Coordinator.State(id, TransactionState.ABORTED));
}

private void putStateForLazyRecoveryRollbackForGroupCommit(String id)
throws CoordinatorException {
// Lazy recoveries don't know which the transaction that created the PREPARE record is using, a
// parent ID or a full ID as `tx_id` partition key.
//
// Case a) If a transaction becomes "ready for commit" in time, it'll be committed in a group
// with `tx_id: <parent tx ID>`.
// Case b) If a transaction is delayed, it'll be committed in an isolated group with a full ID
// as `tx_id: <full tx ID>`.
//
// If lazy recoveries only insert a record with `tx_id: <full tx ID>` to abort the transaction,
// it will not conflict with the group commit using `tx_id: <parent tx ID>` in case #a.
// Therefore, lazy recoveries first need to insert a record with `tx_id: <parent tx ID>` and
// empty `tx_child_ids` to the Coordinator table. We'll call this insertion
// `lazy-recovery-abort-with-parent-id`. This record is intended to conflict with a potential
// group commit considering case#1, even though it doesn't help in finding the coordinator state
// since `tx_child_ids` is empty.
//
// Once the record insertion with `tx_id: <parent tx ID>` succeeds, the lazy recovery will
// insert another record with `tx_id: <full tx ID>`. We'll call this insertion
// `lazy-recovery-abort-with-full-id`. This record insertion is needed to conflict with a
// potential delayed group commit that has `tx_id: <full tx ID>` in case #b, and indicates the
// transaction is aborted.
//
// Let's walk through all the cases.
//
// A. The original commit with `tx_id: <parent tx ID>` succeeds in case #a, and then lazy
// recovery happens
// - The original commit with `tx_id: <parent tx ID>` succeeds
// - `lazy-recovery-abort-with-parent-id` fails
// - The transaction is treated as committed since the commit's `tx_child_ids` contains the
// transaction child ID
//
// B. The original commit with `tx_id: <parent tx ID>` is in-progress in case #a, and lazy
// recovery happens first
// - `lazy-recovery-abort-with-parent-id` succeeds
// - The original commit with `tx_id: <parent tx ID>` fails
// - (If the lazy recovery crashes here, another lazy recovery will insert the below
// `lazy-recovery-abort-with-full-id` later)
// - `lazy-recovery-abort-with-full-id` succeeds
// - The transaction is treated as aborted because of `lazy-recovery-abort-with-full-id`
//
// C. The original commit with `tx_id: <full tx ID>` is done in case #b, and then lazy recovery
// happens
// - The original commit with `tx_id: <full tx ID>` succeeds
// - `lazy-recovery-abort-with-parent-id` succeeds
// - `lazy-recovery-abort-with-full-id` fails
// - The transaction is treated as committed since the commit `tx_id` is the transaction full
// ID
//
// D. The original commit with `tx_id: <full tx ID>` is in-progress in case #b, and lazy
// recovery happens first
// - `lazy-recovery-abort-with-parent-id` succeeds
// - (If the lazy recovery crashes here and the original commit happens, the situation will be
// the same as C)
// - `lazy-recovery-abort-with-full-id` succeeds
// - The original commit with `tx_id: <full tx ID>` fails
// - The transaction is treated as aborted because of `lazy-recovery-abort-with-full-id`
Keys<String, String, String> keys = keyManipulator.keysFromFullKey(id);
try {
// This record is to prevent a group commit that has the same parent ID considering case #a
// regardless if the transaction is actually in a group commit (case #a) or a delayed commit
// (case #b).
putStateForGroupCommit(
keys.parentKey,
Collections.emptyList(),
TransactionState.ABORTED,
System.currentTimeMillis());
} catch (CoordinatorConflictException e) {
// The group commit finished already, although there may be ongoing delayed groups.

// If the group commit contains the transaction, follow the state.
// Otherwise, continue to insert a record with the full ID.
Optional<State> optState = getState(keys.parentKey);
if (!optState.isPresent()) {
throw new AssertionError();
}
State state = optState.get();
if (state.getChildIds().contains(keys.childKey)) {
if (state.getState() == TransactionState.ABORTED) {
return;
} else {
// Conflicted.
throw e;
}
}
}
// This record is to intend the transaction is aborted.
putState(new Coordinator.State(id, TransactionState.ABORTED));
}

private Get createGetWith(String id) {
return new Get(new Key(Attribute.toIdValue(id)))
.withConsistency(Consistency.LINEARIZABLE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ private void abortIfExpired(Selection selection, TransactionResult result) {
}

try {
coordinator.putState(new Coordinator.State(result.getId(), TransactionState.ABORTED));
coordinator.putStateForLazyRecoveryRollback(result.getId());
rollbackRecord(selection, result);
} catch (CoordinatorException e) {
logger.warn("Coordinator tries to abort {}, but failed", result.getId(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,15 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -672,4 +677,185 @@ public void putStateForGroupCommit_FullIdGiven_ShouldThrowAssertionError(
parentId, fullIds, transactionState, current))
.isInstanceOf(IllegalArgumentException.class);
}

@Test
void putStateForLazyRecoveryRollback_NormalIdGiven_ShouldCallPutState()
throws CoordinatorException {
// Arrange
Coordinator spiedCoordinator = spy(coordinator);

// Act
spiedCoordinator.putStateForLazyRecoveryRollback(ANY_ID_1);

// Assert
verify(spiedCoordinator).putState(new State(ANY_ID_1, TransactionState.ABORTED));
}

@Test
void
putStateForLazyRecoveryRollback_FullIdGivenWhenTransactionIsInGroupCommitWhenGroupCommitIsNotCommitted_ShouldInsertTwoRecordsWithParentIdAndFullId()
throws CoordinatorException {
// Arrange
Coordinator spiedCoordinator = spy(coordinator);
CoordinatorGroupCommitKeyManipulator keyManipulator =
new CoordinatorGroupCommitKeyManipulator();
String parentId = keyManipulator.generateParentKey();
String fullId = keyManipulator.fullKey(parentId, ANY_ID_1);

// Act
spiedCoordinator.putStateForLazyRecoveryRollback(fullId);

// Assert
verify(spiedCoordinator)
.putStateForGroupCommit(
eq(parentId), eq(Collections.emptyList()), eq(TransactionState.ABORTED), anyLong());
verify(spiedCoordinator).putState(new State(fullId, TransactionState.ABORTED));
}

@Test
void
putStateForLazyRecoveryRollback_FullIdGivenWhenTransactionIsInGroupCommitWhenGroupCommitIsCommitted_ShouldThrowCoordinatorConflictException()
throws CoordinatorException {
// Arrange
Coordinator spiedCoordinator = spy(coordinator);
CoordinatorGroupCommitKeyManipulator keyManipulator =
new CoordinatorGroupCommitKeyManipulator();
String parentId = keyManipulator.generateParentKey();
String fullId = keyManipulator.fullKey(parentId, ANY_ID_1);

doThrow(CoordinatorConflictException.class)
.when(spiedCoordinator)
.putStateForGroupCommit(anyString(), anyList(), any(), anyLong());
doReturn(
Optional.of(
new State(
parentId,
Collections.singletonList(ANY_ID_1),
TransactionState.COMMITTED,
System.currentTimeMillis())))
.when(spiedCoordinator)
.getState(parentId);

// Act
assertThatThrownBy(() -> spiedCoordinator.putStateForLazyRecoveryRollback(fullId))
.isInstanceOf(CoordinatorConflictException.class);

// Assert
verify(spiedCoordinator)
.putStateForGroupCommit(
eq(parentId), eq(Collections.emptyList()), eq(TransactionState.ABORTED), anyLong());
verify(spiedCoordinator, never()).putState(new State(fullId, TransactionState.ABORTED));
}

@Test
void
putStateForLazyRecoveryRollback_FullIdGivenWhenTransactionIsInGroupCommitWhenGroupCommitIsAbort_ShouldDoNothing()
throws CoordinatorException {
// Arrange
Coordinator spiedCoordinator = spy(coordinator);
CoordinatorGroupCommitKeyManipulator keyManipulator =
new CoordinatorGroupCommitKeyManipulator();
String parentId = keyManipulator.generateParentKey();
String fullId = keyManipulator.fullKey(parentId, ANY_ID_1);

doThrow(CoordinatorConflictException.class)
.when(spiedCoordinator)
.putStateForGroupCommit(anyString(), anyList(), any(), anyLong());
doReturn(
Optional.of(
new State(
parentId,
Collections.singletonList(ANY_ID_1),
TransactionState.ABORTED,
System.currentTimeMillis())))
.when(spiedCoordinator)
.getState(parentId);

// Act
spiedCoordinator.putStateForLazyRecoveryRollback(fullId);

// Assert
verify(spiedCoordinator)
.putStateForGroupCommit(
eq(parentId), eq(Collections.emptyList()), eq(TransactionState.ABORTED), anyLong());
verify(spiedCoordinator, never()).putState(new State(fullId, TransactionState.ABORTED));
}

@ParameterizedTest
@EnumSource(
value = TransactionState.class,
names = {"COMMITTED", "ABORTED"})
void
putStateForLazyRecoveryRollback_FullIdGivenWhenTransactionIsInDelayedGroupCommitWhenGroupCommitFinished_ShouldInsertRecordWithFullId(
TransactionState transactionState) throws CoordinatorException {
// Arrange
Coordinator spiedCoordinator = spy(coordinator);
CoordinatorGroupCommitKeyManipulator keyManipulator =
new CoordinatorGroupCommitKeyManipulator();
String parentId = keyManipulator.generateParentKey();
String fullId = keyManipulator.fullKey(parentId, ANY_ID_1);

doThrow(CoordinatorConflictException.class)
.when(spiedCoordinator)
.putStateForGroupCommit(anyString(), anyList(), any(), anyLong());
doReturn(
Optional.of(
new State(
parentId,
Collections.singletonList("other-id"),
transactionState,
System.currentTimeMillis())))
.when(spiedCoordinator)
.getState(parentId);

// Act
spiedCoordinator.putStateForLazyRecoveryRollback(fullId);

// Assert
verify(spiedCoordinator)
.putStateForGroupCommit(
eq(parentId), eq(Collections.emptyList()), eq(TransactionState.ABORTED), anyLong());
verify(spiedCoordinator).putState(new State(fullId, TransactionState.ABORTED));
}

@ParameterizedTest
@EnumSource(
value = TransactionState.class,
names = {"COMMITTED", "ABORTED"})
void
putStateForLazyRecoveryRollback_FullIdGivenWhenTransactionIsInDelayedGroupCommitWhenGroupCommitAndDelayedGroupCommitFinished_ShouldCoordinatorConflictException(
TransactionState transactionState) throws CoordinatorException {
// Arrange
Coordinator spiedCoordinator = spy(coordinator);
CoordinatorGroupCommitKeyManipulator keyManipulator =
new CoordinatorGroupCommitKeyManipulator();
String parentId = keyManipulator.generateParentKey();
String fullId = keyManipulator.fullKey(parentId, ANY_ID_1);

doThrow(CoordinatorConflictException.class)
.when(spiedCoordinator)
.putStateForGroupCommit(anyString(), anyList(), any(), anyLong());
doReturn(
Optional.of(
new State(
parentId,
Collections.singletonList("other-id"),
transactionState,
System.currentTimeMillis())))
.when(spiedCoordinator)
.getState(parentId);
doThrow(CoordinatorConflictException.class)
.when(spiedCoordinator)
.putState(new State(fullId, TransactionState.ABORTED));

// Act
assertThatThrownBy(() -> spiedCoordinator.putStateForLazyRecoveryRollback(fullId))
.isInstanceOf(CoordinatorConflictException.class);

// Assert
verify(spiedCoordinator)
.putStateForGroupCommit(
eq(parentId), eq(Collections.emptyList()), eq(TransactionState.ABORTED), anyLong());
verify(spiedCoordinator).putState(new State(fullId, TransactionState.ABORTED));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public void recover_SelectionAndResultGivenWhenCoordinatorStateNotExistsAndExpir
handler.recover(selection, result);

// Assert
verify(coordinator).putState(new Coordinator.State(ANY_ID_1, TransactionState.ABORTED));
verify(coordinator).putStateForLazyRecoveryRollback(ANY_ID_1);
verify(handler).rollbackRecord(selection, result);
}
}
Loading
Loading