Skip to content

Commit

Permalink
using remote cluster-state as fallback
Browse files Browse the repository at this point in the history
Signed-off-by: Rajiv Kumar Vaidyanathan <rajivkv@amazon.com>
  • Loading branch information
rajiv-kv committed Sep 12, 2024
1 parent 2e98df3 commit 4e94c5f
Show file tree
Hide file tree
Showing 20 changed files with 853 additions and 21 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Added
- MultiTermQueries in keyword fields now default to `indexed` approach and gated behind cluster setting ([#15637](https://github.com/opensearch-project/OpenSearch/pull/15637))
- [Workload Management] QueryGroup resource cancellation framework changes ([#15651](https://github.com/opensearch-project/OpenSearch/pull/15651))
- Fallback to Remote cluster-state on Term-Version check mismatch - ([#15424](https://github.com/opensearch-project/OpenSearch/pull/15424))

### Dependencies
- Bump `com.azure:azure-identity` from 1.13.0 to 1.13.2 ([#15578](https://github.com/opensearch-project/OpenSearch/pull/15578))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.gateway.remote;

import org.opensearch.action.admin.cluster.state.ClusterStateAction;
import org.opensearch.action.admin.cluster.state.ClusterStateRequest;
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.action.support.clustermanager.term.GetTermVersionAction;
import org.opensearch.action.support.clustermanager.term.GetTermVersionResponse;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.coordination.ClusterStateTermVersion;
import org.opensearch.cluster.coordination.PublicationTransportHandler;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.transport.TransportResponse;
import org.opensearch.gateway.remote.model.RemoteRoutingTableBlobStore;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.remote.RemoteStoreEnums;
import org.opensearch.plugins.Plugin;
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.transport.TransportService;
import org.junit.Before;

import java.nio.file.Path;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;

import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_PUBLICATION_SETTING_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.hamcrest.Matchers.is;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteClusterStateTermVersionIT extends RemoteStoreBaseIntegTestCase {
private static final String INDEX_NAME = "test-index";
private static final String INDEX_NAME_1 = "test-index-1";
List<BlobPath> indexRoutingPaths;
AtomicInteger indexRoutingFiles = new AtomicInteger();
private final RemoteStoreEnums.PathType pathType = RemoteStoreEnums.PathType.HASHED_PREFIX;

@Before
public void setup() {
asyncUploadMockFsRepo = false;
}

protected Collection<Class<? extends Plugin>> nodePlugins() {
return List.of(MockTransportService.TestPlugin.class);
}

@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true)
.put(
RemoteRoutingTableBlobStore.REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING.getKey(),
RemoteStoreEnums.PathType.HASHED_PREFIX.toString()
)
.put("node.attr." + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, REMOTE_ROUTING_TABLE_REPO)
.put(REMOTE_PUBLICATION_SETTING_KEY, true)
.build();
}

public void testRemoteClusterStateFallback() throws Exception {
BlobStoreRepository repository = prepareClusterAndVerifyRepository();

RemoteClusterStateService remoteClusterStateService = internalCluster().getClusterManagerNodeInstance(
RemoteClusterStateService.class
);

RemoteManifestManager remoteManifestManager = remoteClusterStateService.getRemoteManifestManager();
Optional<ClusterMetadataManifest> latestManifest = remoteManifestManager.getLatestClusterMetadataManifest(
getClusterState().getClusterName().value(),
getClusterState().getMetadata().clusterUUID()
);

String[] dataNodes = internalCluster().getDataNodeNames().toArray(String[]::new);
MockTransportService primaryService = (MockTransportService) internalCluster().getInstance(TransportService.class, dataNodes[0]);

String cm = internalCluster().getClusterManagerName();
primaryService.addRequestHandlingBehavior(
PublicationTransportHandler.COMMIT_STATE_ACTION_NAME,
(handler, request, channel, task) -> {
// not committing the state
logger.info("ignoring the commit from cluster-manager {}", request);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
);

String index = "index_1";
createIndex(
index,
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey(), Long.MAX_VALUE)
.build()
);
logger.info("created index {}", index);
Map<String, AtomicInteger> callCounters = Map.ofEntries(
Map.entry(ClusterStateAction.NAME, new AtomicInteger()),
Map.entry(GetTermVersionAction.NAME, new AtomicInteger())
);

addCallCountInterceptor(cm, callCounters);

ClusterStateResponse stateResponseM = client(cm).admin().cluster().state(new ClusterStateRequest()).actionGet();

ClusterStateResponse stateResponseD = client(dataNodes[0]).admin().cluster().state(new ClusterStateRequest()).actionGet();
assertEquals(stateResponseM, stateResponseD);
assertThat(callCounters.get(ClusterStateAction.NAME).get(), is(0));
assertThat(callCounters.get(GetTermVersionAction.NAME).get(), is(1));

}

public void testNoRemoteClusterStateFound() throws Exception {
BlobStoreRepository repository = prepareClusterAndVerifyRepository();

RemoteClusterStateService remoteClusterStateService = internalCluster().getClusterManagerNodeInstance(
RemoteClusterStateService.class
);

RemoteManifestManager remoteManifestManager = remoteClusterStateService.getRemoteManifestManager();
Optional<ClusterMetadataManifest> latestManifest = remoteManifestManager.getLatestClusterMetadataManifest(
getClusterState().getClusterName().value(),
getClusterState().getMetadata().clusterUUID()
);

String[] dataNodes = internalCluster().getDataNodeNames().toArray(String[]::new);
MockTransportService primaryService = (MockTransportService) internalCluster().getInstance(TransportService.class, dataNodes[0]);
primaryService.addRequestHandlingBehavior(
PublicationTransportHandler.COMMIT_STATE_ACTION_NAME,
(handler, request, channel, task) -> {
// not committing the state
logger.info("ignoring the commit from cluster-manager {}", request);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
);

ClusterState state = internalCluster().clusterService().state();
String cm = internalCluster().getClusterManagerName();
MockTransportService cmservice = (MockTransportService) internalCluster().getInstance(TransportService.class, cm);
cmservice.addRequestHandlingBehavior(GetTermVersionAction.NAME, (handler, request, channel, task) -> {
channel.sendResponse(
new GetTermVersionResponse(new ClusterStateTermVersion(state.getClusterName(), state.stateUUID(), -1, -1), true)
);
});

Map<String, AtomicInteger> callCounters = Map.ofEntries(
Map.entry(ClusterStateAction.NAME, new AtomicInteger()),
Map.entry(GetTermVersionAction.NAME, new AtomicInteger())
);

addCallCountInterceptor(cm, callCounters);

ClusterStateResponse stateResponseM = client(cm).admin().cluster().state(new ClusterStateRequest()).actionGet();
ClusterStateResponse stateResponseD = client(dataNodes[0]).admin().cluster().state(new ClusterStateRequest()).actionGet();
assertEquals(stateResponseM, stateResponseD);
assertThat(callCounters.get(ClusterStateAction.NAME).get(), is(1));
assertThat(callCounters.get(GetTermVersionAction.NAME).get(), is(1));

}

private void addCallCountInterceptor(String nodeName, Map<String, AtomicInteger> callCounters) {
MockTransportService primaryService = (MockTransportService) internalCluster().getInstance(TransportService.class, nodeName);
for (var ctrEnty : callCounters.entrySet()) {
primaryService.addRequestHandlingBehavior(ctrEnty.getKey(), (handler, request, channel, task) -> {
ctrEnty.getValue().incrementAndGet();
logger.info("--> {} response redirect", ctrEnty.getKey());
handler.messageReceived(request, channel, task);
});
}
}

private BlobStoreRepository prepareClusterAndVerifyRepository() throws Exception {
clusterSettingsSuppliedByTest = true;
Path segmentRepoPath = randomRepoPath();
Path translogRepoPath = randomRepoPath();
Path remoteRoutingTableRepoPath = randomRepoPath();
Settings settings = buildRemoteStoreNodeAttributes(
REPOSITORY_NAME,
segmentRepoPath,
REPOSITORY_2_NAME,
translogRepoPath,
REMOTE_ROUTING_TABLE_REPO,
remoteRoutingTableRepoPath,
false
);
prepareCluster(1, 3, INDEX_NAME, 1, 5, settings);
ensureGreen(INDEX_NAME);

RepositoriesService repositoriesService = internalCluster().getClusterManagerNodeInstance(RepositoriesService.class);
BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(REMOTE_ROUTING_TABLE_REPO);

return repository;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,12 @@
import org.opensearch.cluster.metadata.Metadata.Custom;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Nullable;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.node.NodeClosedException;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;
Expand Down Expand Up @@ -80,7 +82,8 @@ public TransportClusterStateAction(
ClusterService clusterService,
ThreadPool threadPool,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver
IndexNameExpressionResolver indexNameExpressionResolver,
@Nullable RemoteClusterStateService remoteClusterStateService
) {
super(
ClusterStateAction.NAME,
Expand All @@ -93,6 +96,7 @@ public TransportClusterStateAction(
indexNameExpressionResolver
);
this.localExecuteSupported = true;
this.remoteClusterStateService = remoteClusterStateService;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.opensearch.cluster.ClusterStateObserver;
import org.opensearch.cluster.NotClusterManagerException;
import org.opensearch.cluster.block.ClusterBlockException;
import org.opensearch.cluster.coordination.ClusterStateTermVersion;
import org.opensearch.cluster.coordination.FailedToCommitClusterStateException;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.metadata.ProcessClusterEventTimeoutException;
Expand All @@ -63,6 +64,8 @@
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.discovery.ClusterManagerNotDiscoveredException;
import org.opensearch.gateway.remote.ClusterMetadataManifest;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.node.NodeClosedException;
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType;
import org.opensearch.tasks.Task;
Expand All @@ -74,6 +77,7 @@
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.util.Optional;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Predicate;
Expand All @@ -95,6 +99,8 @@ public abstract class TransportClusterManagerNodeAction<Request extends ClusterM
protected final ClusterService clusterService;
protected final IndexNameExpressionResolver indexNameExpressionResolver;

protected RemoteClusterStateService remoteClusterStateService;

private final String executor;

protected TransportClusterManagerNodeAction(
Expand Down Expand Up @@ -378,9 +384,12 @@ public void handleResponse(GetTermVersionResponse response) {
response.getClusterStateTermVersion(),
isLatestClusterStatePresentOnLocalNode
);
if (isLatestClusterStatePresentOnLocalNode) {
onLatestLocalState.accept(clusterState);

ClusterState stateFromNode = getStateFromLocalNode(response);
if (stateFromNode != null) {
onLatestLocalState.accept(stateFromNode);
} else {
// fallback to clusterManager
onStaleLocalState.accept(clusterManagerNode, clusterState);
}
}
Expand All @@ -405,6 +414,52 @@ public GetTermVersionResponse read(StreamInput in) throws IOException {
};
}

private ClusterState getStateFromLocalNode(GetTermVersionResponse termVersionResponse) {
ClusterStateTermVersion termVersion = termVersionResponse.getClusterStateTermVersion();
ClusterState appliedState = clusterService.state();
if (termVersion.equals(new ClusterStateTermVersion(appliedState))) {
logger.trace("Using the applied State from local, ClusterStateTermVersion {}", termVersion);
return appliedState;
}

ClusterState preCommitState = clusterService.preCommitState();
if (preCommitState != null && termVersion.equals(new ClusterStateTermVersion(preCommitState))) {
logger.trace("Using the published state from local, ClusterStateTermVersion {}", termVersion);
return preCommitState;
}

if (remoteClusterStateService != null && termVersionResponse.isStatePresentInRemote()) {
try {
ClusterStateTermVersion clusterStateTermVersion = termVersionResponse.getClusterStateTermVersion();
Optional<ClusterMetadataManifest> clusterMetadataManifest = remoteClusterStateService
.getClusterMetadataManifestByTermVersion(
clusterStateTermVersion.getClusterName().value(),
clusterStateTermVersion.getClusterUUID(),
clusterStateTermVersion.getTerm(),
clusterStateTermVersion.getVersion()
);
if (clusterMetadataManifest.isEmpty()) {
logger.trace("could not find manifest in remote-store for ClusterStateTermVersion {}", termVersion);
return null;
}
ClusterState clusterStateFromRemote = remoteClusterStateService.getClusterStateForManifest(
appliedState.getClusterName().value(),
clusterMetadataManifest.get(),
appliedState.nodes().getLocalNode().getId(),
true
);

if (clusterStateFromRemote != null) {
logger.trace("Using the remote cluster-state fetched from local node, ClusterStateTermVersion {}", termVersion);
return clusterStateFromRemote;
}
} catch (Exception e) {
logger.trace("Error while fetching from remote cluster state", e);
}
}
return null;
}

private boolean checkForBlock(Request request, ClusterState localClusterState) {
final ClusterBlockException blockException = checkBlock(request, localClusterState);
if (blockException != null) {
Expand Down
Loading

0 comments on commit 4e94c5f

Please sign in to comment.