Skip to content

Commit

Permalink
[Remote Routing Table] Initial commit for RemoteRoutingTableService s…
Browse files Browse the repository at this point in the history
…etup (#13304)

* Initial commit for RemoteRoutingTableService setup

Signed-off-by: Himshikha Gupta <himshikh@amazon.com>
  • Loading branch information
himshikha authored Jun 5, 2024
1 parent 10ae4cb commit 5121409
Show file tree
Hide file tree
Showing 13 changed files with 551 additions and 9 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add getMetadataFields to MapperService ([#13819](https://github.com/opensearch-project/OpenSearch/pull/13819))
- [Remote State] Add async remote state deletion task running on an interval, configurable by a setting ([#13131](https://github.com/opensearch-project/OpenSearch/pull/13131))
- Allow setting query parameters on requests ([#13776](https://github.com/opensearch-project/OpenSearch/issues/13776))
- Add remote routing table for remote state publication with experimental feature flag ([#13304](https://github.com/opensearch-project/OpenSearch/pull/13304))

### Dependencies
- Bump `com.github.spullara.mustache.java:compiler` from 0.9.10 to 0.9.13 ([#13329](https://github.com/opensearch-project/OpenSearch/pull/13329), [#13559](https://github.com/opensearch-project/OpenSearch/pull/13559))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -511,11 +511,27 @@ private static void ensureRemoteStoreNodesCompatibility(DiscoveryNode joiningNod
assert existingNodes.isEmpty() == false;

CompatibilityMode remoteStoreCompatibilityMode = REMOTE_STORE_COMPATIBILITY_MODE_SETTING.get(metadata.settings());
if (STRICT.equals(remoteStoreCompatibilityMode)) {

DiscoveryNode existingNode = existingNodes.get(0);
List<String> reposToSkip = new ArrayList<>(1);
Optional<DiscoveryNode> remoteRoutingTableNode = existingNodes.stream()
.filter(
node -> node.getAttributes().get(RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY) != null
)
.findFirst();
// If none of the existing nodes have routing table repo, then we skip this repo check if present in joining node.
// This ensures a new node with remote routing table repo is able to join the cluster.
if (remoteRoutingTableNode.isEmpty()) {
String joiningNodeRepoName = joiningNode.getAttributes()
.get(RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY);
if (joiningNodeRepoName != null) {
reposToSkip.add(joiningNodeRepoName);
}
}

if (STRICT.equals(remoteStoreCompatibilityMode)) {
DiscoveryNode existingNode = remoteRoutingTableNode.orElseGet(() -> existingNodes.get(0));
if (joiningNode.isRemoteStoreNode()) {
ensureRemoteStoreNodesCompatibility(joiningNode, existingNode);
ensureRemoteStoreNodesCompatibility(joiningNode, existingNode, reposToSkip);
} else {
if (existingNode.isRemoteStoreNode()) {
throw new IllegalStateException(
Expand All @@ -537,19 +553,25 @@ private static void ensureRemoteStoreNodesCompatibility(DiscoveryNode joiningNod
throw new IllegalStateException(reason);
}
if (joiningNode.isRemoteStoreNode()) {
Optional<DiscoveryNode> remoteDN = existingNodes.stream().filter(DiscoveryNode::isRemoteStoreNode).findFirst();
remoteDN.ifPresent(discoveryNode -> ensureRemoteStoreNodesCompatibility(joiningNode, discoveryNode));
Optional<DiscoveryNode> remoteDN = remoteRoutingTableNode.isPresent()
? remoteRoutingTableNode
: existingNodes.stream().filter(DiscoveryNode::isRemoteStoreNode).findFirst();
remoteDN.ifPresent(discoveryNode -> ensureRemoteStoreNodesCompatibility(joiningNode, discoveryNode, reposToSkip));
}
}
}
}

private static void ensureRemoteStoreNodesCompatibility(DiscoveryNode joiningNode, DiscoveryNode existingNode) {
private static void ensureRemoteStoreNodesCompatibility(
DiscoveryNode joiningNode,
DiscoveryNode existingNode,
List<String> reposToSkip
) {
if (joiningNode.isRemoteStoreNode()) {
if (existingNode.isRemoteStoreNode()) {
RemoteStoreNodeAttribute joiningRemoteStoreNodeAttribute = new RemoteStoreNodeAttribute(joiningNode);
RemoteStoreNodeAttribute existingRemoteStoreNodeAttribute = new RemoteStoreNodeAttribute(existingNode);
if (existingRemoteStoreNodeAttribute.equals(joiningRemoteStoreNodeAttribute) == false) {
if (existingRemoteStoreNodeAttribute.equalsWithRepoSkip(joiningRemoteStoreNodeAttribute, reposToSkip) == false) {
throw new IllegalStateException(
"a remote store node ["
+ joiningNode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,10 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.List;
import java.util.stream.Collectors;

import static org.opensearch.repositories.blobstore.BlobStoreRepository.SYSTEM_REPOSITORY_SETTING;

Expand Down Expand Up @@ -164,6 +166,40 @@ public boolean equalsIgnoreGenerations(@Nullable RepositoriesMetadata other) {
return true;
}

/**
* Checks if this instance and the give instance share the same repositories, with option to skip checking for a list of repos.
* This will support
* @param other other repositories metadata
* @param reposToSkip list of repos to skip check for equality
* @return {@code true} iff both instances contain the same repositories apart from differences in generations, not including repos provided in reposToSkip.
*/
public boolean equalsIgnoreGenerationsWithRepoSkip(@Nullable RepositoriesMetadata other, List<String> reposToSkip) {
if (other == null) {
return false;
}
List<RepositoryMetadata> currentRepositories = repositories.stream()
.filter(repo -> !reposToSkip.contains(repo.name()))
.collect(Collectors.toList());
List<RepositoryMetadata> otherRepositories = other.repositories.stream()
.filter(repo -> !reposToSkip.contains(repo.name()))
.collect(Collectors.toList());

if (otherRepositories.size() != currentRepositories.size()) {
return false;
}
// Sort repos by name for ordered comparison
Comparator<RepositoryMetadata> compareByName = (o1, o2) -> o1.name().compareTo(o2.name());
currentRepositories.sort(compareByName);
otherRepositories.sort(compareByName);

for (int i = 0; i < currentRepositories.size(); i++) {
if (currentRepositories.get(i).equalsIgnoreGenerations(otherRepositories.get(i)) == false) {
return false;
}
}
return true;
}

@Override
public int hashCode() {
return repositories.hashCode();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.routing.remote;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.node.Node;
import org.opensearch.node.remotestore.RemoteStoreNodeAttribute;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.blobstore.BlobStoreRepository;

import java.io.IOException;
import java.util.function.Supplier;

import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled;

/**
* A Service which provides APIs to upload and download routing table from remote store.
*
* @opensearch.internal
*/
public class RemoteRoutingTableService extends AbstractLifecycleComponent {

private static final Logger logger = LogManager.getLogger(RemoteRoutingTableService.class);
private final Settings settings;
private final Supplier<RepositoriesService> repositoriesService;
private BlobStoreRepository blobStoreRepository;

public RemoteRoutingTableService(Supplier<RepositoriesService> repositoriesService, Settings settings) {
assert isRemoteRoutingTableEnabled(settings) : "Remote routing table is not enabled";
this.repositoriesService = repositoriesService;
this.settings = settings;
}

@Override
protected void doClose() throws IOException {
if (blobStoreRepository != null) {
IOUtils.close(blobStoreRepository);
}
}

@Override
protected void doStart() {
assert isRemoteRoutingTableEnabled(settings) == true : "Remote routing table is not enabled";
final String remoteStoreRepo = settings.get(
Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY
);
assert remoteStoreRepo != null : "Remote routing table repository is not configured";
final Repository repository = repositoriesService.get().repository(remoteStoreRepo);
assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository";
blobStoreRepository = (BlobStoreRepository) repository;
}

@Override
protected void doStop() {}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/** Package containing class to perform operations on remote routing table */
package org.opensearch.cluster.routing.remote;
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ protected FeatureFlagSettings(
FeatureFlags.DATETIME_FORMATTER_CACHING_SETTING,
FeatureFlags.TIERED_REMOTE_INDEX_SETTING,
FeatureFlags.REMOTE_STORE_MIGRATION_EXPERIMENTAL_SETTING,
FeatureFlags.PLUGGABLE_CACHE_SETTING
FeatureFlags.PLUGGABLE_CACHE_SETTING,
FeatureFlags.REMOTE_PUBLICATION_EXPERIMENTAL_SETTING
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ public class FeatureFlags {
*/
public static final String PLUGGABLE_CACHE = "opensearch.experimental.feature.pluggable.caching.enabled";

/**
* Gates the functionality of remote routing table.
*/
public static final String REMOTE_PUBLICATION_EXPERIMENTAL = "opensearch.experimental.feature.remote_store.publication.enabled";

public static final Setting<Boolean> REMOTE_STORE_MIGRATION_EXPERIMENTAL_SETTING = Setting.boolSetting(
REMOTE_STORE_MIGRATION_EXPERIMENTAL,
false,
Expand All @@ -89,14 +94,21 @@ public class FeatureFlags {

public static final Setting<Boolean> PLUGGABLE_CACHE_SETTING = Setting.boolSetting(PLUGGABLE_CACHE, false, Property.NodeScope);

public static final Setting<Boolean> REMOTE_PUBLICATION_EXPERIMENTAL_SETTING = Setting.boolSetting(
REMOTE_PUBLICATION_EXPERIMENTAL,
false,
Property.NodeScope
);

private static final List<Setting<Boolean>> ALL_FEATURE_FLAG_SETTINGS = List.of(
REMOTE_STORE_MIGRATION_EXPERIMENTAL_SETTING,
EXTENSIONS_SETTING,
IDENTITY_SETTING,
TELEMETRY_SETTING,
DATETIME_FORMATTER_CACHING_SETTING,
TIERED_REMOTE_INDEX_SETTING,
PLUGGABLE_CACHE_SETTING
PLUGGABLE_CACHE_SETTING,
REMOTE_PUBLICATION_EXPERIMENTAL_SETTING
);
/**
* Should store the settings from opensearch.yml.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.metadata.TemplatesMetadata;
import org.opensearch.cluster.routing.remote.RemoteRoutingTableService;
import org.opensearch.common.CheckedRunnable;
import org.opensearch.common.Nullable;
import org.opensearch.common.blobstore.BlobContainer;
Expand Down Expand Up @@ -68,6 +69,7 @@

import static java.util.Objects.requireNonNull;
import static org.opensearch.gateway.PersistedClusterStateService.SLOW_WRITE_LOGGING_THRESHOLD;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled;

/**
Expand Down Expand Up @@ -202,6 +204,7 @@ public class RemoteClusterStateService implements Closeable {
private final List<IndexMetadataUploadListener> indexMetadataUploadListeners;
private BlobStoreRepository blobStoreRepository;
private BlobStoreTransferService blobStoreTransferService;
private Optional<RemoteRoutingTableService> remoteRoutingTableService;
private volatile TimeValue slowWriteLoggingThreshold;

private volatile TimeValue indexMetadataUploadTimeout;
Expand Down Expand Up @@ -253,6 +256,9 @@ public RemoteClusterStateService(
clusterSettings.addSettingsUpdateConsumer(METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING, this::setMetadataManifestUploadTimeout);
this.remoteStateStats = new RemotePersistenceStats();
this.indexMetadataUploadListeners = indexMetadataUploadListeners;
this.remoteRoutingTableService = isRemoteRoutingTableEnabled(settings)
? Optional.of(new RemoteRoutingTableService(repositoriesService, settings))
: Optional.empty();
}

private BlobStoreTransferService getBlobStoreTransferService() {
Expand Down Expand Up @@ -749,6 +755,9 @@ public void close() throws IOException {
if (blobStoreRepository != null) {
IOUtils.close(blobStoreRepository);
}
if (this.remoteRoutingTableService.isPresent()) {
this.remoteRoutingTableService.get().close();
}
}

public void start() {
Expand All @@ -760,6 +769,7 @@ public void start() {
final Repository repository = repositoriesService.get().repository(remoteStoreRepo);
assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository";
blobStoreRepository = (BlobStoreRepository) repository;
this.remoteRoutingTableService.ifPresent(RemoteRoutingTableService::start);
}

private ClusterMetadataManifest uploadManifest(
Expand Down Expand Up @@ -933,6 +943,11 @@ public TimeValue getMetadataManifestUploadTimeout() {
return this.metadataManifestUploadTimeout;
}

// Package private for unit test
Optional<RemoteRoutingTableService> getRemoteRoutingTableService() {
return this.remoteRoutingTableService;
}

static String getManifestFileName(long term, long version, boolean committed, int codecVersion) {
// 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/manifest/manifest__<inverted_term>__<inverted_version>__C/P__<inverted__timestamp>__<codec_version>
return String.join(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.node.Node;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
Expand All @@ -28,6 +29,8 @@
import java.util.Set;
import java.util.stream.Collectors;

import static org.opensearch.common.util.FeatureFlags.REMOTE_PUBLICATION_EXPERIMENTAL;

/**
* This is an abstraction for validating and storing information specific to remote backed storage nodes.
*
Expand All @@ -46,6 +49,8 @@ public class RemoteStoreNodeAttribute {
+ "."
+ CryptoMetadata.SETTINGS_KEY;
public static final String REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX = "remote_store.repository.%s.settings.";
public static final String REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY = "remote_store.routing_table.repository";

private final RepositoriesMetadata repositoriesMetadata;

public static List<String> SUPPORTED_DATA_REPO_NAME_ATTRIBUTES = List.of(
Expand Down Expand Up @@ -157,6 +162,10 @@ private Set<String> getValidatedRepositoryNames(DiscoveryNode node) {
} else if (node.getAttributes().containsKey(REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY)) {
repositoryNames.add(validateAttributeNonNull(node, REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY));
}
if (node.getAttributes().containsKey(REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY)) {
repositoryNames.add(validateAttributeNonNull(node, REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY));
}

return repositoryNames;
}

Expand Down Expand Up @@ -187,6 +196,15 @@ public static boolean isRemoteStoreClusterStateEnabled(Settings settings) {
&& isRemoteClusterStateAttributePresent(settings);
}

private static boolean isRemoteRoutingTableAttributePresent(Settings settings) {
return settings.getByPrefix(Node.NODE_ATTRIBUTES.getKey() + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY)
.isEmpty() == false;
}

public static boolean isRemoteRoutingTableEnabled(Settings settings) {
return FeatureFlags.isEnabled(REMOTE_PUBLICATION_EXPERIMENTAL) && isRemoteRoutingTableAttributePresent(settings);
}

public RepositoriesMetadata getRepositoriesMetadata() {
return this.repositoriesMetadata;
}
Expand Down Expand Up @@ -231,6 +249,21 @@ public int hashCode() {
return hashCode;
}

/**
* Checks if 2 instances are equal, with option to skip check for a list of repos.
* *
* @param o other instance
* @param reposToSkip list of repos to skip check for equality
* @return {@code true} iff both instances are equal, not including the repositories in both instances if they are part of reposToSkip.
*/
public boolean equalsWithRepoSkip(Object o, List<String> reposToSkip) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

RemoteStoreNodeAttribute that = (RemoteStoreNodeAttribute) o;
return this.getRepositoriesMetadata().equalsIgnoreGenerationsWithRepoSkip(that.getRepositoriesMetadata(), reposToSkip);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand Down
Loading

0 comments on commit 5121409

Please sign in to comment.