Skip to content

Commit

Permalink
Delete stale index routing table files.
Browse files Browse the repository at this point in the history
Signed-off-by: Shailendra Singh <singhlhs@amazon.com>
  • Loading branch information
Shailendra Singh committed Jun 10, 2024
1 parent 1b36ee4 commit 5d5842b
Show file tree
Hide file tree
Showing 5 changed files with 175 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.io.IOUtils;
Expand All @@ -20,6 +21,7 @@
import org.opensearch.repositories.blobstore.BlobStoreRepository;

import java.io.IOException;
import java.util.List;
import java.util.function.Supplier;

import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled;
Expand Down Expand Up @@ -61,6 +63,18 @@ protected void doStart() {
blobStoreRepository = (BlobStoreRepository) repository;
}

public void deleteStaleIndexRoutingPaths(List<String> stalePaths) throws IOException {
try {
System.out.println("Deleting stale index routing files from remote - " + stalePaths);
logger.debug(() -> "Deleting stale index routing files from remote - " + stalePaths);

blobStoreRepository.blobStore().blobContainer(BlobPath.cleanPath()).deleteBlobsIgnoringIfNotExists(stalePaths);
} catch (IOException e) {
logger.error("Error while deleting stale index routing files", e);
throw e;
}
}

@Override
protected void doStop() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Strings;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.routing.remote.RemoteRoutingTableService;
import org.opensearch.cluster.service.ClusterApplierService;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.blobstore.BlobMetadata;
Expand All @@ -31,6 +32,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;

