Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,24 +79,40 @@ public Coordinator(DistributedStorage storage, ConsensusCommitConfig config) {
keyManipulator = new CoordinatorGroupCommitKeyManipulator();
}

/**
* Gets the coordinator state by ID. If the ID is a full ID for the coordinator group commit, it
* will look up the state using the parent ID and the child ID. Otherwise, it will look up the
* state only by ID.
*
* @param id the ID of the coordinator state
* @return the coordinator state
* @throws CoordinatorException if the coordinator state cannot be retrieved
*/
public Optional<Coordinator.State> getState(String id) throws CoordinatorException {
if (keyManipulator.isFullKey(id)) {
return getStateForGroupCommit(id);
}

Get get = createGetWith(id);
return get(get);
return getStateInternal(id);
}

/**
* Gets the coordinator state for a group commit by ID. It first looks up the state using the
* parent ID and then checks if the child ID is contained in the state. If the child ID is not
* found, it will look up the state using the full ID.
*
* @param fullId the full ID for the coordinator group commit
* @return the coordinator state
* @throws CoordinatorException if the coordinator state cannot be retrieved
*/
@VisibleForTesting
Optional<Coordinator.State> getStateForGroupCommit(String fullId) throws CoordinatorException {
// Scan with the parent ID for a normal group that contains multiple transactions.
Keys<String, String, String> idForGroupCommit = keyManipulator.keysFromFullKey(fullId);

String parentId = idForGroupCommit.parentKey;
String childId = idForGroupCommit.childKey;
Get get = createGetWith(parentId);
Optional<State> state = get(get);
Optional<State> state = getStateByParentId(parentId);
// The current implementation is optimized for cases where most transactions are
// group-committed. It first looks up a transaction state using the parent ID with a single read
// operation. If no matching transaction state is found (i.e., the transaction was delayed and
Expand All @@ -113,7 +129,41 @@ Optional<Coordinator.State> getStateForGroupCommit(String fullId) throws Coordin
return stateContainingTargetTxId;
}

return get(createGetWith(fullId));
return getStateByFullId(fullId);
}

private Optional<Coordinator.State> getStateInternal(String id) throws CoordinatorException {
Get get = createGetWith(id);
return get(get);
}

/**
* Gets the coordinator state by the parent ID for the coordinator group commit. Note: The scope
* of this method has public visibility, but is intended for internal use. Also, the method only
* calls {@link #getStateInternal(String)} with the parent ID, but it exists as a separate method
* for clarifying this specific use case.
*
* @param parentId the parent ID of the coordinator state for the coordinator group commit
* @return the coordinator state
* @throws CoordinatorException if the coordinator state cannot be retrieved
*/
public Optional<Coordinator.State> getStateByParentId(String parentId)
throws CoordinatorException {
return getStateInternal(parentId);
}

/**
* Gets the coordinator state by the full ID for the coordinator group commit. Note: The scope of
* this method has public visibility, but is intended for internal use. Also, the method only
* calls {@link #getStateInternal(String)} with the parent ID, but it exists as a separate method
* for clarifying this specific use case.
*
* @param fullId the parent ID of the coordinator state for the coordinator group commit
* @return the coordinator state
* @throws CoordinatorException if the coordinator state cannot be retrieved
*/
public Optional<Coordinator.State> getStateByFullId(String fullId) throws CoordinatorException {
return getStateInternal(fullId);
}

