Skip to content

Commit

Permalink
Revert "[controller] do not delay RT topic deletion if the store is n…
Browse files Browse the repository at this point in the history
…o longer hybrid (linkedin#1234)"

This reverts commit af40771.
  • Loading branch information
arjun4084346 committed Dec 2, 2024
1 parent 9da0f5d commit b6852ca
Show file tree
Hide file tree
Showing 9 changed files with 182 additions and 330 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,10 @@
import static com.linkedin.venice.ConfigKeys.KAFKA_BOOTSTRAP_SERVERS;
import static com.linkedin.venice.ConfigKeys.KAFKA_REPLICATION_FACTOR;
import static com.linkedin.venice.ConfigKeys.PARTICIPANT_MESSAGE_STORE_ENABLED;
import static com.linkedin.venice.ConfigKeys.TOPIC_CLEANUP_SLEEP_INTERVAL_BETWEEN_TOPIC_LIST_FETCH_MS;
import static com.linkedin.venice.ConfigKeys.UNREGISTER_METRIC_FOR_DELETED_STORE_ENABLED;
import static com.linkedin.venice.ConfigKeys.ZOOKEEPER_ADDRESS;

import com.linkedin.venice.common.VeniceSystemStoreUtils;
import com.linkedin.venice.controller.kafka.TopicCleanupService;
import com.linkedin.venice.controller.stats.TopicCleanupServiceStats;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.helix.HelixAdapterSerializer;
import com.linkedin.venice.helix.SafeHelixManager;
Expand Down Expand Up @@ -97,7 +94,6 @@ public void setupCluster(boolean createParticipantStore, MetricsRepository metri
}
properties.put(UNREGISTER_METRIC_FOR_DELETED_STORE_ENABLED, true);
properties.put(CONTROLLER_INSTANCE_TAG_LIST, "GENERAL,TEST");
properties.put(TOPIC_CLEANUP_SLEEP_INTERVAL_BETWEEN_TOPIC_LIST_FETCH_MS, 100);
controllerProps = new VeniceProperties(properties);
helixMessageChannelStats = new HelixMessageChannelStats(new MetricsRepository(), clusterName);
controllerConfig = new VeniceControllerClusterConfig(controllerProps);
Expand All @@ -109,13 +105,6 @@ public void setupCluster(boolean createParticipantStore, MetricsRepository metri
pubSubTopicRepository,
pubSubBrokerWrapper.getPubSubClientsFactory());
veniceAdmin.initStorageCluster(clusterName);
TopicCleanupService topicCleanupService = new TopicCleanupService(
veniceAdmin,
multiClusterConfig,
pubSubTopicRepository,
new TopicCleanupServiceStats(metricsRepository),
pubSubBrokerWrapper.getPubSubClientsFactory());
topicCleanupService.start();
startParticipant();
waitUntilIsLeader(veniceAdmin, clusterName, LEADER_CHANGE_TIMEOUT_MS);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,11 @@
import static com.linkedin.venice.utils.TestWriteUtils.getTempDataDirectory;
import static com.linkedin.venice.vpj.VenicePushJobConstants.DEFER_VERSION_SWAP;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.AssertJUnit.fail;

import com.linkedin.venice.common.VeniceSystemStoreUtils;
import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.ControllerResponse;
import com.linkedin.venice.controllerapi.JobStatusQueryResponse;
import com.linkedin.venice.controllerapi.NewStoreResponse;
import com.linkedin.venice.controllerapi.SchemaResponse;
import com.linkedin.venice.controllerapi.StoreResponse;
Expand All @@ -22,20 +20,17 @@
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.VeniceMultiClusterWrapper;
import com.linkedin.venice.integration.utils.VeniceTwoLayerMultiRegionMultiClusterWrapper;
import com.linkedin.venice.meta.BackupStrategy;
import com.linkedin.venice.meta.BufferReplayPolicy;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.StoreInfo;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pubsub.manager.TopicManager;
import com.linkedin.venice.schema.rmd.RmdSchemaEntry;
import com.linkedin.venice.schema.rmd.RmdSchemaGenerator;
import com.linkedin.venice.utils.IntegrationTestPushUtils;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.TestWriteUtils;
import com.linkedin.venice.utils.Time;
import com.linkedin.venice.utils.Utils;
import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -98,123 +93,6 @@ public void cleanUp() {
Utils.closeQuietlyWithErrorLogged(multiRegionMultiClusterWrapper);
}

@Test(timeOut = TEST_TIMEOUT)
public void testRTTopicDeletionWithHybridAndIncrementalVersions() {
String storeName = Utils.getUniqueString("testRTTopicDeletion");
String clusterName = CLUSTER_NAMES[0];
String rtTopicName = Version.composeRealTimeTopic(storeName);
PubSubTopic rtPubSubTopic = new PubSubTopicRepository().getTopic(rtTopicName);
String parentControllerURLs = multiRegionMultiClusterWrapper.getControllerConnectString();
ControllerClient parentControllerClient =
ControllerClient.constructClusterControllerClient(clusterName, parentControllerURLs);
ControllerClient[] childControllerClients = new ControllerClient[childDatacenters.size()];
for (int i = 0; i < childDatacenters.size(); i++) {
childControllerClients[i] =
new ControllerClient(clusterName, childDatacenters.get(i).getControllerConnectString());
}

List<TopicManager> topicManagers = new ArrayList<>(2);
topicManagers
.add(childDatacenters.get(0).getControllers().values().iterator().next().getVeniceAdmin().getTopicManager());
topicManagers
.add(childDatacenters.get(1).getControllers().values().iterator().next().getVeniceAdmin().getTopicManager());

NewStoreResponse newStoreResponse =
parentControllerClient.retryableRequest(5, c -> c.createNewStore(storeName, "", "\"string\"", "\"string\""));
Assert.assertFalse(
newStoreResponse.isError(),
"The NewStoreResponse returned an error: " + newStoreResponse.getError());

UpdateStoreQueryParams updateStoreParams = new UpdateStoreQueryParams();
updateStoreParams.setIncrementalPushEnabled(true)
.setBackupStrategy(BackupStrategy.KEEP_MIN_VERSIONS)
.setNumVersionsToPreserve(2)
.setHybridRewindSeconds(1000)
.setHybridOffsetLagThreshold(1000);
TestWriteUtils.updateStore(storeName, parentControllerClient, updateStoreParams);

// create new version by doing an empty push
ControllerResponse response = parentControllerClient
.sendEmptyPushAndWait(storeName, Utils.getUniqueString("empty-push"), 1L, 60L * Time.MS_PER_SECOND);
PubSubTopic versionPubsubTopic = getVersionPubsubTopic(storeName, response);

for (TopicManager topicManager: topicManagers) {
Assert.assertTrue(topicManager.containsTopic(versionPubsubTopic));
Assert.assertTrue(topicManager.containsTopic(rtPubSubTopic));
}
for (ControllerClient controllerClient: childControllerClients) {
Assert.assertEquals(controllerClient.getStore(storeName).getStore().getCurrentVersion(), 1);
}

// create new version by doing an empty push
response = parentControllerClient
.sendEmptyPushAndWait(storeName, Utils.getUniqueString("empty-push"), 1L, 60L * Time.MS_PER_SECOND);
versionPubsubTopic = getVersionPubsubTopic(storeName, response);

for (TopicManager topicManager: topicManagers) {
Assert.assertTrue(topicManager.containsTopic(versionPubsubTopic));
Assert.assertTrue(topicManager.containsTopic(rtPubSubTopic));
}
for (ControllerClient controllerClient: childControllerClients) {
Assert.assertEquals(controllerClient.getStore(storeName).getStore().getCurrentVersion(), 2);
}

// change store from hybrid to batch-only
UpdateStoreQueryParams params = new UpdateStoreQueryParams();
params.setHybridRewindSeconds(-1).setHybridTimeLagThreshold(-1).setHybridOffsetLagThreshold(-1);
TestWriteUtils.updateStore(storeName, parentControllerClient, params);

// create new version by doing an empty push
response = parentControllerClient
.sendEmptyPushAndWait(storeName, Utils.getUniqueString("empty-push"), 1L, 60L * Time.MS_PER_SECOND);

// at this point, the current version should be batch-only, but the older version should be hybrid, so rt topic
// should not get deleted
versionPubsubTopic = getVersionPubsubTopic(storeName, response);

for (ControllerClient controllerClient: childControllerClients) {
StoreInfo storeInfo = controllerClient.getStore(storeName).getStore();
int currentVersion = storeInfo.getCurrentVersion();
Assert.assertEquals(currentVersion, 3);
Assert.assertNull(storeInfo.getVersion(currentVersion).get().getHybridStoreConfig());
Assert.assertNotNull(storeInfo.getVersion(currentVersion - 1).get().getHybridStoreConfig());
}

for (TopicManager topicManager: topicManagers) {
Assert.assertTrue(topicManager.containsTopic(versionPubsubTopic));
Assert.assertTrue(topicManager.containsTopic(rtPubSubTopic));
}

// create new version by doing an empty push
response = parentControllerClient
.sendEmptyPushAndWait(storeName, Utils.getUniqueString("empty-push"), 1L, 60L * Time.MS_PER_SECOND);
versionPubsubTopic = getVersionPubsubTopic(storeName, response);
for (ControllerClient controllerClient: childControllerClients) {
Assert.assertEquals(controllerClient.getStore(storeName).getStore().getCurrentVersion(), 4);
}

// now both the versions should be batch-only, so rt topic should get deleted by TopicCleanupService
for (TopicManager topicManager: topicManagers) {
Assert.assertTrue(topicManager.containsTopic(versionPubsubTopic));
TestUtils.waitForNonDeterministicAssertion(
30,
TimeUnit.SECONDS,
true,
true,
() -> Assert.assertFalse(topicManager.containsTopic(rtPubSubTopic)));
}

/*
todo - RT topics are not used in parent controller in the current architecture, so we can ignore any RT topics in parent
controller that exist because they are still on old architecture.
TestUtils.waitForNonDeterministicAssertion(60, TimeUnit.SECONDS, true, true, () -> {
Assert.assertFalse(parentTopicManager.containsTopic(rtPubSubTopic));
}
);
*/
}

@Test(timeOut = TEST_TIMEOUT)
public void testUpdateStore() {
String clusterName = CLUSTER_NAMES[0];
Expand Down Expand Up @@ -687,15 +565,4 @@ private void emptyPushToStore(
});
}
}

