Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added rest layer changes for List all PITs and PIT segments #4388

Merged
merged 18 commits into from
Sep 25, 2022
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
### Added
- Github workflow for changelog verification ([#4085](https://github.com/opensearch-project/OpenSearch/pull/4085))
- Point in time rest layer changes for create and delete PIT API ([#4064](https://github.com/opensearch-project/OpenSearch/pull/4064))
- Point in time rest layer changes for list PIT and PIT segments API ([#4388](https://github.com/opensearch-project/OpenSearch/pull/4388))
- Added @dreamer-89 as an Opensearch maintainer ([#4342](https://github.com/opensearch-project/OpenSearch/pull/4342))
- Added release notes for 1.3.5 ([#4343](https://github.com/opensearch-project/OpenSearch/pull/4343))
- Added release notes for 2.2.1 ([#4344](https://github.com/opensearch-project/OpenSearch/pull/4344))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,10 @@ static Request deleteAllPits() {
return new Request(HttpDelete.METHOD_NAME, "/_search/point_in_time/_all");
}

static Request getAllPits() {
return new Request(HttpGet.METHOD_NAME, "/_search/point_in_time/_all");
}

static Request multiSearch(MultiSearchRequest multiSearchRequest) throws IOException {
Request request = new Request(HttpPost.METHOD_NAME, "/_msearch");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.opensearch.action.search.CreatePitResponse;
import org.opensearch.action.search.DeletePitRequest;
import org.opensearch.action.search.DeletePitResponse;
import org.opensearch.action.search.GetAllPitNodesResponse;
import org.opensearch.action.search.MultiSearchRequest;
import org.opensearch.action.search.MultiSearchResponse;
import org.opensearch.action.search.SearchRequest;
Expand Down Expand Up @@ -1368,6 +1369,40 @@ public final Cancellable deleteAllPitsAsync(RequestOptions options, ActionListen
);
}

/**
* Get all point in time searches using list all PITs API
*
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return the response
*/
public final GetAllPitNodesResponse getAllPits(RequestOptions options) throws IOException {
return performRequestAndParseEntity(
new MainRequest(),
(request) -> RequestConverters.getAllPits(),
options,
GetAllPitNodesResponse::fromXContent,
emptySet()
);
}

/**
* Asynchronously get all point in time searches using list all PITs API
*
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener the listener to be notified upon request completion
* @return the response
*/
public final Cancellable getAllPitsAsync(RequestOptions options, ActionListener<GetAllPitNodesResponse> listener) {
return performRequestAsyncAndParseEntity(
new MainRequest(),
(request) -> RequestConverters.getAllPits(),
options,
GetAllPitNodesResponse::fromXContent,
listener,
emptySet()
);
}

/**
* Clears one or more scroll ids using the Clear Scroll API.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
import org.opensearch.action.search.DeletePitInfo;
import org.opensearch.action.search.DeletePitRequest;
import org.opensearch.action.search.DeletePitResponse;
import org.opensearch.action.search.GetAllPitNodesResponse;
import org.opensearch.common.unit.TimeValue;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
* Tests point in time API with rest high level client
Expand Down Expand Up @@ -52,21 +54,24 @@ public void indexDocuments() throws IOException {

public void testCreateAndDeletePit() throws IOException {
CreatePitRequest pitRequest = new CreatePitRequest(new TimeValue(1, TimeUnit.DAYS), true, "index");
CreatePitResponse pitResponse = execute(pitRequest, highLevelClient()::createPit, highLevelClient()::createPitAsync);
assertTrue(pitResponse.getId() != null);
assertEquals(1, pitResponse.getTotalShards());
assertEquals(1, pitResponse.getSuccessfulShards());
assertEquals(0, pitResponse.getFailedShards());
assertEquals(0, pitResponse.getSkippedShards());
CreatePitResponse createPitResponse = execute(pitRequest, highLevelClient()::createPit, highLevelClient()::createPitAsync);
assertTrue(createPitResponse.getId() != null);
assertEquals(1, createPitResponse.getTotalShards());
assertEquals(1, createPitResponse.getSuccessfulShards());
assertEquals(0, createPitResponse.getFailedShards());
assertEquals(0, createPitResponse.getSkippedShards());
GetAllPitNodesResponse getAllPitResponse = highLevelClient().getAllPits(RequestOptions.DEFAULT);
List<String> pits = getAllPitResponse.getPitInfos().stream().map(r -> r.getPitId()).collect(Collectors.toList());
assertTrue(pits.contains(createPitResponse.getId()));
List<String> pitIds = new ArrayList<>();
pitIds.add(pitResponse.getId());
pitIds.add(createPitResponse.getId());
DeletePitRequest deletePitRequest = new DeletePitRequest(pitIds);
DeletePitResponse deletePitResponse = execute(deletePitRequest, highLevelClient()::deletePit, highLevelClient()::deletePitAsync);
assertTrue(deletePitResponse.getDeletePitResults().get(0).isSuccessful());
assertTrue(deletePitResponse.getDeletePitResults().get(0).getPitId().equals(pitResponse.getId()));
assertTrue(deletePitResponse.getDeletePitResults().get(0).getPitId().equals(createPitResponse.getId()));
}

public void testDeleteAllPits() throws IOException {
public void testDeleteAllAndListAllPits() throws IOException {
CreatePitRequest pitRequest = new CreatePitRequest(new TimeValue(1, TimeUnit.DAYS), true, "index");
CreatePitResponse pitResponse = execute(pitRequest, highLevelClient()::createPit, highLevelClient()::createPitAsync);
CreatePitResponse pitResponse1 = execute(pitRequest, highLevelClient()::createPit, highLevelClient()::createPitAsync);
Expand All @@ -80,6 +85,11 @@ public void testDeleteAllPits() throws IOException {
pitResponse1 = execute(pitRequest, highLevelClient()::createPit, highLevelClient()::createPitAsync);
assertTrue(pitResponse.getId() != null);
assertTrue(pitResponse1.getId() != null);
GetAllPitNodesResponse getAllPitResponse = highLevelClient().getAllPits(RequestOptions.DEFAULT);

List<String> pits = getAllPitResponse.getPitInfos().stream().map(r -> r.getPitId()).collect(Collectors.toList());
assertTrue(pits.contains(pitResponse.getId()));
assertTrue(pits.contains(pitResponse1.getId()));
ActionListener<DeletePitResponse> deletePitListener = new ActionListener<>() {
@Override
public void onResponse(DeletePitResponse response) {
Expand All @@ -95,8 +105,27 @@ public void onFailure(Exception e) {
}
}
};
final CreatePitResponse pitResponse3 = execute(pitRequest, highLevelClient()::createPit, highLevelClient()::createPitAsync);

ActionListener<GetAllPitNodesResponse> getPitsListener = new ActionListener<GetAllPitNodesResponse>() {
@Override
public void onResponse(GetAllPitNodesResponse response) {
List<String> pits = response.getPitInfos().stream().map(r -> r.getPitId()).collect(Collectors.toList());
assertTrue(pits.contains(pitResponse3.getId()));
}

@Override
public void onFailure(Exception e) {
if (!(e instanceof OpenSearchStatusException)) {
throw new AssertionError("List all PITs failed", e);
}
}
};
highLevelClient().getAllPitsAsync(RequestOptions.DEFAULT, getPitsListener);
highLevelClient().deleteAllPitsAsync(RequestOptions.DEFAULT, deletePitListener);
// validate no pits case
getAllPitResponse = highLevelClient().getAllPits(RequestOptions.DEFAULT);
assertTrue(getAllPitResponse.getPitInfos().size() == 0);
highLevelClient().deleteAllPitsAsync(RequestOptions.DEFAULT, deletePitListener);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ public class RestHighLevelClientTests extends OpenSearchTestCase {
"ping",
"info",
"delete_all_pits",
"get_all_pits",
// security
"security.get_ssl_certificates",
"security.authenticate",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"get_all_pits":{
"documentation":{
"url":"https://opensearch.org/docs/latest/opensearch/rest-api/point_in_time/",
"description":"Lists all active point in time searches."
},
"stability":"stable",
"url":{
"paths":[
{
"path":"/_search/point_in_time/_all",
"methods":[
"GET"
]
}
]
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,12 @@
- match: {hits.total: 3 }
- length: {hits.hits: 1 }

- do:
get_all_pits: {}

- match: {pits.0.pit_id: $pit_id}
- match: {pits.0.keep_alive: 82800000 }

- do:
delete_pit:
body:
Expand Down Expand Up @@ -119,6 +125,12 @@
- set: {pit_id: pit_id}
- match: { _shards.failed: 0}

- do:
get_all_pits: {}

- match: {pits.0.pit_id: $pit_id}
- match: {pits.0.keep_alive: 82800000 }

- do:
delete_all_pits: {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,7 @@
import org.opensearch.rest.action.cat.RestClusterManagerAction;
import org.opensearch.rest.action.cat.RestNodeAttrsAction;
import org.opensearch.rest.action.cat.RestNodesAction;
import org.opensearch.rest.action.cat.RestPitSegmentsAction;
import org.opensearch.rest.action.cat.RestPluginsAction;
import org.opensearch.rest.action.cat.RestRepositoriesAction;
import org.opensearch.rest.action.cat.RestSegmentsAction;
Expand Down Expand Up @@ -413,6 +414,7 @@
import org.opensearch.rest.action.search.RestCreatePitAction;
import org.opensearch.rest.action.search.RestDeletePitAction;
import org.opensearch.rest.action.search.RestExplainAction;
import org.opensearch.rest.action.search.RestGetAllPitsAction;
import org.opensearch.rest.action.search.RestMultiSearchAction;
import org.opensearch.rest.action.search.RestSearchAction;
import org.opensearch.rest.action.search.RestSearchScrollAction;
Expand Down Expand Up @@ -857,7 +859,9 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {

// Point in time API
registerHandler.accept(new RestCreatePitAction());
registerHandler.accept(new RestDeletePitAction());
registerHandler.accept(new RestDeletePitAction(nodesInCluster));
registerHandler.accept(new RestGetAllPitsAction(nodesInCluster));
registerHandler.accept(new RestPitSegmentsAction(nodesInCluster));

for (ActionPlugin plugin : actionPlugins) {
for (RestHandler handler : plugin.getRestHandlers(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.common.Strings;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.xcontent.XContentParser;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -84,4 +85,37 @@ public ActionRequestValidationException validate() {
}
return validationException;
}

public void fromXContent(XContentParser parser) throws IOException {
pitIds.clear();
if (parser.nextToken() != XContentParser.Token.START_OBJECT) {
throw new IllegalArgumentException("Malformed content, must start with an object");
} else {
XContentParser.Token token;
String currentFieldName = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if ("pit_id".equals(currentFieldName)) {
if (token == XContentParser.Token.START_ARRAY) {
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
if (token.isValue() == false) {
throw new IllegalArgumentException("pit_id array element should only contain pit_id");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
throw new IllegalArgumentException("pit_id array element should only contain pit_id");
throw new IllegalArgumentException("pit_id array element should only contain PIT identifiers");

}
pitIds.add(parser.text());
}
} else {
if (token.isValue() == false) {
throw new IllegalArgumentException("pit_id element should only contain pit_id");
Copy link
Collaborator

@reta reta Sep 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
throw new IllegalArgumentException("pit_id element should only contain pit_id");
throw new IllegalArgumentException("pit_id element should only contain PIT identifier");

}
pitIds.add(parser.text());
}
} else {
throw new IllegalArgumentException(
"Unknown parameter [" + currentFieldName + "] in request body or parameter is of the wrong type[" + token + "] "
);
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
package org.opensearch.action.admin.indices.segments;

import org.opensearch.action.ActionListener;
import org.opensearch.action.search.ListPitInfo;
import org.opensearch.action.search.PitService;
import org.opensearch.action.search.SearchContextId;
import org.opensearch.action.search.SearchContextIdForNode;
Expand Down Expand Up @@ -46,7 +45,6 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.opensearch.action.search.SearchContextId.decode;

Expand Down Expand Up @@ -95,11 +93,10 @@ public TransportPitSegmentsAction(
@Override
protected void doExecute(Task task, PitSegmentsRequest request, ActionListener<IndicesSegmentResponse> listener) {
List<String> pitIds = request.getPitIds();
if (pitIds.size() == 1 && "_all".equals(pitIds.get(0))) {
pitService.getAllPits(ActionListener.wrap(response -> {
request.clearAndSetPitIds(response.getPitInfos().stream().map(ListPitInfo::getPitId).collect(Collectors.toList()));
super.doExecute(task, request, listener);
}, listener::onFailure));
// when security plugin intercepts the request, if PITs are not present in the cluster the PIT IDs in request will be empty
// and in this case return empty response
if (pitIds.isEmpty()) {
listener.onResponse(new IndicesSegmentResponse(new ShardSegments[] {}, 0, 0, 0, new ArrayList<>()));
} else {
super.doExecute(task, request, listener);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ public void executeCreatePit(
task.getParentTaskId(),
Collections.emptyMap()
);
/**
* This is needed for cross cluster functionality to work with PITs and current ccsMinimizeRoundTrips is
* not supported for point in time
*/
searchRequest.setCcsMinimizeRoundtrips(false);
/**
* Phase 1 of create PIT
*/
Expand Down Expand Up @@ -193,6 +198,9 @@ void executeUpdatePitId(
);
for (Map.Entry<ShardId, SearchContextIdForNode> entry : contextId.shards().entrySet()) {
DiscoveryNode node = nodelookup.apply(entry.getValue().getClusterAlias(), entry.getValue().getNode());
if (node == null) {
node = this.clusterService.state().getNodes().get(entry.getValue().getNode());
bharath-techie marked this conversation as resolved.
Show resolved Hide resolved
}
try {
final Transport.Connection connection = searchTransportService.getConnection(entry.getValue().getClusterAlias(), node);
searchTransportService.updatePitContext(
Expand All @@ -206,11 +214,12 @@ void executeUpdatePitId(
groupedActionListener
);
} catch (Exception e) {
String nodeName = node.getName();
logger.error(
() -> new ParameterizedMessage(
"Create pit update phase failed for PIT ID [{}] on node [{}]",
searchResponse.pointInTimeId(),
node
nodeName
),
e
);
Expand Down
Loading