Skip to content

Commit

Permalink
[controller] Fix hybrid store migration failure during ongoing batch …
Browse files Browse the repository at this point in the history
…push jobs (#1360)

This PR fixes an issue where hybrid store migration fails if a batch push job is ongoing
and a non-truncated version topic exists in the parent region.

The problem stemmed from a safeguard intended to prevent hybrid-to-batch conversion during
ongoing push jobs (added to address targeted region push issues). The safeguard failed to
account for store migration scenarios where hybrid configurations are added during the
updateStore operation.

This misclassification of the migration attempt as a batch-to-hybrid store conversion
caused unintended operations, including a call to VPHA::getTopicForCurrentPushJob. This
method attempts to issue a killOfflinePushJob when the version config is missing in the
store’s configuration. The kill job, in turn, tries to update the version status to
KILLED, which does not exist, which leads to a version not found exception from
AbstractStore::updateVersionStatus.
  • Loading branch information
sushantmane authored Dec 3, 2024
1 parent 3a99bd9 commit f653cdd
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import com.linkedin.venice.common.VeniceSystemStoreType;
import com.linkedin.venice.common.VeniceSystemStoreUtils;
import com.linkedin.venice.compression.CompressionStrategy;
import com.linkedin.venice.controller.Admin;
import com.linkedin.venice.controller.VeniceHelixAdmin;
import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.ControllerResponse;
Expand All @@ -51,6 +52,7 @@
import com.linkedin.venice.integration.utils.DaVinciTestContext;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.VeniceClusterWrapper;
import com.linkedin.venice.integration.utils.VeniceControllerWrapper;
import com.linkedin.venice.integration.utils.VeniceMultiClusterWrapper;
import com.linkedin.venice.integration.utils.VeniceRouterWrapper;
import com.linkedin.venice.integration.utils.VeniceTwoLayerMultiRegionMultiClusterWrapper;
Expand All @@ -60,6 +62,9 @@
import com.linkedin.venice.participant.protocol.ParticipantMessageKey;
import com.linkedin.venice.participant.protocol.ParticipantMessageValue;
import com.linkedin.venice.participant.protocol.enums.ParticipantMessageType;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pubsub.manager.TopicManager;
import com.linkedin.venice.pushstatushelper.PushStatusStoreReader;
import com.linkedin.venice.system.store.MetaStoreDataType;
import com.linkedin.venice.systemstore.schemas.StoreMetaKey;
Expand Down Expand Up @@ -102,6 +107,7 @@ public class TestStoreMigration {
private static final int RECORD_COUNT = 20;
private static final String NEW_OWNER = "newtest@linkedin.com";
private static final String FABRIC0 = "dc-0";
private static final PubSubTopicRepository PUBSUB_TOPIC_REPOSITORY = new PubSubTopicRepository();

private VeniceTwoLayerMultiRegionMultiClusterWrapper twoLayerMultiRegionMultiClusterWrapper;
private VeniceMultiClusterWrapper multiClusterWrapper;
Expand Down Expand Up @@ -626,6 +632,104 @@ public void testStoreMigrationStaleKillIngestionMessageDeletion() {
verifyKillMessageInParticipantStore(destClusterWrapper, currentVersionTopicName, false);
}

/**
* Verifies the behavior where a hybrid store migration fails if there is an ongoing batch push job
* with a non-truncated version topic in the parent region.
* The failure occurs because a safeguard, intended to prevent hybrid-to-batch conversion during
* an ongoing push job with deferred version swapping functionality, does not account for the store migration
* scenario where hybrid configurations are added as part of the update store operation.
*/
@Test(timeOut = TEST_TIMEOUT)
public void testStoreMigrationWithPushJobTrackingVTInParentRegion() throws Exception {
String storeName = Utils.getUniqueString("testWithFailedAttempt");
createAndPushStore(srcClusterName, storeName);

try (ControllerClient srcParentControllerClient = new ControllerClient(srcClusterName, parentControllerUrl);
ControllerClient destParentControllerClient = new ControllerClient(destClusterName, parentControllerUrl)) {
StoreResponse storeResponse = TestUtils.assertCommand(srcParentControllerClient.getStore(storeName));
StoreInfo storeInfo = storeResponse.getStore();
assertNotNull(storeInfo);

// Create a dummy version topic to simulate the edge case where a push job is in progress, and the version
// topic exists in the parent region. VT in parent region is used for tracking ongoing batch push job.
VeniceControllerWrapper parentControllerWrapper =
twoLayerMultiRegionMultiClusterWrapper.getLeaderParentControllerWithRetries(srcClusterName);
PubSubTopic dummyVersionTopic = PUBSUB_TOPIC_REPOSITORY.getTopic(Version.composeKafkaTopic(storeName, 9999));
Admin admin = parentControllerWrapper.getVeniceAdmin();
TopicManager parentTopicManager = admin.getTopicManager();
parentTopicManager.createTopic(dummyVersionTopic, 1, 1, true);
TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, () -> {
assertTrue(
parentTopicManager.containsTopic(dummyVersionTopic),
"Dummy version topic: " + dummyVersionTopic + " not created");
});

assertFalse(
admin.isTopicTruncated(dummyVersionTopic.getName()),
"Dummy version topic: " + dummyVersionTopic + " should not be truncated");

StoreMigrationTestUtil.startMigration(parentControllerUrl, storeName, srcClusterName, destClusterName);
// Ensure migration status is updated in source parent controller
TestUtils.waitForNonDeterministicAssertion(
30,
TimeUnit.SECONDS,
() -> assertTrue(srcParentControllerClient.getStore(storeName).getStore().isMigrating()));

// Store migration status output via closure PrintFunction
Set<String> statusOutput = new HashSet<String>();
AdminTool.PrintFunction printFunction = (message) -> {
statusOutput.add(message.trim());
System.err.println(message);
};

TestUtils.waitForNonDeterministicAssertion(60, TimeUnit.SECONDS, () -> {
statusOutput.clear();
StoreMigrationTestUtil
.checkMigrationStatus(parentControllerUrl, storeName, srcClusterName, destClusterName, printFunction);
assertTrue(
statusOutput
.contains(storeName + " belongs to cluster " + srcClusterName + " according to cluster discovery"));
assertTrue(statusOutput.contains(storeName + " exists in this cluster " + destClusterName));
});

StoreMigrationTestUtil
.completeMigration(parentControllerUrl, storeName, srcClusterName, destClusterName, FABRIC0);
StoreMigrationTestUtil
.endMigration(parentControllerUrl, childControllerUrl0, storeName, srcClusterName, destClusterName);
TestUtils.waitForNonDeterministicAssertion(60, TimeUnit.SECONDS, () -> {
// Store migration status output via closure PrintFunction
statusOutput.clear();
StoreMigrationTestUtil
.checkMigrationStatus(parentControllerUrl, storeName, srcClusterName, destClusterName, printFunction);
assertTrue(
statusOutput
.contains(storeName + " belongs to cluster " + destClusterName + " according to cluster discovery"));
assertTrue(statusOutput.contains(storeName + " exists in this cluster " + destClusterName));
});

assertTrue(srcParentControllerClient.getStore(storeName).isError());
StoreResponse destStoreResponse = TestUtils.assertCommand(destParentControllerClient.getStore(storeName));
StoreInfo destStoreInfo = destStoreResponse.getStore();
assertNotNull(destStoreInfo);
assertFalse(destStoreInfo.isMigrating());
assertFalse(destStoreInfo.isMigrationDuplicateStore());
}

try (ControllerClient childControllerClient0 = new ControllerClient(destClusterName, childControllerUrl0)) {
TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, () -> {
StoreResponse response = childControllerClient0.getStore(storeName);
StoreInfo storeInfo = response.getStore();
assertNotNull(storeInfo);
StoreResponse destStoreResponse = TestUtils.assertCommand(childControllerClient0.getStore(storeName));
StoreInfo destStoreInfo = destStoreResponse.getStore();
assertNotNull(destStoreInfo);
assertFalse(destStoreInfo.isMigrating());
assertFalse(destStoreInfo.isMigrationDuplicateStore());
assertEquals(destStoreInfo.getCurrentVersion(), 1);
});
}
}

/**
* Tests store migration after a failed attempt. This test creates a store and induces a kill message in the
* participant store for the current version topic in the destination cluster. It then starts and completes the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2301,7 +2301,9 @@ public void updateStore(String clusterName, String storeName, UpdateStoreQueryPa
setStore.storeName = storeName;
setStore.owner = owner.map(addToUpdatedConfigList(updatedConfigsList, OWNER)).orElseGet(currStore::getOwner);

if (!currStore.isHybrid() && (hybridRewindSeconds.isPresent() || hybridOffsetLagThreshold.isPresent())) {
boolean isUpdateForStoreMigration = storeMigration.orElse(false);
if (!isUpdateForStoreMigration && !currStore.isHybrid()
&& (hybridRewindSeconds.isPresent() || hybridOffsetLagThreshold.isPresent())) {
// Today target colo pushjob cannot handle hybrid stores, so if a batch push is running, fail the request
Optional<String> currentPushTopic = getTopicForCurrentPushJob(clusterName, storeName, false, false);
if (currentPushTopic.isPresent()) {
Expand Down

0 comments on commit f653cdd

Please sign in to comment.