static PubSubTopic getVersionPubsubTopic(String storeName, ControllerResponse response) {
assertFalse(response.isError(), "Failed to perform empty push on test store");
String versionTopic = null;
if (response instanceof VersionCreationResponse) {
versionTopic = ((VersionCreationResponse) response).getKafkaTopic();
} else if (response instanceof JobStatusQueryResponse) {
versionTopic = Version.composeKafkaTopic(storeName, ((JobStatusQueryResponse) response).getVersion());
}
return new PubSubTopicRepository().getTopic(versionTopic);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -538,8 +538,6 @@ InstanceRemovableStatuses getAggregatedHealthStatus(
List<String> toBeStoppedInstances,
boolean isSSLEnabled);

boolean isRTTopicDeletionPermittedByAllControllers(String clusterName, Store store);

/**
* Check if this controller itself is the leader controller for a given cluster or not. Note that the controller can be
* either a parent controller or a child controller since a cluster must have a leader child controller and a leader
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3517,7 +3517,7 @@ private void deleteOneStoreVersion(String clusterName, String storeName, int ver
PubSubTopic rtTopic = pubSubTopicRepository.getTopic(Version.composeRealTimeTopic(storeName));
if (!store.isHybrid() && getTopicManager().containsTopic(rtTopic)) {
store = resources.getStoreMetadataRepository().getStore(storeName);
safeDeleteRTTopic(clusterName, store);
safeDeleteRTTopic(clusterName, storeName, store);
}
}
}
Expand All @@ -3532,29 +3532,17 @@ private boolean hasFatalDataValidationError(PushMonitor pushMonitor, String topi
}
}

private void safeDeleteRTTopic(String clusterName, Store store) {
if (isRTTopicDeletionPermittedByAllControllers(clusterName, store)) {
deleteRTTopic(clusterName, store.getName());
}
}

public boolean isRTTopicDeletionPermittedByAllControllers(String clusterName, Store store) {
private void safeDeleteRTTopic(String clusterName, String storeName, Store store) {
// Perform RT cleanup checks for batch only store that used to be hybrid. Check the local versions first
// to see if any version is still using RT and then also check other fabrics before deleting the RT. Since
// we perform this check everytime when a store version is deleted we can afford to do best effort
// approach if some fabrics are unavailable or out of sync (temporarily).
String storeName = store.getName();
String rtTopicName = Version.composeRealTimeTopic(storeName);
if (Version.containsHybridVersion(store.getVersions())) {
LOGGER.warn(
"Topic {} cannot be deleted yet because the store {} has at least one hybrid version",
rtTopicName,
storeName);
return false;
}

boolean canDeleteRT = !Version.containsHybridVersion(store.getVersions());
Map<String, ControllerClient> controllerClientMap = getControllerClientMap(clusterName);
for (Map.Entry<String, ControllerClient> controllerClientEntry: controllerClientMap.entrySet()) {
if (!canDeleteRT) {
return;
}
StoreResponse storeResponse = controllerClientEntry.getValue().getStore(storeName);
if (storeResponse.isError()) {
LOGGER.warn(
Expand All @@ -3563,37 +3551,24 @@ public boolean isRTTopicDeletionPermittedByAllControllers(String clusterName, St
clusterName,
controllerClientEntry.getKey(),
storeResponse.getError());
return false;
}
if (Version.containsHybridVersion(storeResponse.getStore().getVersions())) {
LOGGER.warn(
"Topic {} cannot be deleted yet because the store {} has at least one hybrid version",
rtTopicName,
storeName);
return false;
return;
}
}
return true;
}

private void deleteRTTopic(String clusterName, String storeName) {
Map<String, ControllerClient> controllerClientMap = getControllerClientMap(clusterName);
String rtTopicToDelete = Version.composeRealTimeTopic(storeName);
deleteRTTopicFromAllFabrics(rtTopicToDelete, controllerClientMap);
// Check if there is incremental push topic exist. If yes, delete it and send out to let other controller to
// delete it.
String incrementalPushRTTopicToDelete = Version.composeSeparateRealTimeTopic(storeName);
if (getTopicManager().containsTopic(pubSubTopicRepository.getTopic(incrementalPushRTTopicToDelete))) {
deleteRTTopicFromAllFabrics(incrementalPushRTTopicToDelete, controllerClientMap);
}
}

private void deleteRTTopicFromAllFabrics(String topic, Map<String, ControllerClient> controllerClientMap) {
truncateKafkaTopic(topic);
for (ControllerClient controllerClient: controllerClientMap.values()) {
ControllerResponse deleteResponse = controllerClient.deleteKafkaTopic(topic);
if (deleteResponse.isError()) {
LOGGER.error("Deleting real time topic " + topic + " encountered error " + deleteResponse.getError());
canDeleteRT = !Version.containsHybridVersion(storeResponse.getStore().getVersions());
}
if (canDeleteRT) {
String rtTopicToDelete = Version.composeRealTimeTopic(storeName);
truncateKafkaTopic(rtTopicToDelete);
for (ControllerClient controllerClient: controllerClientMap.values()) {
controllerClient.deleteKafkaTopic(rtTopicToDelete);
}
// Check if there is incremental push topic exist. If yes, delete it and send out to let other controller to
// delete it.
String incrementalPushRTTopicToDelete = Version.composeSeparateRealTimeTopic(storeName);
if (getTopicManager().containsTopic(pubSubTopicRepository.getTopic(incrementalPushRTTopicToDelete))) {
truncateKafkaTopic(incrementalPushRTTopicToDelete);
for (ControllerClient controllerClient: controllerClientMap.values()) {
controllerClient.deleteKafkaTopic(incrementalPushRTTopicToDelete);
}
}
}
}
Expand Down Expand Up @@ -7465,7 +7440,7 @@ public void configureActiveActiveReplication(
if (storeName.isPresent()) {
/**
* Legacy stores venice_system_store_davinci_push_status_store_<cluster_name> still exist.
* But {@link HelixReadOnlyStoreRepositoryAdapter#getStore(String)} cannot find
* But {@link com.linkedin.venice.helix.HelixReadOnlyStoreRepositoryAdapter#getStore(String)} cannot find
* them by store names. Skip davinci push status stores until legacy znodes are cleaned up.
*/
VeniceSystemStoreType systemStoreType = VeniceSystemStoreType.getSystemStoreType(storeName.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3868,11 +3868,6 @@ public InstanceRemovableStatuses getAggregatedHealthStatus(
throw new VeniceUnsupportedOperationException("getAggregatedHealthStatus");
}

@Override
public boolean isRTTopicDeletionPermittedByAllControllers(String clusterName, Store store) {
return false;
}

/**
* @see VeniceHelixAdmin#isLeaderControllerFor(String)
*/
Expand Down
Loading

0 comments on commit b6852ca

Please sign in to comment.