Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reset discovery nodes in most transport node actions request. (#15131) #15681

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
* threads for all nodes is used.
*/
public NodesHotThreadsRequest(String... nodesIds) {
super(nodesIds);
super(false, nodesIds);

Check warning on line 73 in server/src/main/java/org/opensearch/action/admin/cluster/node/hotthreads/NodesHotThreadsRequest.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/cluster/node/hotthreads/NodesHotThreadsRequest.java#L73

Added line #L73 was not covered by tests
}

public int threads() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public NodesInfoRequest(StreamInput in) throws IOException {
* for all nodes will be returned.
*/
public NodesInfoRequest(String... nodesIds) {
super(nodesIds);
super(false, nodesIds);
all();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public class NodesStatsRequest extends BaseNodesRequest<NodesStatsRequest> {
private final Set<String> requestedMetrics = new HashSet<>();

public NodesStatsRequest() {
super((String[]) null);
super(false, (String[]) null);
}

public NodesStatsRequest(StreamInput in) throws IOException {
Expand Down Expand Up @@ -90,7 +90,7 @@ public NodesStatsRequest(StreamInput in) throws IOException {
* for all nodes will be returned.
*/
public NodesStatsRequest(String... nodesIds) {
super(nodesIds);
super(false, nodesIds);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
* passed, usage for all nodes will be returned.
*/
public NodesUsageRequest(String... nodesIds) {
super(nodesIds);
super(false, nodesIds);

Check warning on line 67 in server/src/main/java/org/opensearch/action/admin/cluster/node/usage/NodesUsageRequest.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/cluster/node/usage/NodesUsageRequest.java#L67

Added line #L67 was not covered by tests
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@
}

public Request(String[] nodesIds) {
super(nodesIds);
super(false, nodesIds);

Check warning on line 164 in server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/TransportNodesSnapshotsStatus.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/TransportNodesSnapshotsStatus.java#L164

Added line #L164 was not covered by tests
}

public Request snapshots(Snapshot[] snapshots) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public ClusterStatsRequest(StreamInput in) throws IOException {
* based on all nodes will be returned.
*/
public ClusterStatsRequest(String... nodesIds) {
super(nodesIds);
super(false, nodesIds);
}

public boolean useAggregatedNodeLevelResponses() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
}

public FindDanglingIndexRequest(String indexUUID) {
super(Strings.EMPTY_ARRAY);
super(false, Strings.EMPTY_ARRAY);

Check warning on line 56 in server/src/main/java/org/opensearch/action/admin/indices/dangling/find/FindDanglingIndexRequest.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/indices/dangling/find/FindDanglingIndexRequest.java#L56

Added line #L56 was not covered by tests
this.indexUUID = indexUUID;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,12 @@
}

public ListDanglingIndicesRequest() {
super(Strings.EMPTY_ARRAY);
super(false, Strings.EMPTY_ARRAY);

Check warning on line 61 in server/src/main/java/org/opensearch/action/admin/indices/dangling/list/ListDanglingIndicesRequest.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/indices/dangling/list/ListDanglingIndicesRequest.java#L61

Added line #L61 was not covered by tests
this.indexUUID = null;
}

public ListDanglingIndicesRequest(String indexUUID) {
super(Strings.EMPTY_ARRAY);
super(false, Strings.EMPTY_ARRAY);

Check warning on line 66 in server/src/main/java/org/opensearch/action/admin/indices/dangling/list/ListDanglingIndicesRequest.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/indices/dangling/list/ListDanglingIndicesRequest.java#L66

Added line #L66 was not covered by tests
this.indexUUID = indexUUID;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class GetAllPitNodesRequest extends BaseNodesRequest<GetAllPitNodesReques

@Inject
public GetAllPitNodesRequest(DiscoveryNode... concreteNodes) {
super(concreteNodes);
super(false, concreteNodes);
}

public GetAllPitNodesRequest(StreamInput in) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public abstract class BaseNodesRequest<Request extends BaseNodesRequest<Request>
* Setting default behavior as `true` but can be explicitly changed in requests that do not require.
*/
private boolean includeDiscoveryNodes = true;

private final TimeValue DEFAULT_TIMEOUT_SECS = TimeValue.timeValueSeconds(30);

private TimeValue timeout;
Expand All @@ -88,11 +89,22 @@ protected BaseNodesRequest(String... nodesIds) {
this.nodesIds = nodesIds;
}

protected BaseNodesRequest(boolean includeDiscoveryNodes, String... nodesIds) {
this.nodesIds = nodesIds;
this.includeDiscoveryNodes = includeDiscoveryNodes;
}

protected BaseNodesRequest(DiscoveryNode... concreteNodes) {
this.nodesIds = null;
this.concreteNodes = concreteNodes;
}

protected BaseNodesRequest(boolean includeDiscoveryNodes, DiscoveryNode... concreteNodes) {
this.nodesIds = null;
this.concreteNodes = concreteNodes;
this.includeDiscoveryNodes = includeDiscoveryNodes;
}

public final String[] nodesIds() {
return nodesIds;
}
Expand Down Expand Up @@ -127,10 +139,6 @@ public void setConcreteNodes(DiscoveryNode[] concreteNodes) {
this.concreteNodes = concreteNodes;
}

public void setIncludeDiscoveryNodes(boolean value) {
includeDiscoveryNodes = value;
}

public boolean getIncludeDiscoveryNodes() {
return includeDiscoveryNodes;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,18 +240,16 @@ class AsyncAction {
}
this.responses = new AtomicReferenceArray<>(request.concreteNodes().length);
this.concreteNodes = request.concreteNodes();

if (request.getIncludeDiscoveryNodes() == false) {
// As we transfer the ownership of discovery nodes to route the request to into the AsyncAction class, we
// remove the list of DiscoveryNodes from the request. This reduces the payload of the request and improves
// As we transfer the ownership of discovery nodes to route the request to into the AsyncAction class,
// we remove the list of DiscoveryNodes from the request. This reduces the payload of the request and improves
// the number of concrete nodes in the memory.
request.setConcreteNodes(null);
}
}

void start() {
final DiscoveryNode[] nodes = this.concreteNodes;
if (nodes.length == 0) {
if (this.concreteNodes.length == 0) {
// nothing to notify
threadPool.generic().execute(() -> listener.onResponse(newResponse(request, responses)));
return;
Expand All @@ -260,9 +258,9 @@ void start() {
if (request.timeout() != null) {
builder.withTimeout(request.timeout());
}
for (int i = 0; i < nodes.length; i++) {
for (int i = 0; i < this.concreteNodes.length; i++) {
final int idx = i;
final DiscoveryNode node = nodes[i];
final DiscoveryNode node = this.concreteNodes[i];
final String nodeId = node.getId();
try {
TransportRequest nodeRequest = newNodeRequest(request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@
}

public Request(String... nodesIds) {
super(nodesIds);
super(false, nodesIds);

Check warning on line 136 in server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayMetaState.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayMetaState.java#L136

Added line #L136 was not covered by tests
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ public Request(StreamInput in) throws IOException {
}

public Request(ShardId shardId, String customDataPath, DiscoveryNode[] nodes) {
super(nodes);
super(false, nodes);
this.shardId = Objects.requireNonNull(shardId);
this.customDataPath = Objects.requireNonNull(customDataPath);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@
}

public Request(DiscoveryNode[] nodes, Map<ShardId, ShardAttributes> shardAttributes) {
super(nodes);
super(false, nodes);

Check warning on line 185 in server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShardsBatch.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShardsBatch.java#L185

Added line #L185 was not covered by tests
this.shardAttributes = Objects.requireNonNull(shardAttributes);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@
}

public Request(ShardId shardId, String customDataPath, DiscoveryNode[] nodes) {
super(nodes);
super(false, nodes);

Check warning on line 179 in server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadata.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadata.java#L179

Added line #L179 was not covered by tests
this.shardId = Objects.requireNonNull(shardId);
this.customDataPath = Objects.requireNonNull(customDataPath);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@
}

public Request(Map<ShardId, ShardAttributes> shardAttributes, DiscoveryNode[] nodes) {
super(nodes);
super(false, nodes);

Check warning on line 191 in server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java#L191

Added line #L191 was not covered by tests
this.shardAttributes = Objects.requireNonNull(shardAttributes);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ public String getName() {
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
ClusterStatsRequest clusterStatsRequest = new ClusterStatsRequest().nodesIds(request.paramAsStringArray("nodeId", null));
clusterStatsRequest.timeout(request.param("timeout"));
clusterStatsRequest.setIncludeDiscoveryNodes(false);
clusterStatsRequest.useAggregatedNodeLevelResponses(true);
return channel -> client.admin().cluster().clusterStats(clusterStatsRequest, new NodesResponseRestListener<>(channel));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
final NodesInfoRequest nodesInfoRequest = prepareRequest(request);
nodesInfoRequest.timeout(request.param("timeout"));
settingsFilter.addFilterSettingParams(request);
nodesInfoRequest.setIncludeDiscoveryNodes(false);
return channel -> client.admin().cluster().nodesInfo(nodesInfoRequest, new NodesResponseRestListener<>(channel));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,6 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
// If no levels are passed in this results in an empty array.
String[] levels = Strings.splitStringByCommaToArray(request.param("level"));
nodesStatsRequest.indices().setLevels(levels);
nodesStatsRequest.setIncludeDiscoveryNodes(false);
nodesStatsRequest.indices().setIncludeIndicesStatsByLevel(true);

return channel -> client.admin().cluster().nodesStats(nodesStatsRequest, new NodesResponseRestListener<>(channel));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ public RestChannelConsumer doCatRequest(final RestRequest request, final NodeCli
public void processResponse(final ClusterStateResponse clusterStateResponse) {
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest();
nodesInfoRequest.timeout(request.param("timeout"));
nodesInfoRequest.setIncludeDiscoveryNodes(false);
nodesInfoRequest.clear()
.addMetrics(
NodesInfoRequest.Metric.JVM.metricName(),
Expand All @@ -138,7 +137,6 @@ public void processResponse(final ClusterStateResponse clusterStateResponse) {
public void processResponse(final NodesInfoResponse nodesInfoResponse) {
NodesStatsRequest nodesStatsRequest = new NodesStatsRequest();
nodesStatsRequest.timeout(request.param("timeout"));
nodesStatsRequest.setIncludeDiscoveryNodes(false);
nodesStatsRequest.clear()
.indices(true)
.addMetrics(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,48 +33,12 @@

public class TransportClusterStatsActionTests extends TransportNodesActionTests {

/**
* By default, we send discovery nodes list to each request that is sent across from the coordinator node. This
* behavior is asserted in this test.
*/
public void testClusterStatsActionWithRetentionOfDiscoveryNodesList() {
ClusterStatsRequest request = new ClusterStatsRequest();
request.setIncludeDiscoveryNodes(true);
Map<String, List<MockClusterStatsNodeRequest>> combinedSentRequest = performNodesInfoAction(request);

assertNotNull(combinedSentRequest);
combinedSentRequest.forEach((node, capturedRequestList) -> {
assertNotNull(capturedRequestList);
capturedRequestList.forEach(sentRequest -> {
assertNotNull(sentRequest.getDiscoveryNodes());
assertEquals(sentRequest.getDiscoveryNodes().length, clusterService.state().nodes().getSize());
});
});
}

public void testClusterStatsActionWithPreFilledConcreteNodesAndWithRetentionOfDiscoveryNodesList() {
ClusterStatsRequest request = new ClusterStatsRequest();
Collection<DiscoveryNode> discoveryNodes = clusterService.state().getNodes().getNodes().values();
request.setConcreteNodes(discoveryNodes.toArray(DiscoveryNode[]::new));
Map<String, List<MockClusterStatsNodeRequest>> combinedSentRequest = performNodesInfoAction(request);

assertNotNull(combinedSentRequest);
combinedSentRequest.forEach((node, capturedRequestList) -> {
assertNotNull(capturedRequestList);
capturedRequestList.forEach(sentRequest -> {
assertNotNull(sentRequest.getDiscoveryNodes());
assertEquals(sentRequest.getDiscoveryNodes().length, clusterService.state().nodes().getSize());
});
});
}

/**
* In the optimized ClusterStats Request, we do not send the DiscoveryNodes List to each node. This behavior is
* asserted in this test.
*/
public void testClusterStatsActionWithoutRetentionOfDiscoveryNodesList() {
ClusterStatsRequest request = new ClusterStatsRequest();
request.setIncludeDiscoveryNodes(false);
Map<String, List<MockClusterStatsNodeRequest>> combinedSentRequest = performNodesInfoAction(request);

assertNotNull(combinedSentRequest);
Expand All @@ -88,7 +52,6 @@ public void testClusterStatsActionWithPreFilledConcreteNodesAndWithoutRetentionO
ClusterStatsRequest request = new ClusterStatsRequest();
Collection<DiscoveryNode> discoveryNodes = clusterService.state().getNodes().getNodes().values();
request.setConcreteNodes(discoveryNodes.toArray(DiscoveryNode[]::new));
request.setIncludeDiscoveryNodes(false);
Map<String, List<MockClusterStatsNodeRequest>> combinedSentRequest = performNodesInfoAction(request);

assertNotNull(combinedSentRequest);
Expand Down
Loading
Loading