Skip to content

Commit

Permalink
Merge branch 'main' into publication-setting-change
Browse files Browse the repository at this point in the history
  • Loading branch information
shiv0408 committed Sep 5, 2024
2 parents c142e1d + 2eb148c commit 3c6a7d7
Show file tree
Hide file tree
Showing 16 changed files with 289 additions and 120 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Relax the join validation for Remote State publication ([#15471](https://github.com/opensearch-project/OpenSearch/pull/15471))
- Static RemotePublication setting added, removed experimental feature flag ([#15478](https://github.com/opensearch-project/OpenSearch/pull/15478))
- MultiTermQueries in keyword fields now default to `indexed` approach and gated behind cluster setting ([#15637](https://github.com/opensearch-project/OpenSearch/pull/15637))
- Making _cat/allocation API use indexLevelStats ([#15292](https://github.com/opensearch-project/OpenSearch/pull/15292))

### Dependencies
- Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,15 @@

package org.opensearch.gateway.remote;

import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest;
import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.client.Client;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.coordination.CoordinationState;
import org.opensearch.cluster.coordination.PersistedStateRegistry;
import org.opensearch.cluster.coordination.PublishClusterStateStats;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.settings.Settings;
import org.opensearch.discovery.DiscoveryStats;
Expand All @@ -26,18 +30,25 @@
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.repositories.fs.ReloadableFsRepository;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope;
import org.opensearch.test.OpenSearchIntegTestCase.Scope;
import org.junit.Before;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.HashSet;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.opensearch.action.admin.cluster.node.info.NodesInfoRequest.Metric.SETTINGS;
import static org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest.Metric.DISCOVERY;
import static org.opensearch.cluster.metadata.Metadata.isGlobalStateEquals;
import static org.opensearch.gateway.remote.RemoteClusterStateAttributesManager.DISCOVERY_NODES;
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_PUBLICATION_SETTING_KEY;
Expand Down Expand Up @@ -213,11 +224,121 @@ public void testRemotePublicationDownloadStats() {
NodesStatsResponse nodesStatsResponseDataNode = client().admin()
.cluster()
.prepareNodesStats(dataNode)
.addMetric(NodesStatsRequest.Metric.DISCOVERY.metricName())
.addMetric(DISCOVERY.metricName())
.get();

assertDataNodeDownloadStats(nodesStatsResponseDataNode);
}

public void testRemotePublicationDisabledByRollingRestart() throws Exception {
prepareCluster(3, 2, INDEX_NAME, 1, 2);
ensureStableCluster(5);
ensureGreen(INDEX_NAME);

Set<String> clusterManagers = internalCluster().getClusterManagerNames();
Set<String> restartedMasters = new HashSet<>();

for (String clusterManager : clusterManagers) {
internalCluster().restartNode(clusterManager, new InternalTestCluster.RestartCallback() {
@Override
public Settings onNodeStopped(String nodeName) {
restartedMasters.add(nodeName);
return Settings.builder().put(REMOTE_PUBLICATION_SETTING_KEY, false).build();
}

@Override
public void doAfterNodes(int n, Client client) {
String activeCM = internalCluster().getClusterManagerName();
Set<String> followingCMs = clusterManagers.stream()
.filter(node -> !Objects.equals(node, activeCM))
.collect(Collectors.toSet());
boolean activeCMRestarted = restartedMasters.contains(activeCM);
NodesStatsResponse response = client().admin()
.cluster()
.prepareNodesStats(followingCMs.toArray(new String[0]))
.clear()
.addMetric(DISCOVERY.metricName())
.get();
// after master is flipped to restarted master, publication should happen on Transport
response.getNodes().forEach(nodeStats -> {
if (activeCMRestarted) {
PublishClusterStateStats stats = nodeStats.getDiscoveryStats().getPublishStats();
assertTrue(
stats.getFullClusterStateReceivedCount() > 0 || stats.getCompatibleClusterStateDiffReceivedCount() > 0
);
assertEquals(0, stats.getIncompatibleClusterStateDiffReceivedCount());
} else {
DiscoveryStats stats = nodeStats.getDiscoveryStats();
assertEquals(0, stats.getPublishStats().getFullClusterStateReceivedCount());
assertEquals(0, stats.getPublishStats().getCompatibleClusterStateDiffReceivedCount());
assertEquals(0, stats.getPublishStats().getIncompatibleClusterStateDiffReceivedCount());
}
});

NodesInfoResponse nodesInfoResponse = client().admin()
.cluster()
.prepareNodesInfo(activeCM)
.clear()
.addMetric(SETTINGS.metricName())
.get();
// if masterRestarted is true Publication Setting should be false, and vice versa
assertTrue(
REMOTE_PUBLICATION_SETTING.get(nodesInfoResponse.getNodes().get(0).getSettings()) != activeCMRestarted
);

followingCMs.forEach(node -> {
PersistedStateRegistry registry = internalCluster().getInstance(PersistedStateRegistry.class, node);
CoordinationState.PersistedState remoteState = registry.getPersistedState(
PersistedStateRegistry.PersistedStateType.REMOTE
);
if (activeCMRestarted) {
assertNull(remoteState.getLastAcceptedState());
// assertNull(remoteState.getLastAcceptedManifest());
} else {
ClusterState localState = registry.getPersistedState(PersistedStateRegistry.PersistedStateType.LOCAL)
.getLastAcceptedState();
ClusterState remotePersistedState = remoteState.getLastAcceptedState();
assertTrue(isGlobalStateEquals(localState.metadata(), remotePersistedState.metadata()));
assertEquals(localState.nodes(), remotePersistedState.nodes());
assertEquals(localState.routingTable(), remotePersistedState.routingTable());
assertEquals(localState.customs(), remotePersistedState.customs());
}
});
}
});

}
ensureGreen(INDEX_NAME);
ensureStableCluster(5);

String activeCM = internalCluster().getClusterManagerName();
Set<String> followingCMs = clusterManagers.stream().filter(node -> !Objects.equals(node, activeCM)).collect(Collectors.toSet());
NodesStatsResponse response = client().admin()
.cluster()
.prepareNodesStats(followingCMs.toArray(new String[0]))
.clear()
.addMetric(DISCOVERY.metricName())
.get();
response.getNodes().forEach(nodeStats -> {
PublishClusterStateStats stats = nodeStats.getDiscoveryStats().getPublishStats();
assertTrue(stats.getFullClusterStateReceivedCount() > 0 || stats.getCompatibleClusterStateDiffReceivedCount() > 0);
assertEquals(0, stats.getIncompatibleClusterStateDiffReceivedCount());
});
NodesInfoResponse nodesInfoResponse = client().admin()
.cluster()
.prepareNodesInfo(activeCM)
.clear()
.addMetric(SETTINGS.metricName())
.get();
// if masterRestarted is true Publication Setting should be false, and vice versa
assertFalse(REMOTE_PUBLICATION_SETTING.get(nodesInfoResponse.getNodes().get(0).getSettings()));

followingCMs.forEach(node -> {
PersistedStateRegistry registry = internalCluster().getInstance(PersistedStateRegistry.class, node);
CoordinationState.PersistedState remoteState = registry.getPersistedState(PersistedStateRegistry.PersistedStateType.REMOTE);
assertNull(remoteState.getLastAcceptedState());
// assertNull(remoteState.getLastAcceptedManifest());
});
}

private void assertDataNodeDownloadStats(NodesStatsResponse nodesStatsResponse) {
Expand Down
18 changes: 17 additions & 1 deletion server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -867,7 +867,7 @@ public IndexSettings getIndexSettings() {
* {@link IndexReader}-specific optimizations, such as rewriting containing range queries.
*/
public QueryShardContext newQueryShardContext(int shardId, IndexSearcher searcher, LongSupplier nowInMillis, String clusterAlias) {
return newQueryShardContext(shardId, searcher, nowInMillis, clusterAlias, false, false);
return newQueryShardContext(shardId, searcher, nowInMillis, clusterAlias, false);
}

/**
Expand Down Expand Up @@ -913,6 +913,22 @@ public QueryShardContext newQueryShardContext(
);
}

/**
* Creates a new QueryShardContext.
* <p>
* Passing a {@code null} {@link IndexSearcher} will return a valid context, however it won't be able to make
* {@link IndexReader}-specific optimizations, such as rewriting containing range queries.
*/
public QueryShardContext newQueryShardContext(
int shardId,
IndexSearcher searcher,
LongSupplier nowInMillis,
String clusterAlias,
boolean validate
) {
return newQueryShardContext(shardId, searcher, nowInMillis, clusterAlias, validate, false);
}

/**
* The {@link ThreadPool} to use for this index.
*/
Expand Down
8 changes: 4 additions & 4 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@
import org.opensearch.search.aggregations.support.AggregationUsageService;
import org.opensearch.search.backpressure.SearchBackpressureService;
import org.opensearch.search.backpressure.settings.SearchBackpressureSettings;
import org.opensearch.search.deciders.ConcurrentSearchDecider;
import org.opensearch.search.deciders.ConcurrentSearchRequestDecider;
import org.opensearch.search.fetch.FetchPhase;
import org.opensearch.search.pipeline.SearchPipelineService;
import org.opensearch.search.query.QueryPhase;
Expand Down Expand Up @@ -1344,7 +1344,7 @@ protected Node(
circuitBreakerService,
searchModule.getIndexSearcherExecutor(threadPool),
taskResourceTrackingService,
searchModule.getConcurrentSearchDeciders()
searchModule.getConcurrentSearchRequestDeciderFactories()
);

final List<PersistentTasksExecutor<?>> tasksExecutors = pluginsService.filterPlugins(PersistentTaskPlugin.class)
Expand Down Expand Up @@ -2004,7 +2004,7 @@ protected SearchService newSearchService(
CircuitBreakerService circuitBreakerService,
Executor indexSearcherExecutor,
TaskResourceTrackingService taskResourceTrackingService,
Collection<ConcurrentSearchDecider> concurrentSearchDecidersList
Collection<ConcurrentSearchRequestDecider.Factory> concurrentSearchDeciderFactories
) {
return new SearchService(
clusterService,
Expand All @@ -2018,7 +2018,7 @@ protected SearchService newSearchService(
circuitBreakerService,
indexSearcherExecutor,
taskResourceTrackingService,
concurrentSearchDecidersList
concurrentSearchDeciderFactories
);
}

Expand Down
10 changes: 5 additions & 5 deletions server/src/main/java/org/opensearch/plugins/SearchPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
import org.opensearch.search.aggregations.pipeline.MovAvgPipelineAggregator;
import org.opensearch.search.aggregations.pipeline.PipelineAggregator;
import org.opensearch.search.aggregations.support.ValuesSourceRegistry;
import org.opensearch.search.deciders.ConcurrentSearchDecider;
import org.opensearch.search.deciders.ConcurrentSearchRequestDecider;
import org.opensearch.search.fetch.FetchSubPhase;
import org.opensearch.search.fetch.subphase.highlight.Highlighter;
import org.opensearch.search.query.QueryPhaseSearcher;
Expand Down Expand Up @@ -141,12 +141,12 @@ default Map<String, Highlighter> getHighlighters() {
}

/**
* Allows plugins to register custom decider for concurrent search
* @return A {@link ConcurrentSearchDecider}
* Allows plugins to register a factory to create custom decider for concurrent search
* @return A {@link ConcurrentSearchRequestDecider.Factory}
*/
@ExperimentalApi
default ConcurrentSearchDecider getConcurrentSearchDecider() {
return null;
default Optional<ConcurrentSearchRequestDecider.Factory> getConcurrentSearchRequestDeciderFactory() {
return Optional.empty();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ public void processResponse(final ClusterStateResponse state) {
statsRequest.clear()
.addMetric(NodesStatsRequest.Metric.FS.metricName())
.indices(new CommonStatsFlags(CommonStatsFlags.Flag.Store));
statsRequest.indices().setIncludeIndicesStatsByLevel(true);

client.admin().cluster().nodesStats(statsRequest, new RestResponseListener<NodesStatsResponse>(channel) {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@
import org.opensearch.search.aggregations.SearchContextAggregations;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.search.collapse.CollapseContext;
import org.opensearch.search.deciders.ConcurrentSearchDecider;
import org.opensearch.search.deciders.ConcurrentSearchDecision;
import org.opensearch.search.deciders.ConcurrentSearchRequestDecider;
import org.opensearch.search.deciders.ConcurrentSearchVisitor;
import org.opensearch.search.dfs.DfsSearchResult;
import org.opensearch.search.fetch.FetchPhase;
Expand Down Expand Up @@ -106,13 +106,14 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;

import static org.opensearch.search.SearchService.CARDINALITY_AGGREGATION_PRUNING_THRESHOLD;
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_MODE;
Expand All @@ -137,7 +138,7 @@ final class DefaultSearchContext extends SearchContext {
private final ShardSearchRequest request;
private final SearchShardTarget shardTarget;
private final LongSupplier relativeTimeSupplier;
private final Collection<ConcurrentSearchDecider> concurrentSearchDeciders;
private final Collection<ConcurrentSearchRequestDecider.Factory> concurrentSearchDeciderFactories;
private SearchType searchType;
private final BigArrays bigArrays;
private final IndexShard indexShard;
Expand Down Expand Up @@ -223,7 +224,7 @@ final class DefaultSearchContext extends SearchContext {
boolean validate,
Executor executor,
Function<SearchSourceBuilder, InternalAggregation.ReduceContextBuilder> requestToAggReduceContextBuilder,
Collection<ConcurrentSearchDecider> concurrentSearchDeciders
Collection<ConcurrentSearchRequestDecider.Factory> concurrentSearchDeciderFactories
) throws IOException {
this.readerContext = readerContext;
this.request = request;
Expand Down Expand Up @@ -267,7 +268,7 @@ final class DefaultSearchContext extends SearchContext {

this.maxAggRewriteFilters = evaluateFilterRewriteSetting();
this.cardinalityAggregationPruningThreshold = evaluateCardinalityAggregationPruningThreshold();
this.concurrentSearchDeciders = concurrentSearchDeciders;
this.concurrentSearchDeciderFactories = concurrentSearchDeciderFactories;
this.keywordIndexOrDocValuesEnabled = evaluateKeywordIndexOrDocValuesEnabled();
}

Expand Down Expand Up @@ -932,14 +933,21 @@ public boolean shouldUseConcurrentSearch() {

private boolean evaluateAutoMode() {

// filter out deciders that want to opt-out of decision-making
final Set<ConcurrentSearchDecider> filteredDeciders = concurrentSearchDeciders.stream()
.filter(concurrentSearchDecider -> concurrentSearchDecider.canEvaluateForIndex(indexService.getIndexSettings()))
.collect(Collectors.toSet());
final Set<ConcurrentSearchRequestDecider> concurrentSearchRequestDeciders = new HashSet<>();

// create the ConcurrentSearchRequestDeciders using registered factories
for (ConcurrentSearchRequestDecider.Factory deciderFactory : concurrentSearchDeciderFactories) {
final Optional<ConcurrentSearchRequestDecider> concurrentSearchRequestDecider = deciderFactory.create(
indexService.getIndexSettings()
);
concurrentSearchRequestDecider.ifPresent(concurrentSearchRequestDeciders::add);

}

// evaluate based on concurrent search query visitor
if (filteredDeciders.size() > 0) {
if (concurrentSearchRequestDeciders.size() > 0) {
ConcurrentSearchVisitor concurrentSearchVisitor = new ConcurrentSearchVisitor(
filteredDeciders,
concurrentSearchRequestDeciders,
indexService.getIndexSettings()
);
if (request().source() != null && request().source().query() != null) {
Expand All @@ -949,7 +957,7 @@ private boolean evaluateAutoMode() {
}

final List<ConcurrentSearchDecision> decisions = new ArrayList<>();
for (ConcurrentSearchDecider decider : filteredDeciders) {
for (ConcurrentSearchRequestDecider decider : concurrentSearchRequestDeciders) {
ConcurrentSearchDecision decision = decider.getConcurrentSearchDecision();
if (decision != null) {
if (logger.isDebugEnabled()) {
Expand Down
Loading

0 comments on commit 3c6a7d7

Please sign in to comment.