Skip to content

Commit

Permalink
enable term check selectively for transport actions based on code audit
Browse files Browse the repository at this point in the history
Signed-off-by: Rajiv Kumar Vaidyanathan <rajivkv@amazon.com>
  • Loading branch information
rajiv-kv committed Jul 22, 2024
1 parent c4f95de commit 92463f4
Show file tree
Hide file tree
Showing 20 changed files with 315 additions and 60 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Refactor remote-routing-table service inline with remote state interfaces([#14668](https://github.com/opensearch-project/OpenSearch/pull/14668))
- Add prefix mode verification setting for repository verification (([#14790](https://github.com/opensearch-project/OpenSearch/pull/14790)))
- Optimize TransportNodesAction to not send DiscoveryNodes for NodeStats, NodesInfo and ClusterStats call ([14749](https://github.com/opensearch-project/OpenSearch/pull/14749))
- Enabling term version check on local state for all ClusterManager Read Transport Actions ([#14273](https://github.com/opensearch-project/OpenSearch/pull/14273))

### Dependencies
- Bump `org.gradle.test-retry` from 1.5.8 to 1.5.9 ([#13442](https://github.com/opensearch-project/OpenSearch/pull/13442))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@
import org.opensearch.action.search.SearchTransportService;
import org.opensearch.action.search.SearchType;
import org.opensearch.action.support.clustermanager.term.GetTermVersionAction;
import org.opensearch.action.support.clustermanager.term.GetTermVersionResponse;
import org.opensearch.action.support.clustermanager.term.GetTermVersionRequest;
import org.opensearch.action.support.replication.TransportReplicationActionTests;
import org.opensearch.action.termvectors.MultiTermVectorsAction;
import org.opensearch.action.termvectors.MultiTermVectorsRequest;
Expand All @@ -95,8 +95,6 @@
import org.opensearch.action.update.UpdateRequest;
import org.opensearch.action.update.UpdateResponse;
import org.opensearch.client.Requests;
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.coordination.ClusterStateTermVersion;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.ThreadContext;
Expand Down Expand Up @@ -128,7 +126,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
Expand Down Expand Up @@ -551,14 +548,14 @@ public void testDeleteIndex() {
}

public void testGetMappings() {
interceptTransportActions(GetMappingsAction.NAME);
stubClusterTermResponse(internalCluster().getClusterManagerName());

interceptTransportActions(GetTermVersionAction.NAME, GetMappingsAction.NAME);
GetMappingsRequest getMappingsRequest = new GetMappingsRequest().indices(randomIndicesOrAliases());
internalCluster().coordOnlyNodeClient().admin().indices().getMappings(getMappingsRequest).actionGet();

clearInterceptedActions();
assertSameIndices(getMappingsRequest, GetMappingsAction.NAME);

assertActionInvocation(GetTermVersionAction.NAME, GetTermVersionRequest.class);
assertNoActionInvocation(GetMappingsAction.NAME);
}

public void testPutMapping() {
Expand All @@ -574,7 +571,6 @@ public void testPutMapping() {
public void testGetSettings() {

interceptTransportActions(GetSettingsAction.NAME);
stubClusterTermResponse(internalCluster().getClusterManagerName());
GetSettingsRequest getSettingsRequest = new GetSettingsRequest().indices(randomIndicesOrAliases());
internalCluster().coordOnlyNodeClient().admin().indices().getSettings(getSettingsRequest).actionGet();

Expand Down Expand Up @@ -670,6 +666,21 @@ private static void assertSameIndices(IndicesRequest originalRequest, boolean op
}
}

private static void assertActionInvocation(String action, Class<? extends TransportRequest> requestClass) {
List<TransportRequest> requests = consumeTransportRequests(action);
assertFalse(requests.isEmpty());
for (TransportRequest internalRequest : requests) {
assertTrue(internalRequest.getClass() == requestClass);
}
}

private static void assertNoActionInvocation(String... actions) {
for (String action : actions) {
List<TransportRequest> requests = consumeTransportRequests(action);
assertTrue(requests.isEmpty());
}
}

private static void assertIndicesSubset(List<String> indices, String... actions) {
// indices returned by each bulk shard request need to be a subset of the original indices
for (String action : actions) {
Expand Down Expand Up @@ -789,8 +800,6 @@ public List<TransportInterceptor> getTransportInterceptors(
}

private final Set<String> actions = new HashSet<>();
private final Map<String, TransportRequestHandler> stubHandlers = new ConcurrentHashMap<>();

private final Map<String, List<TransportRequest>> requests = new HashMap<>();

@Override
Expand All @@ -813,11 +822,6 @@ synchronized void interceptTransportActions(String... actions) {

synchronized void clearInterceptedActions() {
actions.clear();
stubHandlers.clear();
}

synchronized void stub(String action, TransportRequestHandler handler) {
stubHandlers.put(action, handler);
}

private class InterceptingRequestHandler<T extends TransportRequest> implements TransportRequestHandler<T> {
Expand All @@ -844,25 +848,9 @@ public void messageReceived(T request, TransportChannel channel, Task task) thro
}
}
}
if (!stubHandlers.containsKey(action)) {
requestHandler.messageReceived(request, channel, task);
} else {
stubHandlers.get(action).messageReceived(request, channel, task);
}
requestHandler.messageReceived(request, channel, task);

}
}
}

private void stubClusterTermResponse(String master) {
PluginsService pluginsService = internalCluster().getInstance(PluginsService.class, master);
pluginsService.filterPlugins(InterceptingTransportService.TestPlugin.class).stream().findFirst().get().instance.stub(
GetTermVersionAction.NAME,
(request, channel, task) -> channel.sendResponse(
new GetTermVersionResponse(new ClusterStateTermVersion(new ClusterName("test"), "1", -1, -1))
)
);

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
import org.opensearch.action.support.clustermanager.term.GetTermVersionAction;
import org.opensearch.action.support.clustermanager.term.GetTermVersionResponse;
import org.opensearch.client.node.NodeClient;
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.coordination.ClusterStateTermVersion;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
Expand Down Expand Up @@ -92,15 +92,32 @@ public void init() {

// Enable admission control
client().admin().cluster().prepareUpdateSettings().setTransientSettings(ENFORCE_ADMISSION_CONTROL).execute().actionGet();
MockTransportService primaryService = (MockTransportService) internalCluster().getInstance(
TransportService.class,
clusterManagerNode
);

// Force always fetch from ClusterManager
ClusterService clusterService = internalCluster().clusterService();
GetTermVersionResponse oosTerm = new GetTermVersionResponse(
new ClusterStateTermVersion(
clusterService.state().getClusterName(),
clusterService.state().metadata().clusterUUID(),
clusterService.state().term() - 1,
clusterService.state().version() - 1
)
);
primaryService.addRequestHandlingBehavior(
GetTermVersionAction.NAME,
(handler, request, channel, task) -> channel.sendResponse(oosTerm)
);
}

public void testAdmissionControlEnforced() throws Exception {
stubClusterTermResponse(internalCluster().getClusterManagerName());
cMResourceCollector.collectNodeResourceUsageStats(clusterManagerNodeId, System.currentTimeMillis(), 97, 99, new IoUsageStats(98));

// Write API on ClusterManager
assertAcked(prepareCreate("test").setMapping("field", "type=text").setAliases("{\"alias1\" : {}}"));
stubClusterTermResponse(internalCluster().getClusterManagerName());
// Read API on ClusterManager

GetAliasesRequest aliasesRequest = new GetAliasesRequest();
Expand Down Expand Up @@ -171,7 +188,6 @@ public void admissionControlDisabledOnBreach(Settings admission) throws Interrup
}

public void testAdmissionControlResponseStatus() throws Exception {
stubClusterTermResponse(internalCluster().getClusterManagerName());
cMResourceCollector.collectNodeResourceUsageStats(clusterManagerNodeId, System.currentTimeMillis(), 97, 99, new IoUsageStats(98));

// Write API on ClusterManager
Expand Down Expand Up @@ -211,12 +227,4 @@ Map<String, AdmissionControllerStats> getAdmissionControlStats(AdmissionControlS
}
return acStats;
}

private void stubClusterTermResponse(String master) {
MockTransportService primaryService = (MockTransportService) internalCluster().getInstance(TransportService.class, master);
primaryService.addRequestHandlingBehavior(GetTermVersionAction.NAME, (handler, request, channel, task) -> {
channel.sendResponse(new GetTermVersionResponse(new ClusterStateTermVersion(new ClusterName("test"), "1", -1, -1)));
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ public TransportGetDecommissionStateAction(
threadPool,
actionFilters,
GetDecommissionStateRequest::new,
indexNameExpressionResolver
indexNameExpressionResolver,
true
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ public TransportGetRepositoriesAction(
threadPool,
actionFilters,
GetRepositoriesRequest::new,
indexNameExpressionResolver
indexNameExpressionResolver,
true
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ public TransportClusterSearchShardsAction(
threadPool,
actionFilters,
ClusterSearchShardsRequest::new,
indexNameExpressionResolver
indexNameExpressionResolver,
true
);
this.indicesService = indicesService;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ public TransportGetWeightedRoutingAction(
threadPool,
actionFilters,
ClusterGetWeightedRoutingRequest::new,
indexNameExpressionResolver
indexNameExpressionResolver,
true
);
this.weightedRoutingService = weightedRoutingService;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ public TransportClusterStateAction(
ClusterStateRequest::new,
indexNameExpressionResolver
);
this.localExecuteSupported = true;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ public TransportGetStoredScriptAction(
threadPool,
actionFilters,
GetStoredScriptRequest::new,
indexNameExpressionResolver
indexNameExpressionResolver,
true
);
this.scriptService = scriptService;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ public TransportGetAliasesAction(
threadPool,
actionFilters,
GetAliasesRequest::new,
indexNameExpressionResolver
indexNameExpressionResolver,
true
);
this.systemIndices = systemIndices;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ public TransportIndicesExistsAction(
threadPool,
actionFilters,
IndicesExistsRequest::new,
indexNameExpressionResolver
indexNameExpressionResolver,
true
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ public TransportIndicesShardStoresAction(
threadPool,
actionFilters,
IndicesShardStoresRequest::new,
indexNameExpressionResolver
indexNameExpressionResolver,
true
);
this.listShardStoresInfo = listShardStoresInfo;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ public TransportGetComponentTemplateAction(
threadPool,
actionFilters,
GetComponentTemplateAction.Request::new,
indexNameExpressionResolver
indexNameExpressionResolver,
true
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ public TransportGetComposableIndexTemplateAction(
threadPool,
actionFilters,
GetComposableIndexTemplateAction.Request::new,
indexNameExpressionResolver
indexNameExpressionResolver,
true
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ public TransportGetIndexTemplatesAction(
threadPool,
actionFilters,
GetIndexTemplatesRequest::new,
indexNameExpressionResolver
indexNameExpressionResolver,
true
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ public GetPipelineTransportAction(
threadPool,
actionFilters,
GetPipelineRequest::new,
indexNameExpressionResolver
indexNameExpressionResolver,
true
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ public GetSearchPipelineTransportAction(
threadPool,
actionFilters,
GetSearchPipelineRequest::new,
indexNameExpressionResolver
indexNameExpressionResolver,
true
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,17 @@ public abstract class TransportClusterManagerNodeReadAction<
Request extends ClusterManagerNodeReadRequest<Request>,
Response extends ActionResponse> extends TransportClusterManagerNodeAction<Request, Response> {

protected boolean localExecuteSupported = false;

protected TransportClusterManagerNodeReadAction(
String actionName,
TransportService transportService,
ClusterService clusterService,
ThreadPool threadPool,
ActionFilters actionFilters,
Writeable.Reader<Request> request,
IndexNameExpressionResolver indexNameExpressionResolver
IndexNameExpressionResolver indexNameExpressionResolver,
boolean localExecuteSupported
) {
this(
actionName,
Expand All @@ -71,6 +74,19 @@ protected TransportClusterManagerNodeReadAction(
request,
indexNameExpressionResolver
);
this.localExecuteSupported = localExecuteSupported;
}

protected TransportClusterManagerNodeReadAction(
String actionName,
TransportService transportService,
ClusterService clusterService,
ThreadPool threadPool,
ActionFilters actionFilters,
Writeable.Reader<Request> request,
IndexNameExpressionResolver indexNameExpressionResolver
) {
this(actionName, transportService, clusterService, threadPool, actionFilters, request, indexNameExpressionResolver, false);
}

protected TransportClusterManagerNodeReadAction(
Expand Down Expand Up @@ -126,7 +142,7 @@ protected final boolean localExecute(Request request) {
}

protected boolean localExecuteSupportedByAction() {
return true;
return localExecuteSupported;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public TransportClusterInfoAction(
IndexNameExpressionResolver indexNameExpressionResolver
) {
super(actionName, transportService, clusterService, threadPool, actionFilters, request, indexNameExpressionResolver);
this.localExecuteSupported = true;
}

@Override
Expand Down
Loading

0 comments on commit 92463f4

Please sign in to comment.