Skip to content

Commit

Permalink
Merge branch '3' into 3-pull-2390
Browse files Browse the repository at this point in the history
  • Loading branch information
komamitsu authored Dec 16, 2024
2 parents e5e05bb + 7cfb159 commit d1382a5
Show file tree
Hide file tree
Showing 18 changed files with 138 additions and 142 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1520,7 +1520,7 @@ jobs:
steps:
- name: Run YugabyteDB 2
run: |
docker run -p 5433:5433 -e YSQL_USER=yugabyte -e YSQL_PASSWORD=yugabyte -d yugabytedb/yugabyte:2.21.0.0-b545 bin/yugabyted start --background=false --master_flag="ysql_enable_db_catalog_version_mode=false" --tserver_flags="ysql_enable_db_catalog_version_mode=false"
docker run -p 5433:5433 -e YSQL_USER=yugabyte -e YSQL_PASSWORD=yugabyte -d yugabytedb/yugabyte:2.20.4.0-b50 bin/yugabyted start --background=false --master_flag="ysql_enable_db_catalog_version_mode=false" --tserver_flags="ysql_enable_db_catalog_version_mode=false"
- uses: actions/checkout@v4

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
import com.scalar.db.io.DataType;
import java.util.Properties;
import java.util.Random;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledIf;

public class JdbcDatabaseMultipleClusteringKeyScanIntegrationTest
extends DistributedStorageMultipleClusteringKeyScanIntegrationTestBase {
Expand Down Expand Up @@ -81,22 +79,4 @@ protected Column<?> getColumnWithMaxValue(String columnName, DataType dataType)
}
return super.getColumnWithMaxValue(columnName, dataType);
}

// TODO: Remove this once https://github.com/yugabyte/yugabyte-db/issues/22140 is fixed and the
// fix is released.
@DisabledIf("isYugabyteDb")
@Test
@Override
public void scan_WithSecondClusteringKeyRange_ShouldReturnProperResult()
throws java.util.concurrent.ExecutionException, InterruptedException {
super.scan_WithSecondClusteringKeyRange_ShouldReturnProperResult();
}

