Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[controller] do not delay RT topic deletion if the store is no longer hybrid #1358

Merged
merged 4 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@
import static com.linkedin.venice.ConfigKeys.KAFKA_BOOTSTRAP_SERVERS;
import static com.linkedin.venice.ConfigKeys.KAFKA_REPLICATION_FACTOR;
import static com.linkedin.venice.ConfigKeys.PARTICIPANT_MESSAGE_STORE_ENABLED;
import static com.linkedin.venice.ConfigKeys.TOPIC_CLEANUP_SLEEP_INTERVAL_BETWEEN_TOPIC_LIST_FETCH_MS;
import static com.linkedin.venice.ConfigKeys.UNREGISTER_METRIC_FOR_DELETED_STORE_ENABLED;
import static com.linkedin.venice.ConfigKeys.ZOOKEEPER_ADDRESS;

import com.linkedin.venice.common.VeniceSystemStoreUtils;
import com.linkedin.venice.controller.kafka.TopicCleanupService;
import com.linkedin.venice.controller.stats.TopicCleanupServiceStats;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.helix.HelixAdapterSerializer;
import com.linkedin.venice.helix.SafeHelixManager;
Expand Down Expand Up @@ -94,6 +97,7 @@ public void setupCluster(boolean createParticipantStore, MetricsRepository metri
}
properties.put(UNREGISTER_METRIC_FOR_DELETED_STORE_ENABLED, true);
properties.put(CONTROLLER_INSTANCE_TAG_LIST, "GENERAL,TEST");
properties.put(TOPIC_CLEANUP_SLEEP_INTERVAL_BETWEEN_TOPIC_LIST_FETCH_MS, 100);
controllerProps = new VeniceProperties(properties);
helixMessageChannelStats = new HelixMessageChannelStats(new MetricsRepository(), clusterName);
controllerConfig = new VeniceControllerClusterConfig(controllerProps);
Expand All @@ -105,6 +109,13 @@ public void setupCluster(boolean createParticipantStore, MetricsRepository metri
pubSubTopicRepository,
pubSubBrokerWrapper.getPubSubClientsFactory());
veniceAdmin.initStorageCluster(clusterName);
TopicCleanupService topicCleanupService = new TopicCleanupService(
veniceAdmin,
multiClusterConfig,
pubSubTopicRepository,
new TopicCleanupServiceStats(metricsRepository),
pubSubBrokerWrapper.getPubSubClientsFactory());
topicCleanupService.start();
startParticipant();
waitUntilIsLeader(veniceAdmin, clusterName, LEADER_CHANGE_TIMEOUT_MS);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@
import static com.linkedin.venice.utils.TestWriteUtils.getTempDataDirectory;
import static com.linkedin.venice.vpj.VenicePushJobConstants.DEFER_VERSION_SWAP;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.AssertJUnit.fail;

import com.linkedin.venice.common.VeniceSystemStoreUtils;
import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.ControllerResponse;
import com.linkedin.venice.controllerapi.JobStatusQueryResponse;
import com.linkedin.venice.controllerapi.NewStoreResponse;
import com.linkedin.venice.controllerapi.SchemaResponse;
import com.linkedin.venice.controllerapi.StoreResponse;
Expand All @@ -20,17 +22,20 @@
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.VeniceMultiClusterWrapper;
import com.linkedin.venice.integration.utils.VeniceTwoLayerMultiRegionMultiClusterWrapper;
import com.linkedin.venice.meta.BackupStrategy;
import com.linkedin.venice.meta.BufferReplayPolicy;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.StoreInfo;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pubsub.manager.TopicManager;
import com.linkedin.venice.schema.rmd.RmdSchemaEntry;
import com.linkedin.venice.schema.rmd.RmdSchemaGenerator;
import com.linkedin.venice.utils.IntegrationTestPushUtils;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.TestWriteUtils;
import com.linkedin.venice.utils.Time;
import com.linkedin.venice.utils.Utils;
import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -93,6 +98,123 @@ public void cleanUp() {
Utils.closeQuietlyWithErrorLogged(multiRegionMultiClusterWrapper);
}

