Skip to content

Commit

Permalink
Enabling term version check on local state for all ClusterManager Rea…
Browse files Browse the repository at this point in the history
…d Transport Actions (#14273) (#14869)

* enabling term version check on local state for all admin read actions

Signed-off-by: Rajiv Kumar Vaidyanathan <rajivkv@amazon.com>
Signed-off-by: Daniel (dB.) Doubrovkine <dblock@amazon.com>
Co-authored-by: Daniel (dB.) Doubrovkine <dblock@amazon.com>
  • Loading branch information
rajiv-kv and dblock authored Jul 23, 2024
1 parent 788a7fe commit 8b10215
Show file tree
Hide file tree
Showing 23 changed files with 382 additions and 48 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Optimize TransportNodesAction to not send DiscoveryNodes for NodeStats, NodesInfo and ClusterStats call ([14749](https://github.com/opensearch-project/OpenSearch/pull/14749))
- Refactor remote-routing-table service inline with remote state interfaces([#14668](https://github.com/opensearch-project/OpenSearch/pull/14668))
- Reduce logging in DEBUG for MasterService:run ([#14795](https://github.com/opensearch-project/OpenSearch/pull/14795))
- Enabling term version check on local state for all ClusterManager Read Transport Actions ([#14273](https://github.com/opensearch-project/OpenSearch/pull/14273))

### Dependencies
- Update to Apache Lucene 9.11.1 ([#14042](https://github.com/opensearch-project/OpenSearch/pull/14042), [#14576](https://github.com/opensearch-project/OpenSearch/pull/14576))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.search.SearchTransportService;
import org.opensearch.action.search.SearchType;
import org.opensearch.action.support.clustermanager.term.GetTermVersionAction;
import org.opensearch.action.support.clustermanager.term.GetTermVersionRequest;
import org.opensearch.action.support.replication.TransportReplicationActionTests;
import org.opensearch.action.termvectors.MultiTermVectorsAction;
import org.opensearch.action.termvectors.MultiTermVectorsRequest;
Expand Down Expand Up @@ -195,6 +197,7 @@ public void cleanUp() {
}

public void testGetFieldMappings() {

String getFieldMappingsShardAction = GetFieldMappingsAction.NAME + "[index][s]";
interceptTransportActions(getFieldMappingsShardAction);

Expand Down Expand Up @@ -545,13 +548,14 @@ public void testDeleteIndex() {
}

public void testGetMappings() {
interceptTransportActions(GetMappingsAction.NAME);

interceptTransportActions(GetTermVersionAction.NAME, GetMappingsAction.NAME);
GetMappingsRequest getMappingsRequest = new GetMappingsRequest().indices(randomIndicesOrAliases());
internalCluster().coordOnlyNodeClient().admin().indices().getMappings(getMappingsRequest).actionGet();

clearInterceptedActions();
assertSameIndices(getMappingsRequest, GetMappingsAction.NAME);

assertActionInvocation(GetTermVersionAction.NAME, GetTermVersionRequest.class);
assertNoActionInvocation(GetMappingsAction.NAME);
}

public void testPutMapping() {
Expand All @@ -565,8 +569,8 @@ public void testPutMapping() {
}

public void testGetSettings() {
interceptTransportActions(GetSettingsAction.NAME);

interceptTransportActions(GetSettingsAction.NAME);
GetSettingsRequest getSettingsRequest = new GetSettingsRequest().indices(randomIndicesOrAliases());
internalCluster().coordOnlyNodeClient().admin().indices().getSettings(getSettingsRequest).actionGet();

Expand Down Expand Up @@ -662,6 +666,21 @@ private static void assertSameIndices(IndicesRequest originalRequest, boolean op
}
}

private static void assertActionInvocation(String action, Class<? extends TransportRequest> requestClass) {
List<TransportRequest> requests = consumeTransportRequests(action);
assertFalse(requests.isEmpty());
for (TransportRequest internalRequest : requests) {
assertTrue(internalRequest.getClass() == requestClass);
}
}

private static void assertNoActionInvocation(String... actions) {
for (String action : actions) {
List<TransportRequest> requests = consumeTransportRequests(action);
assertTrue(requests.isEmpty());
}
}

private static void assertIndicesSubset(List<String> indices, String... actions) {
// indices returned by each bulk shard request need to be a subset of the original indices
for (String action : actions) {
Expand Down Expand Up @@ -781,7 +800,6 @@ public List<TransportInterceptor> getTransportInterceptors(
}

private final Set<String> actions = new HashSet<>();

private final Map<String, List<TransportRequest>> requests = new HashMap<>();

@Override
Expand Down Expand Up @@ -831,6 +849,7 @@ public void messageReceived(T request, TransportChannel channel, Task task) thro
}
}
requestHandler.messageReceived(request, channel, task);

}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,19 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.action.admin.indices.alias.get.GetAliasesRequest;
import org.opensearch.action.admin.indices.alias.get.GetAliasesResponse;
import org.opensearch.action.support.clustermanager.term.GetTermVersionAction;
import org.opensearch.action.support.clustermanager.term.GetTermVersionResponse;
import org.opensearch.client.node.NodeClient;
import org.opensearch.cluster.coordination.ClusterStateTermVersion;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.node.IoUsageStats;
import org.opensearch.node.ResourceUsageCollectorService;
import org.opensearch.node.resource.tracker.ResourceTrackerSettings;
import org.opensearch.plugins.Plugin;
import org.opensearch.ratelimitting.admissioncontrol.controllers.CpuBasedAdmissionController;
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType;
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode;
Expand All @@ -29,9 +34,13 @@
import org.opensearch.rest.action.admin.indices.RestGetAliasesAction;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.rest.FakeRestRequest;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.transport.TransportService;
import org.junit.Before;

import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -62,6 +71,10 @@ public class AdmissionForClusterManagerIT extends OpenSearchIntegTestCase {
.put(CLUSTER_ADMIN_CPU_USAGE_LIMIT.getKey(), 50)
.build();

protected Collection<Class<? extends Plugin>> nodePlugins() {
return List.of(MockTransportService.TestPlugin.class);
}

@Before
public void init() {
String clusterManagerNode = internalCluster().startClusterManagerOnlyNode(
Expand All @@ -79,15 +92,34 @@ public void init() {

// Enable admission control
client().admin().cluster().prepareUpdateSettings().setTransientSettings(ENFORCE_ADMISSION_CONTROL).execute().actionGet();
MockTransportService primaryService = (MockTransportService) internalCluster().getInstance(
TransportService.class,
clusterManagerNode
);

// Force always fetch from ClusterManager
ClusterService clusterService = internalCluster().clusterService();
GetTermVersionResponse oosTerm = new GetTermVersionResponse(
new ClusterStateTermVersion(
clusterService.state().getClusterName(),
clusterService.state().metadata().clusterUUID(),
clusterService.state().term() - 1,
clusterService.state().version() - 1
)
);
primaryService.addRequestHandlingBehavior(
GetTermVersionAction.NAME,
(handler, request, channel, task) -> channel.sendResponse(oosTerm)
);
}

public void testAdmissionControlEnforced() throws Exception {
cMResourceCollector.collectNodeResourceUsageStats(clusterManagerNodeId, System.currentTimeMillis(), 97, 99, new IoUsageStats(98));

// Write API on ClusterManager
assertAcked(prepareCreate("test").setMapping("field", "type=text").setAliases("{\"alias1\" : {}}"));

// Read API on ClusterManager

GetAliasesRequest aliasesRequest = new GetAliasesRequest();
aliasesRequest.aliases("alias1");
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ public TransportGetDecommissionStateAction(
threadPool,
actionFilters,
GetDecommissionStateRequest::new,
indexNameExpressionResolver
indexNameExpressionResolver,
true
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -534,4 +534,9 @@ private ClusterHealthResponse clusterHealth(
pendingTaskTimeInQueue
);
}

@Override
protected boolean localExecuteSupportedByAction() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ public TransportGetRepositoriesAction(
threadPool,
actionFilters,
GetRepositoriesRequest::new,
indexNameExpressionResolver
indexNameExpressionResolver,
true
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ public TransportClusterSearchShardsAction(
threadPool,
actionFilters,
ClusterSearchShardsRequest::new,
indexNameExpressionResolver
indexNameExpressionResolver,
true
);
this.indicesService = indicesService;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ public TransportGetWeightedRoutingAction(
threadPool,
actionFilters,
ClusterGetWeightedRoutingRequest::new,
indexNameExpressionResolver
indexNameExpressionResolver,
true
);
this.weightedRoutingService = weightedRoutingService;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ public TransportClusterStateAction(
ClusterStateRequest::new,
indexNameExpressionResolver
);
this.localExecuteSupported = true;
}

@Override
Expand Down Expand Up @@ -233,9 +234,4 @@ private ClusterStateResponse buildResponse(final ClusterStateRequest request, fi

return new ClusterStateResponse(currentState.getClusterName(), builder.build(), false);
}

@Override
protected boolean localExecuteSupportedByAction() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ public TransportGetStoredScriptAction(
threadPool,
actionFilters,
GetStoredScriptRequest::new,
indexNameExpressionResolver
indexNameExpressionResolver,
true
);
this.scriptService = scriptService;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,4 +110,9 @@ protected void clusterManagerOperation(
logger.trace("done fetching pending tasks from cluster service");
listener.onResponse(new PendingClusterTasksResponse(pendingTasks));
}

@Override
protected boolean localExecuteSupportedByAction() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ public TransportGetAliasesAction(
threadPool,
actionFilters,
GetAliasesRequest::new,
indexNameExpressionResolver
indexNameExpressionResolver,
true
);
this.systemIndices = systemIndices;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ public TransportIndicesExistsAction(
threadPool,
actionFilters,
IndicesExistsRequest::new,
indexNameExpressionResolver
indexNameExpressionResolver,
true
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ public TransportIndicesShardStoresAction(
threadPool,
actionFilters,
IndicesShardStoresRequest::new,
indexNameExpressionResolver
indexNameExpressionResolver,
true
);
this.listShardStoresInfo = listShardStoresInfo;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ public TransportGetComponentTemplateAction(
threadPool,
actionFilters,
GetComponentTemplateAction.Request::new,
indexNameExpressionResolver
indexNameExpressionResolver,
true
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ public TransportGetComposableIndexTemplateAction(
threadPool,
actionFilters,
GetComposableIndexTemplateAction.Request::new,
indexNameExpressionResolver
indexNameExpressionResolver,
true
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ public TransportGetIndexTemplatesAction(
threadPool,
actionFilters,
GetIndexTemplatesRequest::new,
indexNameExpressionResolver
indexNameExpressionResolver,
true
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ public GetPipelineTransportAction(
threadPool,
actionFilters,
GetPipelineRequest::new,
indexNameExpressionResolver
indexNameExpressionResolver,
true
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ public GetSearchPipelineTransportAction(
threadPool,
actionFilters,
GetSearchPipelineRequest::new,
indexNameExpressionResolver
indexNameExpressionResolver,
true
);
}

Expand Down
Loading

0 comments on commit 8b10215

Please sign in to comment.