Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "[compat] [controller] add rt topic name in store config (#1345)" #1354

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ public class ControllerApiConstants {
public static final String TIME_LAG_TO_GO_ONLINE = "time_lag_to_go_online";
public static final String DATA_REPLICATION_POLICY = "data_replication_policy";
public static final String BUFFER_REPLAY_POLICY = "buffer_replay_policy";
public static final String REAL_TIME_TOPIC_NAME = "real_time_topic_name";
public static final String COMPRESSION_STRATEGY = "compression_strategy";
public static final String CLIENT_DECOMPRESSION_ENABLED = "client_decompression_enabled";
public static final String CHUNKING_ENABLED = "chunking_enabled";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import static com.linkedin.venice.controllerapi.ControllerApiConstants.PUSH_STREAM_SOURCE_ADDRESS;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.READ_COMPUTATION_ENABLED;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.READ_QUOTA_IN_CU;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.REAL_TIME_TOPIC_NAME;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.REGIONS_FILTER;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.REGULAR_VERSION_ETL_ENABLED;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.REPLICATE_ALL_CONFIGS;
Expand Down Expand Up @@ -365,14 +364,6 @@ public Optional<BufferReplayPolicy> getHybridBufferReplayPolicy() {
return Optional.ofNullable(params.get(BUFFER_REPLAY_POLICY)).map(BufferReplayPolicy::valueOf);
}

public UpdateStoreQueryParams setRealTimeTopicName(String realTimeTopicName) {
return putString(REAL_TIME_TOPIC_NAME, realTimeTopicName);
}

public Optional<String> getRealTimeTopicName() {
return getString(REAL_TIME_TOPIC_NAME);
}

public UpdateStoreQueryParams setAccessControlled(boolean accessControlled) {
return putBoolean(ACCESS_CONTROLLED, accessControlled);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,5 @@ public interface HybridStoreConfig extends DataModelBackedStructure<StoreHybridC

BufferReplayPolicy getBufferReplayPolicy();

String getRealTimeTopicName();

void setRealTimeTopicName(String realTimeTopicName);

HybridStoreConfig clone();
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.linkedin.venice.meta;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
Expand All @@ -18,33 +17,15 @@ public class HybridStoreConfigImpl implements HybridStoreConfig {
public static final long DEFAULT_REWIND_TIME_IN_SECONDS = Time.SECONDS_PER_DAY;
public static final long DEFAULT_HYBRID_TIME_LAG_THRESHOLD = -1L;
public static final long DEFAULT_HYBRID_OFFSET_LAG_THRESHOLD = 1000L;
public static final String DEFAULT_REAL_TIME_TOPIC_NAME = "";

private final StoreHybridConfig hybridConfig;

public HybridStoreConfigImpl(
long rewindTimeInSeconds,
long offsetLagThresholdToGoOnline,
long producerTimestampLagThresholdToGoOnlineInSeconds,
DataReplicationPolicy dataReplicationPolicy,
BufferReplayPolicy bufferReplayPolicy) {
this(
rewindTimeInSeconds,
offsetLagThresholdToGoOnline,
producerTimestampLagThresholdToGoOnlineInSeconds,
dataReplicationPolicy,
bufferReplayPolicy,
DEFAULT_REAL_TIME_TOPIC_NAME);
}

@JsonCreator
public HybridStoreConfigImpl(
@JsonProperty("rewindTimeInSeconds") long rewindTimeInSeconds,
@JsonProperty("offsetLagThresholdToGoOnline") long offsetLagThresholdToGoOnline,
@JsonProperty("producerTimestampLagThresholdToGoOnlineInSeconds") long producerTimestampLagThresholdToGoOnlineInSeconds,
@JsonProperty("dataReplicationPolicy") DataReplicationPolicy dataReplicationPolicy,
@JsonProperty("bufferReplayPolicy") BufferReplayPolicy bufferReplayPolicy,
@JsonProperty("realTimeTopicName") String realTimeTopicName) {
@JsonProperty("bufferReplayPolicy") BufferReplayPolicy bufferReplayPolicy) {
this.hybridConfig = new StoreHybridConfig();
this.hybridConfig.rewindTimeInSeconds = rewindTimeInSeconds;
this.hybridConfig.offsetLagThresholdToGoOnline = offsetLagThresholdToGoOnline;
Expand All @@ -56,7 +37,6 @@ public HybridStoreConfigImpl(
: dataReplicationPolicy.getValue();
this.hybridConfig.bufferReplayPolicy =
bufferReplayPolicy == null ? BufferReplayPolicy.REWIND_FROM_EOP.getValue() : bufferReplayPolicy.getValue();
this.hybridConfig.realTimeTopicName = realTimeTopicName;
}

HybridStoreConfigImpl(StoreHybridConfig config) {
Expand Down Expand Up @@ -103,16 +83,6 @@ public BufferReplayPolicy getBufferReplayPolicy() {
return BufferReplayPolicy.valueOf(this.hybridConfig.bufferReplayPolicy);
}

@Override
public String getRealTimeTopicName() {
return this.hybridConfig.realTimeTopicName.toString();
}

@Override
public void setRealTimeTopicName(String realTimeTopicName) {
this.hybridConfig.realTimeTopicName = realTimeTopicName;
}

@Override
public StoreHybridConfig dataModel() {
return this.hybridConfig;
Expand Down Expand Up @@ -142,7 +112,6 @@ public HybridStoreConfig clone() {
getOffsetLagThresholdToGoOnline(),
getProducerTimestampLagThresholdToGoOnlineInSeconds(),
getDataReplicationPolicy(),
getBufferReplayPolicy(),
getRealTimeTopicName());
getBufferReplayPolicy());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,16 +142,6 @@ public BufferReplayPolicy getBufferReplayPolicy() {
return this.delegate.getBufferReplayPolicy();
}

@Override
public String getRealTimeTopicName() {
return this.delegate.getRealTimeTopicName();
}

@Override
public void setRealTimeTopicName(String realTimeTopicName) {
throw new UnsupportedOperationException();
}

@Override
public HybridStoreConfig clone() {
return this.delegate.clone();
Expand Down Expand Up @@ -523,11 +513,6 @@ public void setUseVersionLevelIncrementalPushEnabled(boolean versionLevelIncreme
throw new UnsupportedOperationException();
}

@Override
public boolean isHybrid() {
return this.delegate.isHybrid();
}

@Override
public HybridStoreConfig getHybridStoreConfig() {
HybridStoreConfig config = this.delegate.getHybridStoreConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,6 @@ default void setLeaderFollowerModelEnabled(boolean leaderFollowerModelEnabled) {

void setUseVersionLevelIncrementalPushEnabled(boolean versionLevelIncrementalPushEnabled);

boolean isHybrid();

HybridStoreConfig getHybridStoreConfig();

void setHybridStoreConfig(HybridStoreConfig hybridConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,11 +298,6 @@ public void setUseVersionLevelIncrementalPushEnabled(boolean versionLevelIncreme
this.storeVersion.useVersionLevelIncrementalPushEnabled = versionLevelIncrementalPushEnabled;
}

@Override
public boolean isHybrid() {
return getHybridStoreConfig() != null;
}

@Override
public HybridStoreConfig getHybridStoreConfig() {
if (this.storeVersion.hybridConfig == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,12 @@
import com.linkedin.venice.helix.HelixState;
import com.linkedin.venice.helix.Replica;
import com.linkedin.venice.helix.ResourceAssignment;
import com.linkedin.venice.meta.HybridStoreConfig;
import com.linkedin.venice.meta.Instance;
import com.linkedin.venice.meta.Partition;
import com.linkedin.venice.meta.PartitionAssignment;
import com.linkedin.venice.meta.ReadOnlyStoreRepository;
import com.linkedin.venice.meta.RoutingDataRepository;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.StoreInfo;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
Expand Down Expand Up @@ -68,7 +66,6 @@
import org.apache.http.HttpStatus;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.util.Strings;


/**
Expand Down Expand Up @@ -532,101 +529,6 @@ public static File getTempDataDirectory(String prefix) {
}
}

/** This method should only be used for system stores.
* For other stores, use {@link Utils#getRealTimeTopicName(Store)}, {@link Utils#getRealTimeTopicName(StoreInfo)} or
* {@link Utils#getRealTimeTopicName(Version)}
*/
public static String composeRealTimeTopic(String storeName) {
return storeName + Version.REAL_TIME_TOPIC_SUFFIX;
}

/**
* It follows the following order to search for real time topic name,
* i) current store-version config, ii) store config, iii) other store-version configs, iv) default name
*/
public static String getRealTimeTopicName(Store store) {
return getRealTimeTopicName(
store.getName(),
store.getVersions(),
store.getCurrentVersion(),
store.getHybridStoreConfig());
}

public static String getRealTimeTopicName(StoreInfo storeInfo) {
return getRealTimeTopicName(
storeInfo.getName(),
storeInfo.getVersions(),
storeInfo.getCurrentVersion(),
storeInfo.getHybridStoreConfig());
}

public static String getRealTimeTopicName(Version version) {
HybridStoreConfig hybridStoreConfig = version.getHybridStoreConfig();
if (hybridStoreConfig != null) {
String realTimeTopicName = version.getHybridStoreConfig().getRealTimeTopicName();
return getRealTimeTopicNameIfEmpty(realTimeTopicName, version.getStoreName());
} else {
// if the version is not hybrid, caller should not ask for the real time topic,
// but unfortunately that happens, so instead of throwing exception, we just return a default name.
return composeRealTimeTopic(version.getStoreName());
}
}

static String getRealTimeTopicName(
String storeName,
List<Version> versions,
int currentVersionNumber,
HybridStoreConfig hybridStoreConfig) {
if (currentVersionNumber < 1) {
return composeRealTimeTopic(storeName);
}

Optional<Version> currentVersion =
versions.stream().filter(version -> version.getNumber() == currentVersionNumber).findFirst();
if (currentVersion.isPresent() && currentVersion.get().isHybrid()) {
String realTimeTopicName = currentVersion.get().getHybridStoreConfig().getRealTimeTopicName();
if (Strings.isNotBlank(realTimeTopicName)) {
return realTimeTopicName;
}
}

if (hybridStoreConfig != null) {
String realTimeTopicName = hybridStoreConfig.getRealTimeTopicName();
return getRealTimeTopicNameIfEmpty(realTimeTopicName, storeName);
}

Set<String> realTimeTopicNames = new HashSet<>();

for (Version version: versions) {
try {
if (version.isHybrid()) {
String realTimeTopicName = version.getHybridStoreConfig().getRealTimeTopicName();
if (Strings.isNotBlank(realTimeTopicName)) {
realTimeTopicNames.add(realTimeTopicName);
}
}
} catch (VeniceException e) {
// just try another version
}
}

if (realTimeTopicNames.size() > 1) {
LOGGER.warn(
"Store " + storeName + " and current version are not hybrid, yet " + realTimeTopicNames.size()
+ " older versions are using real time topics. Will return one of them.");
}

if (!realTimeTopicNames.isEmpty()) {
return realTimeTopicNames.iterator().next();
}

return composeRealTimeTopic(storeName);
}

private static String getRealTimeTopicNameIfEmpty(String realTimeTopicName, String storeName) {
return Strings.isBlank(realTimeTopicName) ? composeRealTimeTopic(storeName) : realTimeTopicName;
}

private static class TimeUnitInfo {
String suffix;
int multiplier;
Expand Down
Loading
Loading