@Test(timeOut = TEST_TIMEOUT)
public void testRTTopicDeletionWithHybridAndIncrementalVersions() {
String storeName = Utils.getUniqueString("testRTTopicDeletion");
String clusterName = CLUSTER_NAMES[0];
String rtTopicName = Version.composeRealTimeTopic(storeName);
PubSubTopic rtPubSubTopic = new PubSubTopicRepository().getTopic(rtTopicName);
String parentControllerURLs = multiRegionMultiClusterWrapper.getControllerConnectString();
ControllerClient parentControllerClient =
ControllerClient.constructClusterControllerClient(clusterName, parentControllerURLs);
ControllerClient[] childControllerClients = new ControllerClient[childDatacenters.size()];
for (int i = 0; i < childDatacenters.size(); i++) {
childControllerClients[i] =
new ControllerClient(clusterName, childDatacenters.get(i).getControllerConnectString());
}

List<TopicManager> topicManagers = new ArrayList<>(2);
topicManagers
.add(childDatacenters.get(0).getControllers().values().iterator().next().getVeniceAdmin().getTopicManager());
topicManagers
.add(childDatacenters.get(1).getControllers().values().iterator().next().getVeniceAdmin().getTopicManager());

NewStoreResponse newStoreResponse =
parentControllerClient.retryableRequest(5, c -> c.createNewStore(storeName, "", "\"string\"", "\"string\""));
Assert.assertFalse(
newStoreResponse.isError(),
"The NewStoreResponse returned an error: " + newStoreResponse.getError());

UpdateStoreQueryParams updateStoreParams = new UpdateStoreQueryParams();
updateStoreParams.setIncrementalPushEnabled(true)
.setBackupStrategy(BackupStrategy.KEEP_MIN_VERSIONS)
.setNumVersionsToPreserve(2)
.setHybridRewindSeconds(1000)
.setHybridOffsetLagThreshold(1000);
TestWriteUtils.updateStore(storeName, parentControllerClient, updateStoreParams);

// create new version by doing an empty push
ControllerResponse response = parentControllerClient
.sendEmptyPushAndWait(storeName, Utils.getUniqueString("empty-push"), 1L, 60L * Time.MS_PER_SECOND);
PubSubTopic versionPubsubTopic = getVersionPubsubTopic(storeName, response);

for (TopicManager topicManager: topicManagers) {
Assert.assertTrue(topicManager.containsTopic(versionPubsubTopic));
Assert.assertTrue(topicManager.containsTopic(rtPubSubTopic));
}
for (ControllerClient controllerClient: childControllerClients) {
Assert.assertEquals(controllerClient.getStore(storeName).getStore().getCurrentVersion(), 1);
}

// create new version by doing an empty push
response = parentControllerClient
.sendEmptyPushAndWait(storeName, Utils.getUniqueString("empty-push"), 1L, 60L * Time.MS_PER_SECOND);
versionPubsubTopic = getVersionPubsubTopic(storeName, response);

for (TopicManager topicManager: topicManagers) {
Assert.assertTrue(topicManager.containsTopic(versionPubsubTopic));
Assert.assertTrue(topicManager.containsTopic(rtPubSubTopic));
}
for (ControllerClient controllerClient: childControllerClients) {
Assert.assertEquals(controllerClient.getStore(storeName).getStore().getCurrentVersion(), 2);
}

// change store from hybrid to batch-only
UpdateStoreQueryParams params = new UpdateStoreQueryParams();
params.setHybridRewindSeconds(-1).setHybridTimeLagThreshold(-1).setHybridOffsetLagThreshold(-1);
TestWriteUtils.updateStore(storeName, parentControllerClient, params);

// create new version by doing an empty push
response = parentControllerClient
.sendEmptyPushAndWait(storeName, Utils.getUniqueString("empty-push"), 1L, 60L * Time.MS_PER_SECOND);

// at this point, the current version should be batch-only, but the older version should be hybrid, so rt topic
// should not get deleted
versionPubsubTopic = getVersionPubsubTopic(storeName, response);

for (ControllerClient controllerClient: childControllerClients) {
StoreInfo storeInfo = controllerClient.getStore(storeName).getStore();
int currentVersion = storeInfo.getCurrentVersion();
Assert.assertEquals(currentVersion, 3);
Assert.assertNull(storeInfo.getVersion(currentVersion).get().getHybridStoreConfig());
Assert.assertNotNull(storeInfo.getVersion(currentVersion - 1).get().getHybridStoreConfig());
}

for (TopicManager topicManager: topicManagers) {
Assert.assertTrue(topicManager.containsTopic(versionPubsubTopic));
Assert.assertTrue(topicManager.containsTopic(rtPubSubTopic));
}

// create new version by doing an empty push
response = parentControllerClient
.sendEmptyPushAndWait(storeName, Utils.getUniqueString("empty-push"), 1L, 60L * Time.MS_PER_SECOND);
versionPubsubTopic = getVersionPubsubTopic(storeName, response);
for (ControllerClient controllerClient: childControllerClients) {
Assert.assertEquals(controllerClient.getStore(storeName).getStore().getCurrentVersion(), 4);
}

// now both the versions should be batch-only, so rt topic should get deleted by TopicCleanupService
for (TopicManager topicManager: topicManagers) {
Assert.assertTrue(topicManager.containsTopic(versionPubsubTopic));
TestUtils.waitForNonDeterministicAssertion(
30,
TimeUnit.SECONDS,
true,
true,
() -> Assert.assertFalse(topicManager.containsTopic(rtPubSubTopic)));
}

/*
todo - RT topics are not used in parent controller in the current architecture, so we can ignore any RT topics in parent
controller that exist because they are still on old architecture.

TestUtils.waitForNonDeterministicAssertion(60, TimeUnit.SECONDS, true, true, () -> {
Assert.assertFalse(parentTopicManager.containsTopic(rtPubSubTopic));
}
);
*/
}

