Skip to content

Commit

Permalink
[compat][common][controller] Add store and version configs for target…
Browse files Browse the repository at this point in the history
… region swap and wait time (#1340)

* add store and verion config for target swap region and wait time

* fix imports

* add isDavinciHeartbeatReported to store and version config

* update doc strings and unit tests

* update protocol version

* fix tests and static analysis

* fix spotbugs

---------

Co-authored-by: Michelle Kwong <mkwong@linkedin.com>
  • Loading branch information
misyel and Michelle Kwong authored Dec 2, 2024
1 parent 3a60860 commit 9da0f5d
Show file tree
Hide file tree
Showing 23 changed files with 1,934 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1167,6 +1167,9 @@ static UpdateStoreQueryParams getUpdateStoreQueryParams(CommandLine cmd) {
p -> params.setNearlineProducerCompressionEnabled(p),
argSet);
integerParam(cmd, Arg.NEARLINE_PRODUCER_COUNT_PER_WRITER, p -> params.setNearlineProducerCountPerWriter(p), argSet);
genericParam(cmd, Arg.TARGET_SWAP_REGION, s -> s, p -> params.setTargetRegionSwap(p), argSet);
integerParam(cmd, Arg.TARGET_SWAP_REGION_WAIT_TIME, p -> params.setTargetRegionSwapWaitTime(p), argSet);
booleanParam(cmd, Arg.DAVINCI_HEARTBEAT_REPORTED, p -> params.setIsDavinciHeartbeatReported(p), argSet);

/**
* {@link Arg#REPLICATE_ALL_CONFIGS} doesn't require parameters; once specified, it means true.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,15 @@ public enum Arg {
"How many producers will be used to write nearline workload in Server"
), INSTANCES("instances", "in", true, "Input list of helix ids of nodes to check if they can removed or not"),
TO_BE_STOPPED_NODES("to-be-stopped-nodes", "tbsn", true, "List of helix ids of nodes assumed to be stopped"),
LAG_FILTER_ENABLED("lag-filter-enabled", "lfe", true, "Enable heartbeat lag filter for a heartbeat request");
LAG_FILTER_ENABLED("lag-filter-enabled", "lfe", true, "Enable heartbeat lag filter for a heartbeat request"),
TARGET_SWAP_REGION("target-region-swap", "trs", true, "Region to swap current version during target colo push"),
TARGET_SWAP_REGION_WAIT_TIME(
"target-region-swap-wait-time", "trswt", true,
"How long to wait in minutes before swapping to the new version in a target colo push"
),
DAVINCI_HEARTBEAT_REPORTED(
"dvc-heartbeat-reported", "dvchb", true, "Flag to indicate whether DVC is bootstrapping and sending heartbeats"
);

private final String argName;
private final String first;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static com.linkedin.venice.Arg.CLUSTER_SRC;
import static com.linkedin.venice.Arg.COMPRESSION_STRATEGY;
import static com.linkedin.venice.Arg.DATETIME;
import static com.linkedin.venice.Arg.DAVINCI_HEARTBEAT_REPORTED;
import static com.linkedin.venice.Arg.DEBUG;
import static com.linkedin.venice.Arg.DERIVED_SCHEMA;
import static com.linkedin.venice.Arg.DERIVED_SCHEMA_ID;
Expand Down Expand Up @@ -123,6 +124,8 @@
import static com.linkedin.venice.Arg.STORE_TYPE;
import static com.linkedin.venice.Arg.STORE_VIEW_CONFIGS;
import static com.linkedin.venice.Arg.SYSTEM_STORE_TYPE;
import static com.linkedin.venice.Arg.TARGET_SWAP_REGION;
import static com.linkedin.venice.Arg.TARGET_SWAP_REGION_WAIT_TIME;
import static com.linkedin.venice.Arg.TO_BE_STOPPED_NODES;
import static com.linkedin.venice.Arg.UNUSED_SCHEMA_DELETION_ENABLED;
import static com.linkedin.venice.Arg.URL;
Expand Down Expand Up @@ -274,7 +277,8 @@ public enum Command {
STORAGE_PERSONA, STORE_VIEW_CONFIGS, LATEST_SUPERSET_SCHEMA_ID, MIN_COMPACTION_LAG_SECONDS,
MAX_COMPACTION_LAG_SECONDS, MAX_RECORD_SIZE_BYTES, MAX_NEARLINE_RECORD_SIZE_BYTES,
UNUSED_SCHEMA_DELETION_ENABLED, BLOB_TRANSFER_ENABLED, SEPARATE_REALTIME_TOPIC_ENABLED,
NEARLINE_PRODUCER_COMPRESSION_ENABLED, NEARLINE_PRODUCER_COUNT_PER_WRITER }
NEARLINE_PRODUCER_COMPRESSION_ENABLED, NEARLINE_PRODUCER_COUNT_PER_WRITER, TARGET_SWAP_REGION,
TARGET_SWAP_REGION_WAIT_TIME, DAVINCI_HEARTBEAT_REPORTED }
),
UPDATE_CLUSTER_CONFIG(
"update-cluster-config", "Update live cluster configs", new Arg[] { URL, CLUSTER },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,10 @@ public static void recover(
.setMaxCompactionLagSeconds(deletedStore.getMaxCompactionLagSeconds())
.setMaxRecordSizeBytes(deletedStore.getMaxRecordSizeBytes())
.setMaxNearlineRecordSizeBytes(deletedStore.getMaxNearlineRecordSizeBytes())
.setBlobTransferEnabled(deletedStore.isBlobTransferEnabled());
.setBlobTransferEnabled(deletedStore.isBlobTransferEnabled())
.setTargetRegionSwap(deletedStore.getTargetSwapRegion())
.setTargetRegionSwapWaitTime(deletedStore.getTargetSwapRegionWaitTime())
.setIsDavinciHeartbeatReported(deletedStore.getIsDavinciHeartbeatReported());
System.out.println(
"Updating store: " + storeName + " in cluster: " + recoverCluster + " with params: "
+ updateParams.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ public void testAdminToolRequiresSSLConfigFile() {
public void testAdminUpdateStoreArg() throws ParseException, IOException {
final String K1 = "k1", V1 = "v1", K2 = "k2", V2 = "v2", K3 = "k3", V3 = "v3";
String[] args = { "--update-store", "--url", "http://localhost:7036", "--cluster", "test-cluster", "--store",
"testStore", "--rmd-chunking-enabled", "true", "--blob-transfer-enabled", "true", "--partitioner-params",
"testStore", "--rmd-chunking-enabled", "true", "--blob-transfer-enabled", "true", "--target-region-swap",
"prod", "--target-region-swap-wait-time", "100", "--partitioner-params",
"{\"" + K1 + "\":\"" + V1 + "\",\"" + K2 + "\":\"" + V2 + "\",\"" + K3 + "\":\"" + V3 + "\"}" };

CommandLine commandLine = AdminTool.getCommandLine(args);
Expand All @@ -83,6 +84,10 @@ public void testAdminUpdateStoreArg() throws ParseException, IOException {
Assert.assertTrue(params.getRmdChunkingEnabled().get());
Assert.assertTrue(params.getBlobTransferEnabled().isPresent());
Assert.assertTrue(params.getBlobTransferEnabled().get());
Assert.assertTrue(params.getTargetSwapRegion().isPresent());
Assert.assertEquals(params.getTargetSwapRegion().get(), "prod");
Assert.assertTrue(params.getTargetRegionSwapWaitTime().isPresent());
Assert.assertEquals(params.getTargetRegionSwapWaitTime(), Optional.of(100));
Optional<Map<String, String>> partitionerParams = params.getPartitionerParams();
Assert.assertTrue(partitionerParams.isPresent());
Map<String, String> partitionerParamsMap = partitionerParams.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,4 +242,7 @@ public class ControllerApiConstants {

public static final String NEARLINE_PRODUCER_COMPRESSION_ENABLED = "nearline_producer_compression_enabled";
public static final String NEARLINE_PRODUCER_COUNT_PER_WRITER = "nearline_producer_count_per_writer";
public static final String TARGET_SWAP_REGION = "target_swap_region";
public static final String TARGET_SWAP_REGION_WAIT_TIME = "target_swap_region_wait_time";
public static final String IS_DAVINCI_HEARTBEAT_REPORTED = "is_davinci_heartbeat_reported";
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static com.linkedin.venice.controllerapi.ControllerApiConstants.HYBRID_STORE_DISK_QUOTA_ENABLED;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.HYBRID_STORE_OVERHEAD_BYPASS;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.INCREMENTAL_PUSH_ENABLED;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.IS_DAVINCI_HEARTBEAT_REPORTED;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.LARGEST_USED_VERSION_NUMBER;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.LATEST_SUPERSET_SCHEMA_ID;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.MAX_COMPACTION_LAG_SECONDS;
Expand Down Expand Up @@ -61,6 +62,8 @@
import static com.linkedin.venice.controllerapi.ControllerApiConstants.STORE_VIEW_CLASS;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.STORE_VIEW_NAME;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.STORE_VIEW_PARAMS;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.TARGET_SWAP_REGION;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.TARGET_SWAP_REGION_WAIT_TIME;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.TIME_LAG_TO_GO_ONLINE;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.UNUSED_SCHEMA_DELETION_ENABLED;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.UPDATED_CONFIGS_LIST;
Expand Down Expand Up @@ -141,6 +144,8 @@ public UpdateStoreQueryParams(StoreInfo srcStore, boolean storeMigrating) {
.setBlobTransferEnabled(srcStore.isBlobTransferEnabled())
.setMaxRecordSizeBytes(srcStore.getMaxRecordSizeBytes())
.setMaxNearlineRecordSizeBytes(srcStore.getMaxNearlineRecordSizeBytes())
.setTargetRegionSwap(srcStore.getTargetRegionSwap())
.setTargetRegionSwapWaitTime(srcStore.getTargetRegionSwapWaitTime())
// TODO: This needs probably some refinement, but since we only support one kind of view type today, this is
// still easy to parse
.setStoreViews(
Expand Down Expand Up @@ -738,6 +743,30 @@ public Optional<Integer> getNearlineProducerCountPerWriter() {
return getInteger(NEARLINE_PRODUCER_COUNT_PER_WRITER);
}

public UpdateStoreQueryParams setTargetRegionSwap(String targetRegion) {
return putString(TARGET_SWAP_REGION, targetRegion);
}

public Optional<String> getTargetSwapRegion() {
return getString(TARGET_SWAP_REGION);
}

public UpdateStoreQueryParams setTargetRegionSwapWaitTime(int waitTime) {
return putInteger(TARGET_SWAP_REGION_WAIT_TIME, waitTime);
}

public Optional<Integer> getTargetRegionSwapWaitTime() {
return getInteger(TARGET_SWAP_REGION_WAIT_TIME);
}

public UpdateStoreQueryParams setIsDavinciHeartbeatReported(boolean isReported) {
return putBoolean(IS_DAVINCI_HEARTBEAT_REPORTED, isReported);
}

public Optional<Boolean> getIsDavinciHeartbeatReported() {
return getBoolean(IS_DAVINCI_HEARTBEAT_REPORTED);
}

// ***************** above this line are getters and setters *****************
private UpdateStoreQueryParams putInteger(String name, int value) {
return (UpdateStoreQueryParams) add(name, value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,12 @@ private void addVersion(Version version, boolean checkDisableWrite, boolean isCl

version.setUseVersionLevelIncrementalPushEnabled(true);

version.setTargetSwapRegion(getTargetSwapRegion());

version.setTargetSwapRegionWaitTime(getTargetSwapRegionWaitTime());

version.setIsDavinciHeartbeatReported(getIsDavinciHeartbeatReported());

HybridStoreConfig hybridStoreConfig = getHybridStoreConfig();
if (hybridStoreConfig != null) {
version.setHybridStoreConfig(hybridStoreConfig.clone());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,36 @@ public void setDataRecoveryVersionConfig(DataRecoveryVersionConfig dataRecoveryV
throw new UnsupportedOperationException();
}

@Override
public String getTargetSwapRegion() {
return delegate.getTargetSwapRegion();
}

@Override
public int getTargetSwapRegionWaitTime() {
return delegate.getTargetSwapRegionWaitTime();
}

@Override
public void setTargetSwapRegion(String targetRegion) {
throw new UnsupportedOperationException();
}

@Override
public void setTargetSwapRegionWaitTime(int waitTime) {
throw new UnsupportedOperationException();
}

@Override
public void setIsDavinciHeartbeatReported(boolean isReported) {
throw new UnsupportedOperationException();
}

@Override
public boolean getIsDavinciHeartbeatReported() {
return delegate.getIsDavinciHeartbeatReported();
}

@Override
public void setRepushSourceVersion(int version) {
throw new UnsupportedOperationException();
Expand Down Expand Up @@ -1437,6 +1467,36 @@ public void setNearlineProducerCountPerWriter(int producerCnt) {
throw new UnsupportedOperationException();
}

@Override
public String getTargetSwapRegion() {
return delegate.getTargetSwapRegion();
}

@Override
public int getTargetSwapRegionWaitTime() {
return delegate.getTargetSwapRegionWaitTime();
}

@Override
public void setTargetSwapRegion(String targetRegion) {
throw new UnsupportedOperationException();
}

@Override
public void setTargetSwapRegionWaitTime(int waitTime) {
throw new UnsupportedOperationException();
}

@Override
public void setIsDavinciHeartbeatReported(boolean isReported) {
throw new UnsupportedOperationException();
}

@Override
public boolean getIsDavinciHeartbeatReported() {
return delegate.getIsDavinciHeartbeatReported();
}

@Override
public String toString() {
return this.delegate.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,4 +329,16 @@ static boolean isSystemStore(String storeName) {
int getNearlineProducerCountPerWriter();

void setNearlineProducerCountPerWriter(int producerCnt);

String getTargetSwapRegion();

int getTargetSwapRegionWaitTime();

void setTargetSwapRegion(String targetRegion);

void setTargetSwapRegionWaitTime(int waitTime);

void setIsDavinciHeartbeatReported(boolean isReported);

boolean getIsDavinciHeartbeatReported();
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ public static StoreInfo fromStore(Store store) {
storeInfo.setBlobTransferEnabled(store.isBlobTransferEnabled());
storeInfo.setNearlineProducerCompressionEnabled(store.isNearlineProducerCompressionEnabled());
storeInfo.setNearlineProducerCountPerWriter(store.getNearlineProducerCountPerWriter());
storeInfo.setTargetRegionSwap(store.getTargetSwapRegion());
storeInfo.setTargetRegionSwapWaitTime(store.getTargetSwapRegionWaitTime());
storeInfo.setIsDavinciHeartbeatReported(store.getIsDavinciHeartbeatReported());
return storeInfo;
}

Expand Down Expand Up @@ -326,6 +329,9 @@ public static StoreInfo fromStore(Store store) {

private boolean nearlineProducerCompressionEnabled;
private int nearlineProducerCountPerWriter;
private String targetRegionSwap;
private int targetRegionSwapWaitTime;
private boolean isDavinciHeartbeatReported;

public StoreInfo() {
}
Expand Down Expand Up @@ -831,4 +837,28 @@ public int getNearlineProducerCountPerWriter() {
public void setNearlineProducerCountPerWriter(int nearlineProducerCountPerWriter) {
this.nearlineProducerCountPerWriter = nearlineProducerCountPerWriter;
}

public String getTargetRegionSwap() {
return this.targetRegionSwap;
}

public void setTargetRegionSwap(String targetRegion) {
this.targetRegionSwap = targetRegion;
}

public int getTargetRegionSwapWaitTime() {
return this.targetRegionSwapWaitTime;
}

public void setTargetRegionSwapWaitTime(int waitTime) {
this.targetRegionSwapWaitTime = waitTime;
}

public void setIsDavinciHeartbeatReported(boolean isReported) {
this.isDavinciHeartbeatReported = isReported;
}

public boolean getIsDavinciHeartbeatReported() {
return this.isDavinciHeartbeatReported;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,36 @@ public void setNearlineProducerCountPerWriter(int producerCnt) {
throwUnsupportedOperationException("setNearlineProducerCountPerWriter");
}

@Override
public int getTargetSwapRegionWaitTime() {
return zkSharedStore.getTargetSwapRegionWaitTime();
}

@Override
public String getTargetSwapRegion() {
return zkSharedStore.getTargetSwapRegion();
}

@Override
public void setTargetSwapRegion(String targetRegion) {
throw new UnsupportedOperationException();
}

@Override
public void setTargetSwapRegionWaitTime(int waitTime) {
throw new UnsupportedOperationException();
}

@Override
public void setIsDavinciHeartbeatReported(boolean isReported) {
throw new UnsupportedOperationException();
}

@Override
public boolean getIsDavinciHeartbeatReported() {
return zkSharedStore.getIsDavinciHeartbeatReported();
}

@Override
public Store cloneStore() {
return new SystemStore(zkSharedStore.cloneStore(), systemStoreType, veniceStore.cloneStore());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,18 @@ default void setLeaderFollowerModelEnabled(boolean leaderFollowerModelEnabled) {

void setDataRecoveryVersionConfig(DataRecoveryVersionConfig dataRecoveryVersionConfig);

void setTargetSwapRegion(String targetRegion);

String getTargetSwapRegion();

void setTargetSwapRegionWaitTime(int waitTime);

int getTargetSwapRegionWaitTime();

void setIsDavinciHeartbeatReported(boolean isReported);

boolean getIsDavinciHeartbeatReported();

/**
* Get the replication metadata version id.
* @deprecated
Expand Down
Loading

0 comments on commit 9da0f5d

Please sign in to comment.