diff --git a/.github/workflows/code-hygiene.yml b/.github/workflows/code-hygiene.yml index 1952630e5bdfa..a1adbb8a87507 100644 --- a/.github/workflows/code-hygiene.yml +++ b/.github/workflows/code-hygiene.yml @@ -1,6 +1,6 @@ name: Code Hygiene -on: [push, pull_request] +on: [pull_request] jobs: linelint: diff --git a/.github/workflows/gradle-check.yml b/.github/workflows/gradle-check.yml new file mode 100644 index 0000000000000..dec5ee15d0bea --- /dev/null +++ b/.github/workflows/gradle-check.yml @@ -0,0 +1,89 @@ +name: Gradle Check (Jenkins) +on: + push: + branches-ignore: + - 'backport/*' + - 'create-pull-request/*' + - 'dependabot/*' + pull_request_target: + types: [opened, synchronize, reopened] + +jobs: + gradle-check: + runs-on: ubuntu-latest + timeout-minutes: 130 + steps: + - name: Checkout OpenSearch repo + uses: actions/checkout@v2 + with: + ref: ${{ github.event.pull_request.head.sha }} + + - name: Setup environment variables (PR) + if: github.event_name == 'pull_request_target' + run: | + echo "pr_from_sha=$(jq --raw-output .pull_request.head.sha $GITHUB_EVENT_PATH)" >> $GITHUB_ENV + echo "pr_from_clone_url=$(jq --raw-output .pull_request.head.repo.clone_url $GITHUB_EVENT_PATH)" >> $GITHUB_ENV + echo "pr_to_clone_url=$(jq --raw-output .pull_request.base.repo.clone_url $GITHUB_EVENT_PATH)" >> $GITHUB_ENV + echo "pr_title=$(jq --raw-output .pull_request.title $GITHUB_EVENT_PATH)" >> $GITHUB_ENV + echo "pr_number=$(jq --raw-output .pull_request.number $GITHUB_EVENT_PATH)" >> $GITHUB_ENV + + - name: Setup environment variables (Push) + if: github.event_name == 'push' + run: | + repo_url="https://github.com/opensearch-project/OpenSearch" + ref_id=$(git rev-parse HEAD) + branch_name=$(git rev-parse --abbrev-ref HEAD) + echo "pr_from_sha=$ref_id" >> $GITHUB_ENV + echo "pr_from_clone_url=$repo_url" >> $GITHUB_ENV + echo "pr_to_clone_url=$repo_url" >> $GITHUB_ENV + echo "pr_title=Push trigger $branch_name $ref_id $repo_url" >> $GITHUB_ENV + echo "pr_number=Null" >> $GITHUB_ENV + + - name: Checkout opensearch-build repo + uses: actions/checkout@v2 + with: + repository: opensearch-project/opensearch-build + ref: main + path: opensearch-build + + - name: Trigger jenkins workflow to run gradle check + run: | + set -e + set -o pipefail + bash opensearch-build/scripts/gradle/gradle-check.sh ${{ secrets.JENKINS_GRADLE_CHECK_GENERIC_WEBHOOK_TOKEN }} | tee -a gradle-check.log + + - name: Setup Result Status + if: always() + run: | + WORKFLOW_URL=`cat gradle-check.log | grep 'WORKFLOW_URL' | awk '{print $2}'` + RESULT=`cat gradle-check.log | grep 'Result:' | awk '{print $2}'` + echo "workflow_url=$WORKFLOW_URL" >> $GITHUB_ENV + echo "result=$RESULT" >> $GITHUB_ENV + + - name: Upload Coverage Report + if: success() + uses: codecov/codecov-action@v2 + with: + files: ./codeCoverage.xml + + - name: Create Comment Success + if: ${{ github.event_name == 'pull_request_target' && success() }} + uses: peter-evans/create-or-update-comment@v2 + with: + issue-number: ${{ env.pr_number }} + body: | + ### Gradle Check (Jenkins) Run Completed with: + * **RESULT:** ${{ env.result }} :white_check_mark: + * **URL:** ${{ env.workflow_url }} + * **CommitID:** ${{ env.pr_from_sha }} + + - name: Create Comment Failure + if: ${{ github.event_name == 'pull_request_target' && failure() }} + uses: peter-evans/create-or-update-comment@v2 + with: + issue-number: ${{ env.pr_number }} + body: | + ### Gradle Check (Jenkins) Run Completed with: + * **RESULT:** ${{ env.result }} :x: + * **URL:** ${{ env.workflow_url }} + * **CommitID:** ${{ env.pr_from_sha }} diff --git a/.github/workflows/wrapper.yml b/.github/workflows/wrapper.yml index be5e7afb56ba0..c3e0aae98cde2 100644 --- a/.github/workflows/wrapper.yml +++ b/.github/workflows/wrapper.yml @@ -1,5 +1,5 @@ name: Validate Gradle Wrapper -on: [push, pull_request] +on: [pull_request] jobs: validate: diff --git a/client/rest-high-level/src/test/java/org/opensearch/client/RestHighLevelClientTests.java b/client/rest-high-level/src/test/java/org/opensearch/client/RestHighLevelClientTests.java index efcc13921c398..8b3ef713964f7 100644 --- a/client/rest-high-level/src/test/java/org/opensearch/client/RestHighLevelClientTests.java +++ b/client/rest-high-level/src/test/java/org/opensearch/client/RestHighLevelClientTests.java @@ -885,7 +885,8 @@ public void testApiNamingConventions() throws Exception { "nodes.hot_threads", "nodes.usage", "nodes.reload_secure_settings", - "search_shards", }; + "search_shards", + "get_all_pits" }; List booleanReturnMethods = Arrays.asList("security.enable_user", "security.disable_user", "security.change_password"); Set deprecatedMethods = new HashSet<>(); deprecatedMethods.add("indices.force_merge"); diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/create_pit.json b/rest-api-spec/src/main/resources/rest-api-spec/api/create_pit.json index 1e49f99ab20c5..9ab9b8d08d1b5 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/create_pit.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/create_pit.json @@ -2,7 +2,7 @@ "create_pit":{ "documentation":{ "url":"https://opensearch.org/docs/latest/opensearch/rest-api/point_in_time/", - "description":"Creates point in time context." + "description":"Creates point in time search context." }, "stability":"experimental", "url":{ diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/get_all_pits.json b/rest-api-spec/src/main/resources/rest-api-spec/api/get_all_pits.json new file mode 100644 index 0000000000000..067c9394fc87e --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/get_all_pits.json @@ -0,0 +1,19 @@ +{ + "get_all_pits":{ + "documentation":{ + "url":"https://opensearch.org/docs/latest/opensearch/rest-api/point_in_time/", + "description":"Get all active point in time searches." + }, + "stability":"experimental", + "url":{ + "paths":[ + { + "path":"/_search/point_in_time/all", + "methods":[ + "GET" + ] + } + ] + } + } +} diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/pit/10_basic.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/pit/10_basic.yml index 58f019e788968..75ffd90e2b778 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/pit/10_basic.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/pit/10_basic.yml @@ -27,9 +27,18 @@ keep_alive: 23h - set: {id: pit_id} + - set: {creation_time: create_time} - match: { _shards.total: 1} - match: { _shards.successful: 1} - match: { _shards.failed: 0} + + - do: + get_all_pits: {} + + - match: {pits.0.pitId: $pit_id} + - match: {pits.0.creationTime: $create_time} + - match: {pits.0.keepAlive: 82800000} + - do: search: rest_total_hits_as_int: true @@ -87,7 +96,7 @@ "pit_id": [$pit_id] - match: {pits.0.pitId: $pit_id} - - match: {pits.0.succeeded: true } + - match: {pits.0.successful: true } --- "Delete all": @@ -127,7 +136,7 @@ delete_all_pits: {} - match: {pits.0.pitId: $pit_id} - - match: {pits.0.succeeded: true } + - match: {pits.0.successful: true } - do: catch: missing diff --git a/server/src/main/java/org/opensearch/action/ActionModule.java b/server/src/main/java/org/opensearch/action/ActionModule.java index 4ad1c5abf0ca4..0de277d89ef1b 100644 --- a/server/src/main/java/org/opensearch/action/ActionModule.java +++ b/server/src/main/java/org/opensearch/action/ActionModule.java @@ -234,12 +234,14 @@ import org.opensearch.action.search.ClearScrollAction; import org.opensearch.action.search.CreatePitAction; import org.opensearch.action.search.DeletePitAction; +import org.opensearch.action.search.GetAllPitsAction; import org.opensearch.action.search.MultiSearchAction; import org.opensearch.action.search.SearchAction; import org.opensearch.action.search.SearchScrollAction; import org.opensearch.action.search.TransportClearScrollAction; import org.opensearch.action.search.TransportCreatePitAction; import org.opensearch.action.search.TransportDeletePitAction; +import org.opensearch.action.search.TransportGetAllPitsAction; import org.opensearch.action.search.TransportMultiSearchAction; import org.opensearch.action.search.TransportSearchAction; import org.opensearch.action.search.TransportSearchScrollAction; @@ -403,6 +405,7 @@ import org.opensearch.rest.action.search.RestCreatePitAction; import org.opensearch.rest.action.search.RestDeletePitAction; import org.opensearch.rest.action.search.RestExplainAction; +import org.opensearch.rest.action.search.RestGetAllPitsAction; import org.opensearch.rest.action.search.RestMultiSearchAction; import org.opensearch.rest.action.search.RestSearchAction; import org.opensearch.rest.action.search.RestSearchScrollAction; @@ -664,6 +667,7 @@ public void reg actions.register(FindDanglingIndexAction.INSTANCE, TransportFindDanglingIndexAction.class); actions.register(CreatePitAction.INSTANCE, TransportCreatePitAction.class); actions.register(DeletePitAction.INSTANCE, TransportDeletePitAction.class); + actions.register(GetAllPitsAction.INSTANCE, TransportGetAllPitsAction.class); return unmodifiableMap(actions.getRegistry()); } @@ -840,6 +844,7 @@ public void initRestHandlers(Supplier nodesInCluster) { // Point in time API registerHandler.accept(new RestCreatePitAction()); registerHandler.accept(new RestDeletePitAction()); + registerHandler.accept(new RestGetAllPitsAction()); for (ActionPlugin plugin : actionPlugins) { for (RestHandler handler : plugin.getRestHandlers( diff --git a/server/src/main/java/org/opensearch/action/search/CreatePitAction.java b/server/src/main/java/org/opensearch/action/search/CreatePitAction.java index 1af56a044205b..7ffa30a182458 100644 --- a/server/src/main/java/org/opensearch/action/search/CreatePitAction.java +++ b/server/src/main/java/org/opensearch/action/search/CreatePitAction.java @@ -15,7 +15,7 @@ */ public class CreatePitAction extends ActionType { public static final CreatePitAction INSTANCE = new CreatePitAction(); - public static final String NAME = "indices:data/read/point_in_time"; + public static final String NAME = "indices:data/read/point_in_time/create"; private CreatePitAction() { super(NAME, CreatePitResponse::new); diff --git a/server/src/main/java/org/opensearch/action/search/DeletePitAction.java b/server/src/main/java/org/opensearch/action/search/DeletePitAction.java index f807a82301aba..717017de4a62c 100644 --- a/server/src/main/java/org/opensearch/action/search/DeletePitAction.java +++ b/server/src/main/java/org/opensearch/action/search/DeletePitAction.java @@ -17,7 +17,7 @@ public class DeletePitAction extends ActionType { public static final DeletePitAction INSTANCE = new DeletePitAction(); - public static final String NAME = "indices:admin/read/pit/delete"; + public static final String NAME = "indices:data/read/point_in_time/delete"; private DeletePitAction() { super(NAME, DeletePitResponse::new); diff --git a/server/src/main/java/org/opensearch/action/search/GetAllPitNodeRequest.java b/server/src/main/java/org/opensearch/action/search/GetAllPitNodeRequest.java new file mode 100644 index 0000000000000..e576b98890508 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/search/GetAllPitNodeRequest.java @@ -0,0 +1,39 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.opensearch.action.support.nodes.BaseNodeRequest; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; + +import java.io.IOException; + +/** + * Inner node get all pits request + */ +public class GetAllPitNodeRequest extends BaseNodeRequest { + GetAllPitNodesRequest request; + + @Inject + public GetAllPitNodeRequest(GetAllPitNodesRequest request) { + this.request = request; + } + + public GetAllPitNodeRequest(StreamInput in) throws IOException { + super(in); + request = new GetAllPitNodesRequest(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + request.writeTo(out); + } +} diff --git a/server/src/main/java/org/opensearch/action/search/GetAllPitNodeResponse.java b/server/src/main/java/org/opensearch/action/search/GetAllPitNodeResponse.java new file mode 100644 index 0000000000000..3675628399abe --- /dev/null +++ b/server/src/main/java/org/opensearch/action/search/GetAllPitNodeResponse.java @@ -0,0 +1,70 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.opensearch.action.support.nodes.BaseNodeResponse; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.xcontent.ToXContentFragment; +import org.opensearch.common.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +/** + * Inner node get all pits response + */ +public class GetAllPitNodeResponse extends BaseNodeResponse implements ToXContentFragment { + + private List pitsInfo; + + @Inject + public GetAllPitNodeResponse(StreamInput in, List pitsInfo) throws IOException { + super(in); + this.pitsInfo = pitsInfo; + } + + public GetAllPitNodeResponse(DiscoveryNode node, List pitsInfo) { + super(node); + this.pitsInfo = pitsInfo; + } + + public GetAllPitNodeResponse(StreamInput in) throws IOException { + super(in); + this.pitsInfo = Collections.unmodifiableList(in.readList(ListPitInfo::new)); + } + + public List getPitsInfo() { + return pitsInfo; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeList(pitsInfo); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field("node", this.getNode().getName()); + builder.startArray("pitsInfo"); + for (ListPitInfo pit : pitsInfo) { + pit.toXContent(builder, params); + } + + builder.endArray(); + builder.endObject(); + return builder; + } + +} diff --git a/server/src/main/java/org/opensearch/action/search/GetAllPitNodesRequest.java b/server/src/main/java/org/opensearch/action/search/GetAllPitNodesRequest.java new file mode 100644 index 0000000000000..db20cf1fea36d --- /dev/null +++ b/server/src/main/java/org/opensearch/action/search/GetAllPitNodesRequest.java @@ -0,0 +1,45 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.opensearch.action.support.nodes.BaseNodesRequest; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; + +import java.io.IOException; + +/** + * Request to get all active PIT IDs from all nodes of cluster + */ +public class GetAllPitNodesRequest extends BaseNodesRequest { + + boolean includeAll; + + @Inject + public GetAllPitNodesRequest(boolean includeAll, DiscoveryNode... concreteNodes) { + super(concreteNodes); + this.includeAll = includeAll; + + } + + public GetAllPitNodesRequest(StreamInput in) throws IOException { + super(in); + this.includeAll = in.readBoolean(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeBoolean(includeAll); + } + + public boolean getIncludeAll() { return includeAll; } +} diff --git a/server/src/main/java/org/opensearch/action/search/GetAllPitNodesResponse.java b/server/src/main/java/org/opensearch/action/search/GetAllPitNodesResponse.java new file mode 100644 index 0000000000000..2c5e065434403 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/search/GetAllPitNodesResponse.java @@ -0,0 +1,77 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.opensearch.action.FailedNodeException; +import org.opensearch.action.support.nodes.BaseNodesResponse; +import org.opensearch.cluster.ClusterName; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.xcontent.ToXContentObject; +import org.opensearch.common.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * This class transforms active PIT objects from all nodes to unique PIT objects + */ +public class GetAllPitNodesResponse extends BaseNodesResponse implements ToXContentObject { + List pitsInfo = new ArrayList<>(); + + @Inject + public GetAllPitNodesResponse(StreamInput in) throws IOException { + super(in); + } + + public GetAllPitNodesResponse( + ClusterName clusterName, + List getAllPitNodeResponse, + List failures + ) { + super(clusterName, getAllPitNodeResponse, failures); + Set uniquePitIds = new HashSet<>(); + pitsInfo.addAll( + getAllPitNodeResponse.stream() + .flatMap(p -> p.getPitsInfo().stream().filter(t -> uniquePitIds.add(t.getPitId()))) + .collect(Collectors.toList()) + ); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.startArray("pitsInfo"); + for (ListPitInfo pit : pitsInfo) { + pit.toXContent(builder, params); + } + builder.endArray(); + builder.endObject(); + return builder; + } + + @Override + public List readNodesFrom(StreamInput in) throws IOException { + return in.readList(GetAllPitNodeResponse::new); + } + + @Override + public void writeNodesTo(StreamOutput out, List nodes) throws IOException { + out.writeList(nodes); + } + + public List getPITIDs() { + return new ArrayList<>(pitsInfo); + } +} diff --git a/server/src/main/java/org/opensearch/action/search/GetAllPitsAction.java b/server/src/main/java/org/opensearch/action/search/GetAllPitsAction.java new file mode 100644 index 0000000000000..16e65cb785a7d --- /dev/null +++ b/server/src/main/java/org/opensearch/action/search/GetAllPitsAction.java @@ -0,0 +1,23 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.opensearch.action.ActionType; + +/** + * Action type for listing all PIT reader contexts + */ +public class GetAllPitsAction extends ActionType { + public static final GetAllPitsAction INSTANCE = new GetAllPitsAction(); + public static final String NAME = "indices:data/read/point_in_time/readall"; + + private GetAllPitsAction() { + super(NAME, GetAllPitNodesResponse::new); + } +} diff --git a/server/src/main/java/org/opensearch/action/search/ListPitInfo.java b/server/src/main/java/org/opensearch/action/search/ListPitInfo.java new file mode 100644 index 0000000000000..4499e7d6e8ef5 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/search/ListPitInfo.java @@ -0,0 +1,63 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.io.stream.Writeable; +import org.opensearch.common.xcontent.ToXContentFragment; +import org.opensearch.common.xcontent.XContentBuilder; + +import java.io.IOException; + +/** + * This holds information about pit reader context such as pit id and creation time + */ +public class ListPitInfo implements ToXContentFragment, Writeable { + private final String pitId; + private final long creationTime; + private final long keepAlive; + + public ListPitInfo(String pitId, long creationTime, long keepAlive) { + this.pitId = pitId; + this.creationTime = creationTime; + this.keepAlive = keepAlive; + } + + public ListPitInfo(StreamInput in) throws IOException { + this.pitId = in.readString(); + this.creationTime = in.readLong(); + this.keepAlive = in.readLong(); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field("pitId", pitId); + builder.field("creationTime", creationTime); + builder.field("keepAlive", keepAlive); + builder.endObject(); + return builder; + } + + public String getPitId() { + return pitId; + } + + public long getCreationTime() { + return creationTime; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(pitId); + out.writeLong(creationTime); + out.writeLong(keepAlive); + } +} diff --git a/server/src/main/java/org/opensearch/action/search/PitService.java b/server/src/main/java/org/opensearch/action/search/PitService.java index 6fe5a44aa12e1..4399d276eeff1 100644 --- a/server/src/main/java/org/opensearch/action/search/PitService.java +++ b/server/src/main/java/org/opensearch/action/search/PitService.java @@ -8,6 +8,7 @@ package org.opensearch.action.search; +import com.carrotsearch.hppc.cursors.ObjectCursor; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; @@ -18,11 +19,18 @@ import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Strings; import org.opensearch.common.inject.Inject; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.Transport; +import org.opensearch.transport.TransportException; +import org.opensearch.transport.TransportResponseHandler; +import org.opensearch.transport.TransportService; +import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; @@ -38,11 +46,13 @@ public class PitService { private final ClusterService clusterService; private final SearchTransportService searchTransportService; + private final TransportService transportService; @Inject - public PitService(ClusterService clusterService, SearchTransportService searchTransportService) { + public PitService(ClusterService clusterService, SearchTransportService searchTransportService, TransportService transportService) { this.clusterService = clusterService; this.searchTransportService = searchTransportService; + this.transportService = transportService; } /** @@ -129,4 +139,40 @@ public void onFailure(final Exception e) { } }, size); } + + public void getAllPits(ActionListener getAllPitsListener) { + final List nodes = new LinkedList<>(); + for (ObjectCursor cursor : clusterService.state().nodes().getDataNodes().values()) { + DiscoveryNode node = cursor.value; + nodes.add(node); + } + DiscoveryNode[] disNodesArr = new DiscoveryNode[nodes.size()]; + nodes.toArray(disNodesArr); + transportService.sendRequest( + transportService.getLocalNode(), + GetAllPitsAction.NAME, + new GetAllPitNodesRequest(false, disNodesArr), + new TransportResponseHandler() { + @Override + public void handleResponse(GetAllPitNodesResponse response) { + getAllPitsListener.onResponse(response); + } + + @Override + public void handleException(TransportException exp) { + getAllPitsListener.onFailure(exp); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + + @Override + public GetAllPitNodesResponse read(StreamInput in) throws IOException { + return new GetAllPitNodesResponse(in); + } + } + ); + } } diff --git a/server/src/main/java/org/opensearch/action/search/SearchTransportService.java b/server/src/main/java/org/opensearch/action/search/SearchTransportService.java index 0bd21b8c7d4e8..ce5dcfec213c9 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchTransportService.java +++ b/server/src/main/java/org/opensearch/action/search/SearchTransportService.java @@ -218,16 +218,6 @@ public void sendFreePITContexts( ); } - public void sendFreeAllPitContexts(Transport.Connection connection, final ActionListener listener) { - transportService.sendRequest( - connection, - FREE_ALL_PIT_CONTEXTS_ACTION_NAME, - TransportRequest.Empty.INSTANCE, - TransportRequestOptions.EMPTY, - new ActionListenerResponseHandler<>(listener, DeletePitResponse::new) - ); - } - public void sendExecuteDfs( Transport.Connection connection, final ShardSearchRequest request, @@ -528,14 +518,6 @@ public static void registerRequestHandler(TransportService transportService, Sea ); TransportActionProxy.registerProxyAction(transportService, FREE_PIT_CONTEXT_ACTION_NAME, DeletePitResponse::new); - transportService.registerRequestHandler( - FREE_ALL_PIT_CONTEXTS_ACTION_NAME, - ThreadPool.Names.SAME, - TransportRequest.Empty::new, - (request, channel, task) -> { channel.sendResponse(searchService.freeAllPitContexts()); } - ); - TransportActionProxy.registerProxyAction(transportService, FREE_ALL_PIT_CONTEXTS_ACTION_NAME, DeletePitResponse::new); - transportService.registerRequestHandler( FREE_CONTEXT_ACTION_NAME, ThreadPool.Names.SAME, diff --git a/server/src/main/java/org/opensearch/action/search/TransportDeletePitAction.java b/server/src/main/java/org/opensearch/action/search/TransportDeletePitAction.java index d67979d1c87c5..327ec1f1500b7 100644 --- a/server/src/main/java/org/opensearch/action/search/TransportDeletePitAction.java +++ b/server/src/main/java/org/opensearch/action/search/TransportDeletePitAction.java @@ -11,18 +11,17 @@ import org.opensearch.action.ActionListener; import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.HandledTransportAction; -import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.inject.Inject; import org.opensearch.common.io.stream.NamedWriteableRegistry; import org.opensearch.tasks.Task; -import org.opensearch.transport.Transport; import org.opensearch.transport.TransportService; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; /** * Transport action for deleting point in time searches - supports deleting list and all point in time searches @@ -86,16 +85,12 @@ private void deletePits(ActionListener listener, DeletePitReq * Delete all active PIT reader contexts */ private void deleteAllPits(ActionListener listener) { - // TODO: Use list all PITs to delete all PITs in case of remote cluster use case - int size = clusterService.state().getNodes().getSize(); - ActionListener groupedActionListener = pitService.getDeletePitGroupedListener(listener, size); - for (final DiscoveryNode node : clusterService.state().getNodes()) { - try { - Transport.Connection connection = searchTransportService.getConnection(null, node); - searchTransportService.sendFreeAllPitContexts(connection, groupedActionListener); - } catch (Exception e) { - groupedActionListener.onFailure(e); - } - } + // Get all PITs and execute delete for all the PITs + pitService.getAllPits(ActionListener.wrap(getAllPitNodesResponse -> { + DeletePitRequest deletePitRequest = new DeletePitRequest( + getAllPitNodesResponse.getPITIDs().stream().map(r -> r.getPitId()).collect(Collectors.toList()) + ); + deletePits(listener, deletePitRequest); + }, listener::onFailure)); } } diff --git a/server/src/main/java/org/opensearch/action/search/TransportGetAllPitsAction.java b/server/src/main/java/org/opensearch/action/search/TransportGetAllPitsAction.java new file mode 100644 index 0000000000000..b5593e772f428 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/search/TransportGetAllPitsAction.java @@ -0,0 +1,86 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.opensearch.action.FailedNodeException; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.nodes.TransportNodesAction; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.search.SearchService; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportService; + +import java.io.IOException; +import java.util.List; + +/** + * Transport action to get all active PIT contexts across all nodes + */ +public class TransportGetAllPitsAction extends TransportNodesAction< + GetAllPitNodesRequest, + GetAllPitNodesResponse, + GetAllPitNodeRequest, + GetAllPitNodeResponse> { + private final SearchService searchService; + + @Inject + public TransportGetAllPitsAction( + ThreadPool threadPool, + ClusterService clusterService, + TransportService transportService, + ActionFilters actionFilters, + SearchService searchService + ) { + super( + GetAllPitsAction.NAME, + threadPool, + clusterService, + transportService, + actionFilters, + GetAllPitNodesRequest::new, + GetAllPitNodeRequest::new, + ThreadPool.Names.SAME, + GetAllPitNodeResponse.class + ); + this.searchService = searchService; + } + + @Override + protected GetAllPitNodesResponse newResponse( + GetAllPitNodesRequest request, + List getAllPitNodeRespons, + List failures + ) { + return new GetAllPitNodesResponse(clusterService.getClusterName(), getAllPitNodeRespons, failures); + } + + @Override + protected GetAllPitNodeRequest newNodeRequest(GetAllPitNodesRequest request) { + return new GetAllPitNodeRequest(request); + } + + @Override + protected GetAllPitNodeResponse newNodeResponse(StreamInput in) throws IOException { + return new GetAllPitNodeResponse(in); + } + + /** + * This node specific operation retrieves all active PIT IDs in a node + */ + @Override + protected GetAllPitNodeResponse nodeOperation(GetAllPitNodeRequest request) { + GetAllPitNodeResponse nodeResponse = new GetAllPitNodeResponse( + transportService.getLocalNode(), + searchService.getAllPITReaderContexts() + ); + return nodeResponse; + } +} diff --git a/server/src/main/java/org/opensearch/rest/action/search/RestGetAllPitsAction.java b/server/src/main/java/org/opensearch/rest/action/search/RestGetAllPitsAction.java new file mode 100644 index 0000000000000..3e3d62610d392 --- /dev/null +++ b/server/src/main/java/org/opensearch/rest/action/search/RestGetAllPitsAction.java @@ -0,0 +1,100 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.rest.action.search; + +import com.carrotsearch.hppc.cursors.ObjectCursor; +import org.opensearch.action.admin.cluster.state.ClusterStateRequest; +import org.opensearch.action.admin.cluster.state.ClusterStateResponse; +import org.opensearch.action.search.GetAllPitNodesRequest; +import org.opensearch.action.search.GetAllPitNodesResponse; +import org.opensearch.action.search.GetAllPitsAction; +import org.opensearch.client.node.NodeClient; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.xcontent.XContentBuilder; +import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.BytesRestResponse; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.RestResponse; +import org.opensearch.rest.RestStatus; +import org.opensearch.rest.action.RestActionListener; +import org.opensearch.rest.action.RestResponseListener; + +import java.io.IOException; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; + +import static java.util.Collections.unmodifiableList; +import static org.opensearch.rest.RestRequest.Method.GET; + +/** + * Rest action for retrieving all active PIT IDs across all nodes + */ +public class RestGetAllPitsAction extends BaseRestHandler { + @Override + public String getName() { + return "get_all_pit_action"; + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { + final ClusterStateRequest clusterStateRequest = new ClusterStateRequest(); + clusterStateRequest.local(false); + clusterStateRequest.clusterManagerNodeTimeout( + request.paramAsTime("cluster_manager_timeout", clusterStateRequest.clusterManagerNodeTimeout()) + ); + clusterStateRequest.clear().nodes(true).routingTable(true).indices("*"); + return channel -> client.admin().cluster().state(clusterStateRequest, new RestActionListener(channel) { + @Override + public void processResponse(final ClusterStateResponse clusterStateResponse) { + final List nodes = new LinkedList<>(); + for (ObjectCursor cursor : clusterStateResponse.getState().nodes().getDataNodes().values()) { + DiscoveryNode node = cursor.value; + nodes.add(node); + } + DiscoveryNode[] disNodesArr = new DiscoveryNode[nodes.size()]; + nodes.toArray(disNodesArr); + GetAllPitNodesRequest getAllPITNodesRequest = new GetAllPitNodesRequest( + request.paramAsBoolean("include_all", false), disNodesArr); + client.execute(GetAllPitsAction.INSTANCE, getAllPITNodesRequest, new RestResponseListener(channel) { + @Override + public RestResponse buildResponse(final GetAllPitNodesResponse getAllPITNodesResponse) throws Exception { + try (XContentBuilder builder = channel.newBuilder()) { + builder.startObject(); + if(getAllPITNodesResponse.hasFailures()) { + builder.startArray("failures"); + for (int idx = 0; idx < getAllPITNodesResponse.failures().size(); idx++) { + builder.field(getAllPITNodesResponse.failures().get(idx).nodeId(), + getAllPITNodesResponse.failures().get(idx).getDetailedMessage()); + } + builder.endArray(); + } + builder.field("pits", getAllPITNodesResponse.getPITIDs()); + if(getAllPITNodesRequest.getIncludeAll()) { + builder.startArray("nodeResults"); + for (int idx = 0; idx < getAllPITNodesResponse.getNodes().size(); idx++) { + builder.value(getAllPITNodesResponse.getNodes().get(idx)); + } + builder.endArray(); + } + builder.endObject(); + return new BytesRestResponse(RestStatus.OK, builder); + } + } + }); + } + }); + } + + @Override + public List routes() { + return unmodifiableList(Collections.singletonList(new Route(GET, "/_search/point_in_time/all"))); + } + +} diff --git a/server/src/main/java/org/opensearch/search/SearchService.java b/server/src/main/java/org/opensearch/search/SearchService.java index 7d822496514c1..07c12d9718c34 100644 --- a/server/src/main/java/org/opensearch/search/SearchService.java +++ b/server/src/main/java/org/opensearch/search/SearchService.java @@ -43,6 +43,7 @@ import org.opensearch.action.OriginalIndices; import org.opensearch.action.search.DeletePitInfo; import org.opensearch.action.search.DeletePitResponse; +import org.opensearch.action.search.ListPitInfo; import org.opensearch.action.search.PitSearchContextIdForNode; import org.opensearch.action.search.SearchRequest; import org.opensearch.action.search.SearchShardTask; @@ -1054,22 +1055,6 @@ public DeletePitResponse freeReaderContextsIfFound(List deleteResults = new ArrayList<>(); - for (ReaderContext readerContext : activeReaders.values()) { - if (readerContext instanceof PitReaderContext) { - boolean result = freeReaderContext(readerContext.id()); - DeletePitInfo deletePitInfo = new DeletePitInfo(result, ((PitReaderContext) readerContext).getPitId()); - deleteResults.add(deletePitInfo); - } - } - return new DeletePitResponse(deleteResults); - } - /** * Update PIT reader with pit id, keep alive and created time etc */ @@ -1453,6 +1438,21 @@ public PitReaderContext getPitReaderContext(ShardSearchContextId id) { return null; } + /** + * This method returns all active PIT reader contexts + */ + public List getAllPITReaderContexts() { + final List pitContextsInfo = new ArrayList<>(); + for (ReaderContext ctx : activeReaders.values()) { + if (ctx instanceof PitReaderContext) { + final PitReaderContext context = (PitReaderContext) ctx; + ListPitInfo pitInfo = new ListPitInfo(context.getPitId(), context.getCreationTime(), context.getKeepAlive()); + pitContextsInfo.add(pitInfo); + } + } + return pitContextsInfo; + } + class Reaper implements Runnable { @Override public void run() { diff --git a/server/src/main/java/org/opensearch/search/internal/PitReaderContext.java b/server/src/main/java/org/opensearch/search/internal/PitReaderContext.java index 43ca7e0ebd823..135d8b1d173b0 100644 --- a/server/src/main/java/org/opensearch/search/internal/PitReaderContext.java +++ b/server/src/main/java/org/opensearch/search/internal/PitReaderContext.java @@ -61,7 +61,7 @@ public Releasable updatePitIdAndKeepAlive(long keepAliveInMillis, String pitId, } public long getCreationTime() { - return this.creationTime.get(); + return this.creationTime.get() == null ? 0 : this.creationTime.get(); } public void setCreationTime(final long creationTime) { diff --git a/server/src/main/java/org/opensearch/search/internal/ReaderContext.java b/server/src/main/java/org/opensearch/search/internal/ReaderContext.java index 04791e05f603c..7db9032e8ff9c 100644 --- a/server/src/main/java/org/opensearch/search/internal/ReaderContext.java +++ b/server/src/main/java/org/opensearch/search/internal/ReaderContext.java @@ -117,6 +117,10 @@ protected AtomicLong getLastAccessTime() { return lastAccessTime; } + public long getKeepAlive() { + return keepAlive.get(); + } + @Override public final void close() { if (closed.compareAndSet(false, true)) { diff --git a/server/src/test/java/org/opensearch/action/search/CreatePitControllerTests.java b/server/src/test/java/org/opensearch/action/search/CreatePitControllerTests.java index ef150b8555774..70e587fc4b4e6 100644 --- a/server/src/test/java/org/opensearch/action/search/CreatePitControllerTests.java +++ b/server/src/test/java/org/opensearch/action/search/CreatePitControllerTests.java @@ -219,7 +219,7 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod CreatePitRequest request = new CreatePitRequest(TimeValue.timeValueDays(1), true); request.setIndices(new String[] { "index" }); - PitService pitService = new PitService(clusterServiceMock, searchTransportService); + PitService pitService = new PitService(clusterServiceMock, searchTransportService, transportService); CreatePitController controller = new CreatePitController( request, searchTransportService, @@ -311,7 +311,7 @@ public void sendFreePITContexts( CreatePitRequest request = new CreatePitRequest(TimeValue.timeValueDays(1), true); request.setIndices(new String[] { "index" }); - PitService pitService = new PitService(clusterServiceMock, searchTransportService); + PitService pitService = new PitService(clusterServiceMock, searchTransportService, transportService); CreatePitController controller = new CreatePitController( request, searchTransportService, @@ -412,7 +412,7 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod CreatePitRequest request = new CreatePitRequest(TimeValue.timeValueDays(1), true); request.setIndices(new String[] { "index" }); - PitService pitService = new PitService(clusterServiceMock, searchTransportService); + PitService pitService = new PitService(clusterServiceMock, searchTransportService, transportService); CreatePitController controller = new CreatePitController( request, searchTransportService, @@ -503,7 +503,7 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod }; CreatePitRequest request = new CreatePitRequest(TimeValue.timeValueDays(1), true); request.setIndices(new String[] { "index" }); - PitService pitService = new PitService(clusterServiceMock, searchTransportService); + PitService pitService = new PitService(clusterServiceMock, searchTransportService, transportService); CreatePitController controller = new CreatePitController( request, searchTransportService, diff --git a/server/src/test/java/org/opensearch/action/search/PitTestsUtil.java b/server/src/test/java/org/opensearch/action/search/PitTestsUtil.java index ec83cb45697d9..2571131915300 100644 --- a/server/src/test/java/org/opensearch/action/search/PitTestsUtil.java +++ b/server/src/test/java/org/opensearch/action/search/PitTestsUtil.java @@ -8,7 +8,14 @@ package org.opensearch.action.search; +import com.carrotsearch.hppc.cursors.ObjectCursor; +import org.junit.Assert; import org.opensearch.Version; +import org.opensearch.action.ActionFuture; +import org.opensearch.action.admin.cluster.state.ClusterStateRequest; +import org.opensearch.action.admin.cluster.state.ClusterStateResponse; +import org.opensearch.client.Client; +import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.util.concurrent.AtomicArray; import org.opensearch.index.query.IdsQueryBuilder; import org.opensearch.index.query.MatchAllQueryBuilder; @@ -21,8 +28,12 @@ import org.opensearch.search.internal.ShardSearchContextId; import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; +import static org.opensearch.test.OpenSearchIntegTestCase.client; import static org.opensearch.test.OpenSearchTestCase.between; import static org.opensearch.test.OpenSearchTestCase.randomAlphaOfLength; import static org.opensearch.test.OpenSearchTestCase.randomBoolean; @@ -81,4 +92,41 @@ public static String getPitId() { } return SearchContextId.encode(array.asList(), aliasFilters, version); } + + public static void assertUsingGetAllPits(Client client, String id, long creationTime) throws ExecutionException, InterruptedException { + final ClusterStateRequest clusterStateRequest = new ClusterStateRequest(); + clusterStateRequest.local(false); + clusterStateRequest.clear().nodes(true).routingTable(true).indices("*"); + ClusterStateResponse clusterStateResponse = client.admin().cluster().state(clusterStateRequest).get(); + final List nodes = new LinkedList<>(); + for (ObjectCursor cursor : clusterStateResponse.getState().nodes().getDataNodes().values()) { + DiscoveryNode node = cursor.value; + nodes.add(node); + } + DiscoveryNode[] disNodesArr = new DiscoveryNode[nodes.size()]; + nodes.toArray(disNodesArr); + GetAllPitNodesRequest getAllPITNodesRequest = new GetAllPitNodesRequest(disNodesArr); + ActionFuture execute1 = client.execute(GetAllPitsAction.INSTANCE, getAllPITNodesRequest); + GetAllPitNodesResponse getPitResponse = execute1.get(); + Assert.assertTrue(getPitResponse.getPITIDs().get(0).getPitId().contains(id)); + Assert.assertEquals(getPitResponse.getPITIDs().get(0).getCreationTime(), creationTime); + } + + public static void assertGetAllPitsEmpty(Client client) throws ExecutionException, InterruptedException { + final ClusterStateRequest clusterStateRequest = new ClusterStateRequest(); + clusterStateRequest.local(false); + clusterStateRequest.clear().nodes(true).routingTable(true).indices("*"); + ClusterStateResponse clusterStateResponse = client.admin().cluster().state(clusterStateRequest).get(); + final List nodes = new LinkedList<>(); + for (ObjectCursor cursor : clusterStateResponse.getState().nodes().getDataNodes().values()) { + DiscoveryNode node = cursor.value; + nodes.add(node); + } + DiscoveryNode[] disNodesArr = new DiscoveryNode[nodes.size()]; + nodes.toArray(disNodesArr); + GetAllPitNodesRequest getAllPITNodesRequest = new GetAllPitNodesRequest(disNodesArr); + ActionFuture execute1 = client.execute(GetAllPitsAction.INSTANCE, getAllPITNodesRequest); + GetAllPitNodesResponse getPitResponse = execute1.get(); + Assert.assertEquals(0, getPitResponse.getPITIDs().size()); + } } diff --git a/server/src/test/java/org/opensearch/action/search/TransportDeletePitActionTests.java b/server/src/test/java/org/opensearch/action/search/TransportDeletePitActionTests.java index acddbf639b574..21927cc12f42b 100644 --- a/server/src/test/java/org/opensearch/action/search/TransportDeletePitActionTests.java +++ b/server/src/test/java/org/opensearch/action/search/TransportDeletePitActionTests.java @@ -7,28 +7,25 @@ */ package org.opensearch.action.search; -import org.apache.lucene.search.TotalHits; import org.junit.Before; import org.opensearch.Version; import org.opensearch.action.ActionListener; import org.opensearch.action.support.ActionFilter; import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.PlainActionFuture; +import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.collect.ImmutableOpenMap; import org.opensearch.common.io.stream.NamedWriteableRegistry; import org.opensearch.common.settings.Settings; import org.opensearch.index.query.IdsQueryBuilder; import org.opensearch.index.query.MatchAllQueryBuilder; import org.opensearch.index.query.QueryBuilder; import org.opensearch.index.query.TermQueryBuilder; -import org.opensearch.search.SearchHit; -import org.opensearch.search.SearchHits; -import org.opensearch.search.aggregations.InternalAggregations; -import org.opensearch.search.internal.InternalSearchResponse; import org.opensearch.tasks.Task; import org.opensearch.tasks.TaskId; import org.opensearch.test.OpenSearchTestCase; @@ -91,6 +88,10 @@ public void setupData() { node1 = new DiscoveryNode("node_1", buildNewFakeTransportAddress(), Version.CURRENT); node2 = new DiscoveryNode("node_2", buildNewFakeTransportAddress(), Version.CURRENT); node3 = new DiscoveryNode("node_3", buildNewFakeTransportAddress(), Version.CURRENT); + ImmutableOpenMap.Builder discoveryNodeMap = ImmutableOpenMap.builder(); + discoveryNodeMap.put(node1.getId(), node1); + discoveryNodeMap.put(node2.getId(), node2); + discoveryNodeMap.put(node3.getId(), node3); pitId = getPitId(); namedWriteableRegistry = new NamedWriteableRegistry( Arrays.asList( @@ -109,15 +110,6 @@ public void setupData() { new TaskId(randomLong() + ":" + randomLong()), Collections.emptyMap() ); - InternalSearchResponse response = new InternalSearchResponse( - new SearchHits(new SearchHit[0], new TotalHits(0, TotalHits.Relation.EQUAL_TO), Float.NaN), - InternalAggregations.EMPTY, - null, - null, - false, - null, - 1 - ); clusterServiceMock = mock(ClusterService.class); ClusterState state = mock(ClusterState.class); @@ -178,7 +170,7 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod return new SearchAsyncActionTests.MockConnection(node); } }; - PitService pitService = new PitService(clusterServiceMock, searchTransportService); + PitService pitService = new PitService(clusterServiceMock, searchTransportService, transportService); TransportDeletePitAction action = new TransportDeletePitAction( transportService, actionFilters, @@ -224,7 +216,11 @@ public void testDeleteAllPITSuccess() throws InterruptedException, ExecutionExce transportService.start(); transportService.acceptIncomingRequests(); SearchTransportService searchTransportService = new SearchTransportService(transportService, null) { - public void sendFreeAllPitContexts(Transport.Connection connection, final ActionListener listener) { + public void sendFreePITContexts( + Transport.Connection connection, + List contextIds, + final ActionListener listener + ) { deleteNodesInvoked.add(connection.getNode()); DeletePitInfo deletePitInfo = new DeletePitInfo(true, "pitId"); List deletePitInfos = new ArrayList<>(); @@ -238,7 +234,21 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod return new SearchAsyncActionTests.MockConnection(node); } }; - PitService pitService = new PitService(clusterServiceMock, searchTransportService); + PitService pitService = new PitService(clusterServiceMock, searchTransportService, transportService) { + @Override + public void getAllPits(ActionListener getAllPitsListener) { + ListPitInfo listPitInfo = new ListPitInfo(getPitId(), 0, 0); + List list = new ArrayList<>(); + list.add(listPitInfo); + GetAllPitNodeResponse getAllPitNodeResponse = new GetAllPitNodeResponse( + cluster1Transport.getLocalDiscoNode(), + list + ); + List nodeList = new ArrayList(); + nodeList.add(getAllPitNodeResponse); + getAllPitsListener.onResponse(new GetAllPitNodesResponse(new ClusterName("cn"), nodeList, new ArrayList())); + } + }; TransportDeletePitAction action = new TransportDeletePitAction( transportService, actionFilters, @@ -307,7 +317,7 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod return new SearchAsyncActionTests.MockConnection(node); } }; - PitService pitService = new PitService(clusterServiceMock, searchTransportService); + PitService pitService = new PitService(clusterServiceMock, searchTransportService, transportService); TransportDeletePitAction action = new TransportDeletePitAction( transportService, actionFilters, @@ -366,7 +376,7 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod return new SearchAsyncActionTests.MockConnection(node); } }; - PitService pitService = new PitService(clusterServiceMock, searchTransportService); + PitService pitService = new PitService(clusterServiceMock, searchTransportService, transportService); TransportDeletePitAction action = new TransportDeletePitAction( transportService, actionFilters, @@ -434,7 +444,7 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod return new SearchAsyncActionTests.MockConnection(node); } }; - PitService pitService = new PitService(clusterServiceMock, searchTransportService); + PitService pitService = new PitService(clusterServiceMock, searchTransportService, transportService); TransportDeletePitAction action = new TransportDeletePitAction( transportService, actionFilters, @@ -480,7 +490,11 @@ public void testDeleteAllPitWhenNodeIsDown() { transportService.acceptIncomingRequests(); SearchTransportService searchTransportService = new SearchTransportService(transportService, null) { @Override - public void sendFreeAllPitContexts(Transport.Connection connection, final ActionListener listener) { + public void sendFreePITContexts( + Transport.Connection connection, + List contextId, + final ActionListener listener + ) { deleteNodesInvoked.add(connection.getNode()); if (connection.getNode().getId() == "node_3") { Thread t = new Thread(() -> listener.onFailure(new Exception("node 3 down"))); @@ -496,7 +510,21 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod return new SearchAsyncActionTests.MockConnection(node); } }; - PitService pitService = new PitService(clusterServiceMock, searchTransportService); + PitService pitService = new PitService(clusterServiceMock, searchTransportService, transportService) { + @Override + public void getAllPits(ActionListener getAllPitsListener) { + ListPitInfo listPitInfo = new ListPitInfo(getPitId(), 0, 0); + List list = new ArrayList<>(); + list.add(listPitInfo); + GetAllPitNodeResponse getAllPitNodeResponse = new GetAllPitNodeResponse( + cluster1Transport.getLocalDiscoNode(), + list + ); + List nodeList = new ArrayList(); + nodeList.add(getAllPitNodeResponse); + getAllPitsListener.onResponse(new GetAllPitNodesResponse(new ClusterName("cn"), nodeList, new ArrayList())); + } + }; TransportDeletePitAction action = new TransportDeletePitAction( transportService, actionFilters, @@ -543,7 +571,11 @@ public void testDeleteAllPitWhenAllNodesAreDown() { SearchTransportService searchTransportService = new SearchTransportService(transportService, null) { @Override - public void sendFreeAllPitContexts(Transport.Connection connection, final ActionListener listener) { + public void sendFreePITContexts( + Transport.Connection connection, + List contextId, + final ActionListener listener + ) { deleteNodesInvoked.add(connection.getNode()); Thread t = new Thread(() -> listener.onFailure(new Exception("node down"))); t.start(); @@ -554,7 +586,21 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod return new SearchAsyncActionTests.MockConnection(node); } }; - PitService pitService = new PitService(clusterServiceMock, searchTransportService); + PitService pitService = new PitService(clusterServiceMock, searchTransportService, transportService) { + @Override + public void getAllPits(ActionListener getAllPitsListener) { + ListPitInfo listPitInfo = new ListPitInfo(getPitId(), 0, 0); + List list = new ArrayList<>(); + list.add(listPitInfo); + GetAllPitNodeResponse getAllPitNodeResponse = new GetAllPitNodeResponse( + cluster1Transport.getLocalDiscoNode(), + list + ); + List nodeList = new ArrayList(); + nodeList.add(getAllPitNodeResponse); + getAllPitsListener.onResponse(new GetAllPitNodesResponse(new ClusterName("cn"), nodeList, new ArrayList())); + } + }; TransportDeletePitAction action = new TransportDeletePitAction( transportService, actionFilters, @@ -600,7 +646,11 @@ public void testDeleteAllPitFailure() { transportService.acceptIncomingRequests(); SearchTransportService searchTransportService = new SearchTransportService(transportService, null) { - public void sendFreeAllPitContexts(Transport.Connection connection, final ActionListener listener) { + public void sendFreePITContexts( + Transport.Connection connection, + List contextId, + final ActionListener listener + ) { deleteNodesInvoked.add(connection.getNode()); if (connection.getNode().getId() == "node_3") { Thread t = new Thread(() -> listener.onFailure(new Exception("node 3 is down"))); @@ -616,7 +666,21 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod return new SearchAsyncActionTests.MockConnection(node); } }; - PitService pitService = new PitService(clusterServiceMock, searchTransportService); + PitService pitService = new PitService(clusterServiceMock, searchTransportService, transportService) { + @Override + public void getAllPits(ActionListener getAllPitsListener) { + ListPitInfo listPitInfo = new ListPitInfo(getPitId(), 0, 0); + List list = new ArrayList<>(); + list.add(listPitInfo); + GetAllPitNodeResponse getAllPitNodeResponse = new GetAllPitNodeResponse( + cluster1Transport.getLocalDiscoNode(), + list + ); + List nodeList = new ArrayList(); + nodeList.add(getAllPitNodeResponse); + getAllPitsListener.onResponse(new GetAllPitNodesResponse(new ClusterName("cn"), nodeList, new ArrayList())); + } + }; TransportDeletePitAction action = new TransportDeletePitAction( transportService, actionFilters, diff --git a/server/src/test/java/org/opensearch/search/CreatePitMultiNodeTests.java b/server/src/test/java/org/opensearch/search/CreatePitMultiNodeTests.java index a72204ef426fb..fbeb26dcb9383 100644 --- a/server/src/test/java/org/opensearch/search/CreatePitMultiNodeTests.java +++ b/server/src/test/java/org/opensearch/search/CreatePitMultiNodeTests.java @@ -8,20 +8,32 @@ package org.opensearch.search; +import com.carrotsearch.hppc.cursors.ObjectCursor; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.opensearch.action.ActionFuture; +import org.opensearch.action.admin.cluster.state.ClusterStateRequest; +import org.opensearch.action.admin.cluster.state.ClusterStateResponse; import org.opensearch.action.search.CreatePitAction; import org.opensearch.action.search.CreatePitRequest; import org.opensearch.action.search.CreatePitResponse; +import org.opensearch.action.search.GetAllPitNodesRequest; +import org.opensearch.action.search.GetAllPitNodesResponse; +import org.opensearch.action.search.GetAllPitsAction; +import org.opensearch.action.search.PitTestsUtil; import org.opensearch.action.search.SearchResponse; +import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.search.builder.PointInTimeBuilder; import org.opensearch.test.InternalTestCluster; import org.opensearch.test.OpenSearchIntegTestCase; +import java.util.LinkedList; +import java.util.List; import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; import static org.hamcrest.Matchers.containsString; import static org.opensearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE; @@ -50,6 +62,7 @@ public void testPit() throws Exception { request.setIndices(new String[] { "index" }); ActionFuture execute = client().execute(CreatePitAction.INSTANCE, request); CreatePitResponse pitResponse = execute.get(); + PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime()); SearchResponse searchResponse = client().prepareSearch("index") .setSize(2) .setPointInTime(new PointInTimeBuilder(pitResponse.getId()).setKeepAlive(TimeValue.timeValueDays(1))) @@ -81,6 +94,7 @@ public void testCreatePitWhileNodeDropWithAllowPartialCreationTrue() throws Exce public Settings onNodeStopped(String nodeName) throws Exception { ActionFuture execute = client().execute(CreatePitAction.INSTANCE, request); CreatePitResponse pitResponse = execute.get(); + PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime()); assertEquals(1, pitResponse.getSuccessfulShards()); assertEquals(2, pitResponse.getTotalShards()); SearchResponse searchResponse = client().prepareSearch("index") @@ -110,6 +124,7 @@ public Settings onNodeStopped(String nodeName) throws Exception { assertEquals(1, searchResponse.getFailedShards()); assertEquals(0, searchResponse.getSkippedShards()); assertEquals(2, searchResponse.getTotalShards()); + PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime()); return super.onNodeStopped(nodeName); } }); @@ -205,7 +220,73 @@ public void testPitInvalidDefaultKeepAlive() { .setPersistentSettings(Settings.builder().putNull("*")) .setTransientSettings(Settings.builder().putNull("*")) ); + } + + public void testGetAllPits() throws Exception { + client().admin().indices().prepareCreate("index1").get(); + CreatePitRequest request = new CreatePitRequest(TimeValue.timeValueDays(1), true); + request.setIndices(new String[] { "index", "index1" }); + ActionFuture execute = client().execute(CreatePitAction.INSTANCE, request); + CreatePitResponse pitResponse = execute.get(); + CreatePitResponse pitResponse1 = client().execute(CreatePitAction.INSTANCE, request).get(); + CreatePitResponse pitResponse2 = client().execute(CreatePitAction.INSTANCE, request).get(); + final ClusterStateRequest clusterStateRequest = new ClusterStateRequest(); + clusterStateRequest.local(false); + clusterStateRequest.clear().nodes(true).routingTable(true).indices("*"); + ClusterStateResponse clusterStateResponse = client().admin().cluster().state(clusterStateRequest).get(); + final List nodes = new LinkedList<>(); + for (ObjectCursor cursor : clusterStateResponse.getState().nodes().getDataNodes().values()) { + DiscoveryNode node = cursor.value; + nodes.add(node); + } + DiscoveryNode[] disNodesArr = new DiscoveryNode[nodes.size()]; + nodes.toArray(disNodesArr); + GetAllPitNodesRequest getAllPITNodesRequest = new GetAllPitNodesRequest(disNodesArr); + ActionFuture execute1 = client().execute(GetAllPitsAction.INSTANCE, getAllPITNodesRequest); + GetAllPitNodesResponse getPitResponse = execute1.get(); + assertEquals(3, getPitResponse.getPITIDs().size()); + List resultPitIds = getPitResponse.getPITIDs().stream().map(p -> p.getPitId()).collect(Collectors.toList()); + // asserting that we get all unique PIT IDs + Assert.assertTrue(resultPitIds.contains(pitResponse.getId())); + Assert.assertTrue(resultPitIds.contains(pitResponse1.getId())); + Assert.assertTrue(resultPitIds.contains(pitResponse2.getId())); + client().admin().indices().prepareDelete("index1").get(); + } + + public void testGetAllPitsDuringNodeDrop() throws Exception { + CreatePitRequest request = new CreatePitRequest(TimeValue.timeValueDays(1), true); + request.setIndices(new String[] { "index" }); + ActionFuture execute = client().execute(CreatePitAction.INSTANCE, request); + CreatePitResponse pitResponse = execute.get(); + GetAllPitNodesRequest getAllPITNodesRequest = new GetAllPitNodesRequest(getDiscoveryNodes()); + internalCluster().restartRandomDataNode(new InternalTestCluster.RestartCallback() { + @Override + public Settings onNodeStopped(String nodeName) throws Exception { + ActionFuture execute1 = client().execute(GetAllPitsAction.INSTANCE, getAllPITNodesRequest); + GetAllPitNodesResponse getPitResponse = execute1.get(); + // we still get a pit id from the data node which is up + assertEquals(1, getPitResponse.getPITIDs().size()); + // failure for node drop + assertEquals(1, getPitResponse.failures().size()); + assertTrue(getPitResponse.failures().get(0).getMessage().contains("Failed node")); + return super.onNodeStopped(nodeName); + } + }); + } + private DiscoveryNode[] getDiscoveryNodes() throws ExecutionException, InterruptedException { + final ClusterStateRequest clusterStateRequest = new ClusterStateRequest(); + clusterStateRequest.local(false); + clusterStateRequest.clear().nodes(true).routingTable(true).indices("*"); + ClusterStateResponse clusterStateResponse = client().admin().cluster().state(clusterStateRequest).get(); + final List nodes = new LinkedList<>(); + for (ObjectCursor cursor : clusterStateResponse.getState().nodes().getDataNodes().values()) { + DiscoveryNode node = cursor.value; + nodes.add(node); + } + DiscoveryNode[] disNodesArr = new DiscoveryNode[nodes.size()]; + nodes.toArray(disNodesArr); + return disNodesArr; } } diff --git a/server/src/test/java/org/opensearch/search/CreatePitSingleNodeTests.java b/server/src/test/java/org/opensearch/search/CreatePitSingleNodeTests.java index 5c3c43af9cb66..4ec6f468174c8 100644 --- a/server/src/test/java/org/opensearch/search/CreatePitSingleNodeTests.java +++ b/server/src/test/java/org/opensearch/search/CreatePitSingleNodeTests.java @@ -14,6 +14,7 @@ import org.opensearch.action.search.CreatePitController; import org.opensearch.action.search.CreatePitRequest; import org.opensearch.action.search.CreatePitResponse; +import org.opensearch.action.search.PitTestsUtil; import org.opensearch.action.search.SearchPhaseExecutionException; import org.opensearch.action.search.SearchResponse; import org.opensearch.common.Priority; @@ -63,6 +64,9 @@ public void testCreatePITSuccess() throws ExecutionException, InterruptedExcepti request.setIndices(new String[] { "index" }); ActionFuture execute = client().execute(CreatePitAction.INSTANCE, request); CreatePitResponse pitResponse = execute.get(); + + PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime()); + client().prepareIndex("index").setId("2").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); SearchResponse searchResponse = client().prepareSearch("index") .setSize(2) @@ -86,6 +90,7 @@ public void testCreatePITWithMultipleIndicesSuccess() throws ExecutionException, ActionFuture execute = client().execute(CreatePitAction.INSTANCE, request); CreatePitResponse response = execute.get(); + PitTestsUtil.assertUsingGetAllPits(client(), response.getId(), response.getCreationTime()); assertEquals(4, response.getSuccessfulShards()); assertEquals(4, service.getActiveContexts()); service.doClose(); @@ -99,6 +104,7 @@ public void testCreatePITWithShardReplicasSuccess() throws ExecutionException, I request.setIndices(new String[] { "index" }); ActionFuture execute = client().execute(CreatePitAction.INSTANCE, request); CreatePitResponse pitResponse = execute.get(); + PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime()); client().prepareIndex("index").setId("2").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); SearchResponse searchResponse = client().prepareSearch("index") @@ -128,7 +134,7 @@ public void testCreatePITWithNonExistentIndex() { service.doClose(); } - public void testCreatePITOnCloseIndex() { + public void testCreatePITOnCloseIndex() throws ExecutionException, InterruptedException { createIndex("index", Settings.builder().put("index.number_of_shards", 2).put("index.number_of_replicas", 0).build()); client().prepareIndex("index").setId("1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); client().prepareIndex("index").setId("2").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); @@ -144,6 +150,7 @@ public void testCreatePITOnCloseIndex() { SearchService service = getInstanceFromNode(SearchService.class); assertEquals(0, service.getActiveContexts()); + PitTestsUtil.assertGetAllPitsEmpty(client()); service.doClose(); } @@ -165,6 +172,7 @@ public void testPitSearchOnDeletedIndex() throws ExecutionException, Interrupted }); assertTrue(ex.getMessage().contains("no such index [index]")); SearchService service = getInstanceFromNode(SearchService.class); + PitTestsUtil.assertGetAllPitsEmpty(client()); assertEquals(0, service.getActiveContexts()); service.doClose(); } @@ -190,6 +198,8 @@ public void testPitSearchOnCloseIndex() throws ExecutionException, InterruptedEx request.setIndices(new String[] { "index" }); ActionFuture execute = client().execute(CreatePitAction.INSTANCE, request); CreatePitResponse pitResponse = execute.get(); + PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime()); + SearchService service = getInstanceFromNode(SearchService.class); assertEquals(2, service.getActiveContexts()); client().admin().indices().prepareClose("index").get(); @@ -201,6 +211,7 @@ public void testPitSearchOnCloseIndex() throws ExecutionException, InterruptedEx }); assertTrue(ex.shardFailures()[0].reason().contains("SearchContextMissingException")); assertEquals(0, service.getActiveContexts()); + PitTestsUtil.assertGetAllPitsEmpty(client()); // PIT reader contexts are lost after close, verifying it with open index api client().admin().indices().prepareOpen("index").get(); @@ -314,6 +325,8 @@ public void testPitAfterUpdateIndex() throws Exception { request.setIndices(new String[] { "test" }); ActionFuture execute = client().execute(CreatePitAction.INSTANCE, request); CreatePitResponse pitResponse = execute.get(); + PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime()); + SearchService service = getInstanceFromNode(SearchService.class); assertThat( @@ -456,6 +469,7 @@ public void testPitAfterUpdateIndex() throws Exception { } finally { service.doClose(); assertEquals(0, service.getActiveContexts()); + PitTestsUtil.assertGetAllPitsEmpty(client()); } } @@ -467,6 +481,8 @@ public void testConcurrentSearches() throws Exception { request.setIndices(new String[] { "index" }); ActionFuture execute = client().execute(CreatePitAction.INSTANCE, request); CreatePitResponse pitResponse = execute.get(); + PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime()); + Thread[] threads = new Thread[5]; CountDownLatch latch = new CountDownLatch(threads.length); @@ -497,5 +513,6 @@ public void testConcurrentSearches() throws Exception { assertEquals(2, service.getActiveContexts()); service.doClose(); assertEquals(0, service.getActiveContexts()); + PitTestsUtil.assertGetAllPitsEmpty(client()); } } diff --git a/server/src/test/java/org/opensearch/search/SearchServiceTests.java b/server/src/test/java/org/opensearch/search/SearchServiceTests.java index cbfcb43c22a21..64d2da4160410 100644 --- a/server/src/test/java/org/opensearch/search/SearchServiceTests.java +++ b/server/src/test/java/org/opensearch/search/SearchServiceTests.java @@ -1414,6 +1414,7 @@ public void testOpenReaderContext() { searchService.createPitReaderContext(new ShardId(resolveIndex("index"), 0), TimeValue.timeValueMinutes(between(1, 10)), future); future.actionGet(); assertThat(searchService.getActiveContexts(), equalTo(1)); + assertThat(searchService.getAllPITReaderContexts().size(), equalTo(1)); assertTrue(searchService.freeReaderContext(future.actionGet())); } @@ -1431,6 +1432,7 @@ public void testDeletePitReaderContext() { contextIds.add(pitSearchContextIdForNode); assertThat(searchService.getActiveContexts(), equalTo(1)); + assertThat(searchService.getAllPITReaderContexts().size(), equalTo(1)); DeletePitResponse deletePitResponse = searchService.freeReaderContextsIfFound(contextIds); assertTrue(deletePitResponse.getDeletePitResults().get(0).isSuccessful()); // assert true for reader context not found @@ -1440,19 +1442,6 @@ public void testDeletePitReaderContext() { assertFalse(searchService.freeReaderContext(future.actionGet())); } - public void testDeleteAllPitReaderContexts() { - createIndex("index"); - SearchService searchService = getInstanceFromNode(SearchService.class); - PlainActionFuture future = new PlainActionFuture<>(); - searchService.createPitReaderContext(new ShardId(resolveIndex("index"), 0), TimeValue.timeValueMinutes(between(1, 10)), future); - future.actionGet(); - searchService.createPitReaderContext(new ShardId(resolveIndex("index"), 0), TimeValue.timeValueMinutes(between(1, 10)), future); - future.actionGet(); - assertThat(searchService.getActiveContexts(), equalTo(2)); - searchService.freeAllPitContexts(); - assertThat(searchService.getActiveContexts(), equalTo(0)); - } - public void testPitContextMaxKeepAlive() { createIndex("index"); SearchService searchService = getInstanceFromNode(SearchService.class); @@ -1473,6 +1462,7 @@ public void testPitContextMaxKeepAlive() { ex.getMessage() ); assertThat(searchService.getActiveContexts(), equalTo(0)); + assertThat(searchService.getAllPITReaderContexts().size(), equalTo(0)); } public void testUpdatePitId() { @@ -1495,6 +1485,7 @@ public void testUpdatePitId() { assertTrue(updateResponse.getKeepAlive() == updateRequest.getKeepAlive()); assertTrue(updateResponse.getPitId().equalsIgnoreCase("pitId")); assertThat(searchService.getActiveContexts(), equalTo(1)); + assertThat(searchService.getAllPITReaderContexts().size(), equalTo(1)); assertTrue(searchService.freeReaderContext(future.actionGet())); } @@ -1527,6 +1518,7 @@ public void testUpdatePitIdMaxKeepAlive() { ex.getMessage() ); assertThat(searchService.getActiveContexts(), equalTo(1)); + assertThat(searchService.getAllPITReaderContexts().size(), equalTo(1)); assertTrue(searchService.freeReaderContext(future.actionGet())); } @@ -1547,6 +1539,7 @@ public void testUpdatePitIdWithInvalidReaderId() { assertEquals("No search context found for id [" + id.getId() + "]", ex.getMessage()); assertThat(searchService.getActiveContexts(), equalTo(0)); + assertThat(searchService.getAllPITReaderContexts().size(), equalTo(0)); } private ReaderContext createReaderContext(IndexService indexService, IndexShard indexShard) {