Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enabling term version check on local state for all ClusterManager Read Transport Actions #14273

Merged
merged 2 commits into from
Jul 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Refactor remote-routing-table service inline with remote state interfaces([#14668](https://github.com/opensearch-project/OpenSearch/pull/14668))
- Add prefix mode verification setting for repository verification (([#14790](https://github.com/opensearch-project/OpenSearch/pull/14790)))
- Optimize TransportNodesAction to not send DiscoveryNodes for NodeStats, NodesInfo and ClusterStats call ([14749](https://github.com/opensearch-project/OpenSearch/pull/14749))
- Enabling term version check on local state for all ClusterManager Read Transport Actions ([#14273](https://github.com/opensearch-project/OpenSearch/pull/14273))

### Dependencies
- Bump `org.gradle.test-retry` from 1.5.8 to 1.5.9 ([#13442](https://github.com/opensearch-project/OpenSearch/pull/13442))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.search.SearchTransportService;
import org.opensearch.action.search.SearchType;
import org.opensearch.action.support.clustermanager.term.GetTermVersionAction;
import org.opensearch.action.support.clustermanager.term.GetTermVersionRequest;
import org.opensearch.action.support.replication.TransportReplicationActionTests;
import org.opensearch.action.termvectors.MultiTermVectorsAction;
import org.opensearch.action.termvectors.MultiTermVectorsRequest;
Expand Down Expand Up @@ -195,6 +197,7 @@ public void cleanUp() {
}

public void testGetFieldMappings() {

String getFieldMappingsShardAction = GetFieldMappingsAction.NAME + "[index][s]";
interceptTransportActions(getFieldMappingsShardAction);

Expand Down Expand Up @@ -545,13 +548,14 @@ public void testDeleteIndex() {
}

public void testGetMappings() {
interceptTransportActions(GetMappingsAction.NAME);

interceptTransportActions(GetTermVersionAction.NAME, GetMappingsAction.NAME);
GetMappingsRequest getMappingsRequest = new GetMappingsRequest().indices(randomIndicesOrAliases());
internalCluster().coordOnlyNodeClient().admin().indices().getMappings(getMappingsRequest).actionGet();

clearInterceptedActions();
assertSameIndices(getMappingsRequest, GetMappingsAction.NAME);

assertActionInvocation(GetTermVersionAction.NAME, GetTermVersionRequest.class);
assertNoActionInvocation(GetMappingsAction.NAME);
}

public void testPutMapping() {
Expand All @@ -565,8 +569,8 @@ public void testPutMapping() {
}

public void testGetSettings() {
interceptTransportActions(GetSettingsAction.NAME);

interceptTransportActions(GetSettingsAction.NAME);
GetSettingsRequest getSettingsRequest = new GetSettingsRequest().indices(randomIndicesOrAliases());
internalCluster().coordOnlyNodeClient().admin().indices().getSettings(getSettingsRequest).actionGet();

Expand Down Expand Up @@ -662,6 +666,21 @@ private static void assertSameIndices(IndicesRequest originalRequest, boolean op
}
}

private static void assertActionInvocation(String action, Class<? extends TransportRequest> requestClass) {
List<TransportRequest> requests = consumeTransportRequests(action);
assertFalse(requests.isEmpty());
for (TransportRequest internalRequest : requests) {
assertTrue(internalRequest.getClass() == requestClass);
}
}

private static void assertNoActionInvocation(String... actions) {
for (String action : actions) {
List<TransportRequest> requests = consumeTransportRequests(action);
assertTrue(requests.isEmpty());
}
}

private static void assertIndicesSubset(List<String> indices, String... actions) {
// indices returned by each bulk shard request need to be a subset of the original indices
for (String action : actions) {
Expand Down Expand Up @@ -781,7 +800,6 @@ public List<TransportInterceptor> getTransportInterceptors(
}

private final Set<String> actions = new HashSet<>();

private final Map<String, List<TransportRequest>> requests = new HashMap<>();

@Override
Expand Down Expand Up @@ -831,6 +849,7 @@ public void messageReceived(T request, TransportChannel channel, Task task) thro
}
}
requestHandler.messageReceived(request, channel, task);

}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,19 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.action.admin.indices.alias.get.GetAliasesRequest;
import org.opensearch.action.admin.indices.alias.get.GetAliasesResponse;
import org.opensearch.action.support.clustermanager.term.GetTermVersionAction;
import org.opensearch.action.support.clustermanager.term.GetTermVersionResponse;
import org.opensearch.client.node.NodeClient;
import org.opensearch.cluster.coordination.ClusterStateTermVersion;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.node.IoUsageStats;
import org.opensearch.node.ResourceUsageCollectorService;
import org.opensearch.node.resource.tracker.ResourceTrackerSettings;
import org.opensearch.plugins.Plugin;
import org.opensearch.ratelimitting.admissioncontrol.controllers.CpuBasedAdmissionController;
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType;
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode;
Expand All @@ -29,9 +34,13 @@
import org.opensearch.rest.action.admin.indices.RestGetAliasesAction;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.rest.FakeRestRequest;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.transport.TransportService;
import org.junit.Before;

import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -62,6 +71,10 @@ public class AdmissionForClusterManagerIT extends OpenSearchIntegTestCase {
.put(CLUSTER_ADMIN_CPU_USAGE_LIMIT.getKey(), 50)
.build();

protected Collection<Class<? extends Plugin>> nodePlugins() {
return List.of(MockTransportService.TestPlugin.class);
}

@Before
public void init() {
String clusterManagerNode = internalCluster().startClusterManagerOnlyNode(
Expand All @@ -79,15 +92,34 @@ public void init() {

// Enable admission control
client().admin().cluster().prepareUpdateSettings().setTransientSettings(ENFORCE_ADMISSION_CONTROL).execute().actionGet();
MockTransportService primaryService = (MockTransportService) internalCluster().getInstance(
TransportService.class,
clusterManagerNode
);

// Force always fetch from ClusterManager
ClusterService clusterService = internalCluster().clusterService();
GetTermVersionResponse oosTerm = new GetTermVersionResponse(
new ClusterStateTermVersion(
clusterService.state().getClusterName(),
clusterService.state().metadata().clusterUUID(),
clusterService.state().term() - 1,
clusterService.state().version() - 1
)
);
primaryService.addRequestHandlingBehavior(
GetTermVersionAction.NAME,
(handler, request, channel, task) -> channel.sendResponse(oosTerm)
);
}

public void testAdmissionControlEnforced() throws Exception {
cMResourceCollector.collectNodeResourceUsageStats(clusterManagerNodeId, System.currentTimeMillis(), 97, 99, new IoUsageStats(98));

// Write API on ClusterManager
assertAcked(prepareCreate("test").setMapping("field", "type=text").setAliases("{\"alias1\" : {}}"));

// Read API on ClusterManager

GetAliasesRequest aliasesRequest = new GetAliasesRequest();
aliasesRequest.aliases("alias1");
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ public TransportGetDecommissionStateAction(
threadPool,
actionFilters,
GetDecommissionStateRequest::new,
indexNameExpressionResolver
indexNameExpressionResolver,
true
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -534,4 +534,9 @@ private ClusterHealthResponse clusterHealth(
pendingTaskTimeInQueue
);
}

@Override
protected boolean localExecuteSupportedByAction() {
shwetathareja marked this conversation as resolved.
Show resolved Hide resolved
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ public TransportGetRepositoriesAction(
threadPool,
actionFilters,
GetRepositoriesRequest::new,
indexNameExpressionResolver
indexNameExpressionResolver,
true
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ public TransportClusterSearchShardsAction(
threadPool,
actionFilters,
ClusterSearchShardsRequest::new,
indexNameExpressionResolver
indexNameExpressionResolver,
true
);
this.indicesService = indicesService;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ public TransportGetWeightedRoutingAction(
threadPool,
actionFilters,
ClusterGetWeightedRoutingRequest::new,
indexNameExpressionResolver
indexNameExpressionResolver,
true
);
this.weightedRoutingService = weightedRoutingService;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ public TransportClusterStateAction(
ClusterStateRequest::new,
indexNameExpressionResolver
);
this.localExecuteSupported = true;
}

@Override
Expand Down Expand Up @@ -233,9 +234,4 @@ private ClusterStateResponse buildResponse(final ClusterStateRequest request, fi

return new ClusterStateResponse(currentState.getClusterName(), builder.build(), false);
}

@Override
protected boolean localExecuteSupportedByAction() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ public TransportGetStoredScriptAction(
threadPool,
actionFilters,
GetStoredScriptRequest::new,
indexNameExpressionResolver
indexNameExpressionResolver,
true
);
this.scriptService = scriptService;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,4 +110,9 @@
logger.trace("done fetching pending tasks from cluster service");
listener.onResponse(new PendingClusterTasksResponse(pendingTasks));
}

@Override
protected boolean localExecuteSupportedByAction() {
return false;

Check warning on line 116 in server/src/main/java/org/opensearch/action/admin/cluster/tasks/TransportPendingClusterTasksAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/cluster/tasks/TransportPendingClusterTasksAction.java#L116

Added line #L116 was not covered by tests
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ public TransportGetAliasesAction(
threadPool,
actionFilters,
GetAliasesRequest::new,
indexNameExpressionResolver
indexNameExpressionResolver,
true
);
this.systemIndices = systemIndices;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ public TransportIndicesExistsAction(
threadPool,
actionFilters,
IndicesExistsRequest::new,
indexNameExpressionResolver
indexNameExpressionResolver,
true
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ public TransportIndicesShardStoresAction(
threadPool,
actionFilters,
IndicesShardStoresRequest::new,
indexNameExpressionResolver
indexNameExpressionResolver,
true
);
this.listShardStoresInfo = listShardStoresInfo;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ public TransportGetComponentTemplateAction(
threadPool,
actionFilters,
GetComponentTemplateAction.Request::new,
indexNameExpressionResolver
indexNameExpressionResolver,
true
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ public TransportGetComposableIndexTemplateAction(
threadPool,
actionFilters,
GetComposableIndexTemplateAction.Request::new,
indexNameExpressionResolver
indexNameExpressionResolver,
true
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ public TransportGetIndexTemplatesAction(
threadPool,
actionFilters,
GetIndexTemplatesRequest::new,
indexNameExpressionResolver
indexNameExpressionResolver,
true
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ public GetPipelineTransportAction(
threadPool,
actionFilters,
GetPipelineRequest::new,
indexNameExpressionResolver
indexNameExpressionResolver,
true
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ public GetSearchPipelineTransportAction(
threadPool,
actionFilters,
GetSearchPipelineRequest::new,
indexNameExpressionResolver
indexNameExpressionResolver,
true
);
}

Expand Down
Loading
Loading