diff --git a/server/src/main/java/org/opensearch/action/ActionModule.java b/server/src/main/java/org/opensearch/action/ActionModule.java index 71c900beb5319..2f470d7603869 100644 --- a/server/src/main/java/org/opensearch/action/ActionModule.java +++ b/server/src/main/java/org/opensearch/action/ActionModule.java @@ -234,12 +234,14 @@ import org.opensearch.action.search.ClearScrollAction; import org.opensearch.action.search.CreatePitAction; import org.opensearch.action.search.DeletePitAction; +import org.opensearch.action.search.GetAllPitsAction; import org.opensearch.action.search.MultiSearchAction; import org.opensearch.action.search.SearchAction; import org.opensearch.action.search.SearchScrollAction; import org.opensearch.action.search.TransportClearScrollAction; import org.opensearch.action.search.TransportCreatePitAction; import org.opensearch.action.search.TransportDeletePitAction; +import org.opensearch.action.search.TransportGetAllPitsAction; import org.opensearch.action.search.TransportMultiSearchAction; import org.opensearch.action.search.TransportSearchAction; import org.opensearch.action.search.TransportSearchScrollAction; @@ -663,6 +665,7 @@ public void reg // point in time actions actions.register(CreatePitAction.INSTANCE, TransportCreatePitAction.class); + actions.register(GetAllPitsAction.INSTANCE, TransportGetAllPitsAction.class); actions.register(DeletePitAction.INSTANCE, TransportDeletePitAction.class); return unmodifiableMap(actions.getRegistry()); diff --git a/server/src/main/java/org/opensearch/action/search/GetAllPitNodeRequest.java b/server/src/main/java/org/opensearch/action/search/GetAllPitNodeRequest.java new file mode 100644 index 0000000000000..c90f75e3c0aed --- /dev/null +++ b/server/src/main/java/org/opensearch/action/search/GetAllPitNodeRequest.java @@ -0,0 +1,35 @@ + +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.opensearch.action.support.nodes.BaseNodeRequest; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; + +import java.io.IOException; + +/** + * Inner node get all pits request + */ +public class GetAllPitNodeRequest extends BaseNodeRequest { + + public GetAllPitNodeRequest() { + super(); + } + + public GetAllPitNodeRequest(StreamInput in) throws IOException { + super(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + } +} diff --git a/server/src/main/java/org/opensearch/action/search/GetAllPitNodeResponse.java b/server/src/main/java/org/opensearch/action/search/GetAllPitNodeResponse.java new file mode 100644 index 0000000000000..ba308a1a6ea1e --- /dev/null +++ b/server/src/main/java/org/opensearch/action/search/GetAllPitNodeResponse.java @@ -0,0 +1,69 @@ + +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.opensearch.action.support.nodes.BaseNodeResponse; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.xcontent.ToXContentFragment; +import org.opensearch.common.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +/** + * Inner node get all pits response + */ +public class GetAllPitNodeResponse extends BaseNodeResponse implements ToXContentFragment { + + /** + * List of active PITs in the associated node + */ + private final List pitInfos; + + public GetAllPitNodeResponse(DiscoveryNode node, List pitInfos) { + super(node); + if (pitInfos == null) { + throw new IllegalArgumentException("Pits info cannot be null"); + } + this.pitInfos = Collections.unmodifiableList(pitInfos); + } + + public GetAllPitNodeResponse(StreamInput in) throws IOException { + super(in); + this.pitInfos = Collections.unmodifiableList(in.readList(ListPitInfo::new)); + } + + public List getPitInfos() { + return pitInfos; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeList(pitInfos); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field("node", this.getNode().getName()); + builder.startArray("pitInfos"); + for (ListPitInfo pit : pitInfos) { + pit.toXContent(builder, params); + } + + builder.endArray(); + builder.endObject(); + return builder; + } +} diff --git a/server/src/main/java/org/opensearch/action/search/GetAllPitNodesRequest.java b/server/src/main/java/org/opensearch/action/search/GetAllPitNodesRequest.java new file mode 100644 index 0000000000000..b4ad2f6641087 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/search/GetAllPitNodesRequest.java @@ -0,0 +1,37 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.opensearch.action.support.nodes.BaseNodesRequest; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; + +import java.io.IOException; + +/** + * Request to get all active PIT IDs from all nodes of cluster + */ +public class GetAllPitNodesRequest extends BaseNodesRequest { + + @Inject + public GetAllPitNodesRequest(DiscoveryNode... concreteNodes) { + super(concreteNodes); + } + + public GetAllPitNodesRequest(StreamInput in) throws IOException { + super(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + } +} diff --git a/server/src/main/java/org/opensearch/action/search/GetAllPitNodesResponse.java b/server/src/main/java/org/opensearch/action/search/GetAllPitNodesResponse.java new file mode 100644 index 0000000000000..4a454e7145eff --- /dev/null +++ b/server/src/main/java/org/opensearch/action/search/GetAllPitNodesResponse.java @@ -0,0 +1,80 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.opensearch.action.FailedNodeException; +import org.opensearch.action.support.nodes.BaseNodesResponse; +import org.opensearch.cluster.ClusterName; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.xcontent.ToXContentObject; +import org.opensearch.common.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * This class transforms active PIT objects from all nodes to unique PIT objects + */ +public class GetAllPitNodesResponse extends BaseNodesResponse implements ToXContentObject { + + /** + * List of unique PITs across all nodes + */ + private final Set pitInfos = new HashSet<>(); + + public GetAllPitNodesResponse(StreamInput in) throws IOException { + super(in); + } + + public GetAllPitNodesResponse( + ClusterName clusterName, + List getAllPitNodeResponse, + List failures + ) { + super(clusterName, getAllPitNodeResponse, failures); + Set uniquePitIds = new HashSet<>(); + pitInfos.addAll( + getAllPitNodeResponse.stream() + .flatMap(p -> p.getPitInfos().stream().filter(t -> uniquePitIds.add(t.getPitId()))) + .collect(Collectors.toList()) + ); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.startArray("pitInfos"); + for (ListPitInfo pit : pitInfos) { + pit.toXContent(builder, params); + } + builder.endArray(); + builder.endObject(); + return builder; + } + + @Override + public List readNodesFrom(StreamInput in) throws IOException { + return in.readList(GetAllPitNodeResponse::new); + } + + @Override + public void writeNodesTo(StreamOutput out, List nodes) throws IOException { + out.writeList(nodes); + } + + public List getPitInfos() { + return Collections.unmodifiableList(new ArrayList<>(pitInfos)); + } +} diff --git a/server/src/main/java/org/opensearch/action/search/GetAllPitsAction.java b/server/src/main/java/org/opensearch/action/search/GetAllPitsAction.java new file mode 100644 index 0000000000000..16e65cb785a7d --- /dev/null +++ b/server/src/main/java/org/opensearch/action/search/GetAllPitsAction.java @@ -0,0 +1,23 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.opensearch.action.ActionType; + +/** + * Action type for listing all PIT reader contexts + */ +public class GetAllPitsAction extends ActionType { + public static final GetAllPitsAction INSTANCE = new GetAllPitsAction(); + public static final String NAME = "indices:data/read/point_in_time/readall"; + + private GetAllPitsAction() { + super(NAME, GetAllPitNodesResponse::new); + } +} diff --git a/server/src/main/java/org/opensearch/action/search/ListPitInfo.java b/server/src/main/java/org/opensearch/action/search/ListPitInfo.java new file mode 100644 index 0000000000000..4499e7d6e8ef5 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/search/ListPitInfo.java @@ -0,0 +1,63 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.io.stream.Writeable; +import org.opensearch.common.xcontent.ToXContentFragment; +import org.opensearch.common.xcontent.XContentBuilder; + +import java.io.IOException; + +/** + * This holds information about pit reader context such as pit id and creation time + */ +public class ListPitInfo implements ToXContentFragment, Writeable { + private final String pitId; + private final long creationTime; + private final long keepAlive; + + public ListPitInfo(String pitId, long creationTime, long keepAlive) { + this.pitId = pitId; + this.creationTime = creationTime; + this.keepAlive = keepAlive; + } + + public ListPitInfo(StreamInput in) throws IOException { + this.pitId = in.readString(); + this.creationTime = in.readLong(); + this.keepAlive = in.readLong(); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field("pitId", pitId); + builder.field("creationTime", creationTime); + builder.field("keepAlive", keepAlive); + builder.endObject(); + return builder; + } + + public String getPitId() { + return pitId; + } + + public long getCreationTime() { + return creationTime; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(pitId); + out.writeLong(creationTime); + out.writeLong(keepAlive); + } +} diff --git a/server/src/main/java/org/opensearch/action/search/PitService.java b/server/src/main/java/org/opensearch/action/search/PitService.java index 4c5f1a1c0fc4f..0b79b77fd6014 100644 --- a/server/src/main/java/org/opensearch/action/search/PitService.java +++ b/server/src/main/java/org/opensearch/action/search/PitService.java @@ -8,6 +8,7 @@ package org.opensearch.action.search; +import com.carrotsearch.hppc.cursors.ObjectCursor; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; @@ -18,10 +19,17 @@ import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Strings; import org.opensearch.common.inject.Inject; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.Transport; +import org.opensearch.transport.TransportException; +import org.opensearch.transport.TransportResponseHandler; +import org.opensearch.transport.TransportService; +import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -38,11 +46,13 @@ public class PitService { private final ClusterService clusterService; private final SearchTransportService searchTransportService; + private final TransportService transportService; @Inject - public PitService(ClusterService clusterService, SearchTransportService searchTransportService) { + public PitService(ClusterService clusterService, SearchTransportService searchTransportService, TransportService transportService) { this.clusterService = clusterService; this.searchTransportService = searchTransportService; + this.transportService = transportService; } /** @@ -52,6 +62,9 @@ public void deletePitContexts( Map> nodeToContextsMap, ActionListener listener ) { + if (nodeToContextsMap.size() == 0) { + listener.onResponse(new DeletePitResponse(Collections.emptyList())); + } final Set clusters = nodeToContextsMap.values() .stream() .flatMap(Collection::stream) @@ -130,4 +143,43 @@ public void onFailure(final Exception e) { } }, size); } + + /** + * Get all active point in time contexts + */ + public void getAllPits(ActionListener getAllPitsListener) { + final List nodes = new ArrayList<>(); + for (ObjectCursor cursor : clusterService.state().nodes().getDataNodes().values()) { + DiscoveryNode node = cursor.value; + nodes.add(node); + } + DiscoveryNode[] disNodesArr = nodes.toArray(new DiscoveryNode[nodes.size()]); + transportService.sendRequest( + transportService.getLocalNode(), + GetAllPitsAction.NAME, + new GetAllPitNodesRequest(disNodesArr), + new TransportResponseHandler() { + + @Override + public void handleResponse(GetAllPitNodesResponse response) { + getAllPitsListener.onResponse(response); + } + + @Override + public void handleException(TransportException exp) { + getAllPitsListener.onFailure(exp); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + + @Override + public GetAllPitNodesResponse read(StreamInput in) throws IOException { + return new GetAllPitNodesResponse(in); + } + } + ); + } } diff --git a/server/src/main/java/org/opensearch/action/search/SearchTransportService.java b/server/src/main/java/org/opensearch/action/search/SearchTransportService.java index 23515dd28b329..241b3de72a258 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchTransportService.java +++ b/server/src/main/java/org/opensearch/action/search/SearchTransportService.java @@ -218,16 +218,6 @@ public void sendFreePITContexts( ); } - public void sendFreeAllPitContexts(Transport.Connection connection, final ActionListener listener) { - transportService.sendRequest( - connection, - FREE_ALL_PIT_CONTEXTS_ACTION_NAME, - TransportRequest.Empty.INSTANCE, - TransportRequestOptions.EMPTY, - new ActionListenerResponseHandler<>(listener, DeletePitResponse::new) - ); - } - public void sendExecuteDfs( Transport.Connection connection, final ShardSearchRequest request, @@ -528,14 +518,6 @@ public static void registerRequestHandler(TransportService transportService, Sea ); TransportActionProxy.registerProxyAction(transportService, FREE_PIT_CONTEXT_ACTION_NAME, DeletePitResponse::new); - transportService.registerRequestHandler( - FREE_ALL_PIT_CONTEXTS_ACTION_NAME, - ThreadPool.Names.SAME, - TransportRequest.Empty::new, - (request, channel, task) -> { channel.sendResponse(searchService.freeAllPitContexts()); } - ); - TransportActionProxy.registerProxyAction(transportService, FREE_ALL_PIT_CONTEXTS_ACTION_NAME, DeletePitResponse::new); - transportService.registerRequestHandler( FREE_CONTEXT_ACTION_NAME, ThreadPool.Names.SAME, diff --git a/server/src/main/java/org/opensearch/action/search/TransportDeletePitAction.java b/server/src/main/java/org/opensearch/action/search/TransportDeletePitAction.java index d67979d1c87c5..f9e36c479dd54 100644 --- a/server/src/main/java/org/opensearch/action/search/TransportDeletePitAction.java +++ b/server/src/main/java/org/opensearch/action/search/TransportDeletePitAction.java @@ -11,18 +11,17 @@ import org.opensearch.action.ActionListener; import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.HandledTransportAction; -import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.inject.Inject; import org.opensearch.common.io.stream.NamedWriteableRegistry; import org.opensearch.tasks.Task; -import org.opensearch.transport.Transport; import org.opensearch.transport.TransportService; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; /** * Transport action for deleting point in time searches - supports deleting list and all point in time searches @@ -83,19 +82,22 @@ private void deletePits(ActionListener listener, DeletePitReq } /** - * Delete all active PIT reader contexts + * Delete all active PIT reader contexts leveraging list all PITs + * + * For Cross cluster PITs : + * - mixed cluster PITs ( PIT comprising local and remote ) will be fully deleted. Since there will atleast be + * one reader context with PIT ID present in local cluster, 'Get all PITs' will retrieve the PIT ID with which + * we can completely delete the PIT contexts in both local and remote cluster. + * - fully remote PITs will not be deleted as 'Get all PITs' operates on local cluster only and no PIT info can + * be retrieved when it's fully remote. */ private void deleteAllPits(ActionListener listener) { - // TODO: Use list all PITs to delete all PITs in case of remote cluster use case - int size = clusterService.state().getNodes().getSize(); - ActionListener groupedActionListener = pitService.getDeletePitGroupedListener(listener, size); - for (final DiscoveryNode node : clusterService.state().getNodes()) { - try { - Transport.Connection connection = searchTransportService.getConnection(null, node); - searchTransportService.sendFreeAllPitContexts(connection, groupedActionListener); - } catch (Exception e) { - groupedActionListener.onFailure(e); - } - } + // Get all PITs and execute delete operation for the PITs. + pitService.getAllPits(ActionListener.wrap(getAllPitNodesResponse -> { + DeletePitRequest deletePitRequest = new DeletePitRequest( + getAllPitNodesResponse.getPitInfos().stream().map(r -> r.getPitId()).collect(Collectors.toList()) + ); + deletePits(listener, deletePitRequest); + }, listener::onFailure)); } } diff --git a/server/src/main/java/org/opensearch/action/search/TransportGetAllPitsAction.java b/server/src/main/java/org/opensearch/action/search/TransportGetAllPitsAction.java new file mode 100644 index 0000000000000..21a64e388fa7b --- /dev/null +++ b/server/src/main/java/org/opensearch/action/search/TransportGetAllPitsAction.java @@ -0,0 +1,86 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.opensearch.action.FailedNodeException; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.nodes.TransportNodesAction; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.search.SearchService; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportService; + +import java.io.IOException; +import java.util.List; + +/** + * Transport action to get all active PIT contexts across all nodes + */ +public class TransportGetAllPitsAction extends TransportNodesAction< + GetAllPitNodesRequest, + GetAllPitNodesResponse, + GetAllPitNodeRequest, + GetAllPitNodeResponse> { + private final SearchService searchService; + + @Inject + public TransportGetAllPitsAction( + ThreadPool threadPool, + ClusterService clusterService, + TransportService transportService, + ActionFilters actionFilters, + SearchService searchService + ) { + super( + GetAllPitsAction.NAME, + threadPool, + clusterService, + transportService, + actionFilters, + GetAllPitNodesRequest::new, + GetAllPitNodeRequest::new, + ThreadPool.Names.SAME, + GetAllPitNodeResponse.class + ); + this.searchService = searchService; + } + + @Override + protected GetAllPitNodesResponse newResponse( + GetAllPitNodesRequest request, + List getAllPitNodeRespons, + List failures + ) { + return new GetAllPitNodesResponse(clusterService.getClusterName(), getAllPitNodeRespons, failures); + } + + @Override + protected GetAllPitNodeRequest newNodeRequest(GetAllPitNodesRequest request) { + return new GetAllPitNodeRequest(); + } + + @Override + protected GetAllPitNodeResponse newNodeResponse(StreamInput in) throws IOException { + return new GetAllPitNodeResponse(in); + } + + /** + * This retrieves all active PITs in the node + */ + @Override + protected GetAllPitNodeResponse nodeOperation(GetAllPitNodeRequest request) { + GetAllPitNodeResponse nodeResponse = new GetAllPitNodeResponse( + transportService.getLocalNode(), + searchService.getAllPITReaderContexts() + ); + return nodeResponse; + } +} diff --git a/server/src/main/java/org/opensearch/search/SearchService.java b/server/src/main/java/org/opensearch/search/SearchService.java index 747b13b031824..4bd95da193668 100644 --- a/server/src/main/java/org/opensearch/search/SearchService.java +++ b/server/src/main/java/org/opensearch/search/SearchService.java @@ -42,7 +42,11 @@ import org.opensearch.action.ActionRunnable; import org.opensearch.action.OriginalIndices; import org.opensearch.action.search.DeletePitInfo; +import org.opensearch.action.search.DeletePitInfo; +import org.opensearch.action.search.DeletePitResponse; import org.opensearch.action.search.DeletePitResponse; +import org.opensearch.action.search.ListPitInfo; +import org.opensearch.action.search.PitSearchContextIdForNode; import org.opensearch.action.search.PitSearchContextIdForNode; import org.opensearch.action.search.SearchRequest; import org.opensearch.action.search.SearchShardTask; @@ -142,6 +146,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -946,6 +951,21 @@ public PitReaderContext getPitReaderContext(ShardSearchContextId id) { return null; } + /** + * This method returns all active PIT reader contexts + */ + public List getAllPITReaderContexts() { + final List pitContextsInfo = new ArrayList<>(); + for (ReaderContext ctx : activeReaders.values()) { + if (ctx instanceof PitReaderContext) { + final PitReaderContext context = (PitReaderContext) ctx; + ListPitInfo pitInfo = new ListPitInfo(context.getPitId(), context.getCreationTime(), context.getKeepAlive()); + pitContextsInfo.add(pitInfo); + } + } + return pitContextsInfo; + } + final SearchContext createContext( ReaderContext readerContext, ShardSearchRequest request, @@ -1102,22 +1122,6 @@ public DeletePitResponse freeReaderContextsIfFound(List deleteResults = new ArrayList<>(); - for (ReaderContext readerContext : activeReaders.values()) { - if (readerContext instanceof PitReaderContext) { - boolean result = freeReaderContext(readerContext.id()); - DeletePitInfo deletePitInfo = new DeletePitInfo(result, ((PitReaderContext) readerContext).getPitId()); - deleteResults.add(deletePitInfo); - } - } - return new DeletePitResponse(deleteResults); - } - private long getKeepAlive(ShardSearchRequest request) { if (request.scroll() != null) { return getScrollKeepAlive(request.scroll()); diff --git a/server/src/main/java/org/opensearch/search/internal/PitReaderContext.java b/server/src/main/java/org/opensearch/search/internal/PitReaderContext.java index 1b2400bdadfa1..98e84136a8847 100644 --- a/server/src/main/java/org/opensearch/search/internal/PitReaderContext.java +++ b/server/src/main/java/org/opensearch/search/internal/PitReaderContext.java @@ -61,7 +61,7 @@ public Releasable updatePitIdAndKeepAlive(long keepAliveInMillis, String pitId, } public long getCreationTime() { - return this.creationTime.get(); + return this.creationTime.get() == null ? 0 : this.creationTime.get(); } public void setCreationTime(final long creationTime) { diff --git a/server/src/main/java/org/opensearch/search/internal/ReaderContext.java b/server/src/main/java/org/opensearch/search/internal/ReaderContext.java index fe644e44f297f..898aa2e2c6745 100644 --- a/server/src/main/java/org/opensearch/search/internal/ReaderContext.java +++ b/server/src/main/java/org/opensearch/search/internal/ReaderContext.java @@ -117,6 +117,10 @@ protected long nowInMillis() { return indexShard.getThreadPool().relativeTimeInMillis(); } + public long getKeepAlive() { + return keepAlive.get(); + } + @Override public final void close() { if (closed.compareAndSet(false, true)) { diff --git a/server/src/test/java/org/opensearch/action/search/CreatePitControllerTests.java b/server/src/test/java/org/opensearch/action/search/CreatePitControllerTests.java index 2c65d6ffcc813..a5c6e1c12b79c 100644 --- a/server/src/test/java/org/opensearch/action/search/CreatePitControllerTests.java +++ b/server/src/test/java/org/opensearch/action/search/CreatePitControllerTests.java @@ -219,7 +219,7 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod CreatePitRequest request = new CreatePitRequest(TimeValue.timeValueDays(1), true); request.setIndices(new String[] { "index" }); - PitService pitService = new PitService(clusterServiceMock, searchTransportService); + PitService pitService = new PitService(clusterServiceMock, searchTransportService, transportService); CreatePitController controller = new CreatePitController( searchTransportService, clusterServiceMock, @@ -308,7 +308,7 @@ public void sendFreePITContexts( CreatePitRequest request = new CreatePitRequest(TimeValue.timeValueDays(1), true); request.setIndices(new String[] { "index" }); - PitService pitService = new PitService(clusterServiceMock, searchTransportService); + PitService pitService = new PitService(clusterServiceMock, searchTransportService, transportService); CreatePitController controller = new CreatePitController( searchTransportService, clusterServiceMock, @@ -406,7 +406,7 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod CreatePitRequest request = new CreatePitRequest(TimeValue.timeValueDays(1), true); request.setIndices(new String[] { "index" }); - PitService pitService = new PitService(clusterServiceMock, searchTransportService); + PitService pitService = new PitService(clusterServiceMock, searchTransportService, transportService); CreatePitController controller = new CreatePitController( searchTransportService, clusterServiceMock, @@ -494,7 +494,7 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod }; CreatePitRequest request = new CreatePitRequest(TimeValue.timeValueDays(1), true); request.setIndices(new String[] { "index" }); - PitService pitService = new PitService(clusterServiceMock, searchTransportService); + PitService pitService = new PitService(clusterServiceMock, searchTransportService, transportService); CreatePitController controller = new CreatePitController( searchTransportService, clusterServiceMock, diff --git a/server/src/test/java/org/opensearch/action/search/PitTestsUtil.java b/server/src/test/java/org/opensearch/action/search/PitTestsUtil.java index ec83cb45697d9..433cd9dfa3e89 100644 --- a/server/src/test/java/org/opensearch/action/search/PitTestsUtil.java +++ b/server/src/test/java/org/opensearch/action/search/PitTestsUtil.java @@ -8,7 +8,14 @@ package org.opensearch.action.search; +import com.carrotsearch.hppc.cursors.ObjectCursor; +import org.junit.Assert; import org.opensearch.Version; +import org.opensearch.action.ActionFuture; +import org.opensearch.action.admin.cluster.state.ClusterStateRequest; +import org.opensearch.action.admin.cluster.state.ClusterStateResponse; +import org.opensearch.client.Client; +import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.util.concurrent.AtomicArray; import org.opensearch.index.query.IdsQueryBuilder; import org.opensearch.index.query.MatchAllQueryBuilder; @@ -21,7 +28,10 @@ import org.opensearch.search.internal.ShardSearchContextId; import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; import static org.opensearch.test.OpenSearchTestCase.between; import static org.opensearch.test.OpenSearchTestCase.randomAlphaOfLength; @@ -81,4 +91,41 @@ public static String getPitId() { } return SearchContextId.encode(array.asList(), aliasFilters, version); } + + public static void assertUsingGetAllPits(Client client, String id, long creationTime) throws ExecutionException, InterruptedException { + final ClusterStateRequest clusterStateRequest = new ClusterStateRequest(); + clusterStateRequest.local(false); + clusterStateRequest.clear().nodes(true).routingTable(true).indices("*"); + ClusterStateResponse clusterStateResponse = client.admin().cluster().state(clusterStateRequest).get(); + final List nodes = new LinkedList<>(); + for (ObjectCursor cursor : clusterStateResponse.getState().nodes().getDataNodes().values()) { + DiscoveryNode node = cursor.value; + nodes.add(node); + } + DiscoveryNode[] disNodesArr = new DiscoveryNode[nodes.size()]; + nodes.toArray(disNodesArr); + GetAllPitNodesRequest getAllPITNodesRequest = new GetAllPitNodesRequest(disNodesArr); + ActionFuture execute1 = client.execute(GetAllPitsAction.INSTANCE, getAllPITNodesRequest); + GetAllPitNodesResponse getPitResponse = execute1.get(); + Assert.assertTrue(getPitResponse.getPitInfos().get(0).getPitId().contains(id)); + Assert.assertEquals(getPitResponse.getPitInfos().get(0).getCreationTime(), creationTime); + } + + public static void assertGetAllPitsEmpty(Client client) throws ExecutionException, InterruptedException { + final ClusterStateRequest clusterStateRequest = new ClusterStateRequest(); + clusterStateRequest.local(false); + clusterStateRequest.clear().nodes(true).routingTable(true).indices("*"); + ClusterStateResponse clusterStateResponse = client.admin().cluster().state(clusterStateRequest).get(); + final List nodes = new LinkedList<>(); + for (ObjectCursor cursor : clusterStateResponse.getState().nodes().getDataNodes().values()) { + DiscoveryNode node = cursor.value; + nodes.add(node); + } + DiscoveryNode[] disNodesArr = new DiscoveryNode[nodes.size()]; + nodes.toArray(disNodesArr); + GetAllPitNodesRequest getAllPITNodesRequest = new GetAllPitNodesRequest(disNodesArr); + ActionFuture execute1 = client.execute(GetAllPitsAction.INSTANCE, getAllPITNodesRequest); + GetAllPitNodesResponse getPitResponse = execute1.get(); + Assert.assertEquals(0, getPitResponse.getPitInfos().size()); + } } diff --git a/server/src/test/java/org/opensearch/action/search/TransportDeletePitActionTests.java b/server/src/test/java/org/opensearch/action/search/TransportDeletePitActionTests.java index e2db4fdbc97ef..7a1d9a6fe963c 100644 --- a/server/src/test/java/org/opensearch/action/search/TransportDeletePitActionTests.java +++ b/server/src/test/java/org/opensearch/action/search/TransportDeletePitActionTests.java @@ -7,13 +7,13 @@ */ package org.opensearch.action.search; -import org.apache.lucene.search.TotalHits; import org.junit.Before; import org.opensearch.Version; import org.opensearch.action.ActionListener; import org.opensearch.action.support.ActionFilter; import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.PlainActionFuture; +import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNode; @@ -25,10 +25,6 @@ import org.opensearch.index.query.MatchAllQueryBuilder; import org.opensearch.index.query.QueryBuilder; import org.opensearch.index.query.TermQueryBuilder; -import org.opensearch.search.SearchHit; -import org.opensearch.search.SearchHits; -import org.opensearch.search.aggregations.InternalAggregations; -import org.opensearch.search.internal.InternalSearchResponse; import org.opensearch.tasks.Task; import org.opensearch.tasks.TaskId; import org.opensearch.test.OpenSearchTestCase; @@ -109,15 +105,6 @@ public void setupData() { new TaskId(randomLong() + ":" + randomLong()), Collections.emptyMap() ); - InternalSearchResponse response = new InternalSearchResponse( - new SearchHits(new SearchHit[0], new TotalHits(0, TotalHits.Relation.EQUAL_TO), Float.NaN), - InternalAggregations.EMPTY, - null, - null, - false, - null, - 1 - ); clusterServiceMock = mock(ClusterService.class); ClusterState state = mock(ClusterState.class); @@ -178,7 +165,7 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod return new SearchAsyncActionTests.MockConnection(node); } }; - PitService pitService = new PitService(clusterServiceMock, searchTransportService); + PitService pitService = new PitService(clusterServiceMock, searchTransportService, transportService); TransportDeletePitAction action = new TransportDeletePitAction( transportService, actionFilters, @@ -224,7 +211,11 @@ public void testDeleteAllPITSuccess() throws InterruptedException, ExecutionExce transportService.start(); transportService.acceptIncomingRequests(); SearchTransportService searchTransportService = new SearchTransportService(transportService, null) { - public void sendFreeAllPitContexts(Transport.Connection connection, final ActionListener listener) { + public void sendFreePITContexts( + Transport.Connection connection, + List contextIds, + final ActionListener listener + ) { deleteNodesInvoked.add(connection.getNode()); DeletePitInfo deletePitInfo = new DeletePitInfo(true, "pitId"); List deletePitInfos = new ArrayList<>(); @@ -238,7 +229,21 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod return new SearchAsyncActionTests.MockConnection(node); } }; - PitService pitService = new PitService(clusterServiceMock, searchTransportService); + PitService pitService = new PitService(clusterServiceMock, searchTransportService, transportService) { + @Override + public void getAllPits(ActionListener getAllPitsListener) { + ListPitInfo listPitInfo = new ListPitInfo(getPitId(), 0, 0); + List list = new ArrayList<>(); + list.add(listPitInfo); + GetAllPitNodeResponse getAllPitNodeResponse = new GetAllPitNodeResponse( + cluster1Transport.getLocalDiscoNode(), + list + ); + List nodeList = new ArrayList(); + nodeList.add(getAllPitNodeResponse); + getAllPitsListener.onResponse(new GetAllPitNodesResponse(new ClusterName("cn"), nodeList, new ArrayList())); + } + }; TransportDeletePitAction action = new TransportDeletePitAction( transportService, actionFilters, @@ -307,7 +312,7 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod return new SearchAsyncActionTests.MockConnection(node); } }; - PitService pitService = new PitService(clusterServiceMock, searchTransportService); + PitService pitService = new PitService(clusterServiceMock, searchTransportService, transportService); TransportDeletePitAction action = new TransportDeletePitAction( transportService, actionFilters, @@ -366,7 +371,7 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod return new SearchAsyncActionTests.MockConnection(node); } }; - PitService pitService = new PitService(clusterServiceMock, searchTransportService); + PitService pitService = new PitService(clusterServiceMock, searchTransportService, transportService); TransportDeletePitAction action = new TransportDeletePitAction( transportService, actionFilters, @@ -434,7 +439,7 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod return new SearchAsyncActionTests.MockConnection(node); } }; - PitService pitService = new PitService(clusterServiceMock, searchTransportService); + PitService pitService = new PitService(clusterServiceMock, searchTransportService, transportService); TransportDeletePitAction action = new TransportDeletePitAction( transportService, actionFilters, @@ -480,7 +485,11 @@ public void testDeleteAllPitWhenNodeIsDown() { transportService.acceptIncomingRequests(); SearchTransportService searchTransportService = new SearchTransportService(transportService, null) { @Override - public void sendFreeAllPitContexts(Transport.Connection connection, final ActionListener listener) { + public void sendFreePITContexts( + Transport.Connection connection, + List contextIds, + final ActionListener listener + ) { deleteNodesInvoked.add(connection.getNode()); if (connection.getNode().getId() == "node_3") { Thread t = new Thread(() -> listener.onFailure(new Exception("node 3 down"))); @@ -496,7 +505,21 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod return new SearchAsyncActionTests.MockConnection(node); } }; - PitService pitService = new PitService(clusterServiceMock, searchTransportService); + PitService pitService = new PitService(clusterServiceMock, searchTransportService, transportService) { + @Override + public void getAllPits(ActionListener getAllPitsListener) { + ListPitInfo listPitInfo = new ListPitInfo(getPitId(), 0, 0); + List list = new ArrayList<>(); + list.add(listPitInfo); + GetAllPitNodeResponse getAllPitNodeResponse = new GetAllPitNodeResponse( + cluster1Transport.getLocalDiscoNode(), + list + ); + List nodeList = new ArrayList(); + nodeList.add(getAllPitNodeResponse); + getAllPitsListener.onResponse(new GetAllPitNodesResponse(new ClusterName("cn"), nodeList, new ArrayList())); + } + }; TransportDeletePitAction action = new TransportDeletePitAction( transportService, actionFilters, @@ -543,7 +566,11 @@ public void testDeleteAllPitWhenAllNodesAreDown() { SearchTransportService searchTransportService = new SearchTransportService(transportService, null) { @Override - public void sendFreeAllPitContexts(Transport.Connection connection, final ActionListener listener) { + public void sendFreePITContexts( + Transport.Connection connection, + List contextIds, + final ActionListener listener + ) { deleteNodesInvoked.add(connection.getNode()); Thread t = new Thread(() -> listener.onFailure(new Exception("node down"))); t.start(); @@ -554,7 +581,21 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod return new SearchAsyncActionTests.MockConnection(node); } }; - PitService pitService = new PitService(clusterServiceMock, searchTransportService); + PitService pitService = new PitService(clusterServiceMock, searchTransportService, transportService) { + @Override + public void getAllPits(ActionListener getAllPitsListener) { + ListPitInfo listPitInfo = new ListPitInfo(getPitId(), 0, 0); + List list = new ArrayList<>(); + list.add(listPitInfo); + GetAllPitNodeResponse getAllPitNodeResponse = new GetAllPitNodeResponse( + cluster1Transport.getLocalDiscoNode(), + list + ); + List nodeList = new ArrayList(); + nodeList.add(getAllPitNodeResponse); + getAllPitsListener.onResponse(new GetAllPitNodesResponse(new ClusterName("cn"), nodeList, new ArrayList())); + } + }; TransportDeletePitAction action = new TransportDeletePitAction( transportService, actionFilters, @@ -600,7 +641,11 @@ public void testDeleteAllPitFailure() { transportService.acceptIncomingRequests(); SearchTransportService searchTransportService = new SearchTransportService(transportService, null) { - public void sendFreeAllPitContexts(Transport.Connection connection, final ActionListener listener) { + public void sendFreePITContexts( + Transport.Connection connection, + List contextId, + final ActionListener listener + ) { deleteNodesInvoked.add(connection.getNode()); if (connection.getNode().getId() == "node_3") { Thread t = new Thread(() -> listener.onFailure(new Exception("node 3 is down"))); @@ -616,7 +661,21 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod return new SearchAsyncActionTests.MockConnection(node); } }; - PitService pitService = new PitService(clusterServiceMock, searchTransportService); + PitService pitService = new PitService(clusterServiceMock, searchTransportService, transportService) { + @Override + public void getAllPits(ActionListener getAllPitsListener) { + ListPitInfo listPitInfo = new ListPitInfo(getPitId(), 0, 0); + List list = new ArrayList<>(); + list.add(listPitInfo); + GetAllPitNodeResponse getAllPitNodeResponse = new GetAllPitNodeResponse( + cluster1Transport.getLocalDiscoNode(), + list + ); + List nodeList = new ArrayList(); + nodeList.add(getAllPitNodeResponse); + getAllPitsListener.onResponse(new GetAllPitNodesResponse(new ClusterName("cn"), nodeList, new ArrayList())); + } + }; TransportDeletePitAction action = new TransportDeletePitAction( transportService, actionFilters, diff --git a/server/src/test/java/org/opensearch/search/CreatePitSingleNodeTests.java b/server/src/test/java/org/opensearch/search/CreatePitSingleNodeTests.java index 5c3c43af9cb66..b28423c3a8657 100644 --- a/server/src/test/java/org/opensearch/search/CreatePitSingleNodeTests.java +++ b/server/src/test/java/org/opensearch/search/CreatePitSingleNodeTests.java @@ -14,6 +14,7 @@ import org.opensearch.action.search.CreatePitController; import org.opensearch.action.search.CreatePitRequest; import org.opensearch.action.search.CreatePitResponse; +import org.opensearch.action.search.PitTestsUtil; import org.opensearch.action.search.SearchPhaseExecutionException; import org.opensearch.action.search.SearchResponse; import org.opensearch.common.Priority; @@ -63,6 +64,7 @@ public void testCreatePITSuccess() throws ExecutionException, InterruptedExcepti request.setIndices(new String[] { "index" }); ActionFuture execute = client().execute(CreatePitAction.INSTANCE, request); CreatePitResponse pitResponse = execute.get(); + PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime()); client().prepareIndex("index").setId("2").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); SearchResponse searchResponse = client().prepareSearch("index") .setSize(2) @@ -86,6 +88,7 @@ public void testCreatePITWithMultipleIndicesSuccess() throws ExecutionException, ActionFuture execute = client().execute(CreatePitAction.INSTANCE, request); CreatePitResponse response = execute.get(); + PitTestsUtil.assertUsingGetAllPits(client(), response.getId(), response.getCreationTime()); assertEquals(4, response.getSuccessfulShards()); assertEquals(4, service.getActiveContexts()); service.doClose(); @@ -99,7 +102,7 @@ public void testCreatePITWithShardReplicasSuccess() throws ExecutionException, I request.setIndices(new String[] { "index" }); ActionFuture execute = client().execute(CreatePitAction.INSTANCE, request); CreatePitResponse pitResponse = execute.get(); - + PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime()); client().prepareIndex("index").setId("2").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); SearchResponse searchResponse = client().prepareSearch("index") .setSize(2) @@ -128,7 +131,7 @@ public void testCreatePITWithNonExistentIndex() { service.doClose(); } - public void testCreatePITOnCloseIndex() { + public void testCreatePITOnCloseIndex() throws ExecutionException, InterruptedException { createIndex("index", Settings.builder().put("index.number_of_shards", 2).put("index.number_of_replicas", 0).build()); client().prepareIndex("index").setId("1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); client().prepareIndex("index").setId("2").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); @@ -144,6 +147,7 @@ public void testCreatePITOnCloseIndex() { SearchService service = getInstanceFromNode(SearchService.class); assertEquals(0, service.getActiveContexts()); + PitTestsUtil.assertGetAllPitsEmpty(client()); service.doClose(); } @@ -165,6 +169,7 @@ public void testPitSearchOnDeletedIndex() throws ExecutionException, Interrupted }); assertTrue(ex.getMessage().contains("no such index [index]")); SearchService service = getInstanceFromNode(SearchService.class); + PitTestsUtil.assertGetAllPitsEmpty(client()); assertEquals(0, service.getActiveContexts()); service.doClose(); } @@ -190,6 +195,7 @@ public void testPitSearchOnCloseIndex() throws ExecutionException, InterruptedEx request.setIndices(new String[] { "index" }); ActionFuture execute = client().execute(CreatePitAction.INSTANCE, request); CreatePitResponse pitResponse = execute.get(); + PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime()); SearchService service = getInstanceFromNode(SearchService.class); assertEquals(2, service.getActiveContexts()); client().admin().indices().prepareClose("index").get(); @@ -201,6 +207,7 @@ public void testPitSearchOnCloseIndex() throws ExecutionException, InterruptedEx }); assertTrue(ex.shardFailures()[0].reason().contains("SearchContextMissingException")); assertEquals(0, service.getActiveContexts()); + PitTestsUtil.assertGetAllPitsEmpty(client()); // PIT reader contexts are lost after close, verifying it with open index api client().admin().indices().prepareOpen("index").get(); @@ -314,6 +321,7 @@ public void testPitAfterUpdateIndex() throws Exception { request.setIndices(new String[] { "test" }); ActionFuture execute = client().execute(CreatePitAction.INSTANCE, request); CreatePitResponse pitResponse = execute.get(); + PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime()); SearchService service = getInstanceFromNode(SearchService.class); assertThat( @@ -456,6 +464,7 @@ public void testPitAfterUpdateIndex() throws Exception { } finally { service.doClose(); assertEquals(0, service.getActiveContexts()); + PitTestsUtil.assertGetAllPitsEmpty(client()); } } @@ -467,6 +476,7 @@ public void testConcurrentSearches() throws Exception { request.setIndices(new String[] { "index" }); ActionFuture execute = client().execute(CreatePitAction.INSTANCE, request); CreatePitResponse pitResponse = execute.get(); + PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime()); Thread[] threads = new Thread[5]; CountDownLatch latch = new CountDownLatch(threads.length); @@ -497,5 +507,6 @@ public void testConcurrentSearches() throws Exception { assertEquals(2, service.getActiveContexts()); service.doClose(); assertEquals(0, service.getActiveContexts()); + PitTestsUtil.assertGetAllPitsEmpty(client()); } } diff --git a/server/src/test/java/org/opensearch/search/DeletePitMultiNodeTests.java b/server/src/test/java/org/opensearch/search/DeletePitMultiNodeTests.java index 89607b9201cd9..70efcf88cd581 100644 --- a/server/src/test/java/org/opensearch/search/DeletePitMultiNodeTests.java +++ b/server/src/test/java/org/opensearch/search/DeletePitMultiNodeTests.java @@ -181,7 +181,6 @@ public Settings onNodeStopped(String nodeName) throws Exception { DeletePitResponse deletePITResponse = execute.get(); for (DeletePitInfo deletePitInfo : deletePITResponse.getDeletePitResults()) { assertTrue(pitIds.contains(deletePitInfo.getPitId())); - assertFalse(deletePitInfo.isSuccessful()); } } catch (Exception e) { throw new AssertionError(e); @@ -205,9 +204,9 @@ public Settings onNodeStopped(String nodeName) throws Exception { } public void testDeleteAllPitsWhileNodeDrop() throws Exception { - createPitOnIndex("index"); createIndex("index1", Settings.builder().put("index.number_of_shards", 5).put("index.number_of_replicas", 1).build()); client().prepareIndex("index1").setId("1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).execute().get(); + createPitOnIndex("index1"); ensureGreen(); DeletePitRequest deletePITRequest = new DeletePitRequest("_all"); internalCluster().restartRandomDataNode(new InternalTestCluster.RestartCallback() { @@ -218,7 +217,6 @@ public Settings onNodeStopped(String nodeName) throws Exception { DeletePitResponse deletePITResponse = execute.get(); for (DeletePitInfo deletePitInfo : deletePITResponse.getDeletePitResults()) { assertThat(deletePitInfo.getPitId(), not(blankOrNullString())); - assertFalse(deletePitInfo.isSuccessful()); } } catch (Exception e) { assertTrue(e.getMessage().contains("Node not connected")); @@ -226,18 +224,14 @@ public Settings onNodeStopped(String nodeName) throws Exception { return super.onNodeStopped(nodeName); } }); - ensureGreen(); /** - * When we invoke delete again, returns success after clearing the remaining readers. Asserting reader context - * not found exceptions don't result in failures ( as deletion in one node is successful ) + * When we invoke delete again, returns success as all readers are cleared. (Delete all on node which is Up and + * once the node restarts, all active contexts are cleared in the node ) */ ActionFuture execute = client().execute(DeletePitAction.INSTANCE, deletePITRequest); DeletePitResponse deletePITResponse = execute.get(); - for (DeletePitInfo deletePitInfo : deletePITResponse.getDeletePitResults()) { - assertThat(deletePitInfo.getPitId(), not(blankOrNullString())); - assertTrue(deletePitInfo.isSuccessful()); - } + assertEquals(0, deletePITResponse.getDeletePitResults().size()); client().admin().indices().prepareDelete("index1").get(); } diff --git a/server/src/test/java/org/opensearch/search/CreatePitMultiNodeTests.java b/server/src/test/java/org/opensearch/search/PitMultiNodeTests.java similarity index 63% rename from server/src/test/java/org/opensearch/search/CreatePitMultiNodeTests.java rename to server/src/test/java/org/opensearch/search/PitMultiNodeTests.java index 27d8f27add898..f93a43a027da7 100644 --- a/server/src/test/java/org/opensearch/search/CreatePitMultiNodeTests.java +++ b/server/src/test/java/org/opensearch/search/PitMultiNodeTests.java @@ -8,18 +8,27 @@ package org.opensearch.search; +import com.carrotsearch.hppc.cursors.ObjectCursor; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.opensearch.action.ActionFuture; import org.opensearch.action.ActionListener; import org.opensearch.action.LatchedActionListener; +import org.opensearch.action.admin.cluster.state.ClusterStateRequest; +import org.opensearch.action.admin.cluster.state.ClusterStateResponse; import org.opensearch.action.search.CreatePitAction; import org.opensearch.action.search.CreatePitRequest; import org.opensearch.action.search.CreatePitResponse; import org.opensearch.action.search.DeletePitAction; import org.opensearch.action.search.DeletePitRequest; import org.opensearch.action.search.DeletePitResponse; +import org.opensearch.action.search.GetAllPitNodesRequest; +import org.opensearch.action.search.GetAllPitNodesResponse; +import org.opensearch.action.search.GetAllPitsAction; +import org.opensearch.action.search.PitTestsUtil; import org.opensearch.action.search.SearchResponse; +import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.search.builder.PointInTimeBuilder; @@ -30,12 +39,14 @@ import java.util.ArrayList; import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import static org.hamcrest.Matchers.containsString; import static org.opensearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE; @@ -45,7 +56,7 @@ * Multi node integration tests for PIT creation and search operation with PIT ID. */ @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, numDataNodes = 2) -public class CreatePitMultiNodeTests extends OpenSearchIntegTestCase { +public class PitMultiNodeTests extends OpenSearchIntegTestCase { @Before public void setupIndex() throws ExecutionException, InterruptedException { @@ -70,6 +81,7 @@ public void testPit() throws Exception { .get(); assertEquals(2, searchResponse.getSuccessfulShards()); assertEquals(2, searchResponse.getTotalShards()); + PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime()); } public void testCreatePitWhileNodeDropWithAllowPartialCreationFalse() throws Exception { @@ -95,6 +107,7 @@ public void testCreatePitWhileNodeDropWithAllowPartialCreationTrue() throws Exce public Settings onNodeStopped(String nodeName) throws Exception { ActionFuture execute = client().execute(CreatePitAction.INSTANCE, request); CreatePitResponse pitResponse = execute.get(); + PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime()); assertEquals(1, pitResponse.getSuccessfulShards()); assertEquals(2, pitResponse.getTotalShards()); SearchResponse searchResponse = client().prepareSearch("index") @@ -124,6 +137,7 @@ public Settings onNodeStopped(String nodeName) throws Exception { assertEquals(1, searchResponse.getFailedShards()); assertEquals(0, searchResponse.getSkippedShards()); assertEquals(2, searchResponse.getTotalShards()); + PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime()); return super.onNodeStopped(nodeName); } }); @@ -264,7 +278,7 @@ public void testConcurrentCreatesWithDeletes() throws InterruptedException, Exec AtomicInteger numSuccess = new AtomicInteger(); TestThreadPool testThreadPool = null; try { - testThreadPool = new TestThreadPool(CreatePitMultiNodeTests.class.getName()); + testThreadPool = new TestThreadPool(PitMultiNodeTests.class.getName()); int concurrentRuns = randomIntBetween(20, 50); List operationThreads = new ArrayList<>(); @@ -312,4 +326,132 @@ public void onFailure(Exception e) {} ThreadPool.terminate(testThreadPool, 500, TimeUnit.MILLISECONDS); } } + + public void testGetAllPits() throws Exception { + client().admin().indices().prepareCreate("index1").get(); + CreatePitRequest request = new CreatePitRequest(TimeValue.timeValueDays(1), true); + request.setIndices(new String[] { "index", "index1" }); + ActionFuture execute = client().execute(CreatePitAction.INSTANCE, request); + CreatePitResponse pitResponse = execute.get(); + CreatePitResponse pitResponse1 = client().execute(CreatePitAction.INSTANCE, request).get(); + CreatePitResponse pitResponse2 = client().execute(CreatePitAction.INSTANCE, request).get(); + final ClusterStateRequest clusterStateRequest = new ClusterStateRequest(); + clusterStateRequest.local(false); + clusterStateRequest.clear().nodes(true).routingTable(true).indices("*"); + ClusterStateResponse clusterStateResponse = client().admin().cluster().state(clusterStateRequest).get(); + final List nodes = new LinkedList<>(); + for (ObjectCursor cursor : clusterStateResponse.getState().nodes().getDataNodes().values()) { + DiscoveryNode node = cursor.value; + nodes.add(node); + } + DiscoveryNode[] disNodesArr = new DiscoveryNode[nodes.size()]; + nodes.toArray(disNodesArr); + GetAllPitNodesRequest getAllPITNodesRequest = new GetAllPitNodesRequest(disNodesArr); + ActionFuture execute1 = client().execute(GetAllPitsAction.INSTANCE, getAllPITNodesRequest); + GetAllPitNodesResponse getPitResponse = execute1.get(); + assertEquals(3, getPitResponse.getPitInfos().size()); + List resultPitIds = getPitResponse.getPitInfos().stream().map(p -> p.getPitId()).collect(Collectors.toList()); + // asserting that we get all unique PIT IDs + Assert.assertTrue(resultPitIds.contains(pitResponse.getId())); + Assert.assertTrue(resultPitIds.contains(pitResponse1.getId())); + Assert.assertTrue(resultPitIds.contains(pitResponse2.getId())); + client().admin().indices().prepareDelete("index1").get(); + } + + public void testGetAllPitsDuringNodeDrop() throws Exception { + CreatePitRequest request = new CreatePitRequest(TimeValue.timeValueDays(1), true); + request.setIndices(new String[] { "index" }); + ActionFuture execute = client().execute(CreatePitAction.INSTANCE, request); + CreatePitResponse pitResponse = execute.get(); + GetAllPitNodesRequest getAllPITNodesRequest = new GetAllPitNodesRequest(getDiscoveryNodes()); + internalCluster().restartRandomDataNode(new InternalTestCluster.RestartCallback() { + @Override + public Settings onNodeStopped(String nodeName) throws Exception { + ActionFuture execute1 = client().execute(GetAllPitsAction.INSTANCE, getAllPITNodesRequest); + GetAllPitNodesResponse getPitResponse = execute1.get(); + // we still get a pit id from the data node which is up + assertEquals(1, getPitResponse.getPitInfos().size()); + // failure for node drop + assertEquals(1, getPitResponse.failures().size()); + assertTrue(getPitResponse.failures().get(0).getMessage().contains("Failed node")); + return super.onNodeStopped(nodeName); + } + }); + } + + private DiscoveryNode[] getDiscoveryNodes() throws ExecutionException, InterruptedException { + final ClusterStateRequest clusterStateRequest = new ClusterStateRequest(); + clusterStateRequest.local(false); + clusterStateRequest.clear().nodes(true).routingTable(true).indices("*"); + ClusterStateResponse clusterStateResponse = client().admin().cluster().state(clusterStateRequest).get(); + final List nodes = new LinkedList<>(); + for (ObjectCursor cursor : clusterStateResponse.getState().nodes().getDataNodes().values()) { + DiscoveryNode node = cursor.value; + nodes.add(node); + } + DiscoveryNode[] disNodesArr = new DiscoveryNode[nodes.size()]; + nodes.toArray(disNodesArr); + return disNodesArr; + } + + public void testConcurrentGetWithDeletes() throws InterruptedException, ExecutionException { + CreatePitRequest createPitRequest = new CreatePitRequest(TimeValue.timeValueDays(1), true); + createPitRequest.setIndices(new String[] { "index" }); + List pitIds = new ArrayList<>(); + String id = client().execute(CreatePitAction.INSTANCE, createPitRequest).get().getId(); + pitIds.add(id); + DeletePitRequest deletePITRequest = new DeletePitRequest(pitIds); + GetAllPitNodesRequest getAllPITNodesRequest = new GetAllPitNodesRequest(getDiscoveryNodes()); + AtomicInteger numSuccess = new AtomicInteger(); + TestThreadPool testThreadPool = null; + try { + testThreadPool = new TestThreadPool(PitMultiNodeTests.class.getName()); + int concurrentRuns = randomIntBetween(20, 50); + + List operationThreads = new ArrayList<>(); + CountDownLatch countDownLatch = new CountDownLatch(concurrentRuns); + long randomDeleteThread = randomLongBetween(0, concurrentRuns - 1); + for (int i = 0; i < concurrentRuns; i++) { + int currentThreadIteration = i; + Runnable thread = () -> { + if (currentThreadIteration == randomDeleteThread) { + LatchedActionListener listener = new LatchedActionListener<>(new ActionListener() { + @Override + public void onResponse(GetAllPitNodesResponse getAllPitNodesResponse) { + if (getAllPitNodesResponse.failures().isEmpty()) { + numSuccess.incrementAndGet(); + } + } + + @Override + public void onFailure(Exception e) {} + }, countDownLatch); + client().execute(GetAllPitsAction.INSTANCE, getAllPITNodesRequest, listener); + } else { + LatchedActionListener listener = new LatchedActionListener<>(new ActionListener() { + @Override + public void onResponse(DeletePitResponse deletePitResponse) { + if (deletePitResponse.getDeletePitResults().get(0).isSuccessful()) { + numSuccess.incrementAndGet(); + } + } + + @Override + public void onFailure(Exception e) {} + }, countDownLatch); + client().execute(DeletePitAction.INSTANCE, deletePITRequest, listener); + } + }; + operationThreads.add(thread); + } + TestThreadPool finalTestThreadPool = testThreadPool; + operationThreads.forEach(runnable -> finalTestThreadPool.executor("generic").execute(runnable)); + countDownLatch.await(); + assertEquals(concurrentRuns, numSuccess.get()); + + } finally { + ThreadPool.terminate(testThreadPool, 500, TimeUnit.MILLISECONDS); + } + } + } diff --git a/server/src/test/java/org/opensearch/search/SearchServiceTests.java b/server/src/test/java/org/opensearch/search/SearchServiceTests.java index ecc28470b0eb2..9c140cda6ccc8 100644 --- a/server/src/test/java/org/opensearch/search/SearchServiceTests.java +++ b/server/src/test/java/org/opensearch/search/SearchServiceTests.java @@ -1414,6 +1414,7 @@ public void testOpenReaderContext() { searchService.createPitReaderContext(new ShardId(resolveIndex("index"), 0), TimeValue.timeValueMinutes(between(1, 10)), future); future.actionGet(); assertThat(searchService.getActiveContexts(), equalTo(1)); + assertThat(searchService.getAllPITReaderContexts().size(), equalTo(1)); assertTrue(searchService.freeReaderContext(future.actionGet())); } @@ -1442,6 +1443,7 @@ public void testDeletePitReaderContext() { contextIds.add(pitSearchContextIdForNode); assertThat(searchService.getActiveContexts(), equalTo(1)); + assertThat(searchService.getAllPITReaderContexts().size(), equalTo(1)); DeletePitResponse deletePitResponse = searchService.freeReaderContextsIfFound(contextIds); assertTrue(deletePitResponse.getDeletePitResults().get(0).isSuccessful()); // assert true for reader context not found @@ -1451,19 +1453,6 @@ public void testDeletePitReaderContext() { assertFalse(searchService.freeReaderContext(future.actionGet())); } - public void testDeleteAllPitReaderContexts() { - createIndex("index"); - SearchService searchService = getInstanceFromNode(SearchService.class); - PlainActionFuture future = new PlainActionFuture<>(); - searchService.createPitReaderContext(new ShardId(resolveIndex("index"), 0), TimeValue.timeValueMinutes(between(1, 10)), future); - future.actionGet(); - searchService.createPitReaderContext(new ShardId(resolveIndex("index"), 0), TimeValue.timeValueMinutes(between(1, 10)), future); - future.actionGet(); - assertThat(searchService.getActiveContexts(), equalTo(2)); - searchService.freeAllPitContexts(); - assertThat(searchService.getActiveContexts(), equalTo(0)); - } - public void testPitContextMaxKeepAlive() { createIndex("index"); SearchService searchService = getInstanceFromNode(SearchService.class); @@ -1484,6 +1473,7 @@ public void testPitContextMaxKeepAlive() { ex.getMessage() ); assertThat(searchService.getActiveContexts(), equalTo(0)); + assertThat(searchService.getAllPITReaderContexts().size(), equalTo(0)); } public void testUpdatePitId() { @@ -1506,6 +1496,7 @@ public void testUpdatePitId() { assertTrue(updateResponse.getKeepAlive() == updateRequest.getKeepAlive()); assertTrue(updateResponse.getPitId().equalsIgnoreCase("pitId")); assertThat(searchService.getActiveContexts(), equalTo(1)); + assertThat(searchService.getAllPITReaderContexts().size(), equalTo(1)); assertTrue(searchService.freeReaderContext(future.actionGet())); } @@ -1538,6 +1529,7 @@ public void testUpdatePitIdMaxKeepAlive() { ex.getMessage() ); assertThat(searchService.getActiveContexts(), equalTo(1)); + assertThat(searchService.getAllPITReaderContexts().size(), equalTo(1)); assertTrue(searchService.freeReaderContext(future.actionGet())); } @@ -1558,5 +1550,6 @@ public void testUpdatePitIdWithInvalidReaderId() { assertEquals("No search context found for id [" + id.getId() + "]", ex.getMessage()); assertThat(searchService.getActiveContexts(), equalTo(0)); + assertThat(searchService.getAllPITReaderContexts().size(), equalTo(0)); } }