Skip to content

Commit

Permalink
Adding RestIndicesListAction along with RestListAction changes
Browse files Browse the repository at this point in the history
Signed-off-by: Harsh Garg <gkharsh@amazon.com>
  • Loading branch information
Harsh Garg committed Aug 29, 2024
1 parent 0cfb0e9 commit d665722
Show file tree
Hide file tree
Showing 12 changed files with 1,210 additions and 940 deletions.
10 changes: 10 additions & 0 deletions server/src/main/java/org/opensearch/action/ActionModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,9 @@
import org.opensearch.rest.action.ingest.RestGetPipelineAction;
import org.opensearch.rest.action.ingest.RestPutPipelineAction;
import org.opensearch.rest.action.ingest.RestSimulatePipelineAction;
import org.opensearch.rest.action.list.AbstractListAction;
import org.opensearch.rest.action.list.RestIndicesListAction;
import org.opensearch.rest.action.list.RestListAction;
import org.opensearch.rest.action.search.RestClearScrollAction;
import org.opensearch.rest.action.search.RestCountAction;
import org.opensearch.rest.action.search.RestCreatePitAction;
Expand Down Expand Up @@ -793,9 +796,12 @@ private ActionFilters setupActionFilters(List<ActionPlugin> actionPlugins) {

public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
List<AbstractCatAction> catActions = new ArrayList<>();
List<AbstractListAction> listActions = new ArrayList<>();
Consumer<RestHandler> registerHandler = handler -> {
if (handler instanceof AbstractCatAction) {
catActions.add((AbstractCatAction) handler);
} else if (handler instanceof AbstractListAction) {
listActions.add((AbstractListAction) handler);
}
restController.registerHandler(handler);
};
Expand Down Expand Up @@ -968,6 +974,9 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
registerHandler.accept(new RestSnapshotAction());
registerHandler.accept(new RestTemplatesAction());

// LIST API
registerHandler.accept(new RestIndicesListAction());

// Point in time API
registerHandler.accept(new RestCreatePitAction());
registerHandler.accept(new RestDeletePitAction());
Expand Down Expand Up @@ -999,6 +1008,7 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
}
}
registerHandler.accept(new RestCatAction(catActions));
registerHandler.accept(new RestListAction(listActions));
registerHandler.accept(new RestDecommissionAction());
registerHandler.accept(new RestGetDecommissionStateAction());
registerHandler.accept(new RestRemoteStoreStatsAction());
Expand Down
19 changes: 2 additions & 17 deletions server/src/main/java/org/opensearch/common/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public class Table {
private List<Cell> currentCells;
private boolean inHeaders = false;
private boolean withTime = false;
private PaginationMetadata paginationMetadata = new PaginationMetadata(false, null, null, null);
private PaginationMetadata paginationMetadata = new PaginationMetadata(false, null, null);
public static final String EPOCH = "epoch";
public static final String TIMESTAMP = "timestamp";

Expand Down Expand Up @@ -253,10 +253,6 @@ public String getNextToken() {
return paginationMetadata.nextToken;
}

public String getPreviousToken() {
return paginationMetadata.previousToken;
}

/**
* Cell in a table
*
Expand Down Expand Up @@ -304,22 +300,11 @@ public static class PaginationMetadata {
*/
public final String nextToken;

/**
* String denoting the previous_token of paginated response, which will be used to fetch previous page (if any).
*/
public final String previousToken;

public PaginationMetadata(
@NonNull boolean isResponsePaginated,
@Nullable String paginatedElement,
@Nullable String nextToken,
@Nullable String previousToken
) {
public PaginationMetadata(@NonNull boolean isResponsePaginated, @Nullable String paginatedElement, @Nullable String nextToken) {
this.isResponsePaginated = isResponsePaginated;
assert !isResponsePaginated || paginatedElement != null : "paginatedElement must be specified for a table which is paginated";
this.paginatedElement = paginatedElement;
this.nextToken = nextToken;
this.previousToken = previousToken;
}
}
}
1,545 changes: 736 additions & 809 deletions server/src/main/java/org/opensearch/rest/action/cat/RestIndicesAction.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ public static RestResponse buildXContentBuilder(Table table, RestChannel channel
if (table.isPaginated()) {
assert table.getPaginatedElement() != null : "Paginated element is required in-case nextToken is not null";
builder.startObject();
builder.field("previous_token", table.getPreviousToken());
builder.field("next_token", table.getNextToken());
builder.startArray(table.getPaginatedElement());
} else {
Expand Down Expand Up @@ -147,10 +146,8 @@ public static RestResponse buildTextPlainResponse(Table table, RestChannel chann
}
out.append("\n");
}
// Adding a nextToken row, post an empty line, in the response if the table is paginated.
// Adding a new row for next_token, in the response if the table is paginated.
if (table.isPaginated()) {
out.append("previous_token" + " " + table.getPreviousToken());
out.append("\n");
out.append("next_token" + " " + table.getNextToken());
out.append("\n");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.rest.action.list;

import org.opensearch.client.node.NodeClient;
import org.opensearch.common.Table;
import org.opensearch.common.io.Streams;
import org.opensearch.common.io.UTF8StreamWriter;
import org.opensearch.core.common.io.stream.BytesStream;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.BytesRestResponse;
import org.opensearch.rest.RestRequest;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;

import static org.opensearch.rest.action.cat.RestTable.buildHelpWidths;
import static org.opensearch.rest.action.cat.RestTable.pad;

/**
* Base Transport action class for _list APIs
*
* @opensearch.api
*/
public abstract class AbstractListAction extends BaseRestHandler {
protected abstract RestChannelConsumer doListRequest(RestRequest request, NodeClient client);

protected abstract void documentation(StringBuilder sb);

protected abstract Table getTableWithHeader(RestRequest request);

@Override
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
boolean helpWanted = request.paramAsBoolean("help", false);
if (helpWanted) {
return channel -> {
Table table = getTableWithHeader(request);
int[] width = buildHelpWidths(table, request);
BytesStream bytesOutput = Streams.flushOnCloseStream(channel.bytesOutput());
UTF8StreamWriter out = new UTF8StreamWriter().setOutput(bytesOutput);
for (Table.Cell cell : table.getHeaders()) {
// need to do left-align always, so create new cells
pad(new Table.Cell(cell.value), width[0], request, out);
out.append(" | ");
pad(new Table.Cell(cell.attr.containsKey("alias") ? cell.attr.get("alias") : ""), width[1], request, out);
out.append(" | ");
pad(new Table.Cell(cell.attr.containsKey("desc") ? cell.attr.get("desc") : "not available"), width[2], request, out);
out.append("\n");
}
out.close();
channel.sendResponse(new BytesRestResponse(RestStatus.OK, BytesRestResponse.TEXT_CONTENT_TYPE, bytesOutput.bytes()));
};
} else {
return doListRequest(request, client);
}
}

static Set<String> RESPONSE_PARAMS = Collections.unmodifiableSet(
new HashSet<>(Arrays.asList("format", "h", "v", "ts", "pri", "bytes", "size", "time", "s", "timeout"))
);

@Override
protected Set<String> responseParams() {
return RESPONSE_PARAMS;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.rest.action.list;

import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.action.support.GroupedActionListener;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.client.node.NodeClient;
import org.opensearch.common.Table;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.action.ActionResponse;
import org.opensearch.core.common.Strings;
import org.opensearch.rest.RestRequest;
import org.opensearch.rest.RestResponse;
import org.opensearch.rest.action.RestResponseListener;
import org.opensearch.rest.action.cat.RestIndicesAction;
import org.opensearch.rest.action.cat.RestTable;
import org.opensearch.rest.pagination.IndexBasedPaginationStrategy;

import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import static java.util.Arrays.asList;
import static java.util.Collections.unmodifiableList;
import static org.opensearch.action.support.clustermanager.ClusterManagerNodeRequest.DEFAULT_CLUSTER_MANAGER_NODE_TIMEOUT;
import static org.opensearch.rest.RestRequest.Method.GET;

/**
* _list API action to output indices in pages.
*
* @opensearch.api
*/
public class RestIndicesListAction extends AbstractListAction {
private static final String DEFAULT_LIST_INDICES_PAGE_SIZE_STRING = "1000";
private static final String PAGINATED_ELEMENT_KEY = "indices";

@Override
public List<Route> routes() {
return unmodifiableList(asList(new Route(GET, "/_list/indices"), new Route(GET, "/_list/indices/{index}")));
}

@Override
public String getName() {
return "list_indices_action";
}

@Override
public boolean allowSystemIndexAccessByDefault() {
return true;
}

@Override
protected void documentation(StringBuilder sb) {
sb.append("/_list/indices\n");
sb.append("/_list/indices/{index}\n");
}

@Override
public RestChannelConsumer doListRequest(final RestRequest request, final NodeClient client) {
final String[] indices = Strings.splitStringByCommaToArray(request.param("index"));
final boolean local = request.paramAsBoolean("local", false);
TimeValue clusterManagerTimeout = request.paramAsTime("cluster_manager_timeout", DEFAULT_CLUSTER_MANAGER_NODE_TIMEOUT);
final boolean includeUnloadedSegments = request.paramAsBoolean("include_unloaded_segments", false);
final String requestedToken = request.param("next_token");
final int pageSize = Integer.parseInt(request.param("size", DEFAULT_LIST_INDICES_PAGE_SIZE_STRING));
final String requestedSortOrder = request.param("sort", "ascending");

return channel -> {
final ActionListener<Table> listener = ActionListener.notifyOnce(new RestResponseListener<Table>(channel) {
@Override
public RestResponse buildResponse(final Table table) throws Exception {
return RestTable.buildResponse(table, channel);
}
});

// Fetch all the indices from clusterStateRequest for a paginated query.
RestIndicesAction.RestIndicesActionCommonUtils.sendClusterStateRequest(
indices,
IndicesOptions.lenientExpandHidden(),
local,
clusterManagerTimeout,
client,
new ActionListener<ClusterStateResponse>() {
@Override
public void onResponse(final ClusterStateResponse clusterStateResponse) {
try {
IndexBasedPaginationStrategy paginationStrategy = new IndexBasedPaginationStrategy(
requestedToken == null ? null : new IndexBasedPaginationStrategy.IndexStrategyPageToken(requestedToken),
pageSize,
requestedSortOrder,
clusterStateResponse.getState()
);

final GroupedActionListener<ActionResponse> groupedListener = RestIndicesAction.RestIndicesActionCommonUtils
.createGroupedListener(
request,
4,
listener,
new Table.PaginationMetadata(
true,
PAGINATED_ELEMENT_KEY,
paginationStrategy.getNextToken() == null
? null
: paginationStrategy.getNextToken().generateEncryptedToken()
)
);
groupedListener.onResponse(clusterStateResponse);

final String[] indicesToBeQueried = paginationStrategy.getElementsFromRequestedToken().toArray(new String[0]);
RestIndicesAction.RestIndicesActionCommonUtils.sendGetSettingsRequest(
indicesToBeQueried,
IndicesOptions.fromRequest(request, IndicesOptions.strictExpand()),
local,
clusterManagerTimeout,
client,
ActionListener.wrap(groupedListener::onResponse, groupedListener::onFailure)
);
RestIndicesAction.RestIndicesActionCommonUtils.sendIndicesStatsRequest(
indicesToBeQueried,
IndicesOptions.lenientExpandHidden(),
includeUnloadedSegments,
client,
ActionListener.wrap(groupedListener::onResponse, groupedListener::onFailure)
);
RestIndicesAction.RestIndicesActionCommonUtils.sendClusterHealthRequest(
indicesToBeQueried,
IndicesOptions.lenientExpandHidden(),
local,
clusterManagerTimeout,
client,
ActionListener.wrap(groupedListener::onResponse, groupedListener::onFailure)
);
} catch (Exception e) {
onFailure(e);
}
}

@Override
public void onFailure(final Exception e) {
listener.onFailure(e);
}
}
);
};

}

private static final Set<String> RESPONSE_PARAMS;

static {
final Set<String> responseParams = new HashSet<>(asList("local", "health"));
responseParams.addAll(AbstractListAction.RESPONSE_PARAMS);
RESPONSE_PARAMS = Collections.unmodifiableSet(responseParams);
}

@Override
protected Set<String> responseParams() {
return RESPONSE_PARAMS;
}

@Override
protected Table getTableWithHeader(final RestRequest request) {
return RestIndicesAction.RestIndicesActionCommonUtils.getTableWithHeader(request, null);
}

}
Loading

0 comments on commit d665722

Please sign in to comment.