diff --git a/server/src/main/java/org/opensearch/action/ActionModule.java b/server/src/main/java/org/opensearch/action/ActionModule.java index cf79c97e4ca64..71c900beb5319 100644 --- a/server/src/main/java/org/opensearch/action/ActionModule.java +++ b/server/src/main/java/org/opensearch/action/ActionModule.java @@ -233,11 +233,13 @@ import org.opensearch.action.main.TransportMainAction; import org.opensearch.action.search.ClearScrollAction; import org.opensearch.action.search.CreatePitAction; +import org.opensearch.action.search.DeletePitAction; import org.opensearch.action.search.MultiSearchAction; import org.opensearch.action.search.SearchAction; import org.opensearch.action.search.SearchScrollAction; import org.opensearch.action.search.TransportClearScrollAction; import org.opensearch.action.search.TransportCreatePitAction; +import org.opensearch.action.search.TransportDeletePitAction; import org.opensearch.action.search.TransportMultiSearchAction; import org.opensearch.action.search.TransportSearchAction; import org.opensearch.action.search.TransportSearchScrollAction; @@ -661,6 +663,7 @@ public void reg // point in time actions actions.register(CreatePitAction.INSTANCE, TransportCreatePitAction.class); + actions.register(DeletePitAction.INSTANCE, TransportDeletePitAction.class); return unmodifiableMap(actions.getRegistry()); } diff --git a/server/src/main/java/org/opensearch/action/search/CreatePitController.java b/server/src/main/java/org/opensearch/action/search/CreatePitController.java index 40ee810186eb2..f64dd3d7efae6 100644 --- a/server/src/main/java/org/opensearch/action/search/CreatePitController.java +++ b/server/src/main/java/org/opensearch/action/search/CreatePitController.java @@ -29,9 +29,12 @@ import org.opensearch.tasks.Task; import org.opensearch.transport.Transport; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.function.BiFunction; @@ -51,6 +54,7 @@ public class CreatePitController { private final ClusterService clusterService; private final TransportSearchAction transportSearchAction; private final NamedWriteableRegistry namedWriteableRegistry; + private final PitService pitService; private static final Logger logger = LogManager.getLogger(CreatePitController.class); public static final Setting PIT_INIT_KEEP_ALIVE = Setting.positiveTimeSetting( "point_in_time.init.keep_alive", @@ -63,12 +67,14 @@ public CreatePitController( SearchTransportService searchTransportService, ClusterService clusterService, TransportSearchAction transportSearchAction, - NamedWriteableRegistry namedWriteableRegistry + NamedWriteableRegistry namedWriteableRegistry, + PitService pitService ) { this.searchTransportService = searchTransportService; this.clusterService = clusterService; this.transportSearchAction = transportSearchAction; this.namedWriteableRegistry = namedWriteableRegistry; + this.pitService = pitService; } /** @@ -248,8 +254,47 @@ public void onResponse(final Collection responses) { @Override public void onFailure(final Exception e) { + cleanupContexts(contexts, createPITResponse.getId()); updatePitIdListener.onFailure(e); } }, size); } + + /** + * Cleanup all created PIT contexts in case of failure + */ + private void cleanupContexts(Collection contexts, String pitId) { + ActionListener deleteListener = new ActionListener<>() { + @Override + public void onResponse(DeletePitResponse response) { + // this is invoke and forget call + final StringBuilder failedPitsStringBuilder = new StringBuilder(); + response.getDeletePitResults() + .stream() + .filter(r -> !r.isSuccessful()) + .forEach(r -> failedPitsStringBuilder.append(r.getPitId()).append(",")); + logger.warn(() -> new ParameterizedMessage("Failed to delete PIT IDs {}", failedPitsStringBuilder.toString())); + if (logger.isDebugEnabled()) { + final StringBuilder successfulPitsStringBuilder = new StringBuilder(); + response.getDeletePitResults() + .stream() + .filter(r -> r.isSuccessful()) + .forEach(r -> successfulPitsStringBuilder.append(r.getPitId()).append(",")); + logger.debug(() -> new ParameterizedMessage("Deleted PIT with IDs {}", successfulPitsStringBuilder.toString())); + } + } + + @Override + public void onFailure(Exception e) { + logger.error("Cleaning up PIT contexts failed ", e); + } + }; + Map> nodeToContextsMap = new HashMap<>(); + for (SearchContextIdForNode context : contexts) { + List contextIdsForNode = nodeToContextsMap.getOrDefault(context.getNode(), new ArrayList<>()); + contextIdsForNode.add(new PitSearchContextIdForNode(pitId, context)); + nodeToContextsMap.put(context.getNode(), contextIdsForNode); + } + pitService.deletePitContexts(nodeToContextsMap, deleteListener); + } } diff --git a/server/src/main/java/org/opensearch/action/search/DeletePitAction.java b/server/src/main/java/org/opensearch/action/search/DeletePitAction.java new file mode 100644 index 0000000000000..aa305ecfe73ab --- /dev/null +++ b/server/src/main/java/org/opensearch/action/search/DeletePitAction.java @@ -0,0 +1,24 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.opensearch.action.ActionType; + +/** + * Action type for deleting point in time searches + */ +public class DeletePitAction extends ActionType { + + public static final DeletePitAction INSTANCE = new DeletePitAction(); + public static final String NAME = "indices:data/read/point_in_time/delete"; + + private DeletePitAction() { + super(NAME, DeletePitResponse::new); + } +} diff --git a/server/src/main/java/org/opensearch/action/search/DeletePitInfo.java b/server/src/main/java/org/opensearch/action/search/DeletePitInfo.java new file mode 100644 index 0000000000000..943199812771a --- /dev/null +++ b/server/src/main/java/org/opensearch/action/search/DeletePitInfo.java @@ -0,0 +1,83 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.opensearch.common.ParseField; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.io.stream.Writeable; +import org.opensearch.common.xcontent.ConstructingObjectParser; +import org.opensearch.common.xcontent.ToXContent; +import org.opensearch.common.xcontent.XContentBuilder; +import org.opensearch.transport.TransportResponse; + +import java.io.IOException; + +import static org.opensearch.common.xcontent.ConstructingObjectParser.constructorArg; + +/** + * This class captures if deletion of pit is successful along with pit id + */ +public class DeletePitInfo extends TransportResponse implements Writeable, ToXContent { + /** + * This will be true if PIT reader contexts are deleted ond also if contexts are not found. + */ + private final boolean successful; + + private final String pitId; + + public DeletePitInfo(boolean successful, String pitId) { + this.successful = successful; + this.pitId = pitId; + } + + public DeletePitInfo(StreamInput in) throws IOException { + successful = in.readBoolean(); + pitId = in.readString(); + + } + + public boolean isSuccessful() { + return successful; + } + + public String getPitId() { + return pitId; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeBoolean(successful); + out.writeString(pitId); + } + + static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + "delete_pit_info", + true, + args -> new DeletePitInfo((boolean) args[0], (String) args[1]) + ); + + static { + PARSER.declareBoolean(constructorArg(), new ParseField("successful")); + PARSER.declareString(constructorArg(), new ParseField("pitId")); + } + + private static final ParseField SUCCESSFUL = new ParseField("successful"); + private static final ParseField PIT_ID = new ParseField("pitId"); + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(SUCCESSFUL.getPreferredName(), successful); + builder.field(PIT_ID.getPreferredName(), pitId); + builder.endObject(); + return builder; + } + +} diff --git a/server/src/main/java/org/opensearch/action/search/DeletePitRequest.java b/server/src/main/java/org/opensearch/action/search/DeletePitRequest.java new file mode 100644 index 0000000000000..945fcfd17eb6c --- /dev/null +++ b/server/src/main/java/org/opensearch/action/search/DeletePitRequest.java @@ -0,0 +1,121 @@ + +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.opensearch.action.ActionRequest; +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.xcontent.ToXContent; +import org.opensearch.common.xcontent.ToXContentObject; +import org.opensearch.common.xcontent.XContentBuilder; +import org.opensearch.common.xcontent.XContentParser; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.opensearch.action.ValidateActions.addValidationError; + +/** + * Request to delete one or more PIT search contexts based on IDs. + */ +public class DeletePitRequest extends ActionRequest implements ToXContentObject { + + /** + * List of PIT IDs to be deleted , and use "_all" to delete all PIT reader contexts + */ + private final List pitIds = new ArrayList<>(); + + public DeletePitRequest(StreamInput in) throws IOException { + super(in); + pitIds.addAll(Arrays.asList(in.readStringArray())); + } + + public DeletePitRequest(String... pitIds) { + this.pitIds.addAll(Arrays.asList(pitIds)); + } + + public DeletePitRequest(List pitIds) { + this.pitIds.addAll(pitIds); + } + + public DeletePitRequest() {} + + public List getPitIds() { + return pitIds; + } + + @Override + public ActionRequestValidationException validate() { + ActionRequestValidationException validationException = null; + if (pitIds == null || pitIds.isEmpty()) { + validationException = addValidationError("no pit ids specified", validationException); + } + return validationException; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + if (pitIds == null) { + out.writeVInt(0); + } else { + out.writeStringArray(pitIds.toArray(new String[pitIds.size()])); + } + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { + builder.startObject(); + builder.startArray("pit_id"); + for (String pitId : pitIds) { + builder.value(pitId); + } + builder.endArray(); + builder.endObject(); + return builder; + } + + public void fromXContent(XContentParser parser) throws IOException { + pitIds.clear(); + if (parser.nextToken() != XContentParser.Token.START_OBJECT) { + throw new IllegalArgumentException("Malformed content, must start with an object"); + } else { + XContentParser.Token token; + String currentFieldName = null; + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + currentFieldName = parser.currentName(); + } else if ("pit_id".equals(currentFieldName)) { + if (token == XContentParser.Token.START_ARRAY) { + while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) { + if (token.isValue() == false) { + throw new IllegalArgumentException("pit_id array element should only contain pit_id"); + } + pitIds.add(parser.text()); + } + } else { + if (token.isValue() == false) { + throw new IllegalArgumentException("pit_id element should only contain pit_id"); + } + pitIds.add(parser.text()); + } + } else { + throw new IllegalArgumentException( + "Unknown parameter [" + currentFieldName + "] in request body or parameter is of the wrong type[" + token + "] " + ); + } + } + } + } + +} diff --git a/server/src/main/java/org/opensearch/action/search/DeletePitResponse.java b/server/src/main/java/org/opensearch/action/search/DeletePitResponse.java new file mode 100644 index 0000000000000..cdbeb3dc2b749 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/search/DeletePitResponse.java @@ -0,0 +1,101 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.opensearch.action.ActionResponse; +import org.opensearch.common.ParseField; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.xcontent.ConstructingObjectParser; +import org.opensearch.common.xcontent.StatusToXContentObject; +import org.opensearch.common.xcontent.ToXContent; +import org.opensearch.common.xcontent.XContentBuilder; +import org.opensearch.common.xcontent.XContentParser; +import org.opensearch.rest.RestStatus; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.opensearch.common.xcontent.ConstructingObjectParser.constructorArg; +import static org.opensearch.rest.RestStatus.NOT_FOUND; +import static org.opensearch.rest.RestStatus.OK; + +/** + * Response class for delete pits flow which clears the point in time search contexts + */ +public class DeletePitResponse extends ActionResponse implements StatusToXContentObject { + + private final List deletePitResults; + + public DeletePitResponse(List deletePitResults) { + this.deletePitResults = deletePitResults; + } + + public DeletePitResponse(StreamInput in) throws IOException { + super(in); + int size = in.readVInt(); + deletePitResults = new ArrayList<>(); + for (int i = 0; i < size; i++) { + deletePitResults.add(new DeletePitInfo(in)); + } + + } + + public List getDeletePitResults() { + return deletePitResults; + } + + /** + * @return Whether the attempt to delete PIT was successful. + */ + @Override + public RestStatus status() { + if (deletePitResults.isEmpty()) return NOT_FOUND; + return OK; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVInt(deletePitResults.size()); + for (DeletePitInfo deletePitResult : deletePitResults) { + deletePitResult.writeTo(out); + } + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { + builder.startObject(); + builder.startArray("pits"); + for (DeletePitInfo response : deletePitResults) { + response.toXContent(builder, params); + } + builder.endArray(); + builder.endObject(); + return builder; + } + + private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + "delete_pit_response", + true, + (Object[] parsedObjects) -> { + @SuppressWarnings("unchecked") + List deletePitInfoList = (List) parsedObjects[0]; + return new DeletePitResponse(deletePitInfoList); + } + ); + static { + PARSER.declareObjectArray(constructorArg(), DeletePitInfo.PARSER, new ParseField("pits")); + } + + public static DeletePitResponse fromXContent(XContentParser parser) throws IOException { + return PARSER.parse(parser, null); + } + +} diff --git a/server/src/main/java/org/opensearch/action/search/PitService.java b/server/src/main/java/org/opensearch/action/search/PitService.java new file mode 100644 index 0000000000000..4c5f1a1c0fc4f --- /dev/null +++ b/server/src/main/java/org/opensearch/action/search/PitService.java @@ -0,0 +1,133 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.opensearch.action.ActionListener; +import org.opensearch.action.StepListener; +import org.opensearch.action.support.GroupedActionListener; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.Strings; +import org.opensearch.common.inject.Inject; +import org.opensearch.transport.Transport; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.BiFunction; +import java.util.stream.Collectors; + +/** + * Service class for PIT reusable functions + */ +public class PitService { + + private static final Logger logger = LogManager.getLogger(PitService.class); + + private final ClusterService clusterService; + private final SearchTransportService searchTransportService; + + @Inject + public PitService(ClusterService clusterService, SearchTransportService searchTransportService) { + this.clusterService = clusterService; + this.searchTransportService = searchTransportService; + } + + /** + * Delete list of pit contexts. Returns the details of success of operation per PIT ID. + */ + public void deletePitContexts( + Map> nodeToContextsMap, + ActionListener listener + ) { + final Set clusters = nodeToContextsMap.values() + .stream() + .flatMap(Collection::stream) + .filter(ctx -> Strings.isEmpty(ctx.getSearchContextIdForNode().getClusterAlias()) == false) + .map(c -> c.getSearchContextIdForNode().getClusterAlias()) + .collect(Collectors.toSet()); + StepListener> lookupListener = (StepListener< + BiFunction>) SearchUtils.getConnectionLookupListener( + searchTransportService.getRemoteClusterService(), + clusterService.state(), + clusters + ); + lookupListener.whenComplete(nodeLookup -> { + final GroupedActionListener groupedListener = getDeletePitGroupedListener( + listener, + nodeToContextsMap.size() + ); + + for (Map.Entry> entry : nodeToContextsMap.entrySet()) { + String clusterAlias = entry.getValue().get(0).getSearchContextIdForNode().getClusterAlias(); + final DiscoveryNode node = nodeLookup.apply(clusterAlias, entry.getValue().get(0).getSearchContextIdForNode().getNode()); + if (node == null) { + logger.error( + () -> new ParameterizedMessage("node [{}] not found", entry.getValue().get(0).getSearchContextIdForNode().getNode()) + ); + List deletePitInfos = new ArrayList<>(); + for (PitSearchContextIdForNode pitSearchContextIdForNode : entry.getValue()) { + deletePitInfos.add(new DeletePitInfo(false, pitSearchContextIdForNode.getPitId())); + } + groupedListener.onResponse(new DeletePitResponse(deletePitInfos)); + } else { + try { + final Transport.Connection connection = searchTransportService.getConnection(clusterAlias, node); + searchTransportService.sendFreePITContexts(connection, entry.getValue(), groupedListener); + } catch (Exception e) { + logger.error(() -> new ParameterizedMessage("Delete PITs failed on node [{}]", node.getName()), e); + List deletePitInfos = new ArrayList<>(); + for (PitSearchContextIdForNode pitSearchContextIdForNode : entry.getValue()) { + deletePitInfos.add(new DeletePitInfo(false, pitSearchContextIdForNode.getPitId())); + } + groupedListener.onResponse(new DeletePitResponse(deletePitInfos)); + } + } + } + }, listener::onFailure); + } + + public GroupedActionListener getDeletePitGroupedListener(ActionListener listener, int size) { + return new GroupedActionListener<>(new ActionListener<>() { + @Override + public void onResponse(final Collection responses) { + Map pitIdToSucceededMap = new HashMap<>(); + for (DeletePitResponse response : responses) { + for (DeletePitInfo deletePitInfo : response.getDeletePitResults()) { + if (!pitIdToSucceededMap.containsKey(deletePitInfo.getPitId())) { + pitIdToSucceededMap.put(deletePitInfo.getPitId(), deletePitInfo.isSuccessful()); + } + if (!deletePitInfo.isSuccessful()) { + logger.debug(() -> new ParameterizedMessage("Deleting PIT with ID {} failed ", deletePitInfo.getPitId())); + pitIdToSucceededMap.put(deletePitInfo.getPitId(), deletePitInfo.isSuccessful()); + } + } + } + List deletePitResults = new ArrayList<>(); + for (Map.Entry entry : pitIdToSucceededMap.entrySet()) { + deletePitResults.add(new DeletePitInfo(entry.getValue(), entry.getKey())); + } + DeletePitResponse deletePitResponse = new DeletePitResponse(deletePitResults); + listener.onResponse(deletePitResponse); + } + + @Override + public void onFailure(final Exception e) { + logger.error("Delete PITs failed", e); + listener.onFailure(e); + } + }, size); + } +} diff --git a/server/src/main/java/org/opensearch/action/search/SearchContextIdForNode.java b/server/src/main/java/org/opensearch/action/search/SearchContextIdForNode.java index df655c39f845d..7f218a3b1a17e 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchContextIdForNode.java +++ b/server/src/main/java/org/opensearch/action/search/SearchContextIdForNode.java @@ -45,12 +45,12 @@ * * @opensearch.internal */ -final class SearchContextIdForNode implements Writeable { +public final class SearchContextIdForNode implements Writeable { private final String node; private final ShardSearchContextId searchContextId; private final String clusterAlias; - SearchContextIdForNode(@Nullable String clusterAlias, String node, ShardSearchContextId searchContextId) { + public SearchContextIdForNode(@Nullable String clusterAlias, String node, ShardSearchContextId searchContextId) { this.node = node; this.clusterAlias = clusterAlias; this.searchContextId = searchContextId; diff --git a/server/src/main/java/org/opensearch/action/search/SearchTransportService.java b/server/src/main/java/org/opensearch/action/search/SearchTransportService.java index e667fdff53312..23515dd28b329 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchTransportService.java +++ b/server/src/main/java/org/opensearch/action/search/SearchTransportService.java @@ -71,7 +71,9 @@ import org.opensearch.transport.TransportService; import java.io.IOException; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.function.BiFunction; @@ -87,6 +89,8 @@ public class SearchTransportService { public static final String FREE_CONTEXT_SCROLL_ACTION_NAME = "indices:data/read/search[free_context/scroll]"; public static final String FREE_CONTEXT_ACTION_NAME = "indices:data/read/search[free_context]"; public static final String CLEAR_SCROLL_CONTEXTS_ACTION_NAME = "indices:data/read/search[clear_scroll_contexts]"; + public static final String FREE_PIT_CONTEXT_ACTION_NAME = "indices:data/read/search[free_context/pit]"; + public static final String FREE_ALL_PIT_CONTEXTS_ACTION_NAME = "indices:data/read/search[free_pit_contexts]"; public static final String DFS_ACTION_NAME = "indices:data/read/search[phase/dfs]"; public static final String QUERY_ACTION_NAME = "indices:data/read/search[phase/query]"; public static final String QUERY_ID_ACTION_NAME = "indices:data/read/search[phase/query/id]"; @@ -200,6 +204,30 @@ public void sendClearAllScrollContexts(Transport.Connection connection, final Ac ); } + public void sendFreePITContexts( + Transport.Connection connection, + List contextIds, + ActionListener listener + ) { + transportService.sendRequest( + connection, + FREE_PIT_CONTEXT_ACTION_NAME, + new PitFreeContextsRequest(contextIds), + TransportRequestOptions.EMPTY, + new ActionListenerResponseHandler<>(listener, DeletePitResponse::new) + ); + } + + public void sendFreeAllPitContexts(Transport.Connection connection, final ActionListener listener) { + transportService.sendRequest( + connection, + FREE_ALL_PIT_CONTEXTS_ACTION_NAME, + TransportRequest.Empty.INSTANCE, + TransportRequestOptions.EMPTY, + new ActionListenerResponseHandler<>(listener, DeletePitResponse::new) + ); + } + public void sendExecuteDfs( Transport.Connection connection, final ShardSearchRequest request, @@ -370,6 +398,43 @@ public ShardSearchContextId id() { } + /** + * Request to free the PIT context based on id + */ + static class PitFreeContextsRequest extends TransportRequest { + private List contextIds; + + PitFreeContextsRequest(List contextIds) { + this.contextIds = new ArrayList<>(); + this.contextIds.addAll(contextIds); + } + + PitFreeContextsRequest(StreamInput in) throws IOException { + super(in); + int size = in.readVInt(); + if (size > 0) { + this.contextIds = new ArrayList<>(); + for (int i = 0; i < size; i++) { + PitSearchContextIdForNode contextId = new PitSearchContextIdForNode(in); + contextIds.add(contextId); + } + } + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeVInt(contextIds.size()); + for (PitSearchContextIdForNode contextId : contextIds) { + contextId.writeTo(out); + } + } + + public List getContextIds() { + return this.contextIds; + } + } + /** * A search free context request * @@ -454,6 +519,23 @@ public static void registerRequestHandler(TransportService transportService, Sea } ); TransportActionProxy.registerProxyAction(transportService, FREE_CONTEXT_SCROLL_ACTION_NAME, SearchFreeContextResponse::new); + + transportService.registerRequestHandler( + FREE_PIT_CONTEXT_ACTION_NAME, + ThreadPool.Names.SAME, + PitFreeContextsRequest::new, + (request, channel, task) -> { channel.sendResponse(searchService.freeReaderContextsIfFound(request.getContextIds())); } + ); + TransportActionProxy.registerProxyAction(transportService, FREE_PIT_CONTEXT_ACTION_NAME, DeletePitResponse::new); + + transportService.registerRequestHandler( + FREE_ALL_PIT_CONTEXTS_ACTION_NAME, + ThreadPool.Names.SAME, + TransportRequest.Empty::new, + (request, channel, task) -> { channel.sendResponse(searchService.freeAllPitContexts()); } + ); + TransportActionProxy.registerProxyAction(transportService, FREE_ALL_PIT_CONTEXTS_ACTION_NAME, DeletePitResponse::new); + transportService.registerRequestHandler( FREE_CONTEXT_ACTION_NAME, ThreadPool.Names.SAME, diff --git a/server/src/main/java/org/opensearch/action/search/TransportDeletePitAction.java b/server/src/main/java/org/opensearch/action/search/TransportDeletePitAction.java new file mode 100644 index 0000000000000..d67979d1c87c5 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/search/TransportDeletePitAction.java @@ -0,0 +1,101 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.opensearch.action.ActionListener; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.HandledTransportAction; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.io.stream.NamedWriteableRegistry; +import org.opensearch.tasks.Task; +import org.opensearch.transport.Transport; +import org.opensearch.transport.TransportService; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Transport action for deleting point in time searches - supports deleting list and all point in time searches + */ +public class TransportDeletePitAction extends HandledTransportAction { + private final NamedWriteableRegistry namedWriteableRegistry; + private TransportSearchAction transportSearchAction; + private final ClusterService clusterService; + private final SearchTransportService searchTransportService; + private final PitService pitService; + + @Inject + public TransportDeletePitAction( + TransportService transportService, + ActionFilters actionFilters, + NamedWriteableRegistry namedWriteableRegistry, + TransportSearchAction transportSearchAction, + ClusterService clusterService, + SearchTransportService searchTransportService, + PitService pitService + ) { + super(DeletePitAction.NAME, transportService, actionFilters, DeletePitRequest::new); + this.namedWriteableRegistry = namedWriteableRegistry; + this.transportSearchAction = transportSearchAction; + this.clusterService = clusterService; + this.searchTransportService = searchTransportService; + this.pitService = pitService; + } + + /** + * Invoke 'delete all pits' or 'delete list of pits' workflow based on request + */ + @Override + protected void doExecute(Task task, DeletePitRequest request, ActionListener listener) { + List pitIds = request.getPitIds(); + if (pitIds.size() == 1 && "_all".equals(pitIds.get(0))) { + deleteAllPits(listener); + } else { + deletePits(listener, request); + } + } + + /** + * Deletes one or more point in time search contexts. + */ + private void deletePits(ActionListener listener, DeletePitRequest request) { + Map> nodeToContextsMap = new HashMap<>(); + for (String pitId : request.getPitIds()) { + SearchContextId contextId = SearchContextId.decode(namedWriteableRegistry, pitId); + for (SearchContextIdForNode contextIdForNode : contextId.shards().values()) { + PitSearchContextIdForNode pitSearchContext = new PitSearchContextIdForNode(pitId, contextIdForNode); + List contexts = nodeToContextsMap.getOrDefault(contextIdForNode.getNode(), new ArrayList<>()); + contexts.add(pitSearchContext); + nodeToContextsMap.put(contextIdForNode.getNode(), contexts); + } + } + pitService.deletePitContexts(nodeToContextsMap, listener); + } + + /** + * Delete all active PIT reader contexts + */ + private void deleteAllPits(ActionListener listener) { + // TODO: Use list all PITs to delete all PITs in case of remote cluster use case + int size = clusterService.state().getNodes().getSize(); + ActionListener groupedActionListener = pitService.getDeletePitGroupedListener(listener, size); + for (final DiscoveryNode node : clusterService.state().getNodes()) { + try { + Transport.Connection connection = searchTransportService.getConnection(null, node); + searchTransportService.sendFreeAllPitContexts(connection, groupedActionListener); + } catch (Exception e) { + groupedActionListener.onFailure(e); + } + } + } +} diff --git a/server/src/main/java/org/opensearch/client/Client.java b/server/src/main/java/org/opensearch/client/Client.java index a73f8200ab277..1d3bbfcba43f9 100644 --- a/server/src/main/java/org/opensearch/client/Client.java +++ b/server/src/main/java/org/opensearch/client/Client.java @@ -60,6 +60,8 @@ import org.opensearch.action.search.ClearScrollResponse; import org.opensearch.action.search.CreatePitRequest; import org.opensearch.action.search.CreatePitResponse; +import org.opensearch.action.search.DeletePitRequest; +import org.opensearch.action.search.DeletePitResponse; import org.opensearch.action.search.MultiSearchRequest; import org.opensearch.action.search.MultiSearchRequestBuilder; import org.opensearch.action.search.MultiSearchResponse; @@ -332,6 +334,11 @@ public interface Client extends OpenSearchClient, Releasable { */ void createPit(CreatePitRequest createPITRequest, ActionListener listener); + /** + * Delete one or more point in time contexts + */ + void deletePits(DeletePitRequest deletePITRequest, ActionListener listener); + /** * Performs multiple search requests. */ diff --git a/server/src/main/java/org/opensearch/client/support/AbstractClient.java b/server/src/main/java/org/opensearch/client/support/AbstractClient.java index 6cc0827310bd1..f99454a8a8913 100644 --- a/server/src/main/java/org/opensearch/client/support/AbstractClient.java +++ b/server/src/main/java/org/opensearch/client/support/AbstractClient.java @@ -327,6 +327,9 @@ import org.opensearch.action.search.CreatePitAction; import org.opensearch.action.search.CreatePitRequest; import org.opensearch.action.search.CreatePitResponse; +import org.opensearch.action.search.DeletePitAction; +import org.opensearch.action.search.DeletePitRequest; +import org.opensearch.action.search.DeletePitResponse; import org.opensearch.action.search.MultiSearchAction; import org.opensearch.action.search.MultiSearchRequest; import org.opensearch.action.search.MultiSearchRequestBuilder; @@ -582,6 +585,11 @@ public void createPit(final CreatePitRequest createPITRequest, final ActionListe execute(CreatePitAction.INSTANCE, createPITRequest, listener); } + @Override + public void deletePits(final DeletePitRequest deletePITRequest, final ActionListener listener) { + execute(DeletePitAction.INSTANCE, deletePITRequest, listener); + } + @Override public ActionFuture multiSearch(MultiSearchRequest request) { return execute(MultiSearchAction.INSTANCE, request); diff --git a/server/src/main/java/org/opensearch/search/SearchService.java b/server/src/main/java/org/opensearch/search/SearchService.java index 74527f26ead9e..747b13b031824 100644 --- a/server/src/main/java/org/opensearch/search/SearchService.java +++ b/server/src/main/java/org/opensearch/search/SearchService.java @@ -41,6 +41,9 @@ import org.opensearch.action.ActionListener; import org.opensearch.action.ActionRunnable; import org.opensearch.action.OriginalIndices; +import org.opensearch.action.search.DeletePitInfo; +import org.opensearch.action.search.DeletePitResponse; +import org.opensearch.action.search.PitSearchContextIdForNode; import org.opensearch.action.search.SearchRequest; import org.opensearch.action.search.SearchShardTask; import org.opensearch.action.search.SearchType; @@ -138,6 +141,7 @@ import org.opensearch.transport.TransportRequest; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -1063,6 +1067,57 @@ public void freeAllScrollContexts() { } } + /** + * Free reader contexts if found + * @return response with list of PIT IDs deleted and if operation is successful + */ + public DeletePitResponse freeReaderContextsIfFound(List contextIds) { + List deleteResults = new ArrayList<>(); + for (PitSearchContextIdForNode contextId : contextIds) { + try { + if (getReaderContext(contextId.getSearchContextIdForNode().getSearchContextId()) != null) { + try (ReaderContext context = removeReaderContext(contextId.getSearchContextIdForNode().getSearchContextId().getId())) { + PitReaderContext pitReaderContext = (PitReaderContext) context; + if (context == null) { + DeletePitInfo deletePitInfo = new DeletePitInfo(true, contextId.getPitId()); + deleteResults.add(deletePitInfo); + continue; + } + String pitId = pitReaderContext.getPitId(); + boolean success = context != null; + DeletePitInfo deletePitInfo = new DeletePitInfo(success, pitId); + deleteResults.add(deletePitInfo); + } + } else { + // For search context missing cases, mark the operation as succeeded + DeletePitInfo deletePitInfo = new DeletePitInfo(true, contextId.getPitId()); + deleteResults.add(deletePitInfo); + } + } catch (SearchContextMissingException e) { + // For search context missing cases, mark the operation as succeeded + DeletePitInfo deletePitInfo = new DeletePitInfo(true, contextId.getPitId()); + deleteResults.add(deletePitInfo); + } + } + return new DeletePitResponse(deleteResults); + } + + /** + * Free all active pit contexts + * @return response with list of PIT IDs deleted and if operation is successful + */ + public DeletePitResponse freeAllPitContexts() { + List deleteResults = new ArrayList<>(); + for (ReaderContext readerContext : activeReaders.values()) { + if (readerContext instanceof PitReaderContext) { + boolean result = freeReaderContext(readerContext.id()); + DeletePitInfo deletePitInfo = new DeletePitInfo(result, ((PitReaderContext) readerContext).getPitId()); + deleteResults.add(deletePitInfo); + } + } + return new DeletePitResponse(deleteResults); + } + private long getKeepAlive(ShardSearchRequest request) { if (request.scroll() != null) { return getScrollKeepAlive(request.scroll()); diff --git a/server/src/test/java/org/opensearch/action/search/CreatePitControllerTests.java b/server/src/test/java/org/opensearch/action/search/CreatePitControllerTests.java index 09c5bd834f7d5..2c65d6ffcc813 100644 --- a/server/src/test/java/org/opensearch/action/search/CreatePitControllerTests.java +++ b/server/src/test/java/org/opensearch/action/search/CreatePitControllerTests.java @@ -39,6 +39,7 @@ import org.opensearch.transport.RemoteClusterConnectionTests; import org.opensearch.transport.Transport; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -161,6 +162,7 @@ public void onFailure(Exception e) { */ public void testUpdatePitAfterCreatePitSuccess() throws InterruptedException { List updateNodesInvoked = new CopyOnWriteArrayList<>(); + List deleteNodesInvoked = new CopyOnWriteArrayList<>(); List knownNodes = new CopyOnWriteArrayList<>(); try ( MockTransportService cluster1Transport = startTransport("cluster_1_node", knownNodes, Version.CURRENT); @@ -192,6 +194,20 @@ public void updatePitContext( t.start(); } + /** + * Test if cleanup request is called + */ + @Override + public void sendFreePITContexts( + Transport.Connection connection, + List contextIds, + ActionListener listener + ) { + deleteNodesInvoked.add(connection.getNode()); + Thread t = new Thread(() -> listener.onResponse(new DeletePitResponse(new ArrayList<>()))); + t.start(); + } + @Override public Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) { return new SearchAsyncActionTests.MockConnection(node); @@ -203,11 +219,13 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod CreatePitRequest request = new CreatePitRequest(TimeValue.timeValueDays(1), true); request.setIndices(new String[] { "index" }); + PitService pitService = new PitService(clusterServiceMock, searchTransportService); CreatePitController controller = new CreatePitController( searchTransportService, clusterServiceMock, transportSearchAction, - namedWriteableRegistry + namedWriteableRegistry, + pitService ); ActionListener updatelistener = new LatchedActionListener<>(new ActionListener() { @@ -227,6 +245,7 @@ public void onFailure(Exception e) { createListener.onResponse(searchResponse); latch.await(); assertEquals(3, updateNodesInvoked.size()); + assertEquals(0, deleteNodesInvoked.size()); } } } @@ -236,6 +255,7 @@ public void onFailure(Exception e) { */ public void testUpdatePitAfterCreatePitFailure() throws InterruptedException { List updateNodesInvoked = new CopyOnWriteArrayList<>(); + List deleteNodesInvoked = new CopyOnWriteArrayList<>(); List knownNodes = new CopyOnWriteArrayList<>(); try ( MockTransportService cluster1Transport = startTransport("cluster_1_node", knownNodes, Version.CURRENT); @@ -271,17 +291,30 @@ public void updatePitContext( public Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) { return new SearchAsyncActionTests.MockConnection(node); } + + @Override + public void sendFreePITContexts( + Transport.Connection connection, + List contextIds, + ActionListener listener + ) { + deleteNodesInvoked.add(connection.getNode()); + Thread t = new Thread(() -> listener.onResponse(new DeletePitResponse(new ArrayList<>()))); + t.start(); + } }; CountDownLatch latch = new CountDownLatch(1); CreatePitRequest request = new CreatePitRequest(TimeValue.timeValueDays(1), true); request.setIndices(new String[] { "index" }); + PitService pitService = new PitService(clusterServiceMock, searchTransportService); CreatePitController controller = new CreatePitController( searchTransportService, clusterServiceMock, transportSearchAction, - namedWriteableRegistry + namedWriteableRegistry, + pitService ); ActionListener updatelistener = new LatchedActionListener<>(new ActionListener() { @@ -302,6 +335,10 @@ public void onFailure(Exception e) { createListener.onFailure(new Exception("Exception occurred in phase 1")); latch.await(); assertEquals(0, updateNodesInvoked.size()); + /** + * cleanup is not called on create pit phase one failure + */ + assertEquals(0, deleteNodesInvoked.size()); } } } @@ -311,6 +348,7 @@ public void onFailure(Exception e) { */ public void testUpdatePitFailureForNodeDrop() throws InterruptedException { List updateNodesInvoked = new CopyOnWriteArrayList<>(); + List deleteNodesInvoked = new CopyOnWriteArrayList<>(); List knownNodes = new CopyOnWriteArrayList<>(); try ( MockTransportService cluster1Transport = startTransport("cluster_1_node", knownNodes, Version.CURRENT); @@ -349,6 +387,17 @@ public void updatePitContext( } } + @Override + public void sendFreePITContexts( + Transport.Connection connection, + List contextIds, + ActionListener listener + ) { + deleteNodesInvoked.add(connection.getNode()); + Thread t = new Thread(() -> listener.onResponse(new DeletePitResponse(new ArrayList<>()))); + t.start(); + } + @Override public Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) { return new SearchAsyncActionTests.MockConnection(node); @@ -357,11 +406,13 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod CreatePitRequest request = new CreatePitRequest(TimeValue.timeValueDays(1), true); request.setIndices(new String[] { "index" }); + PitService pitService = new PitService(clusterServiceMock, searchTransportService); CreatePitController controller = new CreatePitController( searchTransportService, clusterServiceMock, transportSearchAction, - namedWriteableRegistry + namedWriteableRegistry, + pitService ); CountDownLatch latch = new CountDownLatch(1); @@ -383,12 +434,17 @@ public void onFailure(Exception e) { createListener.onResponse(searchResponse); latch.await(); assertEquals(3, updateNodesInvoked.size()); + /** + * check if cleanup is called for all nodes in case of update pit failure + */ + assertEquals(3, deleteNodesInvoked.size()); } } } public void testUpdatePitFailureWhereAllNodesDown() throws InterruptedException { List updateNodesInvoked = new CopyOnWriteArrayList<>(); + List deleteNodesInvoked = new CopyOnWriteArrayList<>(); List knownNodes = new CopyOnWriteArrayList<>(); try ( MockTransportService cluster1Transport = startTransport("cluster_1_node", knownNodes, Version.CURRENT); @@ -420,6 +476,17 @@ public void updatePitContext( t.start(); } + @Override + public void sendFreePITContexts( + Transport.Connection connection, + List contextIds, + ActionListener listener + ) { + deleteNodesInvoked.add(connection.getNode()); + Thread t = new Thread(() -> listener.onResponse(new DeletePitResponse(new ArrayList<>()))); + t.start(); + } + @Override public Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) { return new SearchAsyncActionTests.MockConnection(node); @@ -427,11 +494,13 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod }; CreatePitRequest request = new CreatePitRequest(TimeValue.timeValueDays(1), true); request.setIndices(new String[] { "index" }); + PitService pitService = new PitService(clusterServiceMock, searchTransportService); CreatePitController controller = new CreatePitController( searchTransportService, clusterServiceMock, transportSearchAction, - namedWriteableRegistry + namedWriteableRegistry, + pitService ); CountDownLatch latch = new CountDownLatch(1); @@ -453,8 +522,11 @@ public void onFailure(Exception e) { createListener.onResponse(searchResponse); latch.await(); assertEquals(3, updateNodesInvoked.size()); + /** + * check if cleanup is called for all nodes in case of update pit failure + */ + assertEquals(3, deleteNodesInvoked.size()); } } } - } diff --git a/server/src/test/java/org/opensearch/action/search/TransportDeletePitActionTests.java b/server/src/test/java/org/opensearch/action/search/TransportDeletePitActionTests.java new file mode 100644 index 0000000000000..e2db4fdbc97ef --- /dev/null +++ b/server/src/test/java/org/opensearch/action/search/TransportDeletePitActionTests.java @@ -0,0 +1,638 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.action.search; + +import org.apache.lucene.search.TotalHits; +import org.junit.Before; +import org.opensearch.Version; +import org.opensearch.action.ActionListener; +import org.opensearch.action.support.ActionFilter; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.PlainActionFuture; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.io.stream.NamedWriteableRegistry; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.query.IdsQueryBuilder; +import org.opensearch.index.query.MatchAllQueryBuilder; +import org.opensearch.index.query.QueryBuilder; +import org.opensearch.index.query.TermQueryBuilder; +import org.opensearch.search.SearchHit; +import org.opensearch.search.SearchHits; +import org.opensearch.search.aggregations.InternalAggregations; +import org.opensearch.search.internal.InternalSearchResponse; +import org.opensearch.tasks.Task; +import org.opensearch.tasks.TaskId; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.test.transport.MockTransportService; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.RemoteClusterConnectionTests; +import org.opensearch.transport.Transport; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.Matchers.containsString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.opensearch.action.search.PitTestsUtil.getPitId; +import static org.opensearch.action.support.PlainActionFuture.newFuture; + +/** + * Functional tests for transport delete pit action + */ +public class TransportDeletePitActionTests extends OpenSearchTestCase { + DiscoveryNode node1 = null; + DiscoveryNode node2 = null; + DiscoveryNode node3 = null; + String pitId = null; + TransportSearchAction transportSearchAction = null; + Task task = null; + DiscoveryNodes nodes = null; + NamedWriteableRegistry namedWriteableRegistry = null; + ClusterService clusterServiceMock = null; + Settings settings = Settings.builder().put("node.name", TransportMultiSearchActionTests.class.getSimpleName()).build(); + private ThreadPool threadPool = new ThreadPool(settings); + + @Override + public void tearDown() throws Exception { + super.tearDown(); + ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS); + } + + private MockTransportService startTransport(String id, List knownNodes, Version version) { + return startTransport(id, knownNodes, version, Settings.EMPTY); + } + + private MockTransportService startTransport( + final String id, + final List knownNodes, + final Version version, + final Settings settings + ) { + return RemoteClusterConnectionTests.startTransport(id, knownNodes, version, threadPool, settings); + } + + @Before + public void setupData() { + node1 = new DiscoveryNode("node_1", buildNewFakeTransportAddress(), Version.CURRENT); + node2 = new DiscoveryNode("node_2", buildNewFakeTransportAddress(), Version.CURRENT); + node3 = new DiscoveryNode("node_3", buildNewFakeTransportAddress(), Version.CURRENT); + pitId = getPitId(); + namedWriteableRegistry = new NamedWriteableRegistry( + Arrays.asList( + new NamedWriteableRegistry.Entry(QueryBuilder.class, TermQueryBuilder.NAME, TermQueryBuilder::new), + new NamedWriteableRegistry.Entry(QueryBuilder.class, MatchAllQueryBuilder.NAME, MatchAllQueryBuilder::new), + new NamedWriteableRegistry.Entry(QueryBuilder.class, IdsQueryBuilder.NAME, IdsQueryBuilder::new) + ) + ); + nodes = DiscoveryNodes.builder().add(node1).add(node2).add(node3).build(); + transportSearchAction = mock(TransportSearchAction.class); + task = new Task( + randomLong(), + "transport", + SearchAction.NAME, + "description", + new TaskId(randomLong() + ":" + randomLong()), + Collections.emptyMap() + ); + InternalSearchResponse response = new InternalSearchResponse( + new SearchHits(new SearchHit[0], new TotalHits(0, TotalHits.Relation.EQUAL_TO), Float.NaN), + InternalAggregations.EMPTY, + null, + null, + false, + null, + 1 + ); + + clusterServiceMock = mock(ClusterService.class); + ClusterState state = mock(ClusterState.class); + + final Settings keepAliveSettings = Settings.builder().put(CreatePitController.PIT_INIT_KEEP_ALIVE.getKey(), 30000).build(); + when(clusterServiceMock.getSettings()).thenReturn(keepAliveSettings); + + when(state.getMetadata()).thenReturn(Metadata.EMPTY_METADATA); + when(state.metadata()).thenReturn(Metadata.EMPTY_METADATA); + when(clusterServiceMock.state()).thenReturn(state); + when(state.getNodes()).thenReturn(nodes); + } + + /** + * Test if transport call for update pit is made to all nodes present as part of PIT ID returned from phase one of create pit + */ + public void testDeletePitSuccess() throws InterruptedException, ExecutionException { + List deleteNodesInvoked = new CopyOnWriteArrayList<>(); + ActionFilters actionFilters = mock(ActionFilters.class); + when(actionFilters.filters()).thenReturn(new ActionFilter[0]); + List knownNodes = new CopyOnWriteArrayList<>(); + try ( + MockTransportService cluster1Transport = startTransport("cluster_1_node", knownNodes, Version.CURRENT); + MockTransportService cluster2Transport = startTransport("cluster_2_node", knownNodes, Version.CURRENT) + ) { + knownNodes.add(cluster1Transport.getLocalDiscoNode()); + knownNodes.add(cluster2Transport.getLocalDiscoNode()); + Collections.shuffle(knownNodes, random()); + + try ( + MockTransportService transportService = MockTransportService.createNewService( + Settings.EMPTY, + Version.CURRENT, + threadPool, + null + ) + ) { + transportService.start(); + transportService.acceptIncomingRequests(); + SearchTransportService searchTransportService = new SearchTransportService(transportService, null) { + + @Override + public void sendFreePITContexts( + Transport.Connection connection, + List contextIds, + ActionListener listener + ) { + deleteNodesInvoked.add(connection.getNode()); + DeletePitInfo deletePitInfo = new DeletePitInfo(true, "pitId"); + List deletePitInfos = new ArrayList<>(); + deletePitInfos.add(deletePitInfo); + Thread t = new Thread(() -> listener.onResponse(new DeletePitResponse(deletePitInfos))); + t.start(); + } + + @Override + public Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) { + return new SearchAsyncActionTests.MockConnection(node); + } + }; + PitService pitService = new PitService(clusterServiceMock, searchTransportService); + TransportDeletePitAction action = new TransportDeletePitAction( + transportService, + actionFilters, + namedWriteableRegistry, + transportSearchAction, + clusterServiceMock, + searchTransportService, + pitService + ); + DeletePitRequest deletePITRequest = new DeletePitRequest(pitId); + PlainActionFuture future = newFuture(); + action.execute(task, deletePITRequest, future); + DeletePitResponse dr = future.get(); + assertTrue(dr.getDeletePitResults().get(0).getPitId().equals("pitId")); + assertTrue(dr.getDeletePitResults().get(0).isSuccessful()); + assertEquals(3, deleteNodesInvoked.size()); + + } + } + } + + public void testDeleteAllPITSuccess() throws InterruptedException, ExecutionException { + List deleteNodesInvoked = new CopyOnWriteArrayList<>(); + ActionFilters actionFilters = mock(ActionFilters.class); + when(actionFilters.filters()).thenReturn(new ActionFilter[0]); + List knownNodes = new CopyOnWriteArrayList<>(); + try ( + MockTransportService cluster1Transport = startTransport("cluster_1_node", knownNodes, Version.CURRENT); + MockTransportService cluster2Transport = startTransport("cluster_2_node", knownNodes, Version.CURRENT) + ) { + knownNodes.add(cluster1Transport.getLocalDiscoNode()); + knownNodes.add(cluster2Transport.getLocalDiscoNode()); + Collections.shuffle(knownNodes, random()); + + try ( + MockTransportService transportService = MockTransportService.createNewService( + Settings.EMPTY, + Version.CURRENT, + threadPool, + null + ) + ) { + transportService.start(); + transportService.acceptIncomingRequests(); + SearchTransportService searchTransportService = new SearchTransportService(transportService, null) { + public void sendFreeAllPitContexts(Transport.Connection connection, final ActionListener listener) { + deleteNodesInvoked.add(connection.getNode()); + DeletePitInfo deletePitInfo = new DeletePitInfo(true, "pitId"); + List deletePitInfos = new ArrayList<>(); + deletePitInfos.add(deletePitInfo); + Thread t = new Thread(() -> listener.onResponse(new DeletePitResponse(deletePitInfos))); + t.start(); + } + + @Override + public Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) { + return new SearchAsyncActionTests.MockConnection(node); + } + }; + PitService pitService = new PitService(clusterServiceMock, searchTransportService); + TransportDeletePitAction action = new TransportDeletePitAction( + transportService, + actionFilters, + namedWriteableRegistry, + transportSearchAction, + clusterServiceMock, + searchTransportService, + pitService + ); + DeletePitRequest deletePITRequest = new DeletePitRequest("_all"); + PlainActionFuture future = newFuture(); + action.execute(task, deletePITRequest, future); + DeletePitResponse dr = future.get(); + assertTrue(dr.getDeletePitResults().get(0).getPitId().equals("pitId")); + assertTrue(dr.getDeletePitResults().get(0).isSuccessful()); + assertEquals(3, deleteNodesInvoked.size()); + + } + } + } + + public void testDeletePitWhenNodeIsDown() throws InterruptedException, ExecutionException { + List deleteNodesInvoked = new CopyOnWriteArrayList<>(); + ActionFilters actionFilters = mock(ActionFilters.class); + when(actionFilters.filters()).thenReturn(new ActionFilter[0]); + List knownNodes = new CopyOnWriteArrayList<>(); + try ( + MockTransportService cluster1Transport = startTransport("cluster_1_node", knownNodes, Version.CURRENT); + MockTransportService cluster2Transport = startTransport("cluster_2_node", knownNodes, Version.CURRENT) + ) { + knownNodes.add(cluster1Transport.getLocalDiscoNode()); + knownNodes.add(cluster2Transport.getLocalDiscoNode()); + Collections.shuffle(knownNodes, random()); + + try ( + MockTransportService transportService = MockTransportService.createNewService( + Settings.EMPTY, + Version.CURRENT, + threadPool, + null + ) + ) { + transportService.start(); + transportService.acceptIncomingRequests(); + SearchTransportService searchTransportService = new SearchTransportService(transportService, null) { + + @Override + public void sendFreePITContexts( + Transport.Connection connection, + List contextIds, + ActionListener listener + ) { + deleteNodesInvoked.add(connection.getNode()); + + if (connection.getNode().getId() == "node_3") { + Thread t = new Thread(() -> listener.onFailure(new Exception("node 3 down"))); + t.start(); + } else { + Thread t = new Thread(() -> listener.onResponse(new DeletePitResponse(new ArrayList<>()))); + t.start(); + } + } + + @Override + public Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) { + return new SearchAsyncActionTests.MockConnection(node); + } + }; + PitService pitService = new PitService(clusterServiceMock, searchTransportService); + TransportDeletePitAction action = new TransportDeletePitAction( + transportService, + actionFilters, + namedWriteableRegistry, + transportSearchAction, + clusterServiceMock, + searchTransportService, + pitService + ); + DeletePitRequest deletePITRequest = new DeletePitRequest(pitId); + PlainActionFuture future = newFuture(); + action.execute(task, deletePITRequest, future); + Exception e = assertThrows(ExecutionException.class, () -> future.get()); + assertThat(e.getMessage(), containsString("node 3 down")); + assertEquals(3, deleteNodesInvoked.size()); + } + } + } + + public void testDeletePitWhenAllNodesAreDown() { + List deleteNodesInvoked = new CopyOnWriteArrayList<>(); + ActionFilters actionFilters = mock(ActionFilters.class); + when(actionFilters.filters()).thenReturn(new ActionFilter[0]); + List knownNodes = new CopyOnWriteArrayList<>(); + try ( + MockTransportService cluster1Transport = startTransport("cluster_1_node", knownNodes, Version.CURRENT); + MockTransportService cluster2Transport = startTransport("cluster_2_node", knownNodes, Version.CURRENT) + ) { + knownNodes.add(cluster1Transport.getLocalDiscoNode()); + knownNodes.add(cluster2Transport.getLocalDiscoNode()); + Collections.shuffle(knownNodes, random()); + try ( + MockTransportService transportService = MockTransportService.createNewService( + Settings.EMPTY, + Version.CURRENT, + threadPool, + null + ) + ) { + transportService.start(); + transportService.acceptIncomingRequests(); + SearchTransportService searchTransportService = new SearchTransportService(transportService, null) { + @Override + public void sendFreePITContexts( + Transport.Connection connection, + List contextIds, + ActionListener listener + ) { + deleteNodesInvoked.add(connection.getNode()); + Thread t = new Thread(() -> listener.onFailure(new Exception("node 3 down"))); + t.start(); + } + + @Override + public Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) { + return new SearchAsyncActionTests.MockConnection(node); + } + }; + PitService pitService = new PitService(clusterServiceMock, searchTransportService); + TransportDeletePitAction action = new TransportDeletePitAction( + transportService, + actionFilters, + namedWriteableRegistry, + transportSearchAction, + clusterServiceMock, + searchTransportService, + pitService + ); + DeletePitRequest deletePITRequest = new DeletePitRequest(pitId); + PlainActionFuture future = newFuture(); + action.execute(task, deletePITRequest, future); + Exception e = assertThrows(ExecutionException.class, () -> future.get()); + assertThat(e.getMessage(), containsString("node 3 down")); + assertEquals(3, deleteNodesInvoked.size()); + } + } + } + + public void testDeletePitFailure() { + List deleteNodesInvoked = new CopyOnWriteArrayList<>(); + ActionFilters actionFilters = mock(ActionFilters.class); + when(actionFilters.filters()).thenReturn(new ActionFilter[0]); + + List knownNodes = new CopyOnWriteArrayList<>(); + try ( + MockTransportService cluster1Transport = startTransport("cluster_1_node", knownNodes, Version.CURRENT); + MockTransportService cluster2Transport = startTransport("cluster_2_node", knownNodes, Version.CURRENT) + ) { + knownNodes.add(cluster1Transport.getLocalDiscoNode()); + knownNodes.add(cluster2Transport.getLocalDiscoNode()); + Collections.shuffle(knownNodes, random()); + + try ( + MockTransportService transportService = MockTransportService.createNewService( + Settings.EMPTY, + Version.CURRENT, + threadPool, + null + ) + ) { + transportService.start(); + transportService.acceptIncomingRequests(); + SearchTransportService searchTransportService = new SearchTransportService(transportService, null) { + + @Override + public void sendFreePITContexts( + Transport.Connection connection, + List contextId, + ActionListener listener + ) { + deleteNodesInvoked.add(connection.getNode()); + + if (connection.getNode().getId() == "node_3") { + Thread t = new Thread(() -> listener.onFailure(new Exception("node down"))); + t.start(); + } else { + Thread t = new Thread(() -> listener.onResponse(new DeletePitResponse(new ArrayList<>()))); + t.start(); + } + } + + @Override + public Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) { + return new SearchAsyncActionTests.MockConnection(node); + } + }; + PitService pitService = new PitService(clusterServiceMock, searchTransportService); + TransportDeletePitAction action = new TransportDeletePitAction( + transportService, + actionFilters, + namedWriteableRegistry, + transportSearchAction, + clusterServiceMock, + searchTransportService, + pitService + ); + DeletePitRequest deletePITRequest = new DeletePitRequest(pitId); + PlainActionFuture future = newFuture(); + action.execute(task, deletePITRequest, future); + Exception e = assertThrows(ExecutionException.class, () -> future.get()); + assertThat(e.getMessage(), containsString("node down")); + assertEquals(3, deleteNodesInvoked.size()); + } + } + } + + public void testDeleteAllPitWhenNodeIsDown() { + List deleteNodesInvoked = new CopyOnWriteArrayList<>(); + ActionFilters actionFilters = mock(ActionFilters.class); + when(actionFilters.filters()).thenReturn(new ActionFilter[0]); + + List knownNodes = new CopyOnWriteArrayList<>(); + try ( + MockTransportService cluster1Transport = startTransport("cluster_1_node", knownNodes, Version.CURRENT); + MockTransportService cluster2Transport = startTransport("cluster_2_node", knownNodes, Version.CURRENT) + ) { + knownNodes.add(cluster1Transport.getLocalDiscoNode()); + knownNodes.add(cluster2Transport.getLocalDiscoNode()); + Collections.shuffle(knownNodes, random()); + + try ( + MockTransportService transportService = MockTransportService.createNewService( + Settings.EMPTY, + Version.CURRENT, + threadPool, + null + ) + ) { + transportService.start(); + transportService.acceptIncomingRequests(); + SearchTransportService searchTransportService = new SearchTransportService(transportService, null) { + @Override + public void sendFreeAllPitContexts(Transport.Connection connection, final ActionListener listener) { + deleteNodesInvoked.add(connection.getNode()); + if (connection.getNode().getId() == "node_3") { + Thread t = new Thread(() -> listener.onFailure(new Exception("node 3 down"))); + t.start(); + } else { + Thread t = new Thread(() -> listener.onResponse(new DeletePitResponse(new ArrayList<>()))); + t.start(); + } + } + + @Override + public Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) { + return new SearchAsyncActionTests.MockConnection(node); + } + }; + PitService pitService = new PitService(clusterServiceMock, searchTransportService); + TransportDeletePitAction action = new TransportDeletePitAction( + transportService, + actionFilters, + namedWriteableRegistry, + transportSearchAction, + clusterServiceMock, + searchTransportService, + pitService + ); + DeletePitRequest deletePITRequest = new DeletePitRequest("_all"); + PlainActionFuture future = newFuture(); + action.execute(task, deletePITRequest, future); + Exception e = assertThrows(ExecutionException.class, () -> future.get()); + assertThat(e.getMessage(), containsString("node 3 down")); + assertEquals(3, deleteNodesInvoked.size()); + } + } + } + + public void testDeleteAllPitWhenAllNodesAreDown() { + List deleteNodesInvoked = new CopyOnWriteArrayList<>(); + ActionFilters actionFilters = mock(ActionFilters.class); + when(actionFilters.filters()).thenReturn(new ActionFilter[0]); + + List knownNodes = new CopyOnWriteArrayList<>(); + try ( + MockTransportService cluster1Transport = startTransport("cluster_1_node", knownNodes, Version.CURRENT); + MockTransportService cluster2Transport = startTransport("cluster_2_node", knownNodes, Version.CURRENT) + ) { + knownNodes.add(cluster1Transport.getLocalDiscoNode()); + knownNodes.add(cluster2Transport.getLocalDiscoNode()); + Collections.shuffle(knownNodes, random()); + + try ( + MockTransportService transportService = MockTransportService.createNewService( + Settings.EMPTY, + Version.CURRENT, + threadPool, + null + ) + ) { + transportService.start(); + transportService.acceptIncomingRequests(); + SearchTransportService searchTransportService = new SearchTransportService(transportService, null) { + + @Override + public void sendFreeAllPitContexts(Transport.Connection connection, final ActionListener listener) { + deleteNodesInvoked.add(connection.getNode()); + Thread t = new Thread(() -> listener.onFailure(new Exception("node down"))); + t.start(); + } + + @Override + public Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) { + return new SearchAsyncActionTests.MockConnection(node); + } + }; + PitService pitService = new PitService(clusterServiceMock, searchTransportService); + TransportDeletePitAction action = new TransportDeletePitAction( + transportService, + actionFilters, + namedWriteableRegistry, + transportSearchAction, + clusterServiceMock, + searchTransportService, + pitService + ); + DeletePitRequest deletePITRequest = new DeletePitRequest("_all"); + PlainActionFuture future = newFuture(); + action.execute(task, deletePITRequest, future); + Exception e = assertThrows(ExecutionException.class, () -> future.get()); + assertThat(e.getMessage(), containsString("node down")); + assertEquals(3, deleteNodesInvoked.size()); + } + } + } + + public void testDeleteAllPitFailure() { + List deleteNodesInvoked = new CopyOnWriteArrayList<>(); + ActionFilters actionFilters = mock(ActionFilters.class); + when(actionFilters.filters()).thenReturn(new ActionFilter[0]); + + List knownNodes = new CopyOnWriteArrayList<>(); + try ( + MockTransportService cluster1Transport = startTransport("cluster_1_node", knownNodes, Version.CURRENT); + MockTransportService cluster2Transport = startTransport("cluster_2_node", knownNodes, Version.CURRENT) + ) { + knownNodes.add(cluster1Transport.getLocalDiscoNode()); + knownNodes.add(cluster2Transport.getLocalDiscoNode()); + Collections.shuffle(knownNodes, random()); + + try ( + MockTransportService transportService = MockTransportService.createNewService( + Settings.EMPTY, + Version.CURRENT, + threadPool, + null + ) + ) { + transportService.start(); + transportService.acceptIncomingRequests(); + SearchTransportService searchTransportService = new SearchTransportService(transportService, null) { + + public void sendFreeAllPitContexts(Transport.Connection connection, final ActionListener listener) { + deleteNodesInvoked.add(connection.getNode()); + if (connection.getNode().getId() == "node_3") { + Thread t = new Thread(() -> listener.onFailure(new Exception("node 3 is down"))); + t.start(); + } else { + Thread t = new Thread(() -> listener.onResponse(new DeletePitResponse(new ArrayList<>()))); + t.start(); + } + } + + @Override + public Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) { + return new SearchAsyncActionTests.MockConnection(node); + } + }; + PitService pitService = new PitService(clusterServiceMock, searchTransportService); + TransportDeletePitAction action = new TransportDeletePitAction( + transportService, + actionFilters, + namedWriteableRegistry, + transportSearchAction, + clusterServiceMock, + searchTransportService, + pitService + ); + DeletePitRequest deletePITRequest = new DeletePitRequest("_all"); + PlainActionFuture future = newFuture(); + action.execute(task, deletePITRequest, future); + Exception e = assertThrows(ExecutionException.class, () -> future.get()); + assertThat(e.getMessage(), containsString("java.lang.Exception: node 3 is down")); + assertEquals(3, deleteNodesInvoked.size()); + } + } + } +} diff --git a/server/src/test/java/org/opensearch/search/CreatePitMultiNodeTests.java b/server/src/test/java/org/opensearch/search/CreatePitMultiNodeTests.java index 7eb7a62348c5f..27d8f27add898 100644 --- a/server/src/test/java/org/opensearch/search/CreatePitMultiNodeTests.java +++ b/server/src/test/java/org/opensearch/search/CreatePitMultiNodeTests.java @@ -11,17 +11,31 @@ import org.junit.After; import org.junit.Before; import org.opensearch.action.ActionFuture; +import org.opensearch.action.ActionListener; +import org.opensearch.action.LatchedActionListener; import org.opensearch.action.search.CreatePitAction; import org.opensearch.action.search.CreatePitRequest; import org.opensearch.action.search.CreatePitResponse; +import org.opensearch.action.search.DeletePitAction; +import org.opensearch.action.search.DeletePitRequest; +import org.opensearch.action.search.DeletePitResponse; import org.opensearch.action.search.SearchResponse; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.search.builder.PointInTimeBuilder; import org.opensearch.test.InternalTestCluster; import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import static org.hamcrest.Matchers.containsString; import static org.opensearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE; @@ -199,4 +213,103 @@ public void testPitInvalidDefaultKeepAlive() { .setTransientSettings(Settings.builder().putNull("*")) ); } + + public void testConcurrentCreates() throws InterruptedException { + CreatePitRequest createPitRequest = new CreatePitRequest(TimeValue.timeValueDays(1), true); + createPitRequest.setIndices(new String[] { "index" }); + + int concurrentRuns = randomIntBetween(20, 50); + AtomicInteger numSuccess = new AtomicInteger(); + TestThreadPool testThreadPool = null; + try { + testThreadPool = new TestThreadPool(DeletePitMultiNodeTests.class.getName()); + List operationThreads = new ArrayList<>(); + CountDownLatch countDownLatch = new CountDownLatch(concurrentRuns); + Set createSet = new HashSet<>(); + for (int i = 0; i < concurrentRuns; i++) { + Runnable thread = () -> { + logger.info("Triggering pit create --->"); + LatchedActionListener listener = new LatchedActionListener<>(new ActionListener() { + @Override + public void onResponse(CreatePitResponse createPitResponse) { + if (createSet.add(createPitResponse.getId())) { + numSuccess.incrementAndGet(); + } + } + + @Override + public void onFailure(Exception e) {} + }, countDownLatch); + client().execute(CreatePitAction.INSTANCE, createPitRequest, listener); + }; + operationThreads.add(thread); + } + TestThreadPool finalTestThreadPool = testThreadPool; + operationThreads.forEach(runnable -> finalTestThreadPool.executor("generic").execute(runnable)); + countDownLatch.await(); + assertEquals(concurrentRuns, numSuccess.get()); + } finally { + ThreadPool.terminate(testThreadPool, 500, TimeUnit.MILLISECONDS); + } + } + + public void testConcurrentCreatesWithDeletes() throws InterruptedException, ExecutionException { + CreatePitRequest createPitRequest = new CreatePitRequest(TimeValue.timeValueDays(1), true); + createPitRequest.setIndices(new String[] { "index" }); + List pitIds = new ArrayList<>(); + String id = client().execute(CreatePitAction.INSTANCE, createPitRequest).get().getId(); + pitIds.add(id); + DeletePitRequest deletePITRequest = new DeletePitRequest(pitIds); + Set createSet = new HashSet<>(); + AtomicInteger numSuccess = new AtomicInteger(); + TestThreadPool testThreadPool = null; + try { + testThreadPool = new TestThreadPool(CreatePitMultiNodeTests.class.getName()); + int concurrentRuns = randomIntBetween(20, 50); + + List operationThreads = new ArrayList<>(); + CountDownLatch countDownLatch = new CountDownLatch(concurrentRuns); + long randomDeleteThread = randomLongBetween(0, concurrentRuns - 1); + for (int i = 0; i < concurrentRuns; i++) { + int currentThreadIteration = i; + Runnable thread = () -> { + if (currentThreadIteration == randomDeleteThread) { + LatchedActionListener listener = new LatchedActionListener<>(new ActionListener() { + @Override + public void onResponse(CreatePitResponse createPitResponse) { + if (createSet.add(createPitResponse.getId())) { + numSuccess.incrementAndGet(); + } + } + + @Override + public void onFailure(Exception e) {} + }, countDownLatch); + client().execute(CreatePitAction.INSTANCE, createPitRequest, listener); + } else { + LatchedActionListener listener = new LatchedActionListener<>(new ActionListener() { + @Override + public void onResponse(DeletePitResponse deletePitResponse) { + if (deletePitResponse.getDeletePitResults().get(0).isSuccessful()) { + numSuccess.incrementAndGet(); + } + } + + @Override + public void onFailure(Exception e) {} + }, countDownLatch); + client().execute(DeletePitAction.INSTANCE, deletePITRequest, listener); + } + }; + operationThreads.add(thread); + } + TestThreadPool finalTestThreadPool = testThreadPool; + operationThreads.forEach(runnable -> finalTestThreadPool.executor("generic").execute(runnable)); + countDownLatch.await(); + assertEquals(concurrentRuns, numSuccess.get()); + + } finally { + ThreadPool.terminate(testThreadPool, 500, TimeUnit.MILLISECONDS); + } + } } diff --git a/server/src/test/java/org/opensearch/search/DeletePitMultiNodeTests.java b/server/src/test/java/org/opensearch/search/DeletePitMultiNodeTests.java new file mode 100644 index 0000000000000..89607b9201cd9 --- /dev/null +++ b/server/src/test/java/org/opensearch/search/DeletePitMultiNodeTests.java @@ -0,0 +1,333 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search; + +import org.junit.After; +import org.junit.Before; +import org.opensearch.action.ActionFuture; +import org.opensearch.action.ActionListener; +import org.opensearch.action.LatchedActionListener; +import org.opensearch.action.search.CreatePitAction; +import org.opensearch.action.search.CreatePitRequest; +import org.opensearch.action.search.CreatePitResponse; +import org.opensearch.action.search.DeletePitAction; +import org.opensearch.action.search.DeletePitInfo; +import org.opensearch.action.search.DeletePitRequest; +import org.opensearch.action.search.DeletePitResponse; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.search.builder.PointInTimeBuilder; +import org.opensearch.test.InternalTestCluster; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.hamcrest.Matchers.blankOrNullString; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.not; +import static org.opensearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE; + +/** + * Multi node integration tests for delete PIT use cases + */ +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, numDataNodes = 2) +public class DeletePitMultiNodeTests extends OpenSearchIntegTestCase { + + @Before + public void setupIndex() throws ExecutionException, InterruptedException { + createIndex("index", Settings.builder().put("index.number_of_shards", 5).put("index.number_of_replicas", 1).build()); + client().prepareIndex("index").setId("1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).execute().get(); + ensureGreen(); + } + + @After + public void clearIndex() { + client().admin().indices().prepareDelete("index").get(); + } + + private CreatePitResponse createPitOnIndex(String index) throws ExecutionException, InterruptedException { + CreatePitRequest request = new CreatePitRequest(TimeValue.timeValueDays(1), true); + request.setIndices(new String[] { index }); + ActionFuture execute = client().execute(CreatePitAction.INSTANCE, request); + return execute.get(); + } + + public void testDeletePit() throws Exception { + CreatePitRequest request = new CreatePitRequest(TimeValue.timeValueDays(1), true); + request.setIndices(new String[] { "index" }); + ActionFuture execute = client().execute(CreatePitAction.INSTANCE, request); + CreatePitResponse pitResponse = execute.get(); + List pitIds = new ArrayList<>(); + pitIds.add(pitResponse.getId()); + execute = client().execute(CreatePitAction.INSTANCE, request); + pitResponse = execute.get(); + pitIds.add(pitResponse.getId()); + DeletePitRequest deletePITRequest = new DeletePitRequest(pitIds); + ActionFuture deleteExecute = client().execute(DeletePitAction.INSTANCE, deletePITRequest); + DeletePitResponse deletePITResponse = deleteExecute.get(); + assertEquals(2, deletePITResponse.getDeletePitResults().size()); + for (DeletePitInfo deletePitInfo : deletePITResponse.getDeletePitResults()) { + assertTrue(pitIds.contains(deletePitInfo.getPitId())); + assertTrue(deletePitInfo.isSuccessful()); + } + /** + * Checking deleting the same PIT id again results in succeeded + */ + deleteExecute = client().execute(DeletePitAction.INSTANCE, deletePITRequest); + deletePITResponse = deleteExecute.get(); + for (DeletePitInfo deletePitInfo : deletePITResponse.getDeletePitResults()) { + assertTrue(pitIds.contains(deletePitInfo.getPitId())); + assertTrue(deletePitInfo.isSuccessful()); + } + } + + public void testDeletePitWithValidAndDeletedIds() throws Exception { + CreatePitRequest request = new CreatePitRequest(TimeValue.timeValueDays(1), true); + request.setIndices(new String[] { "index" }); + ActionFuture execute = client().execute(CreatePitAction.INSTANCE, request); + CreatePitResponse pitResponse = execute.get(); + List pitIds = new ArrayList<>(); + pitIds.add(pitResponse.getId()); + + /** + * Delete Pit #1 + */ + DeletePitRequest deletePITRequest = new DeletePitRequest(pitIds); + ActionFuture deleteExecute = client().execute(DeletePitAction.INSTANCE, deletePITRequest); + DeletePitResponse deletePITResponse = deleteExecute.get(); + for (DeletePitInfo deletePitInfo : deletePITResponse.getDeletePitResults()) { + assertTrue(pitIds.contains(deletePitInfo.getPitId())); + assertTrue(deletePitInfo.isSuccessful()); + } + execute = client().execute(CreatePitAction.INSTANCE, request); + pitResponse = execute.get(); + pitIds.add(pitResponse.getId()); + /** + * Delete PIT with both Ids #1 (which is deleted) and #2 (which is present) + */ + deletePITRequest = new DeletePitRequest(pitIds); + deleteExecute = client().execute(DeletePitAction.INSTANCE, deletePITRequest); + deletePITResponse = deleteExecute.get(); + for (DeletePitInfo deletePitInfo : deletePITResponse.getDeletePitResults()) { + assertTrue(pitIds.contains(deletePitInfo.getPitId())); + assertTrue(deletePitInfo.isSuccessful()); + } + } + + public void testDeletePitWithValidAndInvalidIds() throws Exception { + CreatePitRequest request = new CreatePitRequest(TimeValue.timeValueDays(1), true); + request.setIndices(new String[] { "index" }); + ActionFuture execute = client().execute(CreatePitAction.INSTANCE, request); + CreatePitResponse pitResponse = execute.get(); + List pitIds = new ArrayList<>(); + pitIds.add(pitResponse.getId()); + pitIds.add("nondecodableid"); + DeletePitRequest deletePITRequest = new DeletePitRequest(pitIds); + ActionFuture deleteExecute = client().execute(DeletePitAction.INSTANCE, deletePITRequest); + Exception e = assertThrows(ExecutionException.class, () -> deleteExecute.get()); + assertThat(e.getMessage(), containsString("invalid id")); + } + + public void testDeleteAllPits() throws Exception { + createPitOnIndex("index"); + createIndex("index1", Settings.builder().put("index.number_of_shards", 5).put("index.number_of_replicas", 1).build()); + client().prepareIndex("index1").setId("1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).execute().get(); + ensureGreen(); + createPitOnIndex("index1"); + DeletePitRequest deletePITRequest = new DeletePitRequest("_all"); + + /** + * When we invoke delete again, returns success after clearing the remaining readers. Asserting reader context + * not found exceptions don't result in failures ( as deletion in one node is successful ) + */ + ActionFuture execute = client().execute(DeletePitAction.INSTANCE, deletePITRequest); + DeletePitResponse deletePITResponse = execute.get(); + for (DeletePitInfo deletePitInfo : deletePITResponse.getDeletePitResults()) { + assertThat(deletePitInfo.getPitId(), not(blankOrNullString())); + assertTrue(deletePitInfo.isSuccessful()); + } + client().admin().indices().prepareDelete("index1").get(); + } + + public void testDeletePitWhileNodeDrop() throws Exception { + CreatePitResponse pitResponse = createPitOnIndex("index"); + createIndex("index1", Settings.builder().put("index.number_of_shards", 5).put("index.number_of_replicas", 1).build()); + client().prepareIndex("index1").setId("1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).execute().get(); + ensureGreen(); + List pitIds = new ArrayList<>(); + pitIds.add(pitResponse.getId()); + CreatePitResponse pitResponse1 = createPitOnIndex("index1"); + pitIds.add(pitResponse1.getId()); + DeletePitRequest deletePITRequest = new DeletePitRequest(pitIds); + internalCluster().restartRandomDataNode(new InternalTestCluster.RestartCallback() { + @Override + public Settings onNodeStopped(String nodeName) throws Exception { + ActionFuture execute = client().execute(DeletePitAction.INSTANCE, deletePITRequest); + try { + DeletePitResponse deletePITResponse = execute.get(); + for (DeletePitInfo deletePitInfo : deletePITResponse.getDeletePitResults()) { + assertTrue(pitIds.contains(deletePitInfo.getPitId())); + assertFalse(deletePitInfo.isSuccessful()); + } + } catch (Exception e) { + throw new AssertionError(e); + } + return super.onNodeStopped(nodeName); + } + }); + + ensureGreen(); + /** + * When we invoke delete again, returns success after clearing the remaining readers. Asserting reader context + * not found exceptions don't result in failures ( as deletion in one node is successful ) + */ + ActionFuture execute = client().execute(DeletePitAction.INSTANCE, deletePITRequest); + DeletePitResponse deletePITResponse = execute.get(); + for (DeletePitInfo deletePitInfo : deletePITResponse.getDeletePitResults()) { + assertTrue(pitIds.contains(deletePitInfo.getPitId())); + assertTrue(deletePitInfo.isSuccessful()); + } + client().admin().indices().prepareDelete("index1").get(); + } + + public void testDeleteAllPitsWhileNodeDrop() throws Exception { + createPitOnIndex("index"); + createIndex("index1", Settings.builder().put("index.number_of_shards", 5).put("index.number_of_replicas", 1).build()); + client().prepareIndex("index1").setId("1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).execute().get(); + ensureGreen(); + DeletePitRequest deletePITRequest = new DeletePitRequest("_all"); + internalCluster().restartRandomDataNode(new InternalTestCluster.RestartCallback() { + @Override + public Settings onNodeStopped(String nodeName) throws Exception { + ActionFuture execute = client().execute(DeletePitAction.INSTANCE, deletePITRequest); + try { + DeletePitResponse deletePITResponse = execute.get(); + for (DeletePitInfo deletePitInfo : deletePITResponse.getDeletePitResults()) { + assertThat(deletePitInfo.getPitId(), not(blankOrNullString())); + assertFalse(deletePitInfo.isSuccessful()); + } + } catch (Exception e) { + assertTrue(e.getMessage().contains("Node not connected")); + } + return super.onNodeStopped(nodeName); + } + }); + + ensureGreen(); + /** + * When we invoke delete again, returns success after clearing the remaining readers. Asserting reader context + * not found exceptions don't result in failures ( as deletion in one node is successful ) + */ + ActionFuture execute = client().execute(DeletePitAction.INSTANCE, deletePITRequest); + DeletePitResponse deletePITResponse = execute.get(); + for (DeletePitInfo deletePitInfo : deletePITResponse.getDeletePitResults()) { + assertThat(deletePitInfo.getPitId(), not(blankOrNullString())); + assertTrue(deletePitInfo.isSuccessful()); + } + client().admin().indices().prepareDelete("index1").get(); + } + + public void testDeleteWhileSearch() throws Exception { + CreatePitResponse pitResponse = createPitOnIndex("index"); + ensureGreen(); + List pitIds = new ArrayList<>(); + pitIds.add(pitResponse.getId()); + DeletePitRequest deletePITRequest = new DeletePitRequest(pitIds); + Thread[] threads = new Thread[5]; + CountDownLatch latch = new CountDownLatch(threads.length); + final AtomicBoolean deleted = new AtomicBoolean(false); + + for (int i = 0; i < threads.length; i++) { + threads[i] = new Thread(() -> { + latch.countDown(); + try { + latch.await(); + for (int j = 0; j < 30; j++) { + client().prepareSearch() + .setSize(2) + .setPointInTime(new PointInTimeBuilder(pitResponse.getId()).setKeepAlive(TimeValue.timeValueDays(1))) + .execute() + .get(); + } + } catch (Exception e) { + /** + * assert for exception once delete pit goes through. throw error in case of any exeption before that. + */ + if (deleted.get() == true) { + if (!e.getMessage().contains("all shards failed")) throw new AssertionError(e); + return; + } + throw new AssertionError(e); + } + }); + threads[i].setName("opensearch[node_s_0][search]"); + threads[i].start(); + } + ActionFuture execute = client().execute(DeletePitAction.INSTANCE, deletePITRequest); + DeletePitResponse deletePITResponse = execute.get(); + deleted.set(true); + for (DeletePitInfo deletePitInfo : deletePITResponse.getDeletePitResults()) { + assertTrue(pitIds.contains(deletePitInfo.getPitId())); + assertTrue(deletePitInfo.isSuccessful()); + } + + for (Thread thread : threads) { + thread.join(); + } + } + + public void testtConcurrentDeletes() throws InterruptedException, ExecutionException { + CreatePitResponse pitResponse = createPitOnIndex("index"); + ensureGreen(); + int concurrentRuns = randomIntBetween(20, 50); + List pitIds = new ArrayList<>(); + pitIds.add(pitResponse.getId()); + DeletePitRequest deletePITRequest = new DeletePitRequest(pitIds); + AtomicInteger numDeleteAcknowledged = new AtomicInteger(); + TestThreadPool testThreadPool = null; + try { + testThreadPool = new TestThreadPool(DeletePitMultiNodeTests.class.getName()); + List operationThreads = new ArrayList<>(); + CountDownLatch countDownLatch = new CountDownLatch(concurrentRuns); + for (int i = 0; i < concurrentRuns; i++) { + Runnable thread = () -> { + logger.info("Triggering pit delete --->"); + LatchedActionListener listener = new LatchedActionListener<>(new ActionListener() { + @Override + public void onResponse(DeletePitResponse deletePitResponse) { + if (deletePitResponse.getDeletePitResults().get(0).isSuccessful()) { + numDeleteAcknowledged.incrementAndGet(); + } + } + + @Override + public void onFailure(Exception e) {} + }, countDownLatch); + client().execute(DeletePitAction.INSTANCE, deletePITRequest, listener); + }; + operationThreads.add(thread); + } + TestThreadPool finalTestThreadPool = testThreadPool; + operationThreads.forEach(runnable -> finalTestThreadPool.executor("generic").execute(runnable)); + countDownLatch.await(); + assertEquals(concurrentRuns, numDeleteAcknowledged.get()); + } finally { + ThreadPool.terminate(testThreadPool, 500, TimeUnit.MILLISECONDS); + } + } + +} diff --git a/server/src/test/java/org/opensearch/search/DeletePitResponseTests.java b/server/src/test/java/org/opensearch/search/DeletePitResponseTests.java new file mode 100644 index 0000000000000..5944e2a35b14a --- /dev/null +++ b/server/src/test/java/org/opensearch/search/DeletePitResponseTests.java @@ -0,0 +1,67 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search; + +import org.opensearch.action.search.DeletePitInfo; +import org.opensearch.action.search.DeletePitResponse; +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.xcontent.ToXContent; +import org.opensearch.common.xcontent.XContentBuilder; +import org.opensearch.common.xcontent.XContentHelper; +import org.opensearch.common.xcontent.XContentParser; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.common.xcontent.json.JsonXContent; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertToXContentEquivalent; + +public class DeletePitResponseTests extends OpenSearchTestCase { + + public void testDeletePitResponseToXContent() throws IOException { + DeletePitInfo deletePitInfo = new DeletePitInfo(true, "pitId"); + List deletePitInfoList = new ArrayList<>(); + deletePitInfoList.add(deletePitInfo); + DeletePitResponse deletePitResponse = new DeletePitResponse(deletePitInfoList); + + try (XContentBuilder builder = JsonXContent.contentBuilder()) { + deletePitResponse.toXContent(builder, ToXContent.EMPTY_PARAMS); + } + assertEquals(true, deletePitResponse.getDeletePitResults().get(0).getPitId().equals("pitId")); + assertEquals(true, deletePitResponse.getDeletePitResults().get(0).isSuccessful()); + } + + public void testDeletePitResponseToAndFromXContent() throws IOException { + XContentType xContentType = randomFrom(XContentType.values()); + DeletePitResponse originalResponse = createDeletePitResponseTestItem(); + ; + BytesReference originalBytes = toShuffledXContent(originalResponse, xContentType, ToXContent.EMPTY_PARAMS, randomBoolean()); + DeletePitResponse parsedResponse; + try (XContentParser parser = createParser(xContentType.xContent(), originalBytes)) { + parsedResponse = DeletePitResponse.fromXContent(parser); + } + assertEquals( + originalResponse.getDeletePitResults().get(0).isSuccessful(), + parsedResponse.getDeletePitResults().get(0).isSuccessful() + ); + assertEquals(originalResponse.getDeletePitResults().get(0).getPitId(), parsedResponse.getDeletePitResults().get(0).getPitId()); + BytesReference parsedBytes = XContentHelper.toXContent(parsedResponse, xContentType, randomBoolean()); + assertToXContentEquivalent(originalBytes, parsedBytes, xContentType); + } + + private static DeletePitResponse createDeletePitResponseTestItem() { + DeletePitInfo deletePitInfo = new DeletePitInfo(randomBoolean(), "pitId"); + List deletePitInfoList = new ArrayList<>(); + deletePitInfoList.add(deletePitInfo); + return new DeletePitResponse(deletePitInfoList); + } +} diff --git a/server/src/test/java/org/opensearch/search/SearchServiceTests.java b/server/src/test/java/org/opensearch/search/SearchServiceTests.java index 08a2ee155ccd4..ecc28470b0eb2 100644 --- a/server/src/test/java/org/opensearch/search/SearchServiceTests.java +++ b/server/src/test/java/org/opensearch/search/SearchServiceTests.java @@ -41,6 +41,9 @@ import org.opensearch.action.OriginalIndices; import org.opensearch.action.index.IndexResponse; import org.opensearch.action.search.ClearScrollRequest; +import org.opensearch.action.search.DeletePitResponse; +import org.opensearch.action.search.PitSearchContextIdForNode; +import org.opensearch.action.search.SearchContextIdForNode; import org.opensearch.action.search.SearchPhaseExecutionException; import org.opensearch.action.search.SearchRequest; import org.opensearch.action.search.SearchResponse; @@ -1425,6 +1428,42 @@ private ReaderContext createReaderContext(IndexService indexService, IndexShard ); } + public void testDeletePitReaderContext() { + createIndex("index"); + SearchService searchService = getInstanceFromNode(SearchService.class); + PlainActionFuture future = new PlainActionFuture<>(); + searchService.createPitReaderContext(new ShardId(resolveIndex("index"), 0), TimeValue.timeValueMinutes(between(1, 10)), future); + List contextIds = new ArrayList<>(); + ShardSearchContextId shardSearchContextId = future.actionGet(); + PitSearchContextIdForNode pitSearchContextIdForNode = new PitSearchContextIdForNode( + "1", + new SearchContextIdForNode(null, "node1", shardSearchContextId) + ); + contextIds.add(pitSearchContextIdForNode); + + assertThat(searchService.getActiveContexts(), equalTo(1)); + DeletePitResponse deletePitResponse = searchService.freeReaderContextsIfFound(contextIds); + assertTrue(deletePitResponse.getDeletePitResults().get(0).isSuccessful()); + // assert true for reader context not found + deletePitResponse = searchService.freeReaderContextsIfFound(contextIds); + assertTrue(deletePitResponse.getDeletePitResults().get(0).isSuccessful()); + // adding this assert to showcase behavior difference + assertFalse(searchService.freeReaderContext(future.actionGet())); + } + + public void testDeleteAllPitReaderContexts() { + createIndex("index"); + SearchService searchService = getInstanceFromNode(SearchService.class); + PlainActionFuture future = new PlainActionFuture<>(); + searchService.createPitReaderContext(new ShardId(resolveIndex("index"), 0), TimeValue.timeValueMinutes(between(1, 10)), future); + future.actionGet(); + searchService.createPitReaderContext(new ShardId(resolveIndex("index"), 0), TimeValue.timeValueMinutes(between(1, 10)), future); + future.actionGet(); + assertThat(searchService.getActiveContexts(), equalTo(2)); + searchService.freeAllPitContexts(); + assertThat(searchService.getActiveContexts(), equalTo(0)); + } + public void testPitContextMaxKeepAlive() { createIndex("index"); SearchService searchService = getInstanceFromNode(SearchService.class);