Expand Down Expand Up @@ -74,8 +76,13 @@ public class RemoteClusterStateCleanupManager implements Closeable {
private long lastCleanupAttemptStateVersion;
private final ThreadPool threadpool;
private final ClusterApplierService clusterApplierService;
private final Optional<RemoteRoutingTableService> remoteRoutingTableService;

public RemoteClusterStateCleanupManager(RemoteClusterStateService remoteClusterStateService, ClusterService clusterService) {
public RemoteClusterStateCleanupManager(
RemoteClusterStateService remoteClusterStateService,
ClusterService clusterService,
Optional<RemoteRoutingTableService> remoteRoutingTableService
) {
this.remoteClusterStateService = remoteClusterStateService;
this.remoteStateStats = remoteClusterStateService.getStats();
ClusterSettings clusterSettings = clusterService.getClusterSettings();
Expand All @@ -84,6 +91,7 @@ public RemoteClusterStateCleanupManager(RemoteClusterStateService remoteClusterS
this.threadpool = remoteClusterStateService.getThreadpool();
// initialize with 0, a cleanup will be done when this node is elected master node and version is incremented more than threshold
this.lastCleanupAttemptStateVersion = 0;
this.remoteRoutingTableService = remoteRoutingTableService;
clusterSettings.addSettingsUpdateConsumer(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING, this::updateCleanupInterval);
}

Expand Down Expand Up @@ -171,6 +179,7 @@ void deleteClusterMetadata(
Set<String> staleManifestPaths = new HashSet<>();
Set<String> staleIndexMetadataPaths = new HashSet<>();
Set<String> staleGlobalMetadataPaths = new HashSet<>();
Set<String> staleIndexRoutingPaths = new HashSet<>();
activeManifestBlobMetadata.forEach(blobMetadata -> {
ClusterMetadataManifest clusterMetadataManifest = remoteClusterStateService.fetchRemoteClusterMetadataManifest(
clusterName,
Expand All @@ -189,6 +198,10 @@ void deleteClusterMetadata(
.values()
.forEach(attribute -> filesToKeep.add(attribute.getUploadedFilename()));
}
if (remoteRoutingTableService.isPresent() && clusterMetadataManifest.getIndicesRouting() != null) {
clusterMetadataManifest.getIndicesRouting()
.forEach(uploadedIndicesRouting -> filesToKeep.add(uploadedIndicesRouting.getUploadedFilename()));
}
});
staleManifestBlobMetadata.forEach(blobMetadata -> {
ClusterMetadataManifest clusterMetadataManifest = remoteClusterStateService.fetchRemoteClusterMetadataManifest(
Expand Down Expand Up @@ -221,6 +234,14 @@ void deleteClusterMetadata(
attribute -> addStaleGlobalMetadataPath(attribute.getUploadedFilename(), filesToKeep, staleGlobalMetadataPaths)
);
}
if (remoteRoutingTableService.isPresent() && clusterMetadataManifest.getIndicesRouting() != null) {
clusterMetadataManifest.getIndicesRouting().forEach(uploadedIndicesRouting -> {
if (!filesToKeep.contains(uploadedIndicesRouting.getUploadedFilename())) {
staleIndexRoutingPaths.add(uploadedIndicesRouting.getUploadedFilename());
logger.debug(() -> "Indices routing paths in stale manifest: " + uploadedIndicesRouting.getUploadedFilename());
}
});
}

clusterMetadataManifest.getIndices().forEach(uploadedIndexMetadata -> {
if (filesToKeep.contains(uploadedIndexMetadata.getUploadedFilename()) == false) {
Expand All @@ -240,6 +261,13 @@ void deleteClusterMetadata(
deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleGlobalMetadataPaths));
deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleIndexMetadataPaths));
deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleManifestPaths));
remoteRoutingTableService.ifPresent(remoteroutingTableService -> {
try {
remoteroutingTableService.deleteStaleIndexRoutingPaths(new ArrayList<>(staleIndexRoutingPaths));
} catch (IOException e) {
throw new RuntimeException(e);
}
});
} catch (IllegalStateException e) {
logger.error("Error while fetching Remote Cluster Metadata manifests", e);
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,11 +254,11 @@ public RemoteClusterStateService(
clusterSettings.addSettingsUpdateConsumer(GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING, this::setGlobalMetadataUploadTimeout);
clusterSettings.addSettingsUpdateConsumer(METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING, this::setMetadataManifestUploadTimeout);
this.remoteStateStats = new RemotePersistenceStats();
this.remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager(this, clusterService);
this.indexMetadataUploadListeners = indexMetadataUploadListeners;
this.remoteRoutingTableService = isRemoteRoutingTableEnabled(settings)
? Optional.of(new RemoteRoutingTableService(repositoriesService, settings))
: Optional.empty();
this.remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager(this, clusterService, this.remoteRoutingTableService);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@

package org.opensearch.cluster.routing.remote;

import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.BlobStore;
import org.opensearch.common.compress.DeflateCompressor;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.repositories.FilterRepository;
Expand All @@ -18,20 +23,33 @@
import org.junit.After;
import org.junit.Before;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.function.Supplier;

import org.mockito.Mockito;

import static org.opensearch.common.util.FeatureFlags.REMOTE_PUBLICATION_EXPERIMENTAL;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class RemoteRoutingTableServiceTests extends OpenSearchTestCase {

private RemoteRoutingTableService remoteRoutingTableService;

private ClusterSettings clusterSettings;
private Supplier<RepositoriesService> repositoriesServiceSupplier;
private RepositoriesService repositoriesService;
private BlobStoreRepository blobStoreRepository;
private BlobStore blobStore;
private BlobContainer blobContainer;
private BlobPath basePath;

@Before
public void setup() {
Expand All @@ -43,9 +61,15 @@ public void setup() {
.put("node.attr." + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, "routing_repository")
.build();

clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);

blobStoreRepository = mock(BlobStoreRepository.class);
when(blobStoreRepository.getCompressor()).thenReturn(new DeflateCompressor());
blobStore = mock(BlobStore.class);
blobContainer = mock(BlobContainer.class);
when(repositoriesService.repository("routing_repository")).thenReturn(blobStoreRepository);

when(blobStoreRepository.blobStore()).thenReturn(blobStore);
Settings nodeSettings = Settings.builder().put(REMOTE_PUBLICATION_EXPERIMENTAL, "true").build();
FeatureFlags.initializeFeatureFlags(nodeSettings);

Expand Down Expand Up @@ -74,4 +98,27 @@ public void testFailStartWhenNotBlobRepository() {
assertThrows(AssertionError.class, () -> remoteRoutingTableService.start());
}

public void testDeleteStaleIndexRoutingPaths() throws IOException {
doNothing().when(blobContainer).deleteBlobsIgnoringIfNotExists(any());
when(blobStore.blobContainer(any())).thenReturn(blobContainer);
List<String> stalePaths = Arrays.asList("path1", "path2");
remoteRoutingTableService.doStart();
remoteRoutingTableService.deleteStaleIndexRoutingPaths(stalePaths);
verify(blobContainer).deleteBlobsIgnoringIfNotExists(stalePaths);
}

public void testDeleteStaleIndexRoutingPathsThrowsIOException() throws IOException {
when(blobStore.blobContainer(any())).thenReturn(blobContainer);
List<String> stalePaths = Arrays.asList("path1", "path2");
// Simulate an IOException
doThrow(new IOException("test exception")).when(blobContainer).deleteBlobsIgnoringIfNotExists(Mockito.anyList());

remoteRoutingTableService.doStart();
IOException thrown = assertThrows(IOException.class, () -> {
remoteRoutingTableService.deleteStaleIndexRoutingPaths(stalePaths);
});
assertEquals("test exception", thrown.getMessage());
verify(blobContainer).deleteBlobsIgnoringIfNotExists(stalePaths);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.remote.RemoteRoutingTableService;
import org.opensearch.cluster.service.ClusterApplierService;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.blobstore.BlobContainer;
Expand Down Expand Up @@ -39,6 +40,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
Expand All @@ -47,6 +49,7 @@

import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V1;
import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V2;
import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V3;
import static org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata;
import static org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute;
import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.AsyncStaleFileDeletion;
Expand Down Expand Up @@ -76,6 +79,7 @@
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand All @@ -92,6 +96,9 @@ public class RemoteClusterStateCleanupManagerTests extends OpenSearchTestCase {
private ClusterState clusterState;
private Metadata metadata;
private RemoteClusterStateService remoteClusterStateService;
private Optional<RemoteRoutingTableService> remoteRoutingTableService;

private RemoteRoutingTableService mockedRemoteRoutingTableService;
private final ThreadPool threadPool = new TestThreadPool(getClass().getName());

@Before
Expand Down Expand Up @@ -139,7 +146,14 @@ public void setup() {
when(remoteClusterStateService.getStats()).thenReturn(new RemotePersistenceStats());
when(remoteClusterStateService.getThreadpool()).thenReturn(threadPool);
when(remoteClusterStateService.getBlobStore()).thenReturn(blobStore);
remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager(remoteClusterStateService, clusterService);
mockedRemoteRoutingTableService = mock(RemoteRoutingTableService.class);
remoteRoutingTableService = Optional.of(mockedRemoteRoutingTableService);
when(remoteClusterStateService.getBlobStore()).thenReturn(blobStore);
remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager(
remoteClusterStateService,
clusterService,
remoteRoutingTableService
);
}

@After
Expand All @@ -155,11 +169,13 @@ public void testDeleteClusterMetadata() throws IOException {
List<BlobMetadata> inactiveBlobs = Arrays.asList(
new PlainBlobMetadata("manifest1.dat", 1L),
new PlainBlobMetadata("manifest2.dat", 1L),
new PlainBlobMetadata("manifest3.dat", 1L)
new PlainBlobMetadata("manifest3.dat", 1L),
new PlainBlobMetadata("manifest6.dat", 1L)
);
List<BlobMetadata> activeBlobs = Arrays.asList(
new PlainBlobMetadata("manifest4.dat", 1L),
new PlainBlobMetadata("manifest5.dat", 1L)
new PlainBlobMetadata("manifest5.dat", 1L),
new PlainBlobMetadata("manifest7.dat", 1L)
);
UploadedIndexMetadata index1Metadata = new UploadedIndexMetadata("index1", "indexUUID1", "index_metadata1");
UploadedIndexMetadata index2Metadata = new UploadedIndexMetadata("index2", "indexUUID2", "index_metadata2");
Expand Down Expand Up @@ -199,6 +215,45 @@ public void testDeleteClusterMetadata() throws IOException {
.settingMetadata(settingMetadataUpdated)
.build();

UploadedIndexMetadata index3Metadata = new UploadedIndexMetadata("index3", "indexUUID3", "index_metadata3");
UploadedIndexMetadata index4Metadata = new UploadedIndexMetadata("index4", "indexUUID4", "index_metadata4");
List<UploadedIndexMetadata> indicesRouting1 = List.of(index3Metadata, index4Metadata);
List<UploadedIndexMetadata> indicesRouting2 = List.of(index4Metadata);
ClusterMetadataManifest manifest6 = ClusterMetadataManifest.builder()
.indices(List.of(index1Metadata))
.coordinationMetadata(coordinationMetadataUpdated)
.templatesMetadata(templateMetadataUpdated)
.settingMetadata(settingMetadataUpdated)
.clusterTerm(1L)
.stateVersion(1L)
.codecVersion(CODEC_V3)
.stateUUID(randomAlphaOfLength(10))
.clusterUUID(clusterUUID)
.nodeId("nodeA")
.opensearchVersion(VersionUtils.randomOpenSearchVersion(random()))
.previousClusterUUID(ClusterState.UNKNOWN_UUID)
.committed(true)
.routingTableVersion(0L)
.indicesRouting(indicesRouting1)
.build();
ClusterMetadataManifest manifest7 = ClusterMetadataManifest.builder()
.indices(List.of(index2Metadata))
.coordinationMetadata(coordinationMetadataUpdated)
.templatesMetadata(templateMetadataUpdated)
.settingMetadata(settingMetadataUpdated)
.clusterTerm(1L)
.stateVersion(1L)
.codecVersion(CODEC_V3)
.stateUUID(randomAlphaOfLength(10))
.clusterUUID(clusterUUID)
.nodeId("nodeA")
.opensearchVersion(VersionUtils.randomOpenSearchVersion(random()))
.previousClusterUUID(ClusterState.UNKNOWN_UUID)
.committed(true)
.routingTableVersion(0L)
.indicesRouting(indicesRouting2)
.build();

// active manifest have reference to index1Updated, index2, settingsUpdated, coordinationUpdated, templates, templatesUpdated
ClusterMetadataManifest manifest4 = ClusterMetadataManifest.builder(manifest3)
.coordinationMetadata(coordinationMetadataUpdated)
Expand All @@ -208,10 +263,13 @@ public void testDeleteClusterMetadata() throws IOException {
when(remoteClusterStateService.fetchRemoteClusterMetadataManifest(eq(clusterName), eq(clusterUUID), any())).thenReturn(
manifest4,
manifest5,
manifest7,
manifest1,
manifest2,
manifest3
manifest3,
manifest6
);

BlobContainer container = mock(BlobContainer.class);
when(blobStore.blobContainer(any())).thenReturn(container);
doNothing().when(container).deleteBlobsIgnoringIfNotExists(any());
Expand All @@ -231,9 +289,31 @@ public void testDeleteClusterMetadata() throws IOException {
+ ".dat"
)
);

verify(mockedRemoteRoutingTableService).deleteStaleIndexRoutingPaths(List.of(index3Metadata.getUploadedFilename()));
Set<String> staleManifest = new HashSet<>();
inactiveBlobs.forEach(blob -> staleManifest.add(new BlobPath().add(MANIFEST_PATH_TOKEN).buildAsString() + blob.name()));
verify(container).deleteBlobsIgnoringIfNotExists(new ArrayList<>(staleManifest));

// Test case when remoteRoutingTableService is null
ClusterService clusterService = mock(ClusterService.class);
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
when(clusterState.getClusterName()).thenReturn(new ClusterName("test"));
when(metadata.clusterUUID()).thenReturn("testUUID");
when(clusterState.metadata()).thenReturn(metadata);
when(clusterApplierService.state()).thenReturn(clusterState);
when(clusterService.getClusterApplierService()).thenReturn(clusterApplierService);

mockedRemoteRoutingTableService = mock(RemoteRoutingTableService.class);
remoteRoutingTableService = Optional.of(mockedRemoteRoutingTableService);

remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager(
remoteClusterStateService,
clusterService,
Optional.empty()
);
remoteClusterStateCleanupManager.deleteClusterMetadata(clusterName, clusterUUID, activeBlobs, inactiveBlobs);
verify(mockedRemoteRoutingTableService, never()).deleteStaleIndexRoutingPaths(any());
}

public void testDeleteStaleClusterUUIDs() throws IOException {
Expand Down

0 comments on commit 5d5842b

Please sign in to comment.