Skip to content

Commit

Permalink
Delete stale index routing table files
Browse files Browse the repository at this point in the history
Signed-off-by: Shailendra Singh <singhlhs@amazon.com>
  • Loading branch information
Shailendra Singh committed Jun 11, 2024
1 parent 1084ba9 commit c8e2947
Show file tree
Hide file tree
Showing 7 changed files with 159 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -300,4 +300,15 @@ protected void doStart() {
@Override
protected void doStop() {}

@Override
public void deleteStaleIndexRoutingPaths(List<String> stalePaths) throws IOException {
try {
logger.debug(() -> "Deleting stale index routing files from remote - " + stalePaths);
blobStoreRepository.blobStore().blobContainer(BlobPath.cleanPath()).deleteBlobsIgnoringIfNotExists(stalePaths);
} catch (IOException e) {
logger.error("Error while deleting stale index routing files", e);
throw e;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,9 @@ protected void doStop() {
protected void doClose() throws IOException {
// noop
}

@Override
public void deleteStaleIndexRoutingPaths(List<String> stalePaths) throws IOException {
// noop
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,5 +60,6 @@ List<ClusterMetadataManifest.UploadedIndexMetadata> getAllUploadedIndicesRouting
List<ClusterMetadataManifest.UploadedIndexMetadata> indicesRoutingUploaded,
List<String> indicesRoutingToDelete
);
public void deleteStaleIndexRoutingPaths(List<String> stalePaths) throws IOException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Strings;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.routing.remote.RemoteRoutingTableService;
import org.opensearch.cluster.routing.remote.RemoteRoutingTableServiceFactory;
import org.opensearch.cluster.service.ClusterApplierService;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.blobstore.BlobMetadata;
Expand All @@ -31,6 +33,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;

Expand Down Expand Up @@ -74,8 +77,13 @@ public class RemoteClusterStateCleanupManager implements Closeable {
private long lastCleanupAttemptStateVersion;
private final ThreadPool threadpool;
private final ClusterApplierService clusterApplierService;
private final RemoteRoutingTableService remoteRoutingTableService;

public RemoteClusterStateCleanupManager(RemoteClusterStateService remoteClusterStateService, ClusterService clusterService) {
public RemoteClusterStateCleanupManager(
RemoteClusterStateService remoteClusterStateService,
ClusterService clusterService,
RemoteRoutingTableService remoteRoutingTableService
) {
this.remoteClusterStateService = remoteClusterStateService;
this.remoteStateStats = remoteClusterStateService.getStats();
ClusterSettings clusterSettings = clusterService.getClusterSettings();
Expand All @@ -85,6 +93,7 @@ public RemoteClusterStateCleanupManager(RemoteClusterStateService remoteClusterS
// initialize with 0, a cleanup will be done when this node is elected master node and version is incremented more than threshold
this.lastCleanupAttemptStateVersion = 0;
clusterSettings.addSettingsUpdateConsumer(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING, this::updateCleanupInterval);
this.remoteRoutingTableService = remoteRoutingTableService;
}

void start() {
Expand Down Expand Up @@ -171,6 +180,7 @@ void deleteClusterMetadata(
Set<String> staleManifestPaths = new HashSet<>();
Set<String> staleIndexMetadataPaths = new HashSet<>();
Set<String> staleGlobalMetadataPaths = new HashSet<>();
Set<String> staleIndexRoutingPaths = new HashSet<>();
activeManifestBlobMetadata.forEach(blobMetadata -> {
ClusterMetadataManifest clusterMetadataManifest = remoteClusterStateService.fetchRemoteClusterMetadataManifest(
clusterName,
Expand All @@ -189,6 +199,10 @@ void deleteClusterMetadata(
.values()
.forEach(attribute -> filesToKeep.add(attribute.getUploadedFilename()));
}
if (clusterMetadataManifest.getIndicesRouting() != null) {
clusterMetadataManifest.getIndicesRouting()
.forEach(uploadedIndicesRouting -> filesToKeep.add(uploadedIndicesRouting.getUploadedFilename()));
}
});
staleManifestBlobMetadata.forEach(blobMetadata -> {
ClusterMetadataManifest clusterMetadataManifest = remoteClusterStateService.fetchRemoteClusterMetadataManifest(
Expand Down Expand Up @@ -221,6 +235,14 @@ void deleteClusterMetadata(
attribute -> addStaleGlobalMetadataPath(attribute.getUploadedFilename(), filesToKeep, staleGlobalMetadataPaths)
);
}
if (clusterMetadataManifest.getIndicesRouting() != null) {
clusterMetadataManifest.getIndicesRouting().forEach(uploadedIndicesRouting -> {
if (!filesToKeep.contains(uploadedIndicesRouting.getUploadedFilename())) {
staleIndexRoutingPaths.add(uploadedIndicesRouting.getUploadedFilename());
logger.debug(() -> "Indices routing paths in stale manifest: " + uploadedIndicesRouting.getUploadedFilename());
}
});
}

clusterMetadataManifest.getIndices().forEach(uploadedIndexMetadata -> {
if (filesToKeep.contains(uploadedIndexMetadata.getUploadedFilename()) == false) {
Expand All @@ -240,6 +262,11 @@ void deleteClusterMetadata(
deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleGlobalMetadataPaths));
deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleIndexMetadataPaths));
deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleManifestPaths));
try {
remoteRoutingTableService.deleteStaleIndexRoutingPaths(new ArrayList<>(staleIndexRoutingPaths));
} catch (IOException e){
throw new RuntimeException(e);
}
} catch (IllegalStateException e) {
logger.error("Error while fetching Remote Cluster Metadata manifests", e);
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,9 +264,9 @@ public RemoteClusterStateService(
clusterSettings.addSettingsUpdateConsumer(GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING, this::setGlobalMetadataUploadTimeout);
clusterSettings.addSettingsUpdateConsumer(METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING, this::setMetadataManifestUploadTimeout);
this.remoteStateStats = new RemotePersistenceStats();
this.remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager(this, clusterService);
this.indexMetadataUploadListeners = indexMetadataUploadListeners;
this.remoteRoutingTableService = RemoteRoutingTableServiceFactory.getService(repositoriesService, settings, clusterSettings);
this.remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager(this, clusterService, remoteRoutingTableService);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
import org.opensearch.common.blobstore.BlobStore;
import org.opensearch.common.blobstore.stream.write.WriteContext;
import org.opensearch.common.blobstore.stream.write.WritePriority;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.BlobStore;
import org.opensearch.common.compress.DeflateCompressor;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
Expand Down Expand Up @@ -56,6 +59,12 @@

import static org.opensearch.cluster.routing.remote.InternalRemoteRoutingTableService.INDEX_ROUTING_FILE_PREFIX;
import static org.opensearch.cluster.routing.remote.InternalRemoteRoutingTableService.INDEX_ROUTING_PATH_TOKEN;
import java.util.Arrays;
import java.util.List;
import java.util.function.Supplier;

import org.mockito.Mockito;

import static org.opensearch.common.util.FeatureFlags.REMOTE_PUBLICATION_EXPERIMENTAL;
import static org.opensearch.gateway.remote.RemoteClusterStateService.DELIMITER;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY;
Expand All @@ -68,12 +77,19 @@
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class RemoteRoutingTableServiceTests extends OpenSearchTestCase {

private InternalRemoteRoutingTableService remoteRoutingTableService;

//private RemoteRoutingTableService remoteRoutingTableService;

private ClusterSettings clusterSettings;
private Supplier<RepositoriesService> repositoriesServiceSupplier;
private RepositoriesService repositoriesService;
private BlobStoreRepository blobStoreRepository;
Expand All @@ -92,13 +108,16 @@ public void setup() {
.put(FsRepository.REPOSITORIES_COMPRESS_SETTING.getKey(), false)
.build();

clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);

blobStoreRepository = mock(BlobStoreRepository.class);
when(blobStoreRepository.getCompressor()).thenReturn(new DeflateCompressor());
blobStore = mock(BlobStore.class);
blobContainer = mock(BlobContainer.class);
when(repositoriesService.repository("routing_repository")).thenReturn(blobStoreRepository);
when(blobStoreRepository.blobStore()).thenReturn(blobStore);

when(blobStoreRepository.blobStore()).thenReturn(blobStore);
Settings nodeSettings = Settings.builder().put(REMOTE_PUBLICATION_EXPERIMENTAL, "true").build();
FeatureFlags.initializeFeatureFlags(nodeSettings);

Expand Down Expand Up @@ -552,4 +571,27 @@ private BlobPath getPath() {
RemoteStoreEnums.PathHashAlgorithm.FNV_1A_BASE64
);
}
public void testDeleteStaleIndexRoutingPaths() throws IOException {
doNothing().when(blobContainer).deleteBlobsIgnoringIfNotExists(any());
when(blobStore.blobContainer(any())).thenReturn(blobContainer);
List<String> stalePaths = Arrays.asList("path1", "path2");
remoteRoutingTableService.doStart();
remoteRoutingTableService.deleteStaleIndexRoutingPaths(stalePaths);
verify(blobContainer).deleteBlobsIgnoringIfNotExists(stalePaths);
}

public void testDeleteStaleIndexRoutingPathsThrowsIOException() throws IOException {
when(blobStore.blobContainer(any())).thenReturn(blobContainer);
List<String> stalePaths = Arrays.asList("path1", "path2");
// Simulate an IOException
doThrow(new IOException("test exception")).when(blobContainer).deleteBlobsIgnoringIfNotExists(Mockito.anyList());

remoteRoutingTableService.doStart();
IOException thrown = assertThrows(IOException.class, () -> {
remoteRoutingTableService.deleteStaleIndexRoutingPaths(stalePaths);
});
assertEquals("test exception", thrown.getMessage());
verify(blobContainer).deleteBlobsIgnoringIfNotExists(stalePaths);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.remote.InternalRemoteRoutingTableService;
import org.opensearch.cluster.routing.remote.NoopRemoteRoutingTableService;
import org.opensearch.cluster.routing.remote.RemoteRoutingTableService;
import org.opensearch.cluster.routing.remote.RemoteRoutingTableServiceFactory;
import org.opensearch.cluster.service.ClusterApplierService;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.blobstore.BlobContainer;
Expand Down Expand Up @@ -47,6 +51,7 @@

import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V1;
import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V2;
import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V3;
import static org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata;
import static org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute;
import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.AsyncStaleFileDeletion;
Expand All @@ -69,13 +74,15 @@
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand All @@ -92,6 +99,8 @@ public class RemoteClusterStateCleanupManagerTests extends OpenSearchTestCase {
private ClusterState clusterState;
private Metadata metadata;
private RemoteClusterStateService remoteClusterStateService;
private RemoteRoutingTableService remoteRoutingTableService;

private final ThreadPool threadPool = new TestThreadPool(getClass().getName());

@Before
Expand All @@ -113,6 +122,7 @@ public void setup() {

Settings settings = Settings.builder()
.put("node.attr." + REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, "remote_store_repository")
.put("node.attr." + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, "routing_repository")
.put(stateRepoTypeAttributeKey, FsRepository.TYPE)
.put(stateRepoSettingsAttributeKeyPrefix + "location", "randomRepoPath")
.put(RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true)
Expand All @@ -139,7 +149,13 @@ public void setup() {
when(remoteClusterStateService.getStats()).thenReturn(new RemotePersistenceStats());
when(remoteClusterStateService.getThreadpool()).thenReturn(threadPool);
when(remoteClusterStateService.getBlobStore()).thenReturn(blobStore);
remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager(remoteClusterStateService, clusterService);
remoteRoutingTableService = mock(InternalRemoteRoutingTableService.class);
when(remoteClusterStateService.getBlobStore()).thenReturn(blobStore);
remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager(
remoteClusterStateService,
clusterService,
remoteRoutingTableService
);
}

@After
Expand All @@ -155,11 +171,13 @@ public void testDeleteClusterMetadata() throws IOException {
List<BlobMetadata> inactiveBlobs = Arrays.asList(
new PlainBlobMetadata("manifest1.dat", 1L),
new PlainBlobMetadata("manifest2.dat", 1L),
new PlainBlobMetadata("manifest3.dat", 1L)
new PlainBlobMetadata("manifest3.dat", 1L),
new PlainBlobMetadata("manifest6.dat", 1L)
);
List<BlobMetadata> activeBlobs = Arrays.asList(
new PlainBlobMetadata("manifest4.dat", 1L),
new PlainBlobMetadata("manifest5.dat", 1L)
new PlainBlobMetadata("manifest5.dat", 1L),
new PlainBlobMetadata("manifest7.dat", 1L)
);
UploadedIndexMetadata index1Metadata = new UploadedIndexMetadata("index1", "indexUUID1", "index_metadata1");
UploadedIndexMetadata index2Metadata = new UploadedIndexMetadata("index2", "indexUUID2", "index_metadata2");
Expand Down Expand Up @@ -199,6 +217,45 @@ public void testDeleteClusterMetadata() throws IOException {
.settingMetadata(settingMetadataUpdated)
.build();

UploadedIndexMetadata index3Metadata = new UploadedIndexMetadata("index3", "indexUUID3", "index_metadata3");
UploadedIndexMetadata index4Metadata = new UploadedIndexMetadata("index4", "indexUUID4", "index_metadata4");
List<UploadedIndexMetadata> indicesRouting1 = List.of(index3Metadata, index4Metadata);
List<UploadedIndexMetadata> indicesRouting2 = List.of(index4Metadata);
ClusterMetadataManifest manifest6 = ClusterMetadataManifest.builder()
.indices(List.of(index1Metadata))
.coordinationMetadata(coordinationMetadataUpdated)
.templatesMetadata(templateMetadataUpdated)
.settingMetadata(settingMetadataUpdated)
.clusterTerm(1L)
.stateVersion(1L)
.codecVersion(CODEC_V3)
.stateUUID(randomAlphaOfLength(10))
.clusterUUID(clusterUUID)
.nodeId("nodeA")
.opensearchVersion(VersionUtils.randomOpenSearchVersion(random()))
.previousClusterUUID(ClusterState.UNKNOWN_UUID)
.committed(true)
.routingTableVersion(0L)
.indicesRouting(indicesRouting1)
.build();
ClusterMetadataManifest manifest7 = ClusterMetadataManifest.builder()
.indices(List.of(index2Metadata))
.coordinationMetadata(coordinationMetadataUpdated)
.templatesMetadata(templateMetadataUpdated)
.settingMetadata(settingMetadataUpdated)
.clusterTerm(1L)
.stateVersion(1L)
.codecVersion(CODEC_V3)
.stateUUID(randomAlphaOfLength(10))
.clusterUUID(clusterUUID)
.nodeId("nodeA")
.opensearchVersion(VersionUtils.randomOpenSearchVersion(random()))
.previousClusterUUID(ClusterState.UNKNOWN_UUID)
.committed(true)
.routingTableVersion(0L)
.indicesRouting(indicesRouting2)
.build();

// active manifest have reference to index1Updated, index2, settingsUpdated, coordinationUpdated, templates, templatesUpdated
ClusterMetadataManifest manifest4 = ClusterMetadataManifest.builder(manifest3)
.coordinationMetadata(coordinationMetadataUpdated)
Expand All @@ -208,10 +265,13 @@ public void testDeleteClusterMetadata() throws IOException {
when(remoteClusterStateService.fetchRemoteClusterMetadataManifest(eq(clusterName), eq(clusterUUID), any())).thenReturn(
manifest4,
manifest5,
manifest7,
manifest1,
manifest2,
manifest3
manifest3,
manifest6
);

BlobContainer container = mock(BlobContainer.class);
when(blobStore.blobContainer(any())).thenReturn(container);
doNothing().when(container).deleteBlobsIgnoringIfNotExists(any());
Expand All @@ -231,9 +291,16 @@ public void testDeleteClusterMetadata() throws IOException {
+ ".dat"
)
);

verify(remoteRoutingTableService).deleteStaleIndexRoutingPaths(List.of(index3Metadata.getUploadedFilename()));
Set<String> staleManifest = new HashSet<>();
inactiveBlobs.forEach(blob -> staleManifest.add(new BlobPath().add(MANIFEST_PATH_TOKEN).buildAsString() + blob.name()));
verify(container).deleteBlobsIgnoringIfNotExists(new ArrayList<>(staleManifest));

// Test case when remoteRoutingTableService is not enabled
remoteRoutingTableService = mock(NoopRemoteRoutingTableService.class);
remoteClusterStateCleanupManager.deleteClusterMetadata(clusterName, clusterUUID, activeBlobs, inactiveBlobs);
verify(remoteRoutingTableService, never()).deleteStaleIndexRoutingPaths(any());
}

public void testDeleteStaleClusterUUIDs() throws IOException {
Expand Down

0 comments on commit c8e2947

Please sign in to comment.