Skip to content

Commit

Permalink
Merge branch 'master' into enable-test-skipped-for-yugabytedb
Browse files Browse the repository at this point in the history
  • Loading branch information
feeblefakie authored Dec 16, 2024
2 parents 48cd2df + 185ca4a commit df13048
Show file tree
Hide file tree
Showing 19 changed files with 186 additions and 148 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,20 @@ public CommitHandler(
this.parallelExecutor = checkNotNull(parallelExecutor);
}

protected void onPrepareFailure(Snapshot snapshot) {}
/**
* A callback invoked when any exception occurs before committing transactions.
*
* @param snapshot the failed snapshot.
*/
protected void onFailureBeforeCommit(Snapshot snapshot) {}

protected void onValidateFailure(Snapshot snapshot) {}
private void safelyCallOnFailureBeforeCommit(Snapshot snapshot) {
try {
onFailureBeforeCommit(snapshot);
} catch (Exception e) {
logger.warn("Failed to call the callback. Transaction ID: {}", snapshot.getId(), e);
}
}

private Optional<Future<Void>> invokeBeforePreparationSnapshotHook(Snapshot snapshot)
throws UnknownTransactionStatusException, CommitException {
Expand All @@ -65,11 +76,9 @@ private Optional<Future<Void>> invokeBeforePreparationSnapshotHook(Snapshot snap
return Optional.of(
beforePreparationSnapshotHook.handle(tableMetadataManager, snapshot.getReadWriteSets()));
} catch (Exception e) {
safelyCallOnFailureBeforeCommit(snapshot);
abortState(snapshot.getId());
rollbackRecords(snapshot);
// TODO: This method is actually a part of preparation phase. But the callback method name
// `onPrepareFailure()` should be renamed to more reasonable one.
onPrepareFailure(snapshot);
throw new CommitException(
CoreError.HANDLING_BEFORE_PREPARATION_SNAPSHOT_HOOK_FAILED.buildMessage(e.getMessage()),
e,
Expand All @@ -87,11 +96,9 @@ private void waitBeforePreparationSnapshotHookFuture(
try {
snapshotHookFuture.get();
} catch (Exception e) {
safelyCallOnFailureBeforeCommit(snapshot);
abortState(snapshot.getId());
rollbackRecords(snapshot);
// TODO: This method is actually a part of validation phase. But the callback method name
// `onValidateFailure()` should be renamed to more reasonable one.
onValidateFailure(snapshot);
throw new CommitException(
CoreError.HANDLING_BEFORE_PREPARATION_SNAPSHOT_HOOK_FAILED.buildMessage(e.getMessage()),
e,
Expand All @@ -104,28 +111,30 @@ public void commit(Snapshot snapshot) throws CommitException, UnknownTransaction
try {
prepare(snapshot);
} catch (PreparationException e) {
safelyCallOnFailureBeforeCommit(snapshot);
abortState(snapshot.getId());
rollbackRecords(snapshot);
if (e instanceof PreparationConflictException) {
throw new CommitConflictException(e.getMessage(), e, e.getTransactionId().orElse(null));
}
throw new CommitException(e.getMessage(), e, e.getTransactionId().orElse(null));
} catch (Exception e) {
onPrepareFailure(snapshot);
safelyCallOnFailureBeforeCommit(snapshot);
throw e;
}

try {
validate(snapshot);
} catch (ValidationException e) {
safelyCallOnFailureBeforeCommit(snapshot);
abortState(snapshot.getId());
rollbackRecords(snapshot);
if (e instanceof ValidationConflictException) {
throw new CommitConflictException(e.getMessage(), e, e.getTransactionId().orElse(null));
}
throw new CommitException(e.getMessage(), e, e.getTransactionId().orElse(null));
} catch (Exception e) {
onValidateFailure(snapshot);
safelyCallOnFailureBeforeCommit(snapshot);
throw e;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,7 @@ public CommitHandlerWithGroupCommit(
}

@Override
protected void onPrepareFailure(Snapshot snapshot) {
cancelGroupCommitIfNeeded(snapshot.getId());
}

@Override
protected void onValidateFailure(Snapshot snapshot) {
protected void onFailureBeforeCommit(Snapshot snapshot) {
cancelGroupCommitIfNeeded(snapshot.getId());
}

Expand Down Expand Up @@ -77,7 +72,12 @@ private void commitStateViaGroupCommit(Snapshot snapshot)
}

private void cancelGroupCommitIfNeeded(String id) {
groupCommitter.remove(id);
try {
groupCommitter.remove(id);
} catch (Exception e) {
logger.warn(
"Unexpectedly failed to remove the snapshot ID from the group committer. ID: {}", id);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import com.scalar.db.io.Key;
import com.scalar.db.io.Value;
import com.scalar.db.transaction.consensuscommit.CoordinatorGroupCommitter.CoordinatorGroupCommitKeyManipulator;
import com.scalar.db.util.groupcommit.KeyManipulator.Keys;
import com.scalar.db.util.groupcommit.GroupCommitKeyManipulator.Keys;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.ArrayList;
import java.util.Collections;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package com.scalar.db.transaction.consensuscommit;

import com.scalar.db.util.groupcommit.DefaultGroupCommitKeyManipulator;
import com.scalar.db.util.groupcommit.GroupCommitConfig;
import com.scalar.db.util.groupcommit.GroupCommitter;
import com.scalar.db.util.groupcommit.KeyManipulator;
import java.util.Optional;
import java.util.concurrent.ThreadLocalRandom;

public class CoordinatorGroupCommitter
extends GroupCommitter<String, String, String, String, String, Snapshot> {
Expand Down Expand Up @@ -35,93 +34,7 @@ public static boolean isEnabled(ConsensusCommitConfig config) {
return config.isCoordinatorGroupCommitEnabled();
}

// The behavior of this class is completely the same as the parent class for now.
public static class CoordinatorGroupCommitKeyManipulator
implements KeyManipulator<String, String, String, String, String> {
private static final int PRIMARY_KEY_SIZE = 24;
private static final char DELIMITER = '$';
private static final int MAX_FULL_KEY_SIZE = 64;
private static final int MAX_CHILD_KEY_SIZE =
MAX_FULL_KEY_SIZE - PRIMARY_KEY_SIZE - 1 /* delimiter */;
private static final char[] CHARS_FOR_PRIMARY_KEY;
private static final int CHARS_FOR_PRIMARY_KEY_SIZE;

static {
int digitsLen = '9' - '0' + 1;
int upperCasesLen = 'Z' - 'A' + 1;
int lowerCasesLen = 'z' - 'a' + 1;
CHARS_FOR_PRIMARY_KEY = new char[digitsLen + upperCasesLen + lowerCasesLen];

int index = 0;
for (char c = '0'; c <= '9'; c++) {
CHARS_FOR_PRIMARY_KEY[index++] = c;
}
for (char c = 'A'; c <= 'Z'; c++) {
CHARS_FOR_PRIMARY_KEY[index++] = c;
}
for (char c = 'a'; c <= 'z'; c++) {
CHARS_FOR_PRIMARY_KEY[index++] = c;
}

CHARS_FOR_PRIMARY_KEY_SIZE = CHARS_FOR_PRIMARY_KEY.length;
}

@Override
public String generateParentKey() {
char[] chars = new char[PRIMARY_KEY_SIZE];
for (int i = 0; i < PRIMARY_KEY_SIZE; i++) {
chars[i] =
CHARS_FOR_PRIMARY_KEY[ThreadLocalRandom.current().nextInt(CHARS_FOR_PRIMARY_KEY_SIZE)];
}
return new String(chars);
}

@Override
public String fullKey(String parentKey, String childKey) {
if (parentKey.length() != PRIMARY_KEY_SIZE) {
throw new IllegalArgumentException(
String.format(
"The length of parent key must be %d. ParentKey: %s", PRIMARY_KEY_SIZE, childKey));
}
if (childKey.length() > MAX_CHILD_KEY_SIZE) {
throw new IllegalArgumentException(
String.format(
"The length of child key must not exceed %d. ChildKey: %s",
MAX_CHILD_KEY_SIZE, childKey));
}
return parentKey + DELIMITER + childKey;
}

@Override
public boolean isFullKey(Object obj) {
if (!(obj instanceof String)) {
return false;
}
String key = (String) obj;
return key.length() > PRIMARY_KEY_SIZE && key.charAt(PRIMARY_KEY_SIZE) == DELIMITER;
}

@Override
public Keys<String, String, String> keysFromFullKey(String fullKey) {
if (!isFullKey(fullKey)) {
throw new IllegalArgumentException("Invalid full key. Key:" + fullKey);
}

return new Keys<>(
fullKey.substring(0, PRIMARY_KEY_SIZE),
fullKey.substring(PRIMARY_KEY_SIZE + 1 /* delimiter */),
fullKey);
}

@Override
public String emitFullKeyFromFullKey(String fullKey) {
// Return the string as is since the value is already String.
return fullKey;
}

@Override
public String emitParentKeyFromParentKey(String parentKey) {
// Return the string as is since the value is already String.
return parentKey;
}
}
extends DefaultGroupCommitKeyManipulator {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package com.scalar.db.util.groupcommit;

import java.util.concurrent.ThreadLocalRandom;

public class DefaultGroupCommitKeyManipulator
implements GroupCommitKeyManipulator<String, String, String, String, String> {
private static final int PRIMARY_KEY_SIZE = 24;
private static final char DELIMITER = '$';
private static final int MAX_FULL_KEY_SIZE = 64;
private static final int MAX_CHILD_KEY_SIZE =
MAX_FULL_KEY_SIZE - PRIMARY_KEY_SIZE - 1 /* delimiter */;
private static final char[] CHARS_FOR_PRIMARY_KEY;
private static final int CHARS_FOR_PRIMARY_KEY_SIZE;

static {
int digitsLen = '9' - '0' + 1;
int upperCasesLen = 'Z' - 'A' + 1;
int lowerCasesLen = 'z' - 'a' + 1;
CHARS_FOR_PRIMARY_KEY = new char[digitsLen + upperCasesLen + lowerCasesLen];

int index = 0;
for (char c = '0'; c <= '9'; c++) {
CHARS_FOR_PRIMARY_KEY[index++] = c;
}
for (char c = 'A'; c <= 'Z'; c++) {
CHARS_FOR_PRIMARY_KEY[index++] = c;
}
for (char c = 'a'; c <= 'z'; c++) {
CHARS_FOR_PRIMARY_KEY[index++] = c;
}

CHARS_FOR_PRIMARY_KEY_SIZE = CHARS_FOR_PRIMARY_KEY.length;
}

@Override
public String generateParentKey() {
char[] chars = new char[PRIMARY_KEY_SIZE];
for (int i = 0; i < PRIMARY_KEY_SIZE; i++) {
chars[i] =
CHARS_FOR_PRIMARY_KEY[ThreadLocalRandom.current().nextInt(CHARS_FOR_PRIMARY_KEY_SIZE)];
}
return new String(chars);
}

@Override
public String fullKey(String parentKey, String childKey) {
if (parentKey.length() != PRIMARY_KEY_SIZE) {
throw new IllegalArgumentException(
String.format(
"The length of parent key must be %d. ParentKey: %s", PRIMARY_KEY_SIZE, childKey));
}
if (childKey.length() > MAX_CHILD_KEY_SIZE) {
throw new IllegalArgumentException(
String.format(
"The length of child key must not exceed %d. ChildKey: %s",
MAX_CHILD_KEY_SIZE, childKey));
}
return parentKey + DELIMITER + childKey;
}

@Override
public boolean isFullKey(Object obj) {
if (!(obj instanceof String)) {
return false;
}
String key = (String) obj;
return key.length() > PRIMARY_KEY_SIZE && key.charAt(PRIMARY_KEY_SIZE) == DELIMITER;
}

@Override
public Keys<String, String, String> keysFromFullKey(String fullKey) {
if (!isFullKey(fullKey)) {
throw new IllegalArgumentException("Invalid full key. Key:" + fullKey);
}

return new Keys<>(
fullKey.substring(0, PRIMARY_KEY_SIZE),
fullKey.substring(PRIMARY_KEY_SIZE + 1 /* delimiter */),
fullKey);
}

@Override
public String emitFullKeyFromFullKey(String fullKey) {
// Return the string as is since the value is already String.
return fullKey;
}

@Override
public String emitParentKeyFromParentKey(String parentKey) {
// Return the string as is since the value is already String.
return parentKey;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class DelayedGroup<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_K
GroupCommitConfig config,
FULL_KEY fullKey,
Emittable<EMIT_PARENT_KEY, EMIT_FULL_KEY, V> emitter,
KeyManipulator<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY>
GroupCommitKeyManipulator<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY>
keyManipulator) {
super(emitter, keyManipulator, 1, config.oldGroupAbortTimeoutMillis());
this.fullKey = fullKey;
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/java/com/scalar/db/util/groupcommit/Group.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ abstract class Group<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL
private static final Logger logger = LoggerFactory.getLogger(Group.class);

protected final Emittable<EMIT_PARENT_KEY, EMIT_FULL_KEY, V> emitter;
protected final KeyManipulator<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY>
protected final GroupCommitKeyManipulator<
PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY>
keyManipulator;
private final int capacity;
private final AtomicReference<Integer> size = new AtomicReference<>();
Expand Down Expand Up @@ -61,7 +62,7 @@ enum Status {

Group(
Emittable<EMIT_PARENT_KEY, EMIT_FULL_KEY, V> emitter,
KeyManipulator<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY>
GroupCommitKeyManipulator<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY>
keyManipulator,
int capacity,
long oldGroupAbortTimeoutMillis) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
* @param <EMIT_FULL_KEY> A full-key type that Emitter can interpret.
*/
@Immutable
public interface KeyManipulator<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY> {
public interface GroupCommitKeyManipulator<
PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY> {
class Keys<PARENT_KEY, CHILD_KEY, FULL_KEY> {
public final PARENT_KEY parentKey;
public final CHILD_KEY childKey;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.Uninterruptibles;
import com.scalar.db.util.groupcommit.KeyManipulator.Keys;
import com.scalar.db.util.groupcommit.GroupCommitKeyManipulator.Keys;
import java.io.Closeable;
import java.time.Instant;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -46,7 +46,8 @@ public class GroupCommitter<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EM
@Nullable private final GroupCommitMonitor groupCommitMonitor;

// This contains logics of how to treat keys.
private final KeyManipulator<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY>
private final GroupCommitKeyManipulator<
PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY>
keyManipulator;

private final GroupManager<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V>
Expand All @@ -62,7 +63,7 @@ public class GroupCommitter<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EM
public GroupCommitter(
String label,
GroupCommitConfig config,
KeyManipulator<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY>
GroupCommitKeyManipulator<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY>
keyManipulator) {
logger.info("Starting GroupCommitter. Label: {}, Config: {}", label, config);
this.keyManipulator = keyManipulator;
Expand Down Expand Up @@ -209,7 +210,7 @@ public void close() {
GroupManager<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V>
createGroupManager(
GroupCommitConfig config,
KeyManipulator<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY>
GroupCommitKeyManipulator<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY>
keyManipulator) {
return new GroupManager<>(config, keyManipulator);
}
Expand Down
Loading

0 comments on commit df13048

Please sign in to comment.