Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Remote Routing Table] Initial commit for RemoteRoutingTableService setup #13304

Merged
merged 19 commits into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add getMetadataFields to MapperService ([#13819](https://github.com/opensearch-project/OpenSearch/pull/13819))
- [Remote State] Add async remote state deletion task running on an interval, configurable by a setting ([#13131](https://github.com/opensearch-project/OpenSearch/pull/13131))
- Allow setting query parameters on requests ([#13776](https://github.com/opensearch-project/OpenSearch/issues/13776))
- Add remote routing table for remote state publication with experimental feature flag ([#13304](https://github.com/opensearch-project/OpenSearch/pull/13304))

### Dependencies
- Bump `com.github.spullara.mustache.java:compiler` from 0.9.10 to 0.9.13 ([#13329](https://github.com/opensearch-project/OpenSearch/pull/13329), [#13559](https://github.com/opensearch-project/OpenSearch/pull/13559))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -511,11 +511,27 @@ private static void ensureRemoteStoreNodesCompatibility(DiscoveryNode joiningNod
assert existingNodes.isEmpty() == false;

CompatibilityMode remoteStoreCompatibilityMode = REMOTE_STORE_COMPATIBILITY_MODE_SETTING.get(metadata.settings());
if (STRICT.equals(remoteStoreCompatibilityMode)) {

DiscoveryNode existingNode = existingNodes.get(0);
List<String> reposToSkip = new ArrayList<>(1);
Optional<DiscoveryNode> remoteRoutingTableNode = existingNodes.stream()
.filter(
node -> node.getAttributes().get(RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY) != null
)
.findFirst();
// If none of the existing nodes have routing table repo, then we skip this repo check if present in joining node.
// This ensures a new node with remote routing table repo is able to join the cluster.
if (remoteRoutingTableNode.isEmpty()) {
String joiningNodeRepoName = joiningNode.getAttributes()
.get(RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY);
if (joiningNodeRepoName != null) {
reposToSkip.add(joiningNodeRepoName);
}
}

if (STRICT.equals(remoteStoreCompatibilityMode)) {
DiscoveryNode existingNode = remoteRoutingTableNode.orElseGet(() -> existingNodes.get(0));
if (joiningNode.isRemoteStoreNode()) {
ensureRemoteStoreNodesCompatibility(joiningNode, existingNode);
ensureRemoteStoreNodesCompatibility(joiningNode, existingNode, reposToSkip);
himshikha marked this conversation as resolved.
Show resolved Hide resolved
} else {
if (existingNode.isRemoteStoreNode()) {
throw new IllegalStateException(
Expand All @@ -537,19 +553,25 @@ private static void ensureRemoteStoreNodesCompatibility(DiscoveryNode joiningNod
throw new IllegalStateException(reason);
}
if (joiningNode.isRemoteStoreNode()) {
Optional<DiscoveryNode> remoteDN = existingNodes.stream().filter(DiscoveryNode::isRemoteStoreNode).findFirst();
remoteDN.ifPresent(discoveryNode -> ensureRemoteStoreNodesCompatibility(joiningNode, discoveryNode));
Optional<DiscoveryNode> remoteDN = remoteRoutingTableNode.isPresent()
? remoteRoutingTableNode
: existingNodes.stream().filter(DiscoveryNode::isRemoteStoreNode).findFirst();
remoteDN.ifPresent(discoveryNode -> ensureRemoteStoreNodesCompatibility(joiningNode, discoveryNode, reposToSkip));
}
}
}
}

private static void ensureRemoteStoreNodesCompatibility(DiscoveryNode joiningNode, DiscoveryNode existingNode) {
private static void ensureRemoteStoreNodesCompatibility(
DiscoveryNode joiningNode,
DiscoveryNode existingNode,
List<String> reposToSkip
) {
if (joiningNode.isRemoteStoreNode()) {
if (existingNode.isRemoteStoreNode()) {
RemoteStoreNodeAttribute joiningRemoteStoreNodeAttribute = new RemoteStoreNodeAttribute(joiningNode);
RemoteStoreNodeAttribute existingRemoteStoreNodeAttribute = new RemoteStoreNodeAttribute(existingNode);
if (existingRemoteStoreNodeAttribute.equals(joiningRemoteStoreNodeAttribute) == false) {
if (existingRemoteStoreNodeAttribute.equalsWithRepoSkip(joiningRemoteStoreNodeAttribute, reposToSkip) == false) {
throw new IllegalStateException(
"a remote store node ["
+ joiningNode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,10 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.List;
import java.util.stream.Collectors;

import static org.opensearch.repositories.blobstore.BlobStoreRepository.SYSTEM_REPOSITORY_SETTING;

Expand Down Expand Up @@ -164,6 +166,40 @@
return true;
}

/**
* Checks if this instance and the give instance share the same repositories, with option to skip checking for a list of repos.
* This will support
* @param other other repositories metadata
* @param reposToSkip list of repos to skip check for equality
* @return {@code true} iff both instances contain the same repositories apart from differences in generations, not including repos provided in reposToSkip.
*/
public boolean equalsIgnoreGenerationsWithRepoSkip(@Nullable RepositoriesMetadata other, List<String> reposToSkip) {
if (other == null) {
return false;

Check warning on line 178 in server/src/main/java/org/opensearch/cluster/metadata/RepositoriesMetadata.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/metadata/RepositoriesMetadata.java#L178

Added line #L178 was not covered by tests
}
List<RepositoryMetadata> currentRepositories = repositories.stream()
.filter(repo -> !reposToSkip.contains(repo.name()))
.collect(Collectors.toList());
List<RepositoryMetadata> otherRepositories = other.repositories.stream()
.filter(repo -> !reposToSkip.contains(repo.name()))
.collect(Collectors.toList());

if (otherRepositories.size() != currentRepositories.size()) {
return false;
}
// Sort repos by name for ordered comparison
Comparator<RepositoryMetadata> compareByName = (o1, o2) -> o1.name().compareTo(o2.name());
currentRepositories.sort(compareByName);
otherRepositories.sort(compareByName);

for (int i = 0; i < currentRepositories.size(); i++) {
if (currentRepositories.get(i).equalsIgnoreGenerations(otherRepositories.get(i)) == false) {
return false;
}
}
return true;
}

@Override
public int hashCode() {
return repositories.hashCode();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.routing.remote;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.node.Node;
import org.opensearch.node.remotestore.RemoteStoreNodeAttribute;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.blobstore.BlobStoreRepository;

import java.io.IOException;
import java.util.function.Supplier;

import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled;

/**
* A Service which provides APIs to upload and download routing table from remote store.
*
* @opensearch.internal
*/
public class RemoteRoutingTableService extends AbstractLifecycleComponent {

private static final Logger logger = LogManager.getLogger(RemoteRoutingTableService.class);
private final Settings settings;
private final Supplier<RepositoriesService> repositoriesService;
private BlobStoreRepository blobStoreRepository;

public RemoteRoutingTableService(Supplier<RepositoriesService> repositoriesService, Settings settings) {
assert isRemoteRoutingTableEnabled(settings) : "Remote routing table is not enabled";
this.repositoriesService = repositoriesService;
this.settings = settings;
}

@Override
protected void doClose() throws IOException {
if (blobStoreRepository != null) {
IOUtils.close(blobStoreRepository);

Check warning on line 48 in server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java#L48

Added line #L48 was not covered by tests
}
}

@Override
protected void doStart() {
assert isRemoteRoutingTableEnabled(settings) == true : "Remote routing table is not enabled";
final String remoteStoreRepo = settings.get(
Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY
);
assert remoteStoreRepo != null : "Remote routing table repository is not configured";
final Repository repository = repositoriesService.get().repository(remoteStoreRepo);
assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository";
blobStoreRepository = (BlobStoreRepository) repository;
}

Check warning on line 62 in server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java#L61-L62

Added lines #L61 - L62 were not covered by tests

@Override
protected void doStop() {}

Check warning on line 65 in server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java#L65

Added line #L65 was not covered by tests

himshikha marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/** Package containing class to perform operations on remote routing table */
package org.opensearch.cluster.routing.remote;
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ protected FeatureFlagSettings(
FeatureFlags.DATETIME_FORMATTER_CACHING_SETTING,
FeatureFlags.TIERED_REMOTE_INDEX_SETTING,
FeatureFlags.REMOTE_STORE_MIGRATION_EXPERIMENTAL_SETTING,
FeatureFlags.PLUGGABLE_CACHE_SETTING
FeatureFlags.PLUGGABLE_CACHE_SETTING,
FeatureFlags.REMOTE_PUBLICATION_EXPERIMENTAL_SETTING
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ public class FeatureFlags {
*/
public static final String PLUGGABLE_CACHE = "opensearch.experimental.feature.pluggable.caching.enabled";

/**
* Gates the functionality of remote routing table.
*/
public static final String REMOTE_PUBLICATION_EXPERIMENTAL = "opensearch.experimental.feature.remote_store.publication.enabled";

public static final Setting<Boolean> REMOTE_STORE_MIGRATION_EXPERIMENTAL_SETTING = Setting.boolSetting(
REMOTE_STORE_MIGRATION_EXPERIMENTAL,
false,
Expand All @@ -89,14 +94,21 @@ public class FeatureFlags {

public static final Setting<Boolean> PLUGGABLE_CACHE_SETTING = Setting.boolSetting(PLUGGABLE_CACHE, false, Property.NodeScope);

public static final Setting<Boolean> REMOTE_PUBLICATION_EXPERIMENTAL_SETTING = Setting.boolSetting(
REMOTE_PUBLICATION_EXPERIMENTAL,
false,
Property.NodeScope
);

private static final List<Setting<Boolean>> ALL_FEATURE_FLAG_SETTINGS = List.of(
REMOTE_STORE_MIGRATION_EXPERIMENTAL_SETTING,
EXTENSIONS_SETTING,
IDENTITY_SETTING,
TELEMETRY_SETTING,
DATETIME_FORMATTER_CACHING_SETTING,
TIERED_REMOTE_INDEX_SETTING,
PLUGGABLE_CACHE_SETTING
PLUGGABLE_CACHE_SETTING,
REMOTE_PUBLICATION_EXPERIMENTAL_SETTING
);
/**
* Should store the settings from opensearch.yml.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.metadata.TemplatesMetadata;
import org.opensearch.cluster.routing.remote.RemoteRoutingTableService;
import org.opensearch.common.CheckedRunnable;
import org.opensearch.common.Nullable;
import org.opensearch.common.blobstore.BlobContainer;
Expand Down Expand Up @@ -68,6 +69,7 @@

import static java.util.Objects.requireNonNull;
import static org.opensearch.gateway.PersistedClusterStateService.SLOW_WRITE_LOGGING_THRESHOLD;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled;

/**
Expand Down Expand Up @@ -202,6 +204,7 @@ public class RemoteClusterStateService implements Closeable {
private final List<IndexMetadataUploadListener> indexMetadataUploadListeners;
private BlobStoreRepository blobStoreRepository;
private BlobStoreTransferService blobStoreTransferService;
private Optional<RemoteRoutingTableService> remoteRoutingTableService;
private volatile TimeValue slowWriteLoggingThreshold;

private volatile TimeValue indexMetadataUploadTimeout;
Expand Down Expand Up @@ -253,6 +256,9 @@ public RemoteClusterStateService(
clusterSettings.addSettingsUpdateConsumer(METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING, this::setMetadataManifestUploadTimeout);
this.remoteStateStats = new RemotePersistenceStats();
this.indexMetadataUploadListeners = indexMetadataUploadListeners;
this.remoteRoutingTableService = isRemoteRoutingTableEnabled(settings)
? Optional.of(new RemoteRoutingTableService(repositoriesService, settings))
: Optional.empty();
}

private BlobStoreTransferService getBlobStoreTransferService() {
Expand Down Expand Up @@ -749,6 +755,9 @@ public void close() throws IOException {
if (blobStoreRepository != null) {
IOUtils.close(blobStoreRepository);
}
if (this.remoteRoutingTableService.isPresent()) {
this.remoteRoutingTableService.get().close();
}
}

public void start() {
Expand All @@ -760,6 +769,7 @@ public void start() {
final Repository repository = repositoriesService.get().repository(remoteStoreRepo);
assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository";
blobStoreRepository = (BlobStoreRepository) repository;
this.remoteRoutingTableService.ifPresent(RemoteRoutingTableService::start);
}

private ClusterMetadataManifest uploadManifest(
Expand Down Expand Up @@ -933,6 +943,11 @@ public TimeValue getMetadataManifestUploadTimeout() {
return this.metadataManifestUploadTimeout;
}

// Package private for unit test
Optional<RemoteRoutingTableService> getRemoteRoutingTableService() {
return this.remoteRoutingTableService;
}

static String getManifestFileName(long term, long version, boolean committed, int codecVersion) {
// 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/manifest/manifest__<inverted_term>__<inverted_version>__C/P__<inverted__timestamp>__<codec_version>
return String.join(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.node.Node;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
Expand All @@ -28,6 +29,8 @@
import java.util.Set;
import java.util.stream.Collectors;

import static org.opensearch.common.util.FeatureFlags.REMOTE_PUBLICATION_EXPERIMENTAL;

/**
* This is an abstraction for validating and storing information specific to remote backed storage nodes.
*
Expand All @@ -46,6 +49,8 @@ public class RemoteStoreNodeAttribute {
+ "."
+ CryptoMetadata.SETTINGS_KEY;
public static final String REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX = "remote_store.repository.%s.settings.";
public static final String REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY = "remote_store.routing_table.repository";
sachinpkale marked this conversation as resolved.
Show resolved Hide resolved

private final RepositoriesMetadata repositoriesMetadata;

public static List<String> SUPPORTED_DATA_REPO_NAME_ATTRIBUTES = List.of(
Expand Down Expand Up @@ -157,6 +162,10 @@ private Set<String> getValidatedRepositoryNames(DiscoveryNode node) {
} else if (node.getAttributes().containsKey(REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY)) {
repositoryNames.add(validateAttributeNonNull(node, REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY));
}
if (node.getAttributes().containsKey(REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY)) {
himshikha marked this conversation as resolved.
Show resolved Hide resolved
sachinpkale marked this conversation as resolved.
Show resolved Hide resolved
repositoryNames.add(validateAttributeNonNull(node, REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY));
himshikha marked this conversation as resolved.
Show resolved Hide resolved
}

return repositoryNames;
}

Expand Down Expand Up @@ -187,6 +196,15 @@ public static boolean isRemoteStoreClusterStateEnabled(Settings settings) {
&& isRemoteClusterStateAttributePresent(settings);
}

private static boolean isRemoteRoutingTableAttributePresent(Settings settings) {
return settings.getByPrefix(Node.NODE_ATTRIBUTES.getKey() + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY)
.isEmpty() == false;
}

public static boolean isRemoteRoutingTableEnabled(Settings settings) {
return FeatureFlags.isEnabled(REMOTE_PUBLICATION_EXPERIMENTAL) && isRemoteRoutingTableAttributePresent(settings);
}

public RepositoriesMetadata getRepositoriesMetadata() {
return this.repositoriesMetadata;
}
Expand Down Expand Up @@ -231,6 +249,21 @@ public int hashCode() {
return hashCode;
}

/**
* Checks if 2 instances are equal, with option to skip check for a list of repos.
* *
* @param o other instance
* @param reposToSkip list of repos to skip check for equality
* @return {@code true} iff both instances are equal, not including the repositories in both instances if they are part of reposToSkip.
*/
public boolean equalsWithRepoSkip(Object o, List<String> reposToSkip) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

RemoteStoreNodeAttribute that = (RemoteStoreNodeAttribute) o;
return this.getRepositoriesMetadata().equalsIgnoreGenerationsWithRepoSkip(that.getRepositoriesMetadata(), reposToSkip);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand Down
Loading
Loading