Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementing pagination for _cat/shards #14641

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [S3 Repository] Change default retry mechanism of s3 clients to Standard Mode ([#15978](https://github.com/opensearch-project/OpenSearch/pull/15978))
- Add changes to block calls in cat shards, indices and segments based on dynamic limit settings ([#15986](https://github.com/opensearch-project/OpenSearch/pull/15986))
- New `phone` & `phone-search` analyzer + tokenizer ([#15915](https://github.com/opensearch-project/OpenSearch/pull/15915))
- Add _list/shards API as paginated alternate to _cat/shards ([#14641](https://github.com/opensearch-project/OpenSearch/pull/14641))

### Dependencies
- Bump `com.azure:azure-identity` from 1.13.0 to 1.13.2 ([#15578](https://github.com/opensearch-project/OpenSearch/pull/15578))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

package org.opensearch.action.admin.cluster.shards;

import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.ShardRouting;
Expand Down Expand Up @@ -51,9 +50,9 @@ public void testCatShardsWithSuccessResponse() throws InterruptedException {
client().execute(CatShardsAction.INSTANCE, shardsRequest, new ActionListener<CatShardsResponse>() {
@Override
public void onResponse(CatShardsResponse catShardsResponse) {
ClusterStateResponse clusterStateResponse = catShardsResponse.getClusterStateResponse();
List<ShardRouting> shardRoutings = catShardsResponse.getResponseShards();
IndicesStatsResponse indicesStatsResponse = catShardsResponse.getIndicesStatsResponse();
for (ShardRouting shard : clusterStateResponse.getState().routingTable().allShards()) {
for (ShardRouting shard : shardRoutings) {
assertEquals("test", shard.getIndexName());
assertNotNull(indicesStatsResponse.asMap().get(shard));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,7 @@
import org.opensearch.rest.action.list.AbstractListAction;
import org.opensearch.rest.action.list.RestIndicesListAction;
import org.opensearch.rest.action.list.RestListAction;
import org.opensearch.rest.action.list.RestShardsListAction;
import org.opensearch.rest.action.search.RestClearScrollAction;
import org.opensearch.rest.action.search.RestCountAction;
import org.opensearch.rest.action.search.RestCreatePitAction;
Expand Down Expand Up @@ -993,6 +994,7 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {

// LIST API
registerHandler.accept(new RestIndicesListAction(responseLimitSettings));
registerHandler.accept(new RestShardsListAction());

// Point in time API
registerHandler.accept(new RestCreatePitAction());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@

package org.opensearch.action.admin.cluster.shards;

import org.opensearch.Version;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.action.pagination.PageParams;
import org.opensearch.action.support.clustermanager.ClusterManagerNodeReadRequest;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.tasks.TaskId;
import org.opensearch.rest.action.admin.cluster.ClusterAdminTask;

Expand All @@ -27,13 +30,39 @@ public class CatShardsRequest extends ClusterManagerNodeReadRequest<CatShardsReq

private String[] indices;
private TimeValue cancelAfterTimeInterval;
private PageParams pageParams = null;
private boolean requestLimitCheckSupported;

public CatShardsRequest() {}

public CatShardsRequest(StreamInput in) throws IOException {
gargharsh3134 marked this conversation as resolved.
Show resolved Hide resolved
super(in);
this.requestLimitCheckSupported = false;
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
indices = in.readStringArray();
cancelAfterTimeInterval = in.readOptionalTimeValue();
if (in.readBoolean()) {
pageParams = new PageParams(in);
}
requestLimitCheckSupported = in.readBoolean();
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
if (indices == null) {
out.writeVInt(0);
} else {
out.writeStringArray(indices);
}
out.writeOptionalTimeValue(cancelAfterTimeInterval);
out.writeBoolean(pageParams != null);
if (pageParams != null) {
pageParams.writeTo(out);
}
out.writeBoolean(requestLimitCheckSupported);
}
}

@Override
Expand All @@ -57,6 +86,14 @@ public TimeValue getCancelAfterTimeInterval() {
return this.cancelAfterTimeInterval;
}

public void setPageParams(PageParams pageParams) {
this.pageParams = pageParams;
}

public PageParams getPageParams() {
return pageParams;
}

public void setRequestLimitCheckSupported(final boolean requestLimitCheckSupported) {
this.requestLimitCheckSupported = requestLimitCheckSupported;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,18 @@

package org.opensearch.action.admin.cluster.shards;

import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.Version;
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
import org.opensearch.action.pagination.PageToken;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.core.action.ActionResponse;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
* A response of a cat shards request.
Expand All @@ -23,28 +28,44 @@
*/
public class CatShardsResponse extends ActionResponse {

private ClusterStateResponse clusterStateResponse = null;

private IndicesStatsResponse indicesStatsResponse = null;
private IndicesStatsResponse indicesStatsResponse;
private DiscoveryNodes nodes = DiscoveryNodes.EMPTY_NODES;
shwetathareja marked this conversation as resolved.
Show resolved Hide resolved
private List<ShardRouting> responseShards = new ArrayList<>();
private PageToken pageToken;

public CatShardsResponse() {}

public CatShardsResponse(StreamInput in) throws IOException {
super(in);
indicesStatsResponse = new IndicesStatsResponse(in);
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
nodes = DiscoveryNodes.readFrom(in, null);
responseShards = in.readList(ShardRouting::new);
if (in.readBoolean()) {
pageToken = new PageToken(in);
}
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
clusterStateResponse.writeTo(out);
indicesStatsResponse.writeTo(out);
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
nodes.writeToWithAttribute(out);
out.writeList(responseShards);
out.writeBoolean(pageToken != null);
if (pageToken != null) {
pageToken.writeTo(out);
}
}
}

public void setClusterStateResponse(ClusterStateResponse clusterStateResponse) {
this.clusterStateResponse = clusterStateResponse;
public void setNodes(DiscoveryNodes nodes) {
this.nodes = nodes;
}

public ClusterStateResponse getClusterStateResponse() {
return this.clusterStateResponse;
public DiscoveryNodes getNodes() {
return this.nodes;
}

public void setIndicesStatsResponse(IndicesStatsResponse indicesStatsResponse) {
Expand All @@ -54,4 +75,20 @@
public IndicesStatsResponse getIndicesStatsResponse() {
return this.indicesStatsResponse;
}

public void setResponseShards(List<ShardRouting> responseShards) {
this.responseShards = responseShards;
}

Check warning on line 81 in server/src/main/java/org/opensearch/action/admin/cluster/shards/CatShardsResponse.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/cluster/shards/CatShardsResponse.java#L80-L81

Added lines #L80 - L81 were not covered by tests

public List<ShardRouting> getResponseShards() {
return this.responseShards;
}

public void setPageToken(PageToken pageToken) {
this.pageToken = pageToken;
}

public PageToken getPageToken() {
return this.pageToken;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.action.admin.indices.stats.IndicesStatsRequest;
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
import org.opensearch.action.pagination.PageParams;
import org.opensearch.action.pagination.ShardPaginationStrategy;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.action.support.TimeoutTaskCancellationUtility;
Expand Down Expand Up @@ -57,7 +59,11 @@
clusterStateRequest.setShouldCancelOnTimeout(true);
clusterStateRequest.local(shardsRequest.local());
clusterStateRequest.clusterManagerNodeTimeout(shardsRequest.clusterManagerNodeTimeout());
clusterStateRequest.clear().nodes(true).routingTable(true).indices(shardsRequest.getIndices());
if (Objects.isNull(shardsRequest.getPageParams())) {
clusterStateRequest.clear().nodes(true).routingTable(true).indices(shardsRequest.getIndices());

Check warning on line 63 in server/src/main/java/org/opensearch/action/admin/cluster/shards/TransportCatShardsAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/cluster/shards/TransportCatShardsAction.java#L63

Added line #L63 was not covered by tests
} else {
clusterStateRequest.clear().nodes(true).routingTable(true).indices(shardsRequest.getIndices()).metadata(true);

Check warning on line 65 in server/src/main/java/org/opensearch/action/admin/cluster/shards/TransportCatShardsAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/cluster/shards/TransportCatShardsAction.java#L65

Added line #L65 was not covered by tests
}
assert parentTask instanceof CancellableTask;
clusterStateRequest.setParentTask(client.getLocalNodeId(), parentTask.getId());

Expand Down Expand Up @@ -87,13 +93,26 @@
@Override
public void onResponse(ClusterStateResponse clusterStateResponse) {
validateRequestLimit(shardsRequest, clusterStateResponse, cancellableListener);
catShardsResponse.setClusterStateResponse(clusterStateResponse);
IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest();
indicesStatsRequest.setShouldCancelOnTimeout(true);
indicesStatsRequest.all();
indicesStatsRequest.indices(shardsRequest.getIndices());
indicesStatsRequest.setParentTask(client.getLocalNodeId(), parentTask.getId());
try {
ShardPaginationStrategy paginationStrategy = getPaginationStrategy(
shardsRequest.getPageParams(),

Check warning on line 98 in server/src/main/java/org/opensearch/action/admin/cluster/shards/TransportCatShardsAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/cluster/shards/TransportCatShardsAction.java#L97-L98

Added lines #L97 - L98 were not covered by tests
clusterStateResponse
);
String[] indices = Objects.isNull(paginationStrategy)
? shardsRequest.getIndices()
: paginationStrategy.getRequestedIndices().toArray(new String[0]);
catShardsResponse.setNodes(clusterStateResponse.getState().getNodes());
catShardsResponse.setResponseShards(

Check warning on line 105 in server/src/main/java/org/opensearch/action/admin/cluster/shards/TransportCatShardsAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/cluster/shards/TransportCatShardsAction.java#L102-L105

Added lines #L102 - L105 were not covered by tests
Objects.isNull(paginationStrategy)
? clusterStateResponse.getState().routingTable().allShards()
: paginationStrategy.getRequestedEntities()

Check warning on line 108 in server/src/main/java/org/opensearch/action/admin/cluster/shards/TransportCatShardsAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/cluster/shards/TransportCatShardsAction.java#L107-L108

Added lines #L107 - L108 were not covered by tests
);
catShardsResponse.setPageToken(Objects.isNull(paginationStrategy) ? null : paginationStrategy.getResponseToken());
IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest();
indicesStatsRequest.setShouldCancelOnTimeout(true);
indicesStatsRequest.all();
indicesStatsRequest.indices(indices);
indicesStatsRequest.setParentTask(client.getLocalNodeId(), parentTask.getId());

Check warning on line 115 in server/src/main/java/org/opensearch/action/admin/cluster/shards/TransportCatShardsAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/cluster/shards/TransportCatShardsAction.java#L111-L115

Added lines #L111 - L115 were not covered by tests
client.admin().indices().stats(indicesStatsRequest, new ActionListener<IndicesStatsResponse>() {
@Override
public void onResponse(IndicesStatsResponse indicesStatsResponse) {
Expand Down Expand Up @@ -122,6 +141,10 @@

}

private ShardPaginationStrategy getPaginationStrategy(PageParams pageParams, ClusterStateResponse clusterStateResponse) {
return Objects.isNull(pageParams) ? null : new ShardPaginationStrategy(pageParams, clusterStateResponse.getState());
}

private void validateRequestLimit(
final CatShardsRequest shardsRequest,
final ClusterStateResponse clusterStateResponse,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public class IndicesStatsResponse extends BroadcastResponse {

private Map<ShardRouting, ShardStats> shardStatsMap;

IndicesStatsResponse(StreamInput in) throws IOException {
public IndicesStatsResponse(StreamInput in) throws IOException {
super(in);
shards = in.readArray(ShardStats::new, (size) -> new ShardStats[size]);
}
Expand Down
Loading
Loading