diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/AbstractTestVeniceHelixAdmin.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/AbstractTestVeniceHelixAdmin.java index 5fe353a373..64cafb76da 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/AbstractTestVeniceHelixAdmin.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/AbstractTestVeniceHelixAdmin.java @@ -13,10 +13,13 @@ import static com.linkedin.venice.ConfigKeys.KAFKA_BOOTSTRAP_SERVERS; import static com.linkedin.venice.ConfigKeys.KAFKA_REPLICATION_FACTOR; import static com.linkedin.venice.ConfigKeys.PARTICIPANT_MESSAGE_STORE_ENABLED; +import static com.linkedin.venice.ConfigKeys.TOPIC_CLEANUP_SLEEP_INTERVAL_BETWEEN_TOPIC_LIST_FETCH_MS; import static com.linkedin.venice.ConfigKeys.UNREGISTER_METRIC_FOR_DELETED_STORE_ENABLED; import static com.linkedin.venice.ConfigKeys.ZOOKEEPER_ADDRESS; import com.linkedin.venice.common.VeniceSystemStoreUtils; +import com.linkedin.venice.controller.kafka.TopicCleanupService; +import com.linkedin.venice.controller.stats.TopicCleanupServiceStats; import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.helix.HelixAdapterSerializer; import com.linkedin.venice.helix.SafeHelixManager; @@ -94,6 +97,7 @@ public void setupCluster(boolean createParticipantStore, MetricsRepository metri } properties.put(UNREGISTER_METRIC_FOR_DELETED_STORE_ENABLED, true); properties.put(CONTROLLER_INSTANCE_TAG_LIST, "GENERAL,TEST"); + properties.put(TOPIC_CLEANUP_SLEEP_INTERVAL_BETWEEN_TOPIC_LIST_FETCH_MS, 100); controllerProps = new VeniceProperties(properties); helixMessageChannelStats = new HelixMessageChannelStats(new MetricsRepository(), clusterName); controllerConfig = new VeniceControllerClusterConfig(controllerProps); @@ -105,6 +109,13 @@ public void setupCluster(boolean createParticipantStore, MetricsRepository metri pubSubTopicRepository, pubSubBrokerWrapper.getPubSubClientsFactory()); veniceAdmin.initStorageCluster(clusterName); + TopicCleanupService topicCleanupService = new TopicCleanupService( + veniceAdmin, + multiClusterConfig, + pubSubTopicRepository, + new TopicCleanupServiceStats(metricsRepository), + pubSubBrokerWrapper.getPubSubClientsFactory()); + topicCleanupService.start(); startParticipant(); waitUntilIsLeader(veniceAdmin, clusterName, LEADER_CHANGE_TIMEOUT_MS); diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestParentControllerWithMultiDataCenter.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestParentControllerWithMultiDataCenter.java index 489200f156..5df0c3f9bb 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestParentControllerWithMultiDataCenter.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestParentControllerWithMultiDataCenter.java @@ -7,11 +7,13 @@ import static com.linkedin.venice.utils.TestWriteUtils.getTempDataDirectory; import static com.linkedin.venice.vpj.VenicePushJobConstants.DEFER_VERSION_SWAP; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; import static org.testng.AssertJUnit.fail; import com.linkedin.venice.common.VeniceSystemStoreUtils; import com.linkedin.venice.controllerapi.ControllerClient; import com.linkedin.venice.controllerapi.ControllerResponse; +import com.linkedin.venice.controllerapi.JobStatusQueryResponse; import com.linkedin.venice.controllerapi.NewStoreResponse; import com.linkedin.venice.controllerapi.SchemaResponse; import com.linkedin.venice.controllerapi.StoreResponse; @@ -20,17 +22,20 @@ import com.linkedin.venice.integration.utils.ServiceFactory; import com.linkedin.venice.integration.utils.VeniceMultiClusterWrapper; import com.linkedin.venice.integration.utils.VeniceTwoLayerMultiRegionMultiClusterWrapper; +import com.linkedin.venice.meta.BackupStrategy; import com.linkedin.venice.meta.BufferReplayPolicy; import com.linkedin.venice.meta.Store; import com.linkedin.venice.meta.StoreInfo; import com.linkedin.venice.meta.Version; import com.linkedin.venice.pubsub.PubSubTopicRepository; +import com.linkedin.venice.pubsub.api.PubSubTopic; import com.linkedin.venice.pubsub.manager.TopicManager; import com.linkedin.venice.schema.rmd.RmdSchemaEntry; import com.linkedin.venice.schema.rmd.RmdSchemaGenerator; import com.linkedin.venice.utils.IntegrationTestPushUtils; import com.linkedin.venice.utils.TestUtils; import com.linkedin.venice.utils.TestWriteUtils; +import com.linkedin.venice.utils.Time; import com.linkedin.venice.utils.Utils; import java.io.File; import java.io.IOException; @@ -93,6 +98,123 @@ public void cleanUp() { Utils.closeQuietlyWithErrorLogged(multiRegionMultiClusterWrapper); } + @Test(timeOut = TEST_TIMEOUT) + public void testRTTopicDeletionWithHybridAndIncrementalVersions() { + String storeName = Utils.getUniqueString("testRTTopicDeletion"); + String clusterName = CLUSTER_NAMES[0]; + String rtTopicName = Version.composeRealTimeTopic(storeName); + PubSubTopic rtPubSubTopic = new PubSubTopicRepository().getTopic(rtTopicName); + String parentControllerURLs = multiRegionMultiClusterWrapper.getControllerConnectString(); + ControllerClient parentControllerClient = + ControllerClient.constructClusterControllerClient(clusterName, parentControllerURLs); + ControllerClient[] childControllerClients = new ControllerClient[childDatacenters.size()]; + for (int i = 0; i < childDatacenters.size(); i++) { + childControllerClients[i] = + new ControllerClient(clusterName, childDatacenters.get(i).getControllerConnectString()); + } + + List topicManagers = new ArrayList<>(2); + topicManagers + .add(childDatacenters.get(0).getControllers().values().iterator().next().getVeniceAdmin().getTopicManager()); + topicManagers + .add(childDatacenters.get(1).getControllers().values().iterator().next().getVeniceAdmin().getTopicManager()); + + NewStoreResponse newStoreResponse = + parentControllerClient.retryableRequest(5, c -> c.createNewStore(storeName, "", "\"string\"", "\"string\"")); + Assert.assertFalse( + newStoreResponse.isError(), + "The NewStoreResponse returned an error: " + newStoreResponse.getError()); + + UpdateStoreQueryParams updateStoreParams = new UpdateStoreQueryParams(); + updateStoreParams.setIncrementalPushEnabled(true) + .setBackupStrategy(BackupStrategy.KEEP_MIN_VERSIONS) + .setNumVersionsToPreserve(2) + .setHybridRewindSeconds(1000) + .setHybridOffsetLagThreshold(1000); + TestWriteUtils.updateStore(storeName, parentControllerClient, updateStoreParams); + + // create new version by doing an empty push + ControllerResponse response = parentControllerClient + .sendEmptyPushAndWait(storeName, Utils.getUniqueString("empty-push"), 1L, 60L * Time.MS_PER_SECOND); + PubSubTopic versionPubsubTopic = getVersionPubsubTopic(storeName, response); + + for (TopicManager topicManager: topicManagers) { + Assert.assertTrue(topicManager.containsTopic(versionPubsubTopic)); + Assert.assertTrue(topicManager.containsTopic(rtPubSubTopic)); + } + for (ControllerClient controllerClient: childControllerClients) { + Assert.assertEquals(controllerClient.getStore(storeName).getStore().getCurrentVersion(), 1); + } + + // create new version by doing an empty push + response = parentControllerClient + .sendEmptyPushAndWait(storeName, Utils.getUniqueString("empty-push"), 1L, 60L * Time.MS_PER_SECOND); + versionPubsubTopic = getVersionPubsubTopic(storeName, response); + + for (TopicManager topicManager: topicManagers) { + Assert.assertTrue(topicManager.containsTopic(versionPubsubTopic)); + Assert.assertTrue(topicManager.containsTopic(rtPubSubTopic)); + } + for (ControllerClient controllerClient: childControllerClients) { + Assert.assertEquals(controllerClient.getStore(storeName).getStore().getCurrentVersion(), 2); + } + + // change store from hybrid to batch-only + UpdateStoreQueryParams params = new UpdateStoreQueryParams(); + params.setHybridRewindSeconds(-1).setHybridTimeLagThreshold(-1).setHybridOffsetLagThreshold(-1); + TestWriteUtils.updateStore(storeName, parentControllerClient, params); + + // create new version by doing an empty push + response = parentControllerClient + .sendEmptyPushAndWait(storeName, Utils.getUniqueString("empty-push"), 1L, 60L * Time.MS_PER_SECOND); + + // at this point, the current version should be batch-only, but the older version should be hybrid, so rt topic + // should not get deleted + versionPubsubTopic = getVersionPubsubTopic(storeName, response); + + for (ControllerClient controllerClient: childControllerClients) { + StoreInfo storeInfo = controllerClient.getStore(storeName).getStore(); + int currentVersion = storeInfo.getCurrentVersion(); + Assert.assertEquals(currentVersion, 3); + Assert.assertNull(storeInfo.getVersion(currentVersion).get().getHybridStoreConfig()); + Assert.assertNotNull(storeInfo.getVersion(currentVersion - 1).get().getHybridStoreConfig()); + } + + for (TopicManager topicManager: topicManagers) { + Assert.assertTrue(topicManager.containsTopic(versionPubsubTopic)); + Assert.assertTrue(topicManager.containsTopic(rtPubSubTopic)); + } + + // create new version by doing an empty push + response = parentControllerClient + .sendEmptyPushAndWait(storeName, Utils.getUniqueString("empty-push"), 1L, 60L * Time.MS_PER_SECOND); + versionPubsubTopic = getVersionPubsubTopic(storeName, response); + for (ControllerClient controllerClient: childControllerClients) { + Assert.assertEquals(controllerClient.getStore(storeName).getStore().getCurrentVersion(), 4); + } + + // now both the versions should be batch-only, so rt topic should get deleted by TopicCleanupService + for (TopicManager topicManager: topicManagers) { + Assert.assertTrue(topicManager.containsTopic(versionPubsubTopic)); + TestUtils.waitForNonDeterministicAssertion( + 30, + TimeUnit.SECONDS, + true, + true, + () -> Assert.assertFalse(topicManager.containsTopic(rtPubSubTopic))); + } + + /* + todo - RT topics are not used in parent controller in the current architecture, so we can ignore any RT topics in parent + controller that exist because they are still on old architecture. + + TestUtils.waitForNonDeterministicAssertion(60, TimeUnit.SECONDS, true, true, () -> { + Assert.assertFalse(parentTopicManager.containsTopic(rtPubSubTopic)); + } + ); + */ + } + @Test(timeOut = TEST_TIMEOUT) public void testUpdateStore() { String clusterName = CLUSTER_NAMES[0]; @@ -565,4 +687,15 @@ private void emptyPushToStore( }); } } + + static PubSubTopic getVersionPubsubTopic(String storeName, ControllerResponse response) { + assertFalse(response.isError(), "Failed to perform empty push on test store"); + String versionTopic = null; + if (response instanceof VersionCreationResponse) { + versionTopic = ((VersionCreationResponse) response).getKafkaTopic(); + } else if (response instanceof JobStatusQueryResponse) { + versionTopic = Version.composeKafkaTopic(storeName, ((JobStatusQueryResponse) response).getVersion()); + } + return new PubSubTopicRepository().getTopic(versionTopic); + } } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/Admin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/Admin.java index a912e8e648..6db491590b 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/Admin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/Admin.java @@ -538,6 +538,8 @@ InstanceRemovableStatuses getAggregatedHealthStatus( List toBeStoppedInstances, boolean isSSLEnabled); + boolean isRTTopicDeletionPermittedByAllControllers(String clusterName, String storeName); + /** * Check if this controller itself is the leader controller for a given cluster or not. Note that the controller can be * either a parent controller or a child controller since a cluster must have a leader child controller and a leader diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java index 963aa9c9e7..010ffbc3bb 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java @@ -3516,8 +3516,7 @@ private void deleteOneStoreVersion(String clusterName, String storeName, int ver } PubSubTopic rtTopic = pubSubTopicRepository.getTopic(Version.composeRealTimeTopic(storeName)); if (!store.isHybrid() && getTopicManager().containsTopic(rtTopic)) { - store = resources.getStoreMetadataRepository().getStore(storeName); - safeDeleteRTTopic(clusterName, storeName, store); + safeDeleteRTTopic(clusterName, storeName); } } } @@ -3532,17 +3531,28 @@ private boolean hasFatalDataValidationError(PushMonitor pushMonitor, String topi } } - private void safeDeleteRTTopic(String clusterName, String storeName, Store store) { - // Perform RT cleanup checks for batch only store that used to be hybrid. Check the local versions first - // to see if any version is still using RT and then also check other fabrics before deleting the RT. Since - // we perform this check everytime when a store version is deleted we can afford to do best effort + private void safeDeleteRTTopic(String clusterName, String storeName) { + boolean rtDeletionPermitted = isRTTopicDeletionPermittedByAllControllers(clusterName, storeName); + if (rtDeletionPermitted) { + String rtTopicToDelete = Version.composeRealTimeTopic(storeName); + deleteRTTopicFromAllFabrics(rtTopicToDelete, clusterName); + // Check if there is incremental push topic exist. If yes, delete it and send out to let other controller to + // delete it. + String incrementalPushRTTopicToDelete = Version.composeSeparateRealTimeTopic(storeName); + if (getTopicManager().containsTopic(pubSubTopicRepository.getTopic(incrementalPushRTTopicToDelete))) { + deleteRTTopicFromAllFabrics(incrementalPushRTTopicToDelete, clusterName); + } + } + } + + public boolean isRTTopicDeletionPermittedByAllControllers(String clusterName, String storeName) { + // Perform RT cleanup checks for batch only store that used to be hybrid. Check versions + // to see if any version is still using RT before deleting the RT. + // Since we perform this check everytime when a store version is deleted we can afford to do best effort // approach if some fabrics are unavailable or out of sync (temporarily). - boolean canDeleteRT = !Version.containsHybridVersion(store.getVersions()); + String rtTopicName = Version.composeRealTimeTopic(storeName); Map controllerClientMap = getControllerClientMap(clusterName); for (Map.Entry controllerClientEntry: controllerClientMap.entrySet()) { - if (!canDeleteRT) { - return; - } StoreResponse storeResponse = controllerClientEntry.getValue().getStore(storeName); if (storeResponse.isError()) { LOGGER.warn( @@ -3551,24 +3561,26 @@ private void safeDeleteRTTopic(String clusterName, String storeName, Store store clusterName, controllerClientEntry.getKey(), storeResponse.getError()); - return; + return false; } - canDeleteRT = !Version.containsHybridVersion(storeResponse.getStore().getVersions()); - } - if (canDeleteRT) { - String rtTopicToDelete = Version.composeRealTimeTopic(storeName); - truncateKafkaTopic(rtTopicToDelete); - for (ControllerClient controllerClient: controllerClientMap.values()) { - controllerClient.deleteKafkaTopic(rtTopicToDelete); + if (Version.containsHybridVersion(storeResponse.getStore().getVersions())) { + LOGGER.warn( + "Topic {} cannot be deleted yet because the store {} has at least one hybrid version", + rtTopicName, + storeName); + return false; } - // Check if there is incremental push topic exist. If yes, delete it and send out to let other controller to - // delete it. - String incrementalPushRTTopicToDelete = Version.composeSeparateRealTimeTopic(storeName); - if (getTopicManager().containsTopic(pubSubTopicRepository.getTopic(incrementalPushRTTopicToDelete))) { - truncateKafkaTopic(incrementalPushRTTopicToDelete); - for (ControllerClient controllerClient: controllerClientMap.values()) { - controllerClient.deleteKafkaTopic(incrementalPushRTTopicToDelete); - } + } + return true; + } + + private void deleteRTTopicFromAllFabrics(String topic, String clusterName) { + Map controllerClientMap = getControllerClientMap(clusterName); + truncateKafkaTopic(topic); + for (ControllerClient controllerClient: controllerClientMap.values()) { + ControllerResponse deleteResponse = controllerClient.deleteKafkaTopic(topic); + if (deleteResponse.isError()) { + LOGGER.error("Deleting real time topic " + topic + " encountered error " + deleteResponse.getError()); } } } @@ -7440,7 +7452,7 @@ public void configureActiveActiveReplication( if (storeName.isPresent()) { /** * Legacy stores venice_system_store_davinci_push_status_store_ still exist. - * But {@link com.linkedin.venice.helix.HelixReadOnlyStoreRepositoryAdapter#getStore(String)} cannot find + * But {@link HelixReadOnlyStoreRepositoryAdapter#getStore(String)} cannot find * them by store names. Skip davinci push status stores until legacy znodes are cleaned up. */ VeniceSystemStoreType systemStoreType = VeniceSystemStoreType.getSystemStoreType(storeName.get()); diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java index 1e47533814..a294cbe1e0 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java @@ -3868,6 +3868,11 @@ public InstanceRemovableStatuses getAggregatedHealthStatus( throw new VeniceUnsupportedOperationException("getAggregatedHealthStatus"); } + @Override + public boolean isRTTopicDeletionPermittedByAllControllers(String clusterName, String storeName) { + return false; + } + /** * @see VeniceHelixAdmin#isLeaderControllerFor(String) */ diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/TopicCleanupService.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/TopicCleanupService.java index b48db2de4b..09dcd6bc0e 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/TopicCleanupService.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/TopicCleanupService.java @@ -76,12 +76,8 @@ public class TopicCleanupService extends AbstractVeniceService { private final int minNumberOfUnusedKafkaTopicsToPreserve; private final AtomicBoolean stop = new AtomicBoolean(false); private final AtomicBoolean stopped = new AtomicBoolean(false); - private final Set childRegions; - private final Map> multiDataCenterStoreToVersionTopicCount; private final PubSubTopicRepository pubSubTopicRepository; private final TopicCleanupServiceStats topicCleanupServiceStats; - private String localDatacenter; - private boolean isRTTopicDeletionBlocked = false; private boolean isLeaderControllerOfControllerCluster = false; private long refreshQueueCycle = Time.MS_PER_MINUTE; @@ -109,23 +105,13 @@ public TopicCleanupService( this.multiClusterConfigs = multiClusterConfigs; this.pubSubTopicRepository = pubSubTopicRepository; this.topicCleanupServiceStats = topicCleanupServiceStats; - this.childRegions = multiClusterConfigs.getCommonConfig().getChildDatacenters(); - if (!admin.isParent()) { - // Only perform cross fabric VT check for RT deletion in child fabrics. - this.multiDataCenterStoreToVersionTopicCount = new HashMap<>(childRegions.size()); - for (String datacenter: childRegions) { - multiDataCenterStoreToVersionTopicCount.put(datacenter, new HashMap<>()); - } - } else { - this.multiDataCenterStoreToVersionTopicCount = Collections.emptyMap(); - } this.danglingTopicCleanupIntervalMs = Time.MS_PER_SECOND * multiClusterConfigs.getDanglingTopicCleanupIntervalSeconds(); - PubSubAdminAdapterFactory sourceOfTruthAdminAdapterFactory = + PubSubAdminAdapterFactory sourceOfTruthAdminAdapterFactory = multiClusterConfigs.getSourceOfTruthAdminAdapterFactory(); - PubSubAdminAdapterFactory pubSubAdminAdapterFactory = pubSubClientsFactory.getAdminAdapterFactory(); + PubSubAdminAdapterFactory pubSubAdminAdapterFactory = pubSubClientsFactory.getAdminAdapterFactory(); this.danglingTopicOccurrenceCounter = new HashMap<>(); this.danglingTopicOccurrenceThresholdForCleanup = multiClusterConfigs.getDanglingTopicOccurrenceThresholdForCleanup(); @@ -138,7 +124,7 @@ public TopicCleanupService( } private PubSubAdminAdapter constructSourceOfTruthPubSubAdminAdapter( - PubSubAdminAdapterFactory sourceOfTruthAdminAdapterFactory) { + PubSubAdminAdapterFactory sourceOfTruthAdminAdapterFactory) { VeniceProperties veniceProperties = admin.getPubSubSSLProperties(getTopicManager().getPubSubClusterAddress()); return sourceOfTruthAdminAdapterFactory.create(veniceProperties, pubSubTopicRepository); } @@ -180,7 +166,7 @@ boolean isStopped() { return this.stopped.get(); } - TopicManager getTopicManager() { + final TopicManager getTopicManager() { return admin.getTopicManager(); } @@ -239,45 +225,33 @@ void cleanupVeniceTopics() { populateDeprecatedTopicQueue(allTopics); topicCleanupServiceStats.recordDeletableTopicsCount(allTopics.size()); long refreshTime = System.currentTimeMillis(); + while (!allTopics.isEmpty()) { PubSubTopic topic = allTopics.poll(); + String storeName = topic.getStoreName(); + String clusterDiscovered; try { - if (topic.isRealTime() && !multiDataCenterStoreToVersionTopicCount.isEmpty()) { - // Only delete realtime topic in child fabrics if all version topics are deleted in all child fabrics. - if (isRTTopicDeletionBlocked) { - LOGGER.warn( - "Topic deletion for topic: {} is blocked due to unable to fetch version topic info", - topic.getName()); - topicCleanupServiceStats.recordTopicDeletionError(); - continue; - } - boolean canDelete = true; - for (Map.Entry> mapEntry: multiDataCenterStoreToVersionTopicCount.entrySet()) { - if (mapEntry.getValue().containsKey(topic.getStoreName())) { - canDelete = false; - LOGGER.info( - "Topic deletion for topic: {} is delayed due to {} version topics found in datacenter {}", - topic.getName(), - mapEntry.getValue().get(topic.getStoreName()), - mapEntry.getKey()); - break; - } - } - if (!canDelete) { - continue; - } - } - getTopicManager().ensureTopicIsDeletedAndBlockWithRetry(topic); - topicCleanupServiceStats.recordTopicDeleted(); - } catch (VeniceException e) { - LOGGER.warn("Caught exception when trying to delete topic: {} - {}", topic.getName(), e.toString()); - topicCleanupServiceStats.recordTopicDeletionError(); - // No op, will try again in the next cleanup cycle. + clusterDiscovered = admin.discoverCluster(storeName).getFirst(); + } catch (VeniceNoStoreException e) { + LOGGER.warn( + "Store {} not found. Exception when trying to delete topic: {} - {}", + storeName, + topic.getName(), + e.toString()); + deleteTopic(topic); + continue; + } + + if (!topic.isRealTime() || admin.isRTTopicDeletionPermittedByAllControllers(clusterDiscovered, storeName)) { + // delete if it is a VT topic or an RT topic eligible for deletion by the above condition + deleteTopic(topic); + } else { + LOGGER.warn("Topic deletion for topic: {} is delayed.", topic.getName()); } if (!topic.isRealTime()) { // If Version topic deletion took long time, skip further VT deletion and check if we have new RT topic to - // delete + // delete. Some new RT topics might have become eligible for deletion in this period. if (System.currentTimeMillis() - refreshTime > refreshQueueCycle) { allTopics.clear(); populateDeprecatedTopicQueue(allTopics); @@ -290,10 +264,20 @@ void cleanupVeniceTopics() { } } + private void deleteTopic(PubSubTopic topic) { + try { + getTopicManager().ensureTopicIsDeletedAndBlockWithRetry(topic); + topicCleanupServiceStats.recordTopicDeleted(); + } catch (VeniceException e) { + LOGGER.warn("Caught exception when trying to delete topic: {} - {}", topic.getName(), e.toString()); + topicCleanupServiceStats.recordTopicDeletionError(); + // No op, will try again in the next cleanup cycle. + } + } + private void populateDeprecatedTopicQueue(PriorityQueue topics) { Map topicsWithRetention = getTopicManager().getAllTopicRetentions(); Map> allStoreTopics = getAllVeniceStoreTopicsRetentions(topicsWithRetention); - AtomicBoolean realTimeTopicDeletionNeeded = new AtomicBoolean(false); allStoreTopics.forEach((storeName, topicRetentions) -> { int minNumOfUnusedVersionTopicsOverride = minNumberOfUnusedKafkaTopicsToPreserve; PubSubTopic realTimeTopic = pubSubTopicRepository.getTopic(Version.composeRealTimeTopic(storeName)); @@ -301,7 +285,6 @@ private void populateDeprecatedTopicQueue(PriorityQueue topics) { if (admin.isTopicTruncatedBasedOnRetention(topicRetentions.get(realTimeTopic))) { topics.offer(realTimeTopic); minNumOfUnusedVersionTopicsOverride = 0; - realTimeTopicDeletionNeeded.set(true); } topicRetentions.remove(realTimeTopic); } @@ -311,9 +294,6 @@ private void populateDeprecatedTopicQueue(PriorityQueue topics) { topics.addAll(oldTopicsToDelete); } }); - if (realTimeTopicDeletionNeeded.get() && !multiDataCenterStoreToVersionTopicCount.isEmpty()) { - refreshMultiDataCenterStoreToVersionTopicCountMap(topicsWithRetention.keySet()); - } // Check if there are dangling topics to be deleted. if (sourceOfTruthPubSubAdminAdapter != null @@ -327,60 +307,6 @@ private void populateDeprecatedTopicQueue(PriorityQueue topics) { } } - private void refreshMultiDataCenterStoreToVersionTopicCountMap(Set localTopics) { - if (localDatacenter == null) { - String localPubSubBootstrapServer = getTopicManager().getPubSubClusterAddress(); - for (String childFabric: childRegions) { - if (localPubSubBootstrapServer.equals(multiClusterConfigs.getChildDataCenterKafkaUrlMap().get(childFabric))) { - localDatacenter = childFabric; - break; - } - } - if (localDatacenter == null) { - String childFabrics = String.join(",", childRegions); - LOGGER.error( - "Blocking RT topic deletion. Cannot find local datacenter in child datacenter list: {}", - childFabrics); - isRTTopicDeletionBlocked = true; - return; - } - } - clearAndPopulateStoreToVersionTopicCountMap( - localTopics, - multiDataCenterStoreToVersionTopicCount.get(localDatacenter)); - if (childRegions.size() > 1) { - for (String childFabric: childRegions) { - try { - if (childFabric.equals(localDatacenter)) { - continue; - } - String pubSubBootstrapServer = multiClusterConfigs.getChildDataCenterKafkaUrlMap().get(childFabric); - Set remoteTopics = getTopicManager(pubSubBootstrapServer).listTopics(); - clearAndPopulateStoreToVersionTopicCountMap( - remoteTopics, - multiDataCenterStoreToVersionTopicCount.get(childFabric)); - } catch (Exception e) { - LOGGER.error("Failed to refresh store to version topic count map for fabric {}", childFabric, e); - isRTTopicDeletionBlocked = true; - return; - } - } - } - isRTTopicDeletionBlocked = false; - } - - private static void clearAndPopulateStoreToVersionTopicCountMap( - Set topics, - Map storeToVersionTopicCountMap) { - storeToVersionTopicCountMap.clear(); - for (PubSubTopic topic: topics) { - String storeName = topic.getStoreName(); - if (!storeName.isEmpty() && topic.isVersionTopic()) { - storeToVersionTopicCountMap.merge(storeName, 1, Integer::sum); - } - } - } - /** * @return a map object that maps from the store name to the Kafka topic name and its configured Kafka retention time. */ @@ -452,11 +378,18 @@ public static List extractVersionTopicsToCleanup( /** Consider only truncated topics */ .filter(t -> admin.isTopicTruncatedBasedOnRetention(t.getName(), topicRetentions.get(t))) /** Always preserve the last {@link #minNumberOfUnusedKafkaTopicsToPreserve} topics, whether they are healthy or not */ - .filter(t -> Version.parseVersionFromKafkaTopicName(t.getName()) <= maxVersionNumberToDelete) + .filter(t -> { + try { + return Version.parseVersionFromKafkaTopicName(t.getName()) <= maxVersionNumberToDelete; + } catch (Exception e) { + LOGGER.error("Could not parse version from kafka topic " + t.getName()); + return false; + } + }) /** * Filter out resources, which haven't been fully removed in child fabrics yet. This is only performed in the * child fabric because parent fabric don't have storage node helix resources. - * + *

* The reason to filter out still-alive resource is to avoid triggering the non-existing topic issue * of Kafka consumer happening in Storage Node. */ diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/TestTopicCleanupService.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/TestTopicCleanupService.java index 52e3fe2020..9e6d3b40ed 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/TestTopicCleanupService.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/TestTopicCleanupService.java @@ -19,9 +19,9 @@ import com.linkedin.venice.controller.VeniceControllerClusterConfig; import com.linkedin.venice.controller.VeniceControllerMultiClusterConfig; import com.linkedin.venice.controller.stats.TopicCleanupServiceStats; -import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.exceptions.VeniceNoStoreException; import com.linkedin.venice.helix.HelixReadOnlyStoreConfigRepository; +import com.linkedin.venice.meta.HybridStoreConfig; import com.linkedin.venice.meta.Store; import com.linkedin.venice.meta.StoreConfig; import com.linkedin.venice.meta.Version; @@ -43,7 +43,6 @@ import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -190,12 +189,15 @@ public void testExtractVeniceTopicsToCleanup() { } @Test - public void testCleanupVeniceTopics() throws ExecutionException { + public void testCleanupVeniceTopics() { + String clusterName = "clusterName"; String storeName1 = Utils.getUniqueString("store1"); String storeName2 = Utils.getUniqueString("store2"); String storeName3 = Utils.getUniqueString("store3"); String storeName4 = Utils.getUniqueString("store4"); String storeName5 = Utils.getUniqueString("store5"); + String storeName6 = Utils.getUniqueString("store6"); + Map storeTopics = new HashMap<>(); storeTopics.put(getPubSubTopic(storeName1, "_v1"), 1000L); storeTopics.put(getPubSubTopic(storeName1, "_v2"), 1000L); @@ -208,6 +210,9 @@ public void testCleanupVeniceTopics() throws ExecutionException { storeTopics.put(getPubSubTopic(storeName3, "_v100"), Long.MAX_VALUE); storeTopics.put(getPubSubTopic(storeName4, "_rt"), Long.MAX_VALUE); storeTopics.put(getPubSubTopic(storeName5, "_v1"), Long.MAX_VALUE); + storeTopics.put(getPubSubTopic(storeName6, "_rt"), 1000L); + storeTopics.put(getPubSubTopic(storeName6, "_v1"), Long.MAX_VALUE); + storeTopics.put(getPubSubTopic(storeName6, "_v2"), Long.MAX_VALUE); storeTopics.put(getPubSubTopic(PubSubTopicType.ADMIN_TOPIC_PREFIX, "_cluster"), Long.MAX_VALUE); Map storeTopics2 = new HashMap<>(); @@ -231,6 +236,8 @@ public void testCleanupVeniceTopics() throws ExecutionException { doReturn(true).when(admin).isTopicTruncatedBasedOnRetention(1000L); doReturn(false).when(admin).isTopicTruncatedBasedOnRetention(any(), eq(Long.MAX_VALUE)); doReturn(true).when(admin).isTopicTruncatedBasedOnRetention(any(), eq(1000L)); + doReturn(Pair.create(clusterName, null)).when(admin).discoverCluster(anyString()); + doReturn(true).when(admin).isRTTopicDeletionPermittedByAllControllers(anyString(), any()); doReturn(Optional.of(new StoreConfig(storeName1))).when(storeConfigRepository).getStoreConfig(storeName1); Set pubSubTopicSet = new HashSet<>(); @@ -245,24 +252,51 @@ public void testCleanupVeniceTopics() throws ExecutionException { doReturn(apacheKafkaAdminAdapter).when(apacheKafkaAdminAdapterFactory).create(any(), eq(pubSubTopicRepository)); topicCleanupService.setSourceOfTruthPubSubAdminAdapter(apacheKafkaAdminAdapter); - String clusterName = "clusterName"; Pair pair = new Pair<>(clusterName, ""); - doReturn(pair).when(admin).discoverCluster(storeName3); - doReturn(pair).when(admin).discoverCluster(storeName4); + doReturn(pair).when(admin).discoverCluster(anyString()); doThrow(new VeniceNoStoreException(storeName5)).when(admin).discoverCluster(storeName5); + Store store2 = mock(Store.class); + doReturn(storeName2).when(store2).getName(); + Store store3 = mock(Store.class); + doReturn(storeName3).when(store3).getName(); + doReturn(false).when(store3).containsVersion(100); Store store4 = mock(Store.class); + doReturn(storeName4).when(store4).getName(); doReturn(false).when(store4).isHybrid(); - Version version = mock(Version.class); - doReturn(null).when(version).getHybridStoreConfig(); + Store store5 = mock(Store.class); + + Store store6 = mock(Store.class); + doReturn(storeName6).when(store6).getName(); + + Version batchVersion = mock(Version.class); + doReturn(null).when(batchVersion).getHybridStoreConfig(); + + doReturn(store2).when(admin).getStore(clusterName, storeName2); doReturn(store3).when(admin).getStore(clusterName, storeName3); doReturn(store4).when(admin).getStore(clusterName, storeName4); + doReturn(store5).when(admin).getStore(clusterName, storeName5); + doReturn(store6).when(admin).getStore(clusterName, storeName6); doReturn(pubSubTopicSet).when(apacheKafkaAdminAdapter).listAllTopics(); + Version hybridVersion = mock(Version.class); + doReturn(mock(HybridStoreConfig.class)).when(hybridVersion).getHybridStoreConfig(); + + doReturn(Collections.singletonList(hybridVersion)).when(store2).getVersions(); + doReturn(Collections.singletonList(hybridVersion)).when(store3).getVersions(); + doReturn(Collections.singletonList(batchVersion)).when(store6).getVersions(); + // simulating blocked delete + doReturn(false).when(admin).isRTTopicDeletionPermittedByAllControllers(anyString(), eq(storeName1)); + doReturn(false).when(admin).isRTTopicDeletionPermittedByAllControllers(anyString(), eq(storeName2)); + doReturn(false).when(admin).isRTTopicDeletionPermittedByAllControllers(anyString(), eq(storeName3)); + doReturn(true).when(admin).isRTTopicDeletionPermittedByAllControllers(anyString(), eq(storeName4)); + doReturn(true).when(admin).isRTTopicDeletionPermittedByAllControllers(anyString(), eq(storeName5)); + doReturn(true).when(admin).isRTTopicDeletionPermittedByAllControllers(anyString(), eq(storeName6)); + topicCleanupService.cleanupVeniceTopics(); verify(topicManager, never()).ensureTopicIsDeletedAndBlockWithRetry(getPubSubTopic(storeName1, "_rt")); @@ -271,17 +305,21 @@ public void testCleanupVeniceTopics() throws ExecutionException { verify(topicManager, never()).ensureTopicIsDeletedAndBlockWithRetry(getPubSubTopic(storeName1, "_v3")); verify(topicManager, never()).ensureTopicIsDeletedAndBlockWithRetry(getPubSubTopic(storeName1, "_v4")); verify(topicManager, atLeastOnce()).ensureTopicIsDeletedAndBlockWithRetry(getPubSubTopic(storeName2, "_v1")); + verify(topicManager, atLeastOnce()).ensureTopicIsDeletedAndBlockWithRetry(getPubSubTopic(storeName6, "_rt")); // Delete should be blocked by local VT verify(topicManager, never()).ensureTopicIsDeletedAndBlockWithRetry(getPubSubTopic(storeName2, "_rt")); // Delete should be blocked by remote VT verify(topicManager, never()).ensureTopicIsDeletedAndBlockWithRetry(getPubSubTopic(storeName3, "_rt")); - verify(topicCleanupServiceStats, atLeastOnce()).recordDeletableTopicsCount(8); + verify(topicCleanupServiceStats, atLeastOnce()).recordDeletableTopicsCount(9); verify(topicCleanupServiceStats, never()).recordTopicDeletionError(); verify(topicCleanupServiceStats, atLeastOnce()).recordTopicDeleted(); verify(topicManager, atLeastOnce()).ensureTopicIsDeletedAndBlockWithRetry(getPubSubTopic(storeName3, "_v100")); verify(topicManager, atLeastOnce()).ensureTopicIsDeletedAndBlockWithRetry(getPubSubTopic(storeName4, "_rt")); verify(topicManager, atLeastOnce()).ensureTopicIsDeletedAndBlockWithRetry(getPubSubTopic(storeName5, "_v1")); + + doReturn(true).when(admin).isRTTopicDeletionPermittedByAllControllers(anyString(), eq(storeName2)); + doReturn(true).when(admin).isRTTopicDeletionPermittedByAllControllers(anyString(), eq(storeName3)); topicCleanupService.cleanupVeniceTopics(); verify(topicManager, never()).ensureTopicIsDeletedAndBlockWithRetry(getPubSubTopic(storeName1, "_v3")); @@ -297,7 +335,7 @@ private PubSubTopic getPubSubTopic(String storeName, String suffix) { } @Test - public void testRun() throws Exception { + public void testRun() { String storeName1 = Utils.getUniqueString("store1"); String storeName2 = Utils.getUniqueString("store2"); String storeName3 = Utils.getUniqueString("store3"); @@ -330,6 +368,7 @@ public void testRun() throws Exception { doReturn(false).when(admin).isTopicTruncatedBasedOnRetention(any(), eq(Long.MAX_VALUE)); doReturn(true).when(admin).isTopicTruncatedBasedOnRetention(any(), eq(1000L)); doReturn(true).when(admin).isLeaderControllerOfControllerCluster(); + doReturn(Pair.create("cluster0", null)).when(admin).discoverCluster(any()); // Resource is still alive doReturn(true).when(admin).isResourceStillAlive(storeName2 + "_v2"); @@ -355,7 +394,7 @@ public void testRun() throws Exception { } @Test - public void testRunWhenCurrentControllerChangeFromLeaderToFollower() throws Exception { + public void testRunWhenCurrentControllerChangeFromLeaderToFollower() { String storeName1 = Utils.getUniqueString("store1"); doReturn(Optional.of(new StoreConfig(storeName1))).when(storeConfigRepository).getStoreConfig(storeName1); Map storeTopics1 = new HashMap<>(); @@ -387,9 +426,10 @@ public void testRunWhenCurrentControllerChangeFromLeaderToFollower() throws Exce } @Test - public void testRunWhenCurrentControllerChangeFromFollowerToLeader() throws Exception { + public void testRunWhenCurrentControllerChangeFromFollowerToLeader() { String storeName1 = Utils.getUniqueString("store1"); doReturn(Optional.of(new StoreConfig(storeName1))).when(storeConfigRepository).getStoreConfig(storeName1); + doReturn(new Pair<>("clusterName", "")).when(admin).discoverCluster(anyString()); Map storeTopics1 = new HashMap<>(); storeTopics1.put(getPubSubTopic(storeName1, "_v1"), 1000L); storeTopics1.put(getPubSubTopic(storeName1, "_v2"), 1000L); @@ -441,57 +481,27 @@ public void testExtractVersionTopicsToCleanupIgnoresInputWithNonVersionTopics() assertTrue(deletableTopics.contains(pubSubTopicRepository.getTopic(Version.composeKafkaTopic(storeName, 1)))); } - @Test - public void testCleanVeniceTopicsBlockRTTopicDeletionWhenMisconfigured() { - // RT topic deletion should be blocked when controller is misconfigured - // Mis-configured where local data center is not in the child data centers list - VeniceControllerClusterConfig controllerConfig = mock(VeniceControllerClusterConfig.class); - doReturn(controllerConfig).when(veniceControllerMultiClusterConfig).getCommonConfig(); - doReturn(Collections.singleton("remote")).when(controllerConfig).getChildDatacenters(); - TopicCleanupService blockedTopicCleanupService = new TopicCleanupService( - admin, - veniceControllerMultiClusterConfig, - pubSubTopicRepository, - topicCleanupServiceStats, - pubSubClientsFactory); - String storeName = Utils.getUniqueString("testStore"); - Map storeTopics = new HashMap<>(); - storeTopics.put(getPubSubTopic(storeName, "_rt"), 1000L); - doReturn(false).when(admin).isTopicTruncatedBasedOnRetention(Long.MAX_VALUE); - doReturn(true).when(admin).isTopicTruncatedBasedOnRetention(1000L); - doReturn(false).when(admin).isTopicTruncatedBasedOnRetention(any(), eq(Long.MAX_VALUE)); - doReturn(true).when(admin).isTopicTruncatedBasedOnRetention(any(), eq(1000L)); - doReturn(storeTopics).when(topicManager).getAllTopicRetentions(); - doReturn(storeTopics).when(remoteTopicManager).getAllTopicRetentions(); - doReturn(Optional.of(new StoreConfig(storeName))).when(storeConfigRepository).getStoreConfig(storeName); - blockedTopicCleanupService.cleanupVeniceTopics(); - verify(topicManager, atLeastOnce()).getPubSubClusterAddress(); - verify(topicManager, never()).ensureTopicIsDeletedAndBlockWithRetry(getPubSubTopic(storeName, "_rt")); - verify(topicCleanupServiceStats, atLeastOnce()).recordDeletableTopicsCount(1); - verify(topicCleanupServiceStats, atLeastOnce()).recordTopicDeletionError(); - } - @Test public void testCleanVeniceTopicRTTopicDeletionWithErrorFetchingVT() { // RT topic deletion should be blocked when version topic cannot be fetched due to error String storeName = Utils.getUniqueString("testStore"); Map storeTopics = new HashMap<>(); storeTopics.put(getPubSubTopic(storeName, "_rt"), 1000L); + doReturn(Pair.create("cluster0", null)).when(admin).discoverCluster(any()); doReturn(false).when(admin).isTopicTruncatedBasedOnRetention(Long.MAX_VALUE); doReturn(true).when(admin).isTopicTruncatedBasedOnRetention(1000L); doReturn(false).when(admin).isTopicTruncatedBasedOnRetention(any(), eq(Long.MAX_VALUE)); doReturn(true).when(admin).isTopicTruncatedBasedOnRetention(any(), eq(1000L)); doReturn(storeTopics).when(topicManager).getAllTopicRetentions(); + doReturn(false).when(admin).isRTTopicDeletionPermittedByAllControllers(anyString(), any()); doReturn(Optional.of(new StoreConfig(storeName))).when(storeConfigRepository).getStoreConfig(storeName); - when(remoteTopicManager.listTopics()).thenThrow(new VeniceException("test")).thenReturn(storeTopics.keySet()); topicCleanupService.cleanupVeniceTopics(); verify(topicManager, never()).ensureTopicIsDeletedAndBlockWithRetry(getPubSubTopic(storeName, "_rt")); - verify(remoteTopicManager, atLeastOnce()).listTopics(); verify(topicCleanupServiceStats, atLeastOnce()).recordDeletableTopicsCount(1); - verify(topicCleanupServiceStats, atLeastOnce()).recordTopicDeletionError(); + doReturn(true).when(admin).isRTTopicDeletionPermittedByAllControllers(anyString(), any()); topicCleanupService.cleanupVeniceTopics(); verify(topicManager, atLeastOnce()).ensureTopicIsDeletedAndBlockWithRetry(getPubSubTopic(storeName, "_rt")); @@ -511,6 +521,8 @@ public void testCleanVeniceTopicOnlyFetchVTOnRTTopicDeletion() { doReturn(true).when(admin).isTopicTruncatedBasedOnRetention(1000L); doReturn(false).when(admin).isTopicTruncatedBasedOnRetention(any(), eq(Long.MAX_VALUE)); doReturn(true).when(admin).isTopicTruncatedBasedOnRetention(any(), eq(1000L)); + doReturn(Pair.create("cluster0", null)).when(admin).discoverCluster(any()); + doReturn(true).when(admin).isRTTopicDeletionPermittedByAllControllers(anyString(), any()); when(topicManager.getAllTopicRetentions()).thenReturn(storeTopics1).thenReturn(storeTopics2); doReturn(storeTopics2).when(remoteTopicManager).getAllTopicRetentions(); @@ -519,8 +531,6 @@ public void testCleanVeniceTopicOnlyFetchVTOnRTTopicDeletion() { verify(remoteTopicManager, never()).getAllTopicRetentions(); topicCleanupService.cleanupVeniceTopics(); - - verify(remoteTopicManager, atLeastOnce()).listTopics(); } @Test diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/TestTopicCleanupServiceForMultiKafkaClusters.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/TestTopicCleanupServiceForMultiKafkaClusters.java index 60170dcc27..f4f3ce5ae3 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/TestTopicCleanupServiceForMultiKafkaClusters.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/TestTopicCleanupServiceForMultiKafkaClusters.java @@ -59,6 +59,7 @@ public void setUp() { doReturn(kafkaUrlMap).when(config).getChildDataCenterKafkaUrlMap(); admin = mock(Admin.class); + doReturn(true).when(admin).isParent(); topicManager1 = mock(TopicManager.class); doReturn(kafkaClusterServerUrl1).when(topicManager1).getPubSubClusterAddress(); doReturn(topicManager1).when(admin).getTopicManager(kafkaClusterServerUrl1); diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/TestTopicCleanupServiceForParentController.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/TestTopicCleanupServiceForParentController.java index 0c4a987da5..4893b51587 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/TestTopicCleanupServiceForParentController.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/TestTopicCleanupServiceForParentController.java @@ -32,6 +32,7 @@ public class TestTopicCleanupServiceForParentController { @BeforeTest public void setUp() { admin = mock(Admin.class); + doReturn(true).when(admin).isParent(); topicManager = mock(TopicManager.class); doReturn(topicManager).when(admin).getTopicManager(); VeniceControllerMultiClusterConfig config = mock(VeniceControllerMultiClusterConfig.class);