Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Draft] Original changes against main #13

Open
wants to merge 153 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 104 commits
Commits
Show all changes
153 commits
Select commit Hold shift + click to select a range
6bf7bc9
Split the cluster state remote global metadata file to metadata attri…
shiv0408 Jan 24, 2024
5d6a0ad
Upload all the metadata attributes in parallel
shiv0408 Feb 8, 2024
279dbbe
Added javadocs
shiv0408 Feb 20, 2024
0542849
Address PR comment
shiv0408 Feb 29, 2024
c9fb25b
Add Integ test
shiv0408 Mar 12, 2024
fc270d1
Added a bwc test
shiv0408 Mar 13, 2024
adb4cf2
Address PR comments
shiv0408 Mar 14, 2024
0b38736
Modify Custom's fromXContent
shiv0408 Mar 15, 2024
c86c0f1
Remove ClusterMetadataManifest constructor
shiv0408 Mar 18, 2024
3ed92e5
Added tests
shiv0408 Mar 18, 2024
e1f517e
remove stale global metadata files
shiv0408 Mar 19, 2024
cd5c9a5
spotless apply
shiv0408 Mar 19, 2024
2bd97d7
Merge branch 'main' into cluster_state_split
shiv0408 Mar 19, 2024
8008ab2
Merge branch 'main' into remote_state_collab
shiv0408 Apr 3, 2024
33cfeb0
Refactored RemoteClusterStateService into smaller files
shiv0408 Apr 3, 2024
8244c6d
Merge branch 'main' into cluster_state_split
shiv0408 Apr 8, 2024
5f62745
Optimize stale file deletion
shiv0408 Apr 9, 2024
d5a17ed
Initial commit for RemoteRoutingTableService setup
himshikha Apr 19, 2024
9176015
Adds unit test for remote routing setup
Apr 22, 2024
a17cf06
Initial commit for index routing table manifest
Bukhtawar Apr 17, 2024
3dd93b4
Changes for IndexRoutingTableHeader
Bukhtawar Apr 19, 2024
933cdc2
Revert unintentional changes for IndexRoutingTableHeader
Bukhtawar Apr 19, 2024
f35f0dc
Changes for IndexRoutingTableInputStream
Bukhtawar Apr 20, 2024
425cf20
Merge branch 'main' into cluster_state_split
shiv0408 Apr 29, 2024
8efc1ed
fix build after merge
shiv0408 Apr 29, 2024
6c637c0
Merge branch 'main' into cluster_state_split
shiv0408 Apr 30, 2024
55ed245
Optimize remote state stale file deletion
shiv0408 Apr 9, 2024
bef9d2e
Added UT
shiv0408 Apr 11, 2024
6bed64b
Refactored into a clean up manager file
shiv0408 Apr 22, 2024
bb4b71f
Add UT
shiv0408 Apr 29, 2024
1febb68
Modify the Integ test
shiv0408 Apr 30, 2024
494aacc
Address PR comments
shiv0408 Apr 30, 2024
928b650
Address further PR comment
shiv0408 May 2, 2024
2ebfc6d
apply spotless
shiv0408 May 2, 2024
8f5d7d7
Address PR comment
shiv0408 May 2, 2024
4e3c9e9
Add changelog
shiv0408 May 2, 2024
5ea5dbb
apply spotless
shiv0408 May 2, 2024
fb0b6aa
removed unnecessary method
shiv0408 May 3, 2024
cae6632
Input stream fixes
May 6, 2024
40fec5f
Checksum calculation at end of file
May 6, 2024
d5fc147
Add toXContent and fromXContent for DiscoveryNode and DiscoveryNodes
shiv0408 May 7, 2024
b9887b4
fixing checksum
May 7, 2024
15d06bc
test fix
shiv0408 May 7, 2024
87625d3
Add to and from XContent to ClusterBlock and ClusterBlocks
shiv0408 May 9, 2024
032ced2
Address further PR comments
shiv0408 May 10, 2024
dd76f39
Merge branch 'main' into cluster_state_split
shiv0408 May 10, 2024
e81db21
Fix test failures
shiv0408 May 13, 2024
04140ff
Made minor requested changes
shiv0408 May 13, 2024
1a6940f
Merge branch 'main' into async_delete
shiv0408 May 13, 2024
b20537a
Merge branch 'main' into cluster_state_split
shiv0408 May 13, 2024
c20c6ac
Add read flow for IndexRoutingTable
Arpit-Bandejiya May 8, 2024
381630b
Moving routing table version from IndexRouting stream to manifest
May 13, 2024
eb1b7ec
Merge remote-tracking branch 'shiv0408/OpenSearch/cluster_state_split…
May 14, 2024
a10b062
Draft changes for Diff manifest upload
shiv0408 May 14, 2024
acdb97e
Download cluster state from remote
soosinha May 14, 2024
2025f70
Fix default time interval
shiv0408 May 14, 2024
289feb4
Merge remote-tracking branch 'shiv0408/OpenSearch/remote_state_collab…
May 15, 2024
b09bce7
Merging from remote cluster state
May 15, 2024
df8ddad
Download cluster state from remote
soosinha May 14, 2024
df3c088
Added parser for ClusterDiffManifest
shiv0408 May 15, 2024
3ff82b3
Merge branch 'main' into async_delete
shiv0408 May 14, 2024
96cf135
Merge remote-tracking branch 'shiv/async-write' into remote_state_collab
Arpit-Bandejiya May 15, 2024
774f0c8
Fixig build failures
May 15, 2024
bf54d72
Merge pull request #9 from himshikha/async-write
himshikha May 15, 2024
5febe86
spotless changes for ClusterBlocks
shiv0408 May 15, 2024
12632fc
Merge branch 'main' into async_delete
shiv0408 May 16, 2024
bd45e85
remove duplicate diff object from manifest
shiv0408 May 16, 2024
54f1f3d
Add remote-routing table diff
Arpit-Bandejiya May 16, 2024
3825951
Merge branch 'main' into async_delete
shiv0408 May 16, 2024
f222634
Add transport call to publish remote state
soosinha May 16, 2024
560ecb8
Fix EmptyMap warning in StreamReader
Arpit-Bandejiya May 16, 2024
4c33720
Add basic read-remote-routing flow
Arpit-Bandejiya May 16, 2024
3186372
Move diff manifest to new class and fix fromXContent
shiv0408 May 17, 2024
8912b0c
Fix cluster state publication
soosinha May 17, 2024
b604b09
Clean up global metadata attribute objects from remote
shiv0408 May 17, 2024
dc204c1
Refactor remote state
soosinha May 14, 2024
4056877
Add Code for reading from diff file and constructing the Routing Table
Arpit-Bandejiya May 18, 2024
70fae29
Add Remote Routing read in remote cluster state publication
Arpit-Bandejiya May 18, 2024
093d926
Add clusterUUID when downloading state
soosinha May 19, 2024
034f54b
Add fixes for remote routing table
Arpit-Bandejiya May 20, 2024
8201f2a
Merge branch 'async_delete' into remote_state_collab
shiv0408 May 20, 2024
ce63a5a
Fix build failure after merge
shiv0408 May 20, 2024
eeff23f
Added support for reading cluster state files in parallel
shiv0408 May 20, 2024
7242dad
Read diff files asynchronously
shiv0408 May 21, 2024
44e1d25
add missed semicolon
shiv0408 May 21, 2024
9982d79
Fix tests after merge
shiv0408 May 21, 2024
6d33900
Enable Routing Table parallel read
Arpit-Bandejiya May 21, 2024
99687b0
Create new remote state interfaces and add implementations
soosinha May 22, 2024
3e1f31f
Remove RemoteObject Store interface
soosinha May 22, 2024
043a170
Reverting dev testing configuration
soosinha May 23, 2024
3e3dfc8
Refactor discoverynodes and clusterblocks
soosinha May 23, 2024
89a10ac
Async write flow for routing table
May 23, 2024
6ae16f5
Merge pull request #11 from himshikha/async-write
himshikha May 23, 2024
8f0902c
Add metadata version and change codec version to V3
shiv0408 May 23, 2024
7d630e2
Minor Refactoring for RemoteReadResults
Arpit-Bandejiya May 23, 2024
c1cd20c
Set metadata version in cluster state
soosinha May 23, 2024
bb8c208
remove excessive logging
Arpit-Bandejiya May 23, 2024
5dc642e
Merge remote-tracking branch 'refs/remotes/shiv/remote_state_collab' …
Arpit-Bandejiya May 23, 2024
a9ddead
Fix build issue
Arpit-Bandejiya May 23, 2024
f7fef97
Fixing build
soosinha May 23, 2024
ae2ed6b
Add custom metadata diff
shiv0408 May 23, 2024
a96304d
Add RemoteIndexMetadata unit tests
soosinha May 23, 2024
6d1990c
Enable remote routing by default with remote state
Arpit-Bandejiya May 24, 2024
e022d78
Correct the diff creation logic
soosinha May 24, 2024
ca2640b
Delete stale index routing files
May 24, 2024
9aa707e
Fix stale file blob path for indices routing
May 24, 2024
3aaa6a8
Use isRemoteStateNode condition
soosinha May 25, 2024
5d70c59
Add metadata version
soosinha May 25, 2024
6a4c721
Correct blob path for download
soosinha May 25, 2024
fda3031
Custom toXContent implementations
shiv0408 May 25, 2024
1ea5b01
Add indexMetadataListener count to latch
shiv0408 May 25, 2024
9d687fa
Change RemoteObject interface signature
soosinha May 26, 2024
677b632
refactor manifest manager
soosinha May 26, 2024
e12131d
Refactor cleanup manager
soosinha May 26, 2024
f4f773f
Prevent loading ephemeral objects in restore flow
soosinha May 27, 2024
14a279d
Add transient settings to manifest
shiv0408 May 27, 2024
523889c
Create RemoteObjectStore class and corresponding implementations
soosinha May 28, 2024
9b1cab6
Add remote state implementation unit tests
soosinha May 29, 2024
9aeea21
Update interface to use entity U
soosinha May 29, 2024
1952bac
Remove duplicate tests
shiv0408 May 30, 2024
fc9160c
Add parsing for DiffableStringMap
shiv0408 May 30, 2024
35970d9
Add status to cluster block serialization
shiv0408 May 30, 2024
e541318
fix cluster block tests
shiv0408 May 30, 2024
e8fec03
Minor refactoring for read flow
Arpit-Bandejiya May 30, 2024
00f0249
Upload consistent settings and customs object to remote
shiv0408 May 31, 2024
bb335f6
Address remote state interface comments
soosinha Jun 2, 2024
6776c03
Implement diffUtils in RemoteRoutingTable
Arpit-Bandejiya Jun 3, 2024
65a04a2
refactoring in remote routing table service
Arpit-Bandejiya Jun 3, 2024
2afe186
Fix snapshot de/ser
shiv0408 Jun 4, 2024
633b183
Fix snapshot de/ser
shiv0408 Jun 4, 2024
1c0bbf9
Add UTs for RemoteRoutingTableService
Arpit-Bandejiya Jun 4, 2024
f15da96
Merge remote-tracking branch 'refs/remotes/shiv/remote_state_collab' …
Arpit-Bandejiya Jun 4, 2024
0333312
Make arraylist synchronized
soosinha Jun 5, 2024
148eca5
Update remote publication to skip download on master node
soosinha Jun 5, 2024
23f49e5
Add unit tests for uploadBlobAsync
soosinha Jun 5, 2024
7f2391c
Address interface comments
soosinha Jun 5, 2024
caf61a6
read cluster state custom from diff
shiv0408 Jun 4, 2024
e1eddbe
Remove get/set method from interface
soosinha Jun 5, 2024
944f8b2
Merge branch 'main' into remote_state_collab
soosinha Jun 6, 2024
8b71d2c
disable checksum for debugging
soosinha Jun 6, 2024
87ade4c
Use byte buffer
soosinha Jun 6, 2024
5c3e28b
Use InputStreams rather than XContent for serialization
shiv0408 Jun 7, 2024
3147aa3
Add Remote Entity implementation unit tests
soosinha Jun 7, 2024
0742523
Add unit tests for Remote Entity implementation
soosinha Jun 7, 2024
26a12cf
Add publication flag and remote routing table check
Jun 6, 2024
5b83599
Add manifest file name
soosinha Jun 7, 2024
e47d285
Renaming test files
soosinha Jun 7, 2024
79eb089
Send manifest file name
soosinha Jun 7, 2024
fa24988
Send manifest file name in remote publish request
soosinha Jun 7, 2024
5c6f418
Create serde utility for Writable classes
soosinha Jun 8, 2024
a813834
ChecksumWritableBlobStoreFormat Unit test
soosinha Jun 8, 2024
e8c8413
Fixing build
soosinha Jun 8, 2024
4def30f
Handle read ClusterState for V1 manifest
shiv0408 Jun 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add support for Azure Managed Identity in repository-azure ([#12423](https://github.com/opensearch-project/OpenSearch/issues/12423))
- Add useCompoundFile index setting ([#13478](https://github.com/opensearch-project/OpenSearch/pull/13478))
- Make outbound side of transport protocol dependent ([#13293](https://github.com/opensearch-project/OpenSearch/pull/13293))
- [Remote State] Add async remote state deletion task running on a interval, configurable by a setting ([#13131](https://github.com/opensearch-project/OpenSearch/pull/13131))

### Dependencies
- Bump `com.github.spullara.mustache.java:compiler` from 0.9.10 to 0.9.13 ([#13329](https://github.com/opensearch-project/OpenSearch/pull/13329), [#13559](https://github.com/opensearch-project/OpenSearch/pull/13559))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,4 +162,14 @@ public String toString() {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder.value(toString());
}

public static TransportAddress fromString(String address) throws UnknownHostException {
String[] addressSplit = address.split(":");
if (addressSplit.length != 2) {
throw new IllegalArgumentException("address must be of the form [hostname/ip]:[port]");
}
String hostname = addressSplit[0];
int port = Integer.parseInt(addressSplit[1]);
return new TransportAddress(InetAddress.getByName(hostname), port);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import org.opensearch.core.xcontent.XContentParser.Token;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.function.Consumer;

Expand Down Expand Up @@ -178,4 +180,14 @@ public static <T> void parseTypedKeysObject(XContentParser parser, String delimi
throw new ParsingException(parser.getTokenLocation(), "Failed to parse object: empty key");
}
}

public static List<String> parseStringList(XContentParser parser) throws IOException {
List<String> valueList = new ArrayList<>();
ensureExpectedToken(Token.START_ARRAY, parser.currentToken(), parser);
while (parser.nextToken() != Token.END_ARRAY) {
ensureExpectedToken(Token.VALUE_STRING, parser.currentToken(), parser);
valueList.add(parser.text());
}
return valueList;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.gateway.remote;

import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.settings.Settings;
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.CLUSTER_STATE_CLEANUP_INTERVAL_DEFAULT;
import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING;
import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.RETAINED_MANIFESTS;
import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.SKIP_CLEANUP_STATE_CHANGES;
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
import static org.opensearch.indices.IndicesService.CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteClusterStateCleanupManagerIT extends RemoteStoreBaseIntegTestCase {

private static final String INDEX_NAME = "test-index";

@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true).build();
}

private Map<String, Long> initialTestSetup(int shardCount, int replicaCount, int dataNodeCount, int clusterManagerNodeCount) {
prepareCluster(clusterManagerNodeCount, dataNodeCount, INDEX_NAME, replicaCount, shardCount);
Map<String, Long> indexStats = indexData(1, false, INDEX_NAME);
assertEquals(shardCount * (replicaCount + 1), getNumShards(INDEX_NAME).totalNumShards);
ensureGreen(INDEX_NAME);
return indexStats;
}

public void testRemoteCleanupTaskUpdated() {
int shardCount = randomIntBetween(1, 2);
int replicaCount = 1;
int dataNodeCount = shardCount * (replicaCount + 1);
int clusterManagerNodeCount = 1;

initialTestSetup(shardCount, replicaCount, dataNodeCount, clusterManagerNodeCount);
RemoteClusterStateCleanupManager remoteClusterStateCleanupManager = internalCluster().getClusterManagerNodeInstance(
RemoteClusterStateCleanupManager.class
);

assertEquals(CLUSTER_STATE_CLEANUP_INTERVAL_DEFAULT, remoteClusterStateCleanupManager.getStaleFileDeletionTask().getInterval());
assertTrue(remoteClusterStateCleanupManager.getStaleFileDeletionTask().isScheduled());

// now disable
client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(Settings.builder().put(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING.getKey(), -1))
.get();

assertEquals(-1, remoteClusterStateCleanupManager.getStaleFileDeletionTask().getInterval().getMillis());
assertFalse(remoteClusterStateCleanupManager.getStaleFileDeletionTask().isScheduled());

// now set Clean up interval to 1 min
client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(Settings.builder().put(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING.getKey(), "1m"))
.get();
assertEquals(1, remoteClusterStateCleanupManager.getStaleFileDeletionTask().getInterval().getMinutes());
}

public void testRemoteCleanupDeleteStale() throws Exception {
int shardCount = randomIntBetween(1, 2);
int replicaCount = 1;
int dataNodeCount = shardCount * (replicaCount + 1);
int clusterManagerNodeCount = 1;

initialTestSetup(shardCount, replicaCount, dataNodeCount, clusterManagerNodeCount);

// set cleanup interval to 100 ms to make the test faster
ClusterUpdateSettingsResponse response = client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(Settings.builder().put(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING.getKey(), "100ms"))
.get();

assertTrue(response.isAcknowledged());

// update cluster state 21 times to ensure that clean up has run after this will upload 42 manifest files
// to repository, if manifest files are less than that it means clean up has run
updateClusterStateNTimes(RETAINED_MANIFESTS + SKIP_CLEANUP_STATE_CHANGES + 1);

RepositoriesService repositoriesService = internalCluster().getClusterManagerNodeInstance(RepositoriesService.class);
BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(REPOSITORY_NAME);
BlobPath baseMetadataPath = repository.basePath()
.add(
Base64.getUrlEncoder()
.withoutPadding()
.encodeToString(getClusterState().getClusterName().value().getBytes(StandardCharsets.UTF_8))
)
.add("cluster-state")
.add(getClusterState().metadata().clusterUUID());
BlobPath manifestContainerPath = baseMetadataPath.add("manifest");

assertBusy(() -> {
int manifestFiles = repository.blobStore().blobContainer(manifestContainerPath).listBlobsByPrefix("manifest").size();
logger.info("number of current manifest file: {}", manifestFiles);
// we can't guarantee that we have same number of manifest as Retained manifest in our repo as there can be other queued task
// other than replica count change which can upload new manifest files, that's why we check that number of manifests is between
// Retained manifests and Retained manifests + 2 * Skip cleanup state changes (each cluster state update uploads 2 manifests)
assertTrue(
"Current number of manifest files: " + manifestFiles,
manifestFiles >= RETAINED_MANIFESTS && manifestFiles < RETAINED_MANIFESTS + 2 * SKIP_CLEANUP_STATE_CHANGES
);
}, 500, TimeUnit.MILLISECONDS);
}

private void updateClusterStateNTimes(int n) {
int newReplicaCount = randomIntBetween(0, 3);
for (int i = n; i > 0; i--) {
ClusterUpdateSettingsResponse response = client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(Settings.builder().put(CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING.getKey(), i, TimeUnit.SECONDS))
.get();
assertTrue(response.isAcknowledged());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,20 @@
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.gateway.remote.RemoteClusterStateService.COORDINATION_METADATA;
import static org.opensearch.gateway.remote.RemoteClusterStateService.CUSTOM_METADATA;
import static org.opensearch.gateway.remote.RemoteClusterStateService.DELIMITER;
import static org.opensearch.gateway.remote.RemoteClusterStateService.METADATA_FILE_PREFIX;
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING;
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
import static org.opensearch.gateway.remote.RemoteClusterStateService.SETTING_METADATA;
import static org.opensearch.gateway.remote.RemoteClusterStateService.TEMPLATES_METADATA;
import static org.opensearch.gateway.remote.RemoteClusterStateService.RETAINED_MANIFESTS;
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER;
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.METADATA_FILE_PREFIX;
import static org.opensearch.gateway.remote.RemoteGlobalMetadataManager.COORDINATION_METADATA;
import static org.opensearch.gateway.remote.RemoteGlobalMetadataManager.CUSTOM_METADATA;
import static org.opensearch.gateway.remote.RemoteGlobalMetadataManager.SETTING_METADATA;
import static org.opensearch.gateway.remote.RemoteGlobalMetadataManager.TEMPLATES_METADATA;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteClusterStateServiceIT extends RemoteStoreBaseIntegTestCase {
Expand Down Expand Up @@ -63,27 +66,59 @@ private Map<String, Long> initialTestSetup(int shardCount, int replicaCount, int
return indexStats;
}

public void testFullClusterRestoreStaleDelete() throws Exception {
public void testRemoteCleanupTaskUpdated() {
int shardCount = randomIntBetween(1, 2);
int replicaCount = 1;
int dataNodeCount = shardCount * (replicaCount + 1);
int clusterManagerNodeCount = 1;

initialTestSetup(shardCount, replicaCount, dataNodeCount, clusterManagerNodeCount);
setReplicaCount(0);
setReplicaCount(2);
setReplicaCount(0);
setReplicaCount(1);
setReplicaCount(0);
setReplicaCount(1);
setReplicaCount(0);
setReplicaCount(2);
setReplicaCount(0);
RemoteClusterStateService remoteClusterStateService = internalCluster().getClusterManagerNodeInstance(
RemoteClusterStateService.class
);

assertEquals(5, remoteClusterStateService.getStaleFileDeletionTask().getInterval().getMinutes());
assertTrue(remoteClusterStateService.getStaleFileDeletionTask().isScheduled());

// now disable
client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(Settings.builder().put(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING.getKey(), -1))
.get();

assertEquals(-1, remoteClusterStateService.getStaleFileDeletionTask().getInterval().getMillis());
assertFalse(remoteClusterStateService.getStaleFileDeletionTask().isScheduled());

// now set Clean up interval to 1 min
client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(Settings.builder().put(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING.getKey(), "1m"))
.get();
assertEquals(1, remoteClusterStateService.getStaleFileDeletionTask().getInterval().getMinutes());
}

public void testRemoteCleanupOnlyAfter10Updates() throws Exception {
int shardCount = randomIntBetween(1, 2);
int replicaCount = 1;
int dataNodeCount = shardCount * (replicaCount + 1);
int clusterManagerNodeCount = 1;

initialTestSetup(shardCount, replicaCount, dataNodeCount, clusterManagerNodeCount);
RemoteClusterStateService remoteClusterStateService = internalCluster().getClusterManagerNodeInstance(
RemoteClusterStateService.class
);

// set cleanup interval to 1 min
client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(Settings.builder().put(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING.getKey(), "1m"))
.get();

replicaCount = updateReplicaCountNTimes(9, replicaCount);

RepositoriesService repositoriesService = internalCluster().getClusterManagerNodeInstance(RepositoriesService.class);

BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(REPOSITORY_NAME);
Expand All @@ -95,14 +130,39 @@ public void testFullClusterRestoreStaleDelete() throws Exception {
)
.add("cluster-state")
.add(getClusterState().metadata().clusterUUID());
BlobPath manifestContainerPath = baseMetadataPath.add("manifest");

assertBusy(() -> {
assertEquals(
RETAINED_MANIFESTS - 1,
repository.blobStore().blobContainer(manifestContainerPath).listBlobsByPrefix("manifest").size()
);
}, 1, TimeUnit.MINUTES);

replicaCount = updateReplicaCountNTimes(8, replicaCount);

// wait for 1 min, to ensure that clean up task ran and didn't clean up stale files because it was less than 10
Thread.sleep(60000);
assertNotEquals(
RETAINED_MANIFESTS - 1,
repository.blobStore().blobContainer(manifestContainerPath).listBlobsByPrefix("manifest").size()
);

// Do 2 more updates, now since the total successful state changes are more than 10, stale files will be cleaned up
replicaCount = updateReplicaCountNTimes(2, replicaCount);

assertEquals(10, repository.blobStore().blobContainer(baseMetadataPath.add("manifest")).listBlobsByPrefix("manifest").size());
assertBusy(() -> {
assertEquals(
RETAINED_MANIFESTS - 1,
repository.blobStore().blobContainer(manifestContainerPath).listBlobsByPrefix("manifest").size()
);
}, 1, TimeUnit.MINUTES);

Map<String, IndexMetadata> indexMetadataMap = remoteClusterStateService.getLatestClusterState(
cluster().getClusterName(),
getClusterState().metadata().clusterUUID()
).getMetadata().getIndices();
assertEquals(0, indexMetadataMap.values().stream().findFirst().get().getNumberOfReplicas());
assertEquals(replicaCount, indexMetadataMap.values().stream().findFirst().get().getNumberOfReplicas());
assertEquals(shardCount, indexMetadataMap.values().stream().findFirst().get().getNumberOfShards());
}

Expand Down Expand Up @@ -243,4 +303,17 @@ private void setReplicaCount(int replicaCount) {
.setSettings(Settings.builder().put(SETTING_NUMBER_OF_REPLICAS, replicaCount))
.get();
}

private int updateReplicaCountNTimes(int n, int initialCount) {
int newReplicaCount = randomIntBetween(0, 3);
;
for (int i = 0; i < n; i++) {
while (newReplicaCount == initialCount) {
newReplicaCount = randomIntBetween(0, 3);
}
setReplicaCount(newReplicaCount);
initialCount = newReplicaCount;
}
return newReplicaCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -311,4 +311,14 @@ protected void restore(boolean restoreAllShards, String... indices) {
PlainActionFuture.newFuture()
);
}

protected void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes, String indices, int replicaCount, int shardCount) {
internalCluster().startClusterManagerOnlyNodes(numClusterManagerNodes);
internalCluster().startDataOnlyNodes(numDataOnlyNodes);
for (String index : indices.split(",")) {
createIndex(index, remoteStoreIndexSettings(replicaCount, shardCount));
ensureYellowAndNoInitializingShards(index);
ensureGreen(index);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.opensearch.gateway.remote.ClusterMetadataManifest;
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.gateway.remote.RemoteClusterStateUtils;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;

Expand Down Expand Up @@ -320,9 +321,7 @@ public void testFullClusterRestoreManifestFilePointsToInvalidIndexMetadataPathTh
// Step - 3 Delete index metadata file in remote
try {
Files.move(
segmentRepoPath.resolve(
RemoteClusterStateService.encodeString(clusterName) + "/cluster-state/" + prevClusterUUID + "/index"
),
segmentRepoPath.resolve(RemoteClusterStateUtils.encodeString(clusterName) + "/cluster-state/" + prevClusterUUID + "/index"),
segmentRepoPath.resolve("cluster-state/")
);
} catch (IOException e) {
Expand All @@ -348,7 +347,7 @@ public void testRemoteStateFullRestart() throws Exception {
try {
Files.move(
segmentRepoPath.resolve(
RemoteClusterStateService.encodeString(clusterService().state().getClusterName().value())
RemoteClusterStateUtils.encodeString(clusterService().state().getClusterName().value())
+ "/cluster-state/"
+ prevClusterUUID
+ "/manifest"
Expand Down
Loading