Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added rest layer changes for List all PITs and PIT segments #4388

Merged
merged 18 commits into from
Sep 25, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
### Added
- Github workflow for changelog verification ([#4085](https://github.com/opensearch-project/OpenSearch/pull/4085))
- Point in time rest layer changes for create and delete PIT API ([#4064](https://github.com/opensearch-project/OpenSearch/pull/4064))
- Point in time rest layer changes for list PIT and PIT segments API ([#4388](https://github.com/opensearch-project/OpenSearch/pull/4388))
- Added @dreamer-89 as an Opensearch maintainer ([#4342](https://github.com/opensearch-project/OpenSearch/pull/4342))
- Added release notes for 1.3.5 ([#4343](https://github.com/opensearch-project/OpenSearch/pull/4343))
- Added release notes for 2.2.1 ([#4344](https://github.com/opensearch-project/OpenSearch/pull/4344))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,10 @@ static Request deleteAllPits() {
return new Request(HttpDelete.METHOD_NAME, "/_search/point_in_time/_all");
}

static Request getAllPits() {
return new Request(HttpGet.METHOD_NAME, "/_search/point_in_time/_all");
}

static Request multiSearch(MultiSearchRequest multiSearchRequest) throws IOException {
Request request = new Request(HttpPost.METHOD_NAME, "/_msearch");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.opensearch.action.search.CreatePitResponse;
import org.opensearch.action.search.DeletePitRequest;
import org.opensearch.action.search.DeletePitResponse;
import org.opensearch.action.search.GetAllPitNodesResponse;
import org.opensearch.action.search.MultiSearchRequest;
import org.opensearch.action.search.MultiSearchResponse;
import org.opensearch.action.search.SearchRequest;
Expand Down Expand Up @@ -1368,6 +1369,40 @@ public final Cancellable deleteAllPitsAsync(RequestOptions options, ActionListen
);
}

/**
* Get all point in time searches using list all PITs API
*
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return the response
*/
public final GetAllPitNodesResponse getAllPits(RequestOptions options) throws IOException {
return performRequestAndParseEntity(
new MainRequest(),
(request) -> RequestConverters.getAllPits(),
options,
GetAllPitNodesResponse::fromXContent,
emptySet()
);
}

/**
* Asynchronously get all point in time searches using list all PITs API
*
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener the listener to be notified upon request completion
* @return the response
*/
public final Cancellable getAllPitsAsync(RequestOptions options, ActionListener<GetAllPitNodesResponse> listener) {
return performRequestAsyncAndParseEntity(
new MainRequest(),
(request) -> RequestConverters.getAllPits(),
options,
GetAllPitNodesResponse::fromXContent,
listener,
emptySet()
);
}

/**
* Clears one or more scroll ids using the Clear Scroll API.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
import org.opensearch.action.search.DeletePitInfo;
import org.opensearch.action.search.DeletePitRequest;
import org.opensearch.action.search.DeletePitResponse;
import org.opensearch.action.search.GetAllPitNodesResponse;
import org.opensearch.common.unit.TimeValue;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
* Tests point in time API with rest high level client
Expand Down Expand Up @@ -52,21 +54,24 @@ public void indexDocuments() throws IOException {

public void testCreateAndDeletePit() throws IOException {
CreatePitRequest pitRequest = new CreatePitRequest(new TimeValue(1, TimeUnit.DAYS), true, "index");
CreatePitResponse pitResponse = execute(pitRequest, highLevelClient()::createPit, highLevelClient()::createPitAsync);
assertTrue(pitResponse.getId() != null);
assertEquals(1, pitResponse.getTotalShards());
assertEquals(1, pitResponse.getSuccessfulShards());
assertEquals(0, pitResponse.getFailedShards());
assertEquals(0, pitResponse.getSkippedShards());
CreatePitResponse createPitResponse = execute(pitRequest, highLevelClient()::createPit, highLevelClient()::createPitAsync);
assertTrue(createPitResponse.getId() != null);
assertEquals(1, createPitResponse.getTotalShards());
assertEquals(1, createPitResponse.getSuccessfulShards());
assertEquals(0, createPitResponse.getFailedShards());
assertEquals(0, createPitResponse.getSkippedShards());
GetAllPitNodesResponse getAllPitResponse = highLevelClient().getAllPits(RequestOptions.DEFAULT);
List<String> pits = getAllPitResponse.getPitInfos().stream().map(r -> r.getPitId()).collect(Collectors.toList());
assertTrue(pits.contains(createPitResponse.getId()));
List<String> pitIds = new ArrayList<>();
pitIds.add(pitResponse.getId());
pitIds.add(createPitResponse.getId());
DeletePitRequest deletePitRequest = new DeletePitRequest(pitIds);
DeletePitResponse deletePitResponse = execute(deletePitRequest, highLevelClient()::deletePit, highLevelClient()::deletePitAsync);
assertTrue(deletePitResponse.getDeletePitResults().get(0).isSuccessful());
assertTrue(deletePitResponse.getDeletePitResults().get(0).getPitId().equals(pitResponse.getId()));
assertTrue(deletePitResponse.getDeletePitResults().get(0).getPitId().equals(createPitResponse.getId()));
}

public void testDeleteAllPits() throws IOException {
public void testDeleteAllAndListAllPits() throws IOException {
CreatePitRequest pitRequest = new CreatePitRequest(new TimeValue(1, TimeUnit.DAYS), true, "index");
CreatePitResponse pitResponse = execute(pitRequest, highLevelClient()::createPit, highLevelClient()::createPitAsync);
CreatePitResponse pitResponse1 = execute(pitRequest, highLevelClient()::createPit, highLevelClient()::createPitAsync);
Expand All @@ -80,6 +85,11 @@ public void testDeleteAllPits() throws IOException {
pitResponse1 = execute(pitRequest, highLevelClient()::createPit, highLevelClient()::createPitAsync);
assertTrue(pitResponse.getId() != null);
assertTrue(pitResponse1.getId() != null);
GetAllPitNodesResponse getAllPitResponse = highLevelClient().getAllPits(RequestOptions.DEFAULT);

List<String> pits = getAllPitResponse.getPitInfos().stream().map(r -> r.getPitId()).collect(Collectors.toList());
assertTrue(pits.contains(pitResponse.getId()));
assertTrue(pits.contains(pitResponse1.getId()));
ActionListener<DeletePitResponse> deletePitListener = new ActionListener<>() {
@Override
public void onResponse(DeletePitResponse response) {
Expand All @@ -95,8 +105,26 @@ public void onFailure(Exception e) {
}
}
};
final CreatePitResponse pitResponse3 = execute(pitRequest, highLevelClient()::createPit, highLevelClient()::createPitAsync);

ActionListener<GetAllPitNodesResponse> getPitsListener = new ActionListener<GetAllPitNodesResponse>() {
@Override
public void onResponse(GetAllPitNodesResponse response) {
List<String> pits = response.getPitInfos().stream().map(r -> r.getPitId()).collect(Collectors.toList());
assertTrue(pits.contains(pitResponse3.getId()));
}

@Override
public void onFailure(Exception e) {
if (!(e instanceof OpenSearchStatusException)) {
throw new AssertionError("List all PITs failed", e);
}
}
};
highLevelClient().getAllPitsAsync(RequestOptions.DEFAULT, getPitsListener);
highLevelClient().deleteAllPitsAsync(RequestOptions.DEFAULT, deletePitListener);
// validate no pits case
highLevelClient().getAllPitsAsync(RequestOptions.DEFAULT, getPitsListener);
highLevelClient().deleteAllPitsAsync(RequestOptions.DEFAULT, deletePitListener);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"get_all_pits":{
"documentation":{
"url":"https://opensearch.org/docs/latest/opensearch/rest-api/point_in_time/",
"description":"Lists all active point in time searches."
},
"stability":"stable",
"url":{
"paths":[
{
"path":"/_search/point_in_time/_all",
"methods":[
"GET"
]
}
]
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,12 @@
- match: {hits.total: 3 }
- length: {hits.hits: 1 }

- do:
get_all_pits: {}

- match: {pits.0.pit_id: $pit_id}
- match: {pits.0.keep_alive: 82800000 }

- do:
delete_pit:
body:
Expand Down Expand Up @@ -119,6 +125,12 @@
- set: {pit_id: pit_id}
- match: { _shards.failed: 0}

- do:
get_all_pits: {}

- match: {pits.0.pit_id: $pit_id}
- match: {pits.0.keep_alive: 82800000 }

- do:
delete_all_pits: {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,7 @@
import org.opensearch.rest.action.cat.RestClusterManagerAction;
import org.opensearch.rest.action.cat.RestNodeAttrsAction;
import org.opensearch.rest.action.cat.RestNodesAction;
import org.opensearch.rest.action.cat.RestPitSegmentsAction;
import org.opensearch.rest.action.cat.RestPluginsAction;
import org.opensearch.rest.action.cat.RestRepositoriesAction;
import org.opensearch.rest.action.cat.RestSegmentsAction;
Expand Down Expand Up @@ -413,6 +414,7 @@
import org.opensearch.rest.action.search.RestCreatePitAction;
import org.opensearch.rest.action.search.RestDeletePitAction;
import org.opensearch.rest.action.search.RestExplainAction;
import org.opensearch.rest.action.search.RestGetAllPitsAction;
import org.opensearch.rest.action.search.RestMultiSearchAction;
import org.opensearch.rest.action.search.RestSearchAction;
import org.opensearch.rest.action.search.RestSearchScrollAction;
Expand Down Expand Up @@ -858,6 +860,8 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
// Point in time API
registerHandler.accept(new RestCreatePitAction());
registerHandler.accept(new RestDeletePitAction());
registerHandler.accept(new RestGetAllPitsAction());
registerHandler.accept(new RestPitSegmentsAction());

for (ActionPlugin plugin : actionPlugins) {
for (RestHandler handler : plugin.getRestHandlers(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.common.Strings;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.xcontent.XContentParser;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -84,4 +85,37 @@ public ActionRequestValidationException validate() {
}
return validationException;
}

public void fromXContent(XContentParser parser) throws IOException {
pitIds.clear();
if (parser.nextToken() != XContentParser.Token.START_OBJECT) {
throw new IllegalArgumentException("Malformed content, must start with an object");
} else {
XContentParser.Token token;
String currentFieldName = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if ("pit_id".equals(currentFieldName)) {
if (token == XContentParser.Token.START_ARRAY) {
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
if (token.isValue() == false) {
throw new IllegalArgumentException("pit_id array element should only contain pit_id");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
throw new IllegalArgumentException("pit_id array element should only contain pit_id");
throw new IllegalArgumentException("pit_id array element should only contain PIT identifiers");

}
pitIds.add(parser.text());
}
} else {
if (token.isValue() == false) {
throw new IllegalArgumentException("pit_id element should only contain pit_id");
Copy link
Collaborator

@reta reta Sep 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
throw new IllegalArgumentException("pit_id element should only contain pit_id");
throw new IllegalArgumentException("pit_id element should only contain PIT identifier");

}
pitIds.add(parser.text());
}
} else {
throw new IllegalArgumentException(
"Unknown parameter [" + currentFieldName + "] in request body or parameter is of the wrong type[" + token + "] "
);
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,11 @@ public TransportPitSegmentsAction(
@Override
protected void doExecute(Task task, PitSegmentsRequest request, ActionListener<IndicesSegmentResponse> listener) {
List<String> pitIds = request.getPitIds();
if (pitIds.size() == 1 && "_all".equals(pitIds.get(0))) {
// when security plugin intercepts the request, if PITs are not present in the cluster the PIT IDs in request will be empty
// and in this case return empty response
if (pitIds.isEmpty()) {
listener.onResponse(new IndicesSegmentResponse(new ShardSegments[] {}, 0, 0, 0, new ArrayList<>()));
} else if (pitIds.size() == 1 && "_all".equals(pitIds.get(0))) {
pitService.getAllPits(ActionListener.wrap(response -> {
request.clearAndSetPitIds(response.getPitInfos().stream().map(ListPitInfo::getPitId).collect(Collectors.toList()));
super.doExecute(task, request, listener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,13 @@
import org.opensearch.action.FailedNodeException;
import org.opensearch.action.support.nodes.BaseNodesResponse;
import org.opensearch.cluster.ClusterName;
import org.opensearch.common.ParseField;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.xcontent.ConstructingObjectParser;
import org.opensearch.common.xcontent.ToXContentObject;
import org.opensearch.common.xcontent.XContentBuilder;
import org.opensearch.common.xcontent.XContentParser;

import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -24,6 +27,8 @@
import java.util.Set;
import java.util.stream.Collectors;

import static org.opensearch.common.xcontent.ConstructingObjectParser.constructorArg;

/**
* This class transforms active PIT objects from all nodes to unique PIT objects
*/
Expand Down Expand Up @@ -60,14 +65,30 @@ public GetAllPitNodesResponse(List<ListPitInfo> listPitInfos, GetAllPitNodesResp
pitInfos.addAll(listPitInfos);
}

public GetAllPitNodesResponse(
List<ListPitInfo> listPitInfos,
ClusterName clusterName,
List<GetAllPitNodeResponse> getAllPitNodeResponse,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getAllPitNodeResponse. -> responses or responseList

List<FailedNodeException> failures
) {
super(clusterName, getAllPitNodeResponse, failures);
pitInfos.addAll(listPitInfos);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.startArray("pitInfos");
builder.startArray("pits");
for (ListPitInfo pit : pitInfos) {
pit.toXContent(builder, params);
}
builder.endArray();
if (!failures().isEmpty()) {
builder.startArray("failures");
for (FailedNodeException e : failures()) {
e.toXContent(builder, params);
}
}
builder.endObject();
return builder;
}
Expand All @@ -85,4 +106,28 @@ public void writeNodesTo(StreamOutput out, List<GetAllPitNodeResponse> nodes) th
public List<ListPitInfo> getPitInfos() {
return Collections.unmodifiableList(new ArrayList<>(pitInfos));
}

private static final ConstructingObjectParser<GetAllPitNodesResponse, Void> PARSER = new ConstructingObjectParser<>(
"get_all_pits_response",
true,
(Object[] parsedObjects) -> {
@SuppressWarnings("unchecked")
List<ListPitInfo> listPitInfos = (List<ListPitInfo>) parsedObjects[0];
List<FailedNodeException> failures = null;
if (parsedObjects.length > 1) {
failures = (List<FailedNodeException>) parsedObjects[1];
}
if (failures == null) {
failures = new ArrayList<>();
}
return new GetAllPitNodesResponse(listPitInfos, new ClusterName(""), new ArrayList<>(), failures);
}
);
static {
PARSER.declareObjectArray(constructorArg(), ListPitInfo.PARSER, new ParseField("pits"));
}

public static GetAllPitNodesResponse fromXContent(XContentParser parser) throws IOException {
return PARSER.parse(parser, null);
}
}
Loading