public void putState(Coordinator.State state) throws CoordinatorException {
Expand Down Expand Up @@ -339,6 +389,10 @@ public State(String id, TransactionState state) {
this(id, state, System.currentTimeMillis());
}

public State(String id, List<String> childIds, TransactionState state) {
this(id, childIds, state, System.currentTimeMillis());
}

@VisibleForTesting
State(String id, List<String> childIds, TransactionState state, long createdAt) {
this.id = checkNotNull(id);
Expand Down Expand Up @@ -374,8 +428,9 @@ public long getCreatedAt() {
return createdAt;
}

@VisibleForTesting
List<String> getChildIds() {
@SuppressFBWarnings("EI_EXPOSE_REP")
@Nonnull
public List<String> getChildIds() {
return childIds;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,73 @@ public void getState_TransactionIdGivenAndExceptionThrownInGet_ShouldThrowCoordi
assertThatThrownBy(() -> coordinator.getState(id)).isInstanceOf(CoordinatorException.class);
}

@Test
public void getStateByParentId_GroupCommitParentIdGiven_ShouldReturnStateUsingItParent()
throws ExecutionException, CoordinatorException {
// Arrange
CoordinatorGroupCommitKeyManipulator keyManipulator =
new CoordinatorGroupCommitKeyManipulator();
String parentId = keyManipulator.generateParentKey();
String childIdsStr =
String.join(
",",
UUID.randomUUID().toString(),
UUID.randomUUID().toString(),
UUID.randomUUID().toString());

Result result = mock(Result.class);
when(result.getValue(Attribute.ID))
.thenReturn(Optional.of(new TextValue(Attribute.ID, parentId)));
when(result.getValue(Attribute.CHILD_IDS))
.thenReturn(Optional.of(new TextValue(Attribute.CHILD_IDS, childIdsStr)));
when(result.getValue(Attribute.STATE))
.thenReturn(Optional.of(new IntValue(Attribute.STATE, TransactionState.ABORTED.get())));
when(result.getValue(Attribute.CREATED_AT))
.thenReturn(Optional.of(new BigIntValue(Attribute.CREATED_AT, ANY_TIME_1)));
when(storage.get(any(Get.class))).thenReturn(Optional.of(result));

// Act
Optional<Coordinator.State> state = coordinator.getStateByParentId(parentId);

// Assert
assertThat(state.get().getId()).isEqualTo(parentId);
assertThat(state.get().getChildIds()).isEqualTo(Arrays.asList(childIdsStr.split(",")));
assertThat(state.get().getChildIdsAsString()).isEqualTo(childIdsStr);
Assertions.assertThat(state.get().getState()).isEqualTo(TransactionState.ABORTED);
assertThat(state.get().getCreatedAt()).isEqualTo(ANY_TIME_1);
}

@Test
public void getStateByFullId_GroupCommitFullIdGiven_ShouldReturnStateUsingItParent()
throws ExecutionException, CoordinatorException {
// Arrange
CoordinatorGroupCommitKeyManipulator keyManipulator =
new CoordinatorGroupCommitKeyManipulator();
String fullId =
keyManipulator.fullKey(keyManipulator.generateParentKey(), UUID.randomUUID().toString());

Result result = mock(Result.class);
when(result.getValue(Attribute.ID))
.thenReturn(Optional.of(new TextValue(Attribute.ID, fullId)));
when(result.getValue(Attribute.CHILD_IDS))
.thenReturn(Optional.of(new TextValue(Attribute.CHILD_IDS, EMPTY_CHILD_IDS)));
when(result.getValue(Attribute.STATE))
.thenReturn(Optional.of(new IntValue(Attribute.STATE, TransactionState.ABORTED.get())));
when(result.getValue(Attribute.CREATED_AT))
.thenReturn(Optional.of(new BigIntValue(Attribute.CREATED_AT, ANY_TIME_1)));
when(storage.get(any(Get.class))).thenReturn(Optional.of(result));

// Act
Optional<Coordinator.State> state = coordinator.getStateByFullId(fullId);

// Assert
assertThat(state.get().getId()).isEqualTo(fullId);
assertThat(state.get().getChildIds()).isEmpty();
assertThat(state.get().getChildIdsAsString()).isEmpty();
Assertions.assertThat(state.get().getState()).isEqualTo(TransactionState.ABORTED);
assertThat(state.get().getCreatedAt()).isEqualTo(ANY_TIME_1);
}

@Test
public void putState_StateGiven_ShouldPutWithCorrectValues()
throws ExecutionException, CoordinatorException {
Expand Down