@Test(timeOut = TEST_TIMEOUT)
public void testUpdateStore() {
String clusterName = CLUSTER_NAMES[0];
Expand Down Expand Up @@ -565,4 +687,15 @@ private void emptyPushToStore(
});
}
}

static PubSubTopic getVersionPubsubTopic(String storeName, ControllerResponse response) {
assertFalse(response.isError(), "Failed to perform empty push on test store");
String versionTopic = null;
if (response instanceof VersionCreationResponse) {
versionTopic = ((VersionCreationResponse) response).getKafkaTopic();
} else if (response instanceof JobStatusQueryResponse) {
versionTopic = Version.composeKafkaTopic(storeName, ((JobStatusQueryResponse) response).getVersion());
}
return new PubSubTopicRepository().getTopic(versionTopic);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,8 @@ InstanceRemovableStatuses getAggregatedHealthStatus(
List<String> toBeStoppedInstances,
boolean isSSLEnabled);

boolean isRTTopicDeletionPermittedByAllControllers(String clusterName, String storeName);

/**
* Check if this controller itself is the leader controller for a given cluster or not. Note that the controller can be
* either a parent controller or a child controller since a cluster must have a leader child controller and a leader
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3516,8 +3516,7 @@ private void deleteOneStoreVersion(String clusterName, String storeName, int ver
}
PubSubTopic rtTopic = pubSubTopicRepository.getTopic(Version.composeRealTimeTopic(storeName));
if (!store.isHybrid() && getTopicManager().containsTopic(rtTopic)) {
store = resources.getStoreMetadataRepository().getStore(storeName);
safeDeleteRTTopic(clusterName, storeName, store);
safeDeleteRTTopic(clusterName, storeName);
}
}
}
Expand All @@ -3532,17 +3531,28 @@ private boolean hasFatalDataValidationError(PushMonitor pushMonitor, String topi
}
}

private void safeDeleteRTTopic(String clusterName, String storeName, Store store) {
// Perform RT cleanup checks for batch only store that used to be hybrid. Check the local versions first
// to see if any version is still using RT and then also check other fabrics before deleting the RT. Since
// we perform this check everytime when a store version is deleted we can afford to do best effort
private void safeDeleteRTTopic(String clusterName, String storeName) {
boolean rtDeletionPermitted = isRTTopicDeletionPermittedByAllControllers(clusterName, storeName);
if (rtDeletionPermitted) {
String rtTopicToDelete = Version.composeRealTimeTopic(storeName);
deleteRTTopicFromAllFabrics(rtTopicToDelete, clusterName);
// Check if there is incremental push topic exist. If yes, delete it and send out to let other controller to
// delete it.
String incrementalPushRTTopicToDelete = Version.composeSeparateRealTimeTopic(storeName);
if (getTopicManager().containsTopic(pubSubTopicRepository.getTopic(incrementalPushRTTopicToDelete))) {
deleteRTTopicFromAllFabrics(incrementalPushRTTopicToDelete, clusterName);
}
}
}

public boolean isRTTopicDeletionPermittedByAllControllers(String clusterName, String storeName) {
// Perform RT cleanup checks for batch only store that used to be hybrid. Check versions
// to see if any version is still using RT before deleting the RT.
// Since we perform this check everytime when a store version is deleted we can afford to do best effort
// approach if some fabrics are unavailable or out of sync (temporarily).
boolean canDeleteRT = !Version.containsHybridVersion(store.getVersions());
String rtTopicName = Version.composeRealTimeTopic(storeName);
Map<String, ControllerClient> controllerClientMap = getControllerClientMap(clusterName);
for (Map.Entry<String, ControllerClient> controllerClientEntry: controllerClientMap.entrySet()) {
if (!canDeleteRT) {
return;
}
StoreResponse storeResponse = controllerClientEntry.getValue().getStore(storeName);
if (storeResponse.isError()) {
LOGGER.warn(
Expand All @@ -3551,24 +3561,26 @@ private void safeDeleteRTTopic(String clusterName, String storeName, Store store
clusterName,
controllerClientEntry.getKey(),
storeResponse.getError());
return;
return false;
}
canDeleteRT = !Version.containsHybridVersion(storeResponse.getStore().getVersions());
}
if (canDeleteRT) {
String rtTopicToDelete = Version.composeRealTimeTopic(storeName);
truncateKafkaTopic(rtTopicToDelete);
for (ControllerClient controllerClient: controllerClientMap.values()) {
controllerClient.deleteKafkaTopic(rtTopicToDelete);
if (Version.containsHybridVersion(storeResponse.getStore().getVersions())) {
LOGGER.warn(
"Topic {} cannot be deleted yet because the store {} has at least one hybrid version",
rtTopicName,
storeName);
return false;
}
// Check if there is incremental push topic exist. If yes, delete it and send out to let other controller to
// delete it.
String incrementalPushRTTopicToDelete = Version.composeSeparateRealTimeTopic(storeName);
if (getTopicManager().containsTopic(pubSubTopicRepository.getTopic(incrementalPushRTTopicToDelete))) {
truncateKafkaTopic(incrementalPushRTTopicToDelete);
for (ControllerClient controllerClient: controllerClientMap.values()) {
controllerClient.deleteKafkaTopic(incrementalPushRTTopicToDelete);
}
}
return true;
}

private void deleteRTTopicFromAllFabrics(String topic, String clusterName) {
Map<String, ControllerClient> controllerClientMap = getControllerClientMap(clusterName);
truncateKafkaTopic(topic);
xunyin8 marked this conversation as resolved.
Show resolved Hide resolved
for (ControllerClient controllerClient: controllerClientMap.values()) {
ControllerResponse deleteResponse = controllerClient.deleteKafkaTopic(topic);
if (deleteResponse.isError()) {
LOGGER.error("Deleting real time topic " + topic + " encountered error " + deleteResponse.getError());
}
}
}
Expand Down Expand Up @@ -7440,7 +7452,7 @@ public void configureActiveActiveReplication(
if (storeName.isPresent()) {
/**
* Legacy stores venice_system_store_davinci_push_status_store_<cluster_name> still exist.
* But {@link com.linkedin.venice.helix.HelixReadOnlyStoreRepositoryAdapter#getStore(String)} cannot find
* But {@link HelixReadOnlyStoreRepositoryAdapter#getStore(String)} cannot find
* them by store names. Skip davinci push status stores until legacy znodes are cleaned up.
*/
VeniceSystemStoreType systemStoreType = VeniceSystemStoreType.getSystemStoreType(storeName.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3868,6 +3868,11 @@ public InstanceRemovableStatuses getAggregatedHealthStatus(
throw new VeniceUnsupportedOperationException("getAggregatedHealthStatus");
}

@Override
public boolean isRTTopicDeletionPermittedByAllControllers(String clusterName, String storeName) {
return false;
}

/**
* @see VeniceHelixAdmin#isLeaderControllerFor(String)
*/
Expand Down
Loading
Loading