@DisabledIf("isYugabyteDb")
@Test
@Override
public void scan_WithSecondClusteringKeyRangeWithSameValues_ShouldReturnProperResult()
throws java.util.concurrent.ExecutionException, InterruptedException {
super.scan_WithSecondClusteringKeyRangeWithSameValues_ShouldReturnProperResult();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import com.scalar.db.io.Key;
import com.scalar.db.io.Value;
import com.scalar.db.transaction.consensuscommit.CoordinatorGroupCommitter.CoordinatorGroupCommitKeyManipulator;
import com.scalar.db.util.groupcommit.KeyManipulator.Keys;
import com.scalar.db.util.groupcommit.GroupCommitKeyManipulator.Keys;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.ArrayList;
import java.util.Collections;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package com.scalar.db.transaction.consensuscommit;

import com.scalar.db.util.groupcommit.DefaultGroupCommitKeyManipulator;
import com.scalar.db.util.groupcommit.GroupCommitConfig;
import com.scalar.db.util.groupcommit.GroupCommitter;
import com.scalar.db.util.groupcommit.KeyManipulator;
import java.util.Optional;
import java.util.concurrent.ThreadLocalRandom;

public class CoordinatorGroupCommitter
extends GroupCommitter<String, String, String, String, String, Snapshot> {
Expand Down Expand Up @@ -35,93 +34,7 @@ public static boolean isEnabled(ConsensusCommitConfig config) {
return config.isCoordinatorGroupCommitEnabled();
}

// The behavior of this class is completely the same as the parent class for now.
public static class CoordinatorGroupCommitKeyManipulator
implements KeyManipulator<String, String, String, String, String> {
private static final int PRIMARY_KEY_SIZE = 24;
private static final char DELIMITER = '$';
private static final int MAX_FULL_KEY_SIZE = 64;
private static final int MAX_CHILD_KEY_SIZE =
MAX_FULL_KEY_SIZE - PRIMARY_KEY_SIZE - 1 /* delimiter */;
private static final char[] CHARS_FOR_PRIMARY_KEY;
private static final int CHARS_FOR_PRIMARY_KEY_SIZE;

static {
int digitsLen = '9' - '0' + 1;
int upperCasesLen = 'Z' - 'A' + 1;
int lowerCasesLen = 'z' - 'a' + 1;
CHARS_FOR_PRIMARY_KEY = new char[digitsLen + upperCasesLen + lowerCasesLen];

int index = 0;
for (char c = '0'; c <= '9'; c++) {
CHARS_FOR_PRIMARY_KEY[index++] = c;
}
for (char c = 'A'; c <= 'Z'; c++) {
CHARS_FOR_PRIMARY_KEY[index++] = c;
}
for (char c = 'a'; c <= 'z'; c++) {
CHARS_FOR_PRIMARY_KEY[index++] = c;
}

CHARS_FOR_PRIMARY_KEY_SIZE = CHARS_FOR_PRIMARY_KEY.length;
}

@Override
public String generateParentKey() {
char[] chars = new char[PRIMARY_KEY_SIZE];
for (int i = 0; i < PRIMARY_KEY_SIZE; i++) {
chars[i] =
CHARS_FOR_PRIMARY_KEY[ThreadLocalRandom.current().nextInt(CHARS_FOR_PRIMARY_KEY_SIZE)];
}
return new String(chars);
}

@Override
public String fullKey(String parentKey, String childKey) {
if (parentKey.length() != PRIMARY_KEY_SIZE) {
throw new IllegalArgumentException(
String.format(
"The length of parent key must be %d. ParentKey: %s", PRIMARY_KEY_SIZE, childKey));
}
if (childKey.length() > MAX_CHILD_KEY_SIZE) {
throw new IllegalArgumentException(
String.format(
"The length of child key must not exceed %d. ChildKey: %s",
MAX_CHILD_KEY_SIZE, childKey));
}
return parentKey + DELIMITER + childKey;
}

@Override
public boolean isFullKey(Object obj) {
if (!(obj instanceof String)) {
return false;
}
String key = (String) obj;
return key.length() > PRIMARY_KEY_SIZE && key.charAt(PRIMARY_KEY_SIZE) == DELIMITER;
}

@Override
public Keys<String, String, String> keysFromFullKey(String fullKey) {
if (!isFullKey(fullKey)) {
throw new IllegalArgumentException("Invalid full key. Key:" + fullKey);
}

return new Keys<>(
fullKey.substring(0, PRIMARY_KEY_SIZE),
fullKey.substring(PRIMARY_KEY_SIZE + 1 /* delimiter */),
fullKey);
}

@Override
public String emitFullKeyFromFullKey(String fullKey) {
// Return the string as is since the value is already String.
return fullKey;
}

@Override
public String emitParentKeyFromParentKey(String parentKey) {
// Return the string as is since the value is already String.
return parentKey;
}
}
extends DefaultGroupCommitKeyManipulator {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package com.scalar.db.util.groupcommit;

import java.util.concurrent.ThreadLocalRandom;

public class DefaultGroupCommitKeyManipulator
implements GroupCommitKeyManipulator<String, String, String, String, String> {
private static final int PRIMARY_KEY_SIZE = 24;
private static final char DELIMITER = '$';
private static final int MAX_FULL_KEY_SIZE = 64;
private static final int MAX_CHILD_KEY_SIZE =
MAX_FULL_KEY_SIZE - PRIMARY_KEY_SIZE - 1 /* delimiter */;
private static final char[] CHARS_FOR_PRIMARY_KEY;
private static final int CHARS_FOR_PRIMARY_KEY_SIZE;

static {
int digitsLen = '9' - '0' + 1;
int upperCasesLen = 'Z' - 'A' + 1;
int lowerCasesLen = 'z' - 'a' + 1;
CHARS_FOR_PRIMARY_KEY = new char[digitsLen + upperCasesLen + lowerCasesLen];

int index = 0;
for (char c = '0'; c <= '9'; c++) {
CHARS_FOR_PRIMARY_KEY[index++] = c;
}
for (char c = 'A'; c <= 'Z'; c++) {
CHARS_FOR_PRIMARY_KEY[index++] = c;
}
for (char c = 'a'; c <= 'z'; c++) {
CHARS_FOR_PRIMARY_KEY[index++] = c;
}

CHARS_FOR_PRIMARY_KEY_SIZE = CHARS_FOR_PRIMARY_KEY.length;
}

@Override
public String generateParentKey() {
char[] chars = new char[PRIMARY_KEY_SIZE];
for (int i = 0; i < PRIMARY_KEY_SIZE; i++) {
chars[i] =
CHARS_FOR_PRIMARY_KEY[ThreadLocalRandom.current().nextInt(CHARS_FOR_PRIMARY_KEY_SIZE)];
}
return new String(chars);
}

@Override
public String fullKey(String parentKey, String childKey) {
if (parentKey.length() != PRIMARY_KEY_SIZE) {
throw new IllegalArgumentException(
String.format(
"The length of parent key must be %d. ParentKey: %s", PRIMARY_KEY_SIZE, childKey));
}
if (childKey.length() > MAX_CHILD_KEY_SIZE) {
throw new IllegalArgumentException(
String.format(
"The length of child key must not exceed %d. ChildKey: %s",
MAX_CHILD_KEY_SIZE, childKey));
}
return parentKey + DELIMITER + childKey;
}

@Override
public boolean isFullKey(Object obj) {
if (!(obj instanceof String)) {
return false;
}
String key = (String) obj;
return key.length() > PRIMARY_KEY_SIZE && key.charAt(PRIMARY_KEY_SIZE) == DELIMITER;
}

@Override
public Keys<String, String, String> keysFromFullKey(String fullKey) {
if (!isFullKey(fullKey)) {
throw new IllegalArgumentException("Invalid full key. Key:" + fullKey);
}

return new Keys<>(
fullKey.substring(0, PRIMARY_KEY_SIZE),
fullKey.substring(PRIMARY_KEY_SIZE + 1 /* delimiter */),
fullKey);
}

@Override
public String emitFullKeyFromFullKey(String fullKey) {
// Return the string as is since the value is already String.
return fullKey;
}

@Override
public String emitParentKeyFromParentKey(String parentKey) {
// Return the string as is since the value is already String.
return parentKey;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class DelayedGroup<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_K
GroupCommitConfig config,
FULL_KEY fullKey,
Emittable<EMIT_PARENT_KEY, EMIT_FULL_KEY, V> emitter,
KeyManipulator<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY>
GroupCommitKeyManipulator<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY>
keyManipulator) {
super(emitter, keyManipulator, 1, config.oldGroupAbortTimeoutMillis());
this.fullKey = fullKey;
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/java/com/scalar/db/util/groupcommit/Group.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ abstract class Group<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL
private static final Logger logger = LoggerFactory.getLogger(Group.class);

protected final Emittable<EMIT_PARENT_KEY, EMIT_FULL_KEY, V> emitter;
protected final KeyManipulator<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY>
protected final GroupCommitKeyManipulator<
PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY>
keyManipulator;
private final int capacity;
private final AtomicReference<Integer> size = new AtomicReference<>();
Expand Down Expand Up @@ -61,7 +62,7 @@ enum Status {

Group(
Emittable<EMIT_PARENT_KEY, EMIT_FULL_KEY, V> emitter,
KeyManipulator<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY>
GroupCommitKeyManipulator<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY>
keyManipulator,
int capacity,
long oldGroupAbortTimeoutMillis) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
* @param <EMIT_FULL_KEY> A full-key type that Emitter can interpret.
*/
@Immutable
public interface KeyManipulator<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY> {
public interface GroupCommitKeyManipulator<
PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY> {
class Keys<PARENT_KEY, CHILD_KEY, FULL_KEY> {
public final PARENT_KEY parentKey;
public final CHILD_KEY childKey;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.Uninterruptibles;
import com.scalar.db.util.groupcommit.KeyManipulator.Keys;
import com.scalar.db.util.groupcommit.GroupCommitKeyManipulator.Keys;
import java.io.Closeable;
import java.time.Instant;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -46,7 +46,8 @@ public class GroupCommitter<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EM
@Nullable private final GroupCommitMonitor groupCommitMonitor;

// This contains logics of how to treat keys.
private final KeyManipulator<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY>
private final GroupCommitKeyManipulator<
PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY>
keyManipulator;

private final GroupManager<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V>
Expand All @@ -62,7 +63,7 @@ public class GroupCommitter<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EM
public GroupCommitter(
String label,
GroupCommitConfig config,
KeyManipulator<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY>
GroupCommitKeyManipulator<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY>
keyManipulator) {
logger.info("Starting GroupCommitter. Label: {}, Config: {}", label, config);
this.keyManipulator = keyManipulator;
Expand Down Expand Up @@ -209,7 +210,7 @@ public void close() {
GroupManager<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V>
createGroupManager(
GroupCommitConfig config,
KeyManipulator<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY>
GroupCommitKeyManipulator<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY>
keyManipulator) {
return new GroupManager<>(config, keyManipulator);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.errorprone.annotations.ThreadSafe;
import com.google.errorprone.annotations.concurrent.LazyInit;
import com.scalar.db.util.groupcommit.KeyManipulator.Keys;
import com.scalar.db.util.groupcommit.GroupCommitKeyManipulator.Keys;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -49,15 +49,16 @@ class GroupManager<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_K
groupCleanupWorker;

// Custom operations injected by the client
private final KeyManipulator<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY>
private final GroupCommitKeyManipulator<
PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY>
keyManipulator;
@LazyInit private Emittable<EMIT_PARENT_KEY, EMIT_FULL_KEY, V> emitter;

private final GroupCommitConfig config;

GroupManager(
GroupCommitConfig config,
KeyManipulator<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY>
GroupCommitKeyManipulator<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY>
keyManipulator) {
this.keyManipulator = keyManipulator;
this.config = config;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class NormalGroup<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KE
NormalGroup(
GroupCommitConfig config,
Emittable<EMIT_PARENT_KEY, EMIT_FULL_KEY, V> emitter,
KeyManipulator<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY>
GroupCommitKeyManipulator<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY>
keyManipulator) {
super(emitter, keyManipulator, config.slotCapacity(), config.oldGroupAbortTimeoutMillis());
this.delayedSlotMoveTimeoutMillis = config.delayedSlotMoveTimeoutMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@
@ExtendWith(MockitoExtension.class)
class DelayedGroupTest {
@Mock private Emittable<String, String, Integer> emitter;
private TestableKeyManipulator keyManipulator;
private TestableGroupCommitKeyManipulator keyManipulator;

@BeforeEach
void setUp() {
// This generates parent keys which start with "0000" and increment by one for each subsequent
// key ("0001", "0002"...).
keyManipulator = new TestableKeyManipulator();
keyManipulator = new TestableGroupCommitKeyManipulator();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public ExpectedException(String message) {
}

private static class MyKeyManipulator
implements KeyManipulator<String, String, String, String, String> {
implements GroupCommitKeyManipulator<String, String, String, String, String> {
@Override
public String generateParentKey() {
return UUID.randomUUID().toString();
Expand Down
Loading

0 comments on commit d1382a5

Please sign in to comment.