From 37de45b51e2c4857b10d0903161bb4bc3de642a4 Mon Sep 17 00:00:00 2001 From: Bharathwaj G Date: Mon, 18 Jul 2022 12:18:44 +0530 Subject: [PATCH 1/4] pit segments api Signed-off-by: Bharathwaj G --- .../org/opensearch/action/ActionModule.java | 25 +- .../indices/segments/PitSegmentsAction.java | 21 ++ .../indices/segments/PitSegmentsRequest.java | 76 ++++++ .../segments/TransportPitSegmentsAction.java | 233 ++++++++++++++++++ .../java/org/opensearch/client/Client.java | 7 + .../client/support/AbstractClient.java | 10 +- .../cluster/routing/ShardRouting.java | 4 +- .../action/cat/RestPitSegmentsAction.java | 145 +++++++++++ .../search/internal/PitReaderContext.java | 24 ++ .../search/PitSegmentSingleNodeTests.java | 58 +++++ .../search/PitSegmentsMultiNodeTests.java | 60 +++++ 11 files changed, 638 insertions(+), 25 deletions(-) create mode 100644 server/src/main/java/org/opensearch/action/admin/indices/segments/PitSegmentsAction.java create mode 100644 server/src/main/java/org/opensearch/action/admin/indices/segments/PitSegmentsRequest.java create mode 100644 server/src/main/java/org/opensearch/action/admin/indices/segments/TransportPitSegmentsAction.java create mode 100644 server/src/main/java/org/opensearch/rest/action/cat/RestPitSegmentsAction.java create mode 100644 server/src/test/java/org/opensearch/search/PitSegmentSingleNodeTests.java create mode 100644 server/src/test/java/org/opensearch/search/PitSegmentsMultiNodeTests.java diff --git a/server/src/main/java/org/opensearch/action/ActionModule.java b/server/src/main/java/org/opensearch/action/ActionModule.java index 0de277d89ef1b..b5d5d46b16284 100644 --- a/server/src/main/java/org/opensearch/action/ActionModule.java +++ b/server/src/main/java/org/opensearch/action/ActionModule.java @@ -163,7 +163,9 @@ import org.opensearch.action.admin.indices.rollover.RolloverAction; import org.opensearch.action.admin.indices.rollover.TransportRolloverAction; import org.opensearch.action.admin.indices.segments.IndicesSegmentsAction; +import org.opensearch.action.admin.indices.segments.PitSegmentsAction; import org.opensearch.action.admin.indices.segments.TransportIndicesSegmentsAction; +import org.opensearch.action.admin.indices.segments.TransportPitSegmentsAction; import org.opensearch.action.admin.indices.settings.get.GetSettingsAction; import org.opensearch.action.admin.indices.settings.get.TransportGetSettingsAction; import org.opensearch.action.admin.indices.settings.put.TransportUpdateSettingsAction; @@ -366,25 +368,7 @@ import org.opensearch.rest.action.admin.indices.RestUpgradeAction; import org.opensearch.rest.action.admin.indices.RestUpgradeStatusAction; import org.opensearch.rest.action.admin.indices.RestValidateQueryAction; -import org.opensearch.rest.action.cat.AbstractCatAction; -import org.opensearch.rest.action.cat.RestAliasAction; -import org.opensearch.rest.action.cat.RestAllocationAction; -import org.opensearch.rest.action.cat.RestCatAction; -import org.opensearch.rest.action.cat.RestCatRecoveryAction; -import org.opensearch.rest.action.cat.RestFielddataAction; -import org.opensearch.rest.action.cat.RestHealthAction; -import org.opensearch.rest.action.cat.RestIndicesAction; -import org.opensearch.rest.action.cat.RestMasterAction; -import org.opensearch.rest.action.cat.RestNodeAttrsAction; -import org.opensearch.rest.action.cat.RestNodesAction; -import org.opensearch.rest.action.cat.RestPluginsAction; -import org.opensearch.rest.action.cat.RestRepositoriesAction; -import org.opensearch.rest.action.cat.RestSegmentsAction; -import org.opensearch.rest.action.cat.RestShardsAction; -import org.opensearch.rest.action.cat.RestSnapshotAction; -import org.opensearch.rest.action.cat.RestTasksAction; -import org.opensearch.rest.action.cat.RestTemplatesAction; -import org.opensearch.rest.action.cat.RestThreadPoolAction; +import org.opensearch.rest.action.cat.*; import org.opensearch.rest.action.document.RestBulkAction; import org.opensearch.rest.action.document.RestDeleteAction; import org.opensearch.rest.action.document.RestGetAction; @@ -622,6 +606,8 @@ public void reg actions.register(NodesReloadSecureSettingsAction.INSTANCE, TransportNodesReloadSecureSettingsAction.class); actions.register(AutoCreateAction.INSTANCE, AutoCreateAction.TransportAction.class); + actions.register(PitSegmentsAction.INSTANCE, TransportPitSegmentsAction.class); + // Indexed scripts actions.register(PutStoredScriptAction.INSTANCE, TransportPutStoredScriptAction.class); actions.register(GetStoredScriptAction.INSTANCE, TransportGetStoredScriptAction.class); @@ -835,6 +821,7 @@ public void initRestHandlers(Supplier nodesInCluster) { registerHandler.accept(new RestAliasAction()); registerHandler.accept(new RestThreadPoolAction()); registerHandler.accept(new RestPluginsAction()); + registerHandler.accept(new RestPitSegmentsAction()); registerHandler.accept(new RestFielddataAction()); registerHandler.accept(new RestNodeAttrsAction()); registerHandler.accept(new RestRepositoriesAction()); diff --git a/server/src/main/java/org/opensearch/action/admin/indices/segments/PitSegmentsAction.java b/server/src/main/java/org/opensearch/action/admin/indices/segments/PitSegmentsAction.java new file mode 100644 index 0000000000000..79e15ce5c4016 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/indices/segments/PitSegmentsAction.java @@ -0,0 +1,21 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.indices.segments; + +import org.opensearch.action.ActionType; + +public class PitSegmentsAction extends ActionType { + + public static final PitSegmentsAction INSTANCE = new PitSegmentsAction(); + public static final String NAME = "indices:monitor/point_in_time/segments"; + + private PitSegmentsAction() { + super(NAME, IndicesSegmentResponse::new); + } +} \ No newline at end of file diff --git a/server/src/main/java/org/opensearch/action/admin/indices/segments/PitSegmentsRequest.java b/server/src/main/java/org/opensearch/action/admin/indices/segments/PitSegmentsRequest.java new file mode 100644 index 0000000000000..e970e1a173e76 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/indices/segments/PitSegmentsRequest.java @@ -0,0 +1,76 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.indices.segments; + +import org.opensearch.action.support.broadcast.BroadcastRequest; +import org.opensearch.common.Strings; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; + +public class PitSegmentsRequest extends BroadcastRequest { + + protected boolean verbose = false; + private Collection pitIds; + + public PitSegmentsRequest() { + this(Strings.EMPTY_ARRAY); + } + + public PitSegmentsRequest(StreamInput in) throws IOException { + super(in); + pitIds = Arrays.asList(in.readStringArray()); + verbose = in.readBoolean(); + } + + public PitSegmentsRequest(String... indices) { + super(indices); + pitIds = Collections.emptyList(); + } + + /** + * true if detailed information about each segment should be returned, + * false otherwise. + */ + public boolean verbose() { + return verbose; + } + + /** + * Sets the verbose option. + * @see #verbose() + */ + public void verbose(boolean v) { + verbose = v; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + if (pitIds == null) { + out.writeVInt(0); + } else { + out.writeStringArray(pitIds.toArray(new String[pitIds.size()])); + } + out.writeBoolean(verbose); + + } + + public Collection getPitIds() { + return pitIds; + } + + public void setPitIds(Collection pitIds) { + this.pitIds = pitIds; + } +} \ No newline at end of file diff --git a/server/src/main/java/org/opensearch/action/admin/indices/segments/TransportPitSegmentsAction.java b/server/src/main/java/org/opensearch/action/admin/indices/segments/TransportPitSegmentsAction.java new file mode 100644 index 0000000000000..0014d93635bf2 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/indices/segments/TransportPitSegmentsAction.java @@ -0,0 +1,233 @@ + +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.indices.segments; + +import com.carrotsearch.hppc.cursors.ObjectCursor; +import org.opensearch.action.ActionListener; +import org.opensearch.action.search.GetAllPitNodesRequest; +import org.opensearch.action.search.GetAllPitNodesResponse; +import org.opensearch.action.search.GetAllPitsAction; +import org.opensearch.action.search.ListPitInfo; +import org.opensearch.action.search.SearchContextId; +import org.opensearch.action.search.SearchContextIdForNode; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.DefaultShardOperationFailedException; +import org.opensearch.action.support.broadcast.node.TransportBroadcastByNodeAction; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.block.ClusterBlockException; +import org.opensearch.cluster.block.ClusterBlockLevel; +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.routing.AllocationId; +import org.opensearch.cluster.routing.PlainShardsIterator; +import org.opensearch.cluster.routing.RecoverySource; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.ShardRoutingState; +import org.opensearch.cluster.routing.ShardsIterator; +import org.opensearch.cluster.routing.UnassignedInfo; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.Strings; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.io.stream.NamedWriteableRegistry; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.xcontent.XContentBuilder; +import org.opensearch.index.shard.ShardId; +import org.opensearch.indices.IndicesService; +import org.opensearch.search.SearchService; +import org.opensearch.search.internal.PitReaderContext; +import org.opensearch.tasks.Task; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportException; +import org.opensearch.transport.TransportResponseHandler; +import org.opensearch.transport.TransportService; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.opensearch.action.search.SearchContextId.decode; + +public class TransportPitSegmentsAction extends TransportBroadcastByNodeAction { + + private final ClusterService clusterService; + private final IndicesService indicesService; + private final SearchService searchService; + private final NamedWriteableRegistry namedWriteableRegistry; + private final TransportService transportService; + + + @Inject + public TransportPitSegmentsAction(ClusterService clusterService, TransportService transportService, + IndicesService indicesService, ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver, + SearchService searchService, + NamedWriteableRegistry namedWriteableRegistry) { + super(PitSegmentsAction.NAME, clusterService, transportService, actionFilters, indexNameExpressionResolver, + PitSegmentsRequest::new, ThreadPool.Names.MANAGEMENT); + this.clusterService = clusterService; + this.indicesService = indicesService; + this.searchService = searchService; + this.namedWriteableRegistry = namedWriteableRegistry; + this.transportService = transportService; + + } + + @Override + protected void doExecute(Task task, PitSegmentsRequest request, ActionListener listener) { + //TODO : Refactor to use a util method to "get all PITs" once such a method is implemented. + if (request.getPitIds().isEmpty()) { + final List nodes = new LinkedList<>(); + for (ObjectCursor cursor : clusterService.state().nodes().getDataNodes().values()) { + DiscoveryNode node = cursor.value; + nodes.add(node); + } + DiscoveryNode[] disNodesArr = new DiscoveryNode[nodes.size()]; + nodes.toArray(disNodesArr); + transportService.sendRequest(transportService.getLocalNode(), GetAllPitsAction.NAME, + new GetAllPitNodesRequest(disNodesArr), new TransportResponseHandler() { + @Override + public void handleResponse(GetAllPitNodesResponse response) { + request.setPitIds(response.getPITIDs().stream().map(ListPitInfo::getPitId).collect(Collectors.toList())); + getDoExecute(task, request, listener); + } + + @Override + public void handleException(TransportException exp) { + listener.onFailure(exp); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + + @Override + public GetAllPitNodesResponse read(StreamInput in) throws IOException { + return new GetAllPitNodesResponse(in); + } + }); + } else { + getDoExecute(task, request, listener); + } + } + + private void getDoExecute(Task task, PitSegmentsRequest request, ActionListener listener) { + super.doExecute(task, request, listener); + } + + @Override + protected ShardsIterator shards(ClusterState clusterState, PitSegmentsRequest request, String[] concreteIndices) { + final ArrayList iterators = new ArrayList<>(); + for (String pitId : request.getPitIds()) { + SearchContextId searchContext = decode(namedWriteableRegistry, pitId); + for (Map.Entry entry : searchContext.shards().entrySet()) { + final SearchContextIdForNode perNode = entry.getValue(); + if (Strings.isEmpty(perNode.getClusterAlias())) { + final ShardId shardId = entry.getKey(); + iterators.add(new PitAwareShardRouting(pitId, shardId, perNode.getNode(), null, true, ShardRoutingState.STARTED, + null, null, null, -1L)); + } + } + } + return new PlainShardsIterator(iterators); + } + + @Override + protected ClusterBlockException checkGlobalBlock(ClusterState state, PitSegmentsRequest request) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ); + } + + @Override + protected ClusterBlockException checkRequestBlock(ClusterState state, PitSegmentsRequest countRequest, String[] concreteIndices) { + return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_READ, concreteIndices); + } + +// @Override +// protected Writeable.Reader.NodeRequest> getNodeRequestWriteable() { +// return NodeRequest::new; +// } + + @Override + protected ShardSegments readShardResult(StreamInput in) throws IOException { + return new ShardSegments(in); + } + + @Override + protected IndicesSegmentResponse newResponse(PitSegmentsRequest request, int totalShards, int successfulShards, int failedShards, + List results, List shardFailures, + ClusterState clusterState) { + return new IndicesSegmentResponse(results.toArray(new ShardSegments[results.size()]), totalShards, successfulShards, failedShards, + shardFailures); + } + + @Override + protected PitSegmentsRequest readRequestFrom(StreamInput in) throws IOException { + return new PitSegmentsRequest(in); + } + + @Override + protected ShardSegments shardOperation(PitSegmentsRequest request, ShardRouting shardRouting) { + PitAwareShardRouting pitAwareShardRouting = (PitAwareShardRouting) shardRouting; + SearchContextIdForNode searchContextIdForNode = decode(namedWriteableRegistry, + pitAwareShardRouting.getPitId()).shards().get(shardRouting.shardId()); + PitReaderContext pitReaderContext = searchService.getPitReaderContext(searchContextIdForNode.getSearchContextId()); + return new ShardSegments(pitReaderContext.getShardRouting(), pitReaderContext.getSegments()); + } + + public class PitAwareShardRouting extends ShardRouting { + + private final String pitId; + + public PitAwareShardRouting(StreamInput in) throws IOException { + super(in); + this.pitId = in.readString(); + } + + public PitAwareShardRouting( + String pitId, + ShardId shardId, + String currentNodeId, + String relocatingNodeId, + boolean primary, + ShardRoutingState state, + RecoverySource recoverySource, + UnassignedInfo unassignedInfo, + AllocationId allocationId, + long expectedShardSize + ) { + super(shardId, currentNodeId, relocatingNodeId, primary, state, recoverySource, unassignedInfo, + allocationId, expectedShardSize); + this.pitId = pitId; + } + + public String getPitId() { + return pitId; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(pitId); + } + +// @Override +// public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { +// super.toXContent(builder, params); +// builder.field("pitId", pitId); +// return builder.endObject(); +// } + } + + +} \ No newline at end of file diff --git a/server/src/main/java/org/opensearch/client/Client.java b/server/src/main/java/org/opensearch/client/Client.java index 1d3bbfcba43f9..1f953e4c2c1d7 100644 --- a/server/src/main/java/org/opensearch/client/Client.java +++ b/server/src/main/java/org/opensearch/client/Client.java @@ -34,6 +34,8 @@ import org.opensearch.action.ActionFuture; import org.opensearch.action.ActionListener; +import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse; +import org.opensearch.action.admin.indices.segments.PitSegmentsRequest; import org.opensearch.action.bulk.BulkRequest; import org.opensearch.action.bulk.BulkRequestBuilder; import org.opensearch.action.bulk.BulkResponse; @@ -339,6 +341,11 @@ public interface Client extends OpenSearchClient, Releasable { */ void deletePits(DeletePitRequest deletePITRequest, ActionListener listener); + /** + * The segments of one or more Point In Time's + */ + void pitSegments(PitSegmentsRequest pitSegmentsRequest, ActionListener listener); + /** * Performs multiple search requests. */ diff --git a/server/src/main/java/org/opensearch/client/support/AbstractClient.java b/server/src/main/java/org/opensearch/client/support/AbstractClient.java index f99454a8a8913..b7092a1a538d9 100644 --- a/server/src/main/java/org/opensearch/client/support/AbstractClient.java +++ b/server/src/main/java/org/opensearch/client/support/AbstractClient.java @@ -233,10 +233,7 @@ import org.opensearch.action.admin.indices.rollover.RolloverRequest; import org.opensearch.action.admin.indices.rollover.RolloverRequestBuilder; import org.opensearch.action.admin.indices.rollover.RolloverResponse; -import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse; -import org.opensearch.action.admin.indices.segments.IndicesSegmentsAction; -import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequest; -import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequestBuilder; +import org.opensearch.action.admin.indices.segments.*; import org.opensearch.action.admin.indices.settings.get.GetSettingsAction; import org.opensearch.action.admin.indices.settings.get.GetSettingsRequest; import org.opensearch.action.admin.indices.settings.get.GetSettingsRequestBuilder; @@ -590,6 +587,11 @@ public void deletePits(final DeletePitRequest deletePITRequest, final ActionList execute(DeletePitAction.INSTANCE, deletePITRequest, listener); } + @Override + public void pitSegments(final PitSegmentsRequest request, final ActionListener listener) { + execute(PitSegmentsAction.INSTANCE, request, listener); + } + @Override public ActionFuture multiSearch(MultiSearchRequest request) { return execute(MultiSearchAction.INSTANCE, request); diff --git a/server/src/main/java/org/opensearch/cluster/routing/ShardRouting.java b/server/src/main/java/org/opensearch/cluster/routing/ShardRouting.java index 7dec8f9c84a89..7a3ebf8d160a9 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/ShardRouting.java +++ b/server/src/main/java/org/opensearch/cluster/routing/ShardRouting.java @@ -54,7 +54,7 @@ * * @opensearch.internal */ -public final class ShardRouting implements Writeable, ToXContentObject { +public class ShardRouting implements Writeable, ToXContentObject { /** * Used if shard size is not available @@ -78,7 +78,7 @@ public final class ShardRouting implements Writeable, ToXContentObject { * A constructor to internally create shard routing instances, note, the internal flag should only be set to true * by either this class or tests. Visible for testing. */ - ShardRouting( + public ShardRouting( ShardId shardId, String currentNodeId, String relocatingNodeId, diff --git a/server/src/main/java/org/opensearch/rest/action/cat/RestPitSegmentsAction.java b/server/src/main/java/org/opensearch/rest/action/cat/RestPitSegmentsAction.java new file mode 100644 index 0000000000000..6fa5969662026 --- /dev/null +++ b/server/src/main/java/org/opensearch/rest/action/cat/RestPitSegmentsAction.java @@ -0,0 +1,145 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.rest.action.cat; + +import org.opensearch.action.admin.cluster.state.ClusterStateRequest; +import org.opensearch.action.admin.cluster.state.ClusterStateResponse; +import org.opensearch.action.admin.indices.segments.IndexSegments; +import org.opensearch.action.admin.indices.segments.IndexShardSegments; +import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse; +import org.opensearch.action.admin.indices.segments.PitSegmentsAction; +import org.opensearch.action.admin.indices.segments.PitSegmentsRequest; +import org.opensearch.action.admin.indices.segments.ShardSegments; +import org.opensearch.client.node.NodeClient; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.common.Strings; +import org.opensearch.common.Table; +import org.opensearch.index.engine.Segment; +import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.RestHandler; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.RestResponse; +import org.opensearch.rest.action.RestActionListener; +import org.opensearch.rest.action.RestResponseListener; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Map; + +import static java.util.Arrays.asList; +import static java.util.Collections.unmodifiableList; +import static org.opensearch.rest.RestRequest.Method.GET; + +public class RestPitSegmentsAction extends AbstractCatAction { + @Override + public List routes() { + return unmodifiableList(asList(new Route(GET, "/_cat/pit_segments"), new Route(GET, "/_cat/pit_segments/{pit_id}"))); + } + + @Override + public String getName() { + return "cat_pit_segments_action"; + } + + @Override + public boolean allowSystemIndexAccessByDefault() { + return true; + } + + @Override + protected BaseRestHandler.RestChannelConsumer doCatRequest(final RestRequest request, final NodeClient client) { + final String[] pitIds = Strings.splitStringByCommaToArray(request.param("pit_id")); + final ClusterStateRequest clusterStateRequest = new ClusterStateRequest(); + clusterStateRequest.local(false); + clusterStateRequest.masterNodeTimeout(request.paramAsTime("master_timeout", clusterStateRequest.masterNodeTimeout())); + clusterStateRequest.clear().nodes(true).routingTable(true).indices("*"); + + return channel -> client.admin().cluster().state(clusterStateRequest, new RestActionListener<>(channel) { + @Override + public void processResponse(final ClusterStateResponse clusterStateResponse) { + final PitSegmentsRequest pitSegmentsRequest = new PitSegmentsRequest(); + pitSegmentsRequest.setPitIds(new HashSet(Arrays.asList(pitIds))); + client.execute(PitSegmentsAction.INSTANCE, pitSegmentsRequest, new RestResponseListener<>(channel) { + @Override + public RestResponse buildResponse(final IndicesSegmentResponse indicesSegmentResponse) throws Exception { + final Map indicesSegments = indicesSegmentResponse.getIndices(); + Table tab = buildTable(request, clusterStateResponse, indicesSegments); + return RestTable.buildResponse(tab, channel); + } + }); + } + }); + } + + @Override + protected void documentation(StringBuilder sb) { + sb.append("/_cat/pit_segments\n"); + sb.append("/_cat/pit_segments/{pit_id}\n"); + } + + @Override + protected Table getTableWithHeader(RestRequest request) { + Table table = new Table(); + table.startHeaders(); + table.addCell("index", "default:true;alias:i,idx;desc:index name"); + table.addCell("shard", "default:true;alias:s,sh;desc:shard name"); + table.addCell("prirep", "alias:p,pr,primaryOrReplica;default:true;desc:primary or replica"); + table.addCell("ip", "default:true;desc:ip of node where it lives"); + table.addCell("id", "default:false;desc:unique id of node where it lives"); + table.addCell("segment", "default:true;alias:seg;desc:segment name"); + table.addCell("generation", "default:true;alias:g,gen;text-align:right;desc:segment generation"); + table.addCell("docs.count", "default:true;alias:dc,docsCount;text-align:right;desc:number of docs in segment"); + table.addCell("docs.deleted", "default:true;alias:dd,docsDeleted;text-align:right;desc:number of deleted docs in segment"); + table.addCell("size", "default:true;alias:si;text-align:right;desc:segment size in bytes"); + table.addCell("size.memory", "default:true;alias:sm,sizeMemory;text-align:right;desc:segment memory in bytes"); + table.addCell("committed", "default:true;alias:ic,isCommitted;desc:is segment committed"); + table.addCell("searchable", "default:true;alias:is,isSearchable;desc:is segment searched"); + table.addCell("version", "default:true;alias:v,ver;desc:version"); + table.addCell("compound", "default:true;alias:ico,isCompound;desc:is segment compound"); + table.endHeaders(); + return table; + } + + private Table buildTable(final RestRequest request, ClusterStateResponse state, Map indicesSegments) { + Table table = getTableWithHeader(request); + + DiscoveryNodes nodes = state.getState().nodes(); + + for (IndexSegments indexSegments : indicesSegments.values()) { + Map shards = indexSegments.getShards(); + for (IndexShardSegments indexShardSegments : shards.values()) { + ShardSegments[] shardSegments = indexShardSegments.getShards(); + for (ShardSegments shardSegment : shardSegments) { + List segments = shardSegment.getSegments(); + for (Segment segment : segments) { + table.startRow(); + table.addCell(shardSegment.getShardRouting().getIndexName()); + table.addCell(shardSegment.getShardRouting().getId()); + table.addCell(shardSegment.getShardRouting().primary() ? "p" : "r"); + table.addCell(nodes.get(shardSegment.getShardRouting().currentNodeId()).getHostAddress()); + table.addCell(shardSegment.getShardRouting().currentNodeId()); + table.addCell(segment.getName()); + table.addCell(segment.getGeneration()); + table.addCell(segment.getNumDocs()); + table.addCell(segment.getDeletedDocs()); + table.addCell(segment.getSize()); + table.addCell(0L); + table.addCell(segment.isCommitted()); + table.addCell(segment.isSearch()); + table.addCell(segment.getVersion()); + table.addCell(segment.isCompound()); + table.endRow(); + } + } + } + } + return table; + } +} diff --git a/server/src/main/java/org/opensearch/search/internal/PitReaderContext.java b/server/src/main/java/org/opensearch/search/internal/PitReaderContext.java index 135d8b1d173b0..86771908e2144 100644 --- a/server/src/main/java/org/opensearch/search/internal/PitReaderContext.java +++ b/server/src/main/java/org/opensearch/search/internal/PitReaderContext.java @@ -9,12 +9,16 @@ package org.opensearch.search.internal; import org.apache.lucene.util.SetOnce; +import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.common.lease.Releasable; import org.opensearch.common.lease.Releasables; import org.opensearch.index.IndexService; import org.opensearch.index.engine.Engine; +import org.opensearch.index.engine.Segment; import org.opensearch.index.shard.IndexShard; +import java.util.List; + /** * PIT reader context containing PIT specific information such as pit id, create time etc. */ @@ -24,6 +28,15 @@ public class PitReaderContext extends ReaderContext { private final SetOnce pitId = new SetOnce<>(); // Creation time of PIT contexts which helps users to differentiate between multiple PIT reader contexts private final SetOnce creationTime = new SetOnce<>(); + /** + * Shard routing at the time of creation of PIT Reader Context + */ + private final ShardRouting shardRouting; + + /** + * Encapsulates segments constituting the shard at the time of creation of PIT Reader Context. + */ + private final List segments; public PitReaderContext( ShardSearchContextId id, @@ -34,6 +47,8 @@ public PitReaderContext( boolean singleSession ) { super(id, indexService, indexShard, searcherSupplier, keepAliveInMillis, singleSession); + shardRouting = indexShard.routingEntry(); + segments = indexShard.segments(true); } public String getPitId() { @@ -67,4 +82,13 @@ public long getCreationTime() { public void setCreationTime(final long creationTime) { this.creationTime.set(creationTime); } + + public ShardRouting getShardRouting() { + return shardRouting; + } + + public List getSegments() { + return segments; + } + } diff --git a/server/src/test/java/org/opensearch/search/PitSegmentSingleNodeTests.java b/server/src/test/java/org/opensearch/search/PitSegmentSingleNodeTests.java new file mode 100644 index 0000000000000..722c3c0999c05 --- /dev/null +++ b/server/src/test/java/org/opensearch/search/PitSegmentSingleNodeTests.java @@ -0,0 +1,58 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search; + +import org.junit.After; +import org.junit.Before; +import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse; +import org.opensearch.action.admin.indices.segments.PitSegmentsAction; +import org.opensearch.action.admin.indices.segments.PitSegmentsRequest; +import org.opensearch.action.search.CreatePitAction; +import org.opensearch.action.search.CreatePitRequest; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.test.OpenSearchSingleNodeTestCase; + +import java.util.concurrent.ExecutionException; + +import static org.opensearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE; + +public class PitSegmentSingleNodeTests extends OpenSearchSingleNodeTestCase { + @Before + public void setupIndex() throws ExecutionException, InterruptedException { + for (int i = 0; i < 4; i++) { + + + + String indexName = "index" + i; + createIndex(indexName, Settings.builder().put("index.number_of_shards", 3).put("index.number_of_replicas", 0).build()); + client().prepareIndex(indexName).setId("1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).execute().get(); + ensureGreen(); + } + } + + @After + public void clearIndex() { + for (int i = 0; i < 4; i++) { + String indexName = "index" + i; + client().admin().indices().prepareDelete(indexName).get(); + } + } + + public void testPitSegments() throws Exception { + IndicesSegmentResponse indicesSegmentResponse; + CreatePitRequest request = new CreatePitRequest(TimeValue.timeValueDays(1), true); + request.setIndices(new String[]{"index1"}); + client().execute(CreatePitAction.INSTANCE, request).get(); + indicesSegmentResponse = client().execute(PitSegmentsAction.INSTANCE, new PitSegmentsRequest()).get(); + assertTrue(indicesSegmentResponse.getShardFailures() == null || indicesSegmentResponse.getShardFailures().length == 0); + + + } +} diff --git a/server/src/test/java/org/opensearch/search/PitSegmentsMultiNodeTests.java b/server/src/test/java/org/opensearch/search/PitSegmentsMultiNodeTests.java new file mode 100644 index 0000000000000..34f9c6da8ac2e --- /dev/null +++ b/server/src/test/java/org/opensearch/search/PitSegmentsMultiNodeTests.java @@ -0,0 +1,60 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search; + +import org.junit.After; +import org.junit.Before; +import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse; +import org.opensearch.action.admin.indices.segments.PitSegmentsAction; +import org.opensearch.action.admin.indices.segments.PitSegmentsRequest; +import org.opensearch.action.search.CreatePitAction; +import org.opensearch.action.search.CreatePitRequest; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.util.concurrent.ExecutionException; + +import static org.opensearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE; + +/** + * Multi node integration tests for PIT segments operation + */ +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, numDataNodes = 3) +public class PitSegmentsMultiNodeTests extends OpenSearchIntegTestCase { + + @Before + public void setupIndex() throws ExecutionException, InterruptedException { + for (int i = 0; i < 4; i++) { + String indexName = "index" + i; + createIndex(indexName, Settings.builder().put("index.number_of_shards", 3).put("index.number_of_replicas", 0).build()); + client().prepareIndex(indexName).setId("1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).execute().get(); + ensureGreen(); + } + } + + @After + public void clearIndex() { + for (int i = 0; i < 4; i++) { + String indexName = "index" + i; + client().admin().indices().prepareDelete(indexName).get(); + } + } + + public void testPitSegments() throws Exception { + IndicesSegmentResponse indicesSegmentResponse; + CreatePitRequest request = new CreatePitRequest(TimeValue.timeValueDays(1), true); + request.setIndices(new String[]{"index1"}); + client().execute(CreatePitAction.INSTANCE, request).get(); + indicesSegmentResponse = client().execute(PitSegmentsAction.INSTANCE, new PitSegmentsRequest()).get(); + assertTrue(indicesSegmentResponse.getShardFailures() == null || indicesSegmentResponse.getShardFailures().length == 0); + + + } +} \ No newline at end of file From 4ff1ef404504a282e33b584b14e7ff8f215348d4 Mon Sep 17 00:00:00 2001 From: Bharathwaj G Date: Tue, 19 Jul 2022 13:35:21 +0530 Subject: [PATCH 2/4] pit seg Signed-off-by: Bharathwaj G --- .../segments/TransportPitSegmentsAction.java | 59 ++++++++++--------- .../search/PitSegmentsMultiNodeTests.java | 1 + 2 files changed, 32 insertions(+), 28 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/admin/indices/segments/TransportPitSegmentsAction.java b/server/src/main/java/org/opensearch/action/admin/indices/segments/TransportPitSegmentsAction.java index 0014d93635bf2..70d6f701ce2b5 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/segments/TransportPitSegmentsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/segments/TransportPitSegmentsAction.java @@ -25,13 +25,7 @@ import org.opensearch.cluster.block.ClusterBlockLevel; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.node.DiscoveryNode; -import org.opensearch.cluster.routing.AllocationId; -import org.opensearch.cluster.routing.PlainShardsIterator; -import org.opensearch.cluster.routing.RecoverySource; -import org.opensearch.cluster.routing.ShardRouting; -import org.opensearch.cluster.routing.ShardRoutingState; -import org.opensearch.cluster.routing.ShardsIterator; -import org.opensearch.cluster.routing.UnassignedInfo; +import org.opensearch.cluster.routing.*; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Strings; import org.opensearch.common.inject.Inject; @@ -39,6 +33,8 @@ import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; import org.opensearch.common.xcontent.XContentBuilder; +import org.opensearch.index.IndexService; +import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.ShardId; import org.opensearch.indices.IndicesService; import org.opensearch.search.SearchService; @@ -54,6 +50,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.function.Predicate; import java.util.stream.Collectors; import static org.opensearch.action.search.SearchContextId.decode; @@ -128,19 +125,20 @@ private void getDoExecute(Task task, PitSegmentsRequest request, ActionListener< @Override protected ShardsIterator shards(ClusterState clusterState, PitSegmentsRequest request, String[] concreteIndices) { - final ArrayList iterators = new ArrayList<>(); - for (String pitId : request.getPitIds()) { - SearchContextId searchContext = decode(namedWriteableRegistry, pitId); - for (Map.Entry entry : searchContext.shards().entrySet()) { - final SearchContextIdForNode perNode = entry.getValue(); - if (Strings.isEmpty(perNode.getClusterAlias())) { - final ShardId shardId = entry.getKey(); - iterators.add(new PitAwareShardRouting(pitId, shardId, perNode.getNode(), null, true, ShardRoutingState.STARTED, - null, null, null, -1L)); - } - } - } - return new PlainShardsIterator(iterators); +// final ArrayList iterators = new ArrayList<>(); +// for (String pitId : request.getPitIds()) { +// SearchContextId searchContext = decode(namedWriteableRegistry, pitId); +// for (Map.Entry entry : searchContext.shards().entrySet()) { +// final SearchContextIdForNode perNode = entry.getValue(); +// if (Strings.isEmpty(perNode.getClusterAlias())) { +// final ShardId shardId = entry.getKey(); +// iterators.add(new PitAwareShardRouting(shardId, perNode.getNode(), null, true, ShardRoutingState.STARTED, +// null, null, null, -1L, pitId)); +// } +// } +// } +// return new PlainShardsIterator(iterators); + return clusterState.routingTable().allShards(concreteIndices); } @Override @@ -183,6 +181,11 @@ protected ShardSegments shardOperation(PitSegmentsRequest request, ShardRouting pitAwareShardRouting.getPitId()).shards().get(shardRouting.shardId()); PitReaderContext pitReaderContext = searchService.getPitReaderContext(searchContextIdForNode.getSearchContextId()); return new ShardSegments(pitReaderContext.getShardRouting(), pitReaderContext.getSegments()); + + +// IndexService indexService = indicesService.indexServiceSafe(shardRouting.index()); +// IndexShard indexShard = indexService.getShard(shardRouting.id()); +// return new ShardSegments(indexShard.routingEntry(), indexShard.segments(request.verbose())); } public class PitAwareShardRouting extends ShardRouting { @@ -195,7 +198,6 @@ public PitAwareShardRouting(StreamInput in) throws IOException { } public PitAwareShardRouting( - String pitId, ShardId shardId, String currentNodeId, String relocatingNodeId, @@ -204,7 +206,8 @@ public PitAwareShardRouting( RecoverySource recoverySource, UnassignedInfo unassignedInfo, AllocationId allocationId, - long expectedShardSize + long expectedShardSize, + String pitId ) { super(shardId, currentNodeId, relocatingNodeId, primary, state, recoverySource, unassignedInfo, allocationId, expectedShardSize); @@ -221,12 +224,12 @@ public void writeTo(StreamOutput out) throws IOException { out.writeString(pitId); } -// @Override -// public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { -// super.toXContent(builder, params); -// builder.field("pitId", pitId); -// return builder.endObject(); -// } + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + super.toXContent(builder, params); + builder.field("pitId", pitId); + return builder.endObject(); + } } diff --git a/server/src/test/java/org/opensearch/search/PitSegmentsMultiNodeTests.java b/server/src/test/java/org/opensearch/search/PitSegmentsMultiNodeTests.java index 34f9c6da8ac2e..f85067fadd5c0 100644 --- a/server/src/test/java/org/opensearch/search/PitSegmentsMultiNodeTests.java +++ b/server/src/test/java/org/opensearch/search/PitSegmentsMultiNodeTests.java @@ -53,6 +53,7 @@ public void testPitSegments() throws Exception { request.setIndices(new String[]{"index1"}); client().execute(CreatePitAction.INSTANCE, request).get(); indicesSegmentResponse = client().execute(PitSegmentsAction.INSTANCE, new PitSegmentsRequest()).get(); + // assertTrue(indicesSegmentResponse.getShardFailures()[0].) assertTrue(indicesSegmentResponse.getShardFailures() == null || indicesSegmentResponse.getShardFailures().length == 0); From 92b7d2453677485b251016f8377da382676c7893 Mon Sep 17 00:00:00 2001 From: Bharathwaj G Date: Tue, 19 Jul 2022 18:09:31 +0530 Subject: [PATCH 3/4] pit seg Signed-off-by: Bharathwaj G --- .../segments/TransportPitSegmentsAction.java | 68 +++++++++---------- .../cluster/routing/AllocationId.java | 2 +- 2 files changed, 32 insertions(+), 38 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/admin/indices/segments/TransportPitSegmentsAction.java b/server/src/main/java/org/opensearch/action/admin/indices/segments/TransportPitSegmentsAction.java index 70d6f701ce2b5..8e8102102836f 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/segments/TransportPitSegmentsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/segments/TransportPitSegmentsAction.java @@ -46,10 +46,7 @@ import org.opensearch.transport.TransportService; import java.io.IOException; -import java.util.ArrayList; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -125,20 +122,23 @@ private void getDoExecute(Task task, PitSegmentsRequest request, ActionListener< @Override protected ShardsIterator shards(ClusterState clusterState, PitSegmentsRequest request, String[] concreteIndices) { -// final ArrayList iterators = new ArrayList<>(); -// for (String pitId : request.getPitIds()) { -// SearchContextId searchContext = decode(namedWriteableRegistry, pitId); -// for (Map.Entry entry : searchContext.shards().entrySet()) { -// final SearchContextIdForNode perNode = entry.getValue(); -// if (Strings.isEmpty(perNode.getClusterAlias())) { -// final ShardId shardId = entry.getKey(); -// iterators.add(new PitAwareShardRouting(shardId, perNode.getNode(), null, true, ShardRoutingState.STARTED, -// null, null, null, -1L, pitId)); -// } -// } -// } -// return new PlainShardsIterator(iterators); - return clusterState.routingTable().allShards(concreteIndices); + final ArrayList iterators = new ArrayList<>(); + ShardsIterator shardsIterator = clusterState.routingTable().allShards(concreteIndices); + for (String pitId : request.getPitIds()) { + SearchContextId searchContext = decode(namedWriteableRegistry, pitId); + for (Map.Entry entry : searchContext.shards().entrySet()) { + final SearchContextIdForNode perNode = entry.getValue(); + if (Strings.isEmpty(perNode.getClusterAlias())) { + final ShardId shardId = entry.getKey(); + Optional shardRouting = shardsIterator.getShardRoutings().stream().filter(r -> r.shardId().equals(shardId)).findFirst(); + ShardRouting sr = shardRouting.get(); + iterators.add(sr); + // iterators.add(new PitAwareShardRouting(sr, pitId)); + } + } + } + return new PlainShardsIterator(iterators); +// return clusterState.routingTable().allShards(concreteIndices); } @Override @@ -176,16 +176,16 @@ protected PitSegmentsRequest readRequestFrom(StreamInput in) throws IOException @Override protected ShardSegments shardOperation(PitSegmentsRequest request, ShardRouting shardRouting) { - PitAwareShardRouting pitAwareShardRouting = (PitAwareShardRouting) shardRouting; - SearchContextIdForNode searchContextIdForNode = decode(namedWriteableRegistry, - pitAwareShardRouting.getPitId()).shards().get(shardRouting.shardId()); - PitReaderContext pitReaderContext = searchService.getPitReaderContext(searchContextIdForNode.getSearchContextId()); - return new ShardSegments(pitReaderContext.getShardRouting(), pitReaderContext.getSegments()); +// PitAwareShardRouting pitAwareShardRouting = (PitAwareShardRouting) shardRouting; +// SearchContextIdForNode searchContextIdForNode = decode(namedWriteableRegistry, +// pitAwareShardRouting.getPitId()).shards().get(shardRouting.shardId()); +// PitReaderContext pitReaderContext = searchService.getPitReaderContext(searchContextIdForNode.getSearchContextId()); +// return new ShardSegments(pitReaderContext.getShardRouting(), pitReaderContext.getSegments()); -// IndexService indexService = indicesService.indexServiceSafe(shardRouting.index()); -// IndexShard indexShard = indexService.getShard(shardRouting.id()); -// return new ShardSegments(indexShard.routingEntry(), indexShard.segments(request.verbose())); + IndexService indexService = indicesService.indexServiceSafe(shardRouting.index()); + IndexShard indexShard = indexService.getShard(shardRouting.id()); + return new ShardSegments(indexShard.routingEntry(), indexShard.segments(request.verbose())); } public class PitAwareShardRouting extends ShardRouting { @@ -198,19 +198,13 @@ public PitAwareShardRouting(StreamInput in) throws IOException { } public PitAwareShardRouting( - ShardId shardId, - String currentNodeId, - String relocatingNodeId, - boolean primary, - ShardRoutingState state, - RecoverySource recoverySource, - UnassignedInfo unassignedInfo, - AllocationId allocationId, - long expectedShardSize, + ShardRouting shardRouting, String pitId ) { - super(shardId, currentNodeId, relocatingNodeId, primary, state, recoverySource, unassignedInfo, - allocationId, expectedShardSize); + super(shardRouting.shardId(), shardRouting.currentNodeId(), shardRouting.relocatingNodeId(), + shardRouting.primary(), shardRouting.state(), shardRouting.recoverySource(), + shardRouting.unassignedInfo(), + shardRouting.allocationId(), shardRouting.getExpectedShardSize()); this.pitId = pitId; } diff --git a/server/src/main/java/org/opensearch/cluster/routing/AllocationId.java b/server/src/main/java/org/opensearch/cluster/routing/AllocationId.java index fb149743df077..b1a871b279b3a 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/AllocationId.java +++ b/server/src/main/java/org/opensearch/cluster/routing/AllocationId.java @@ -100,7 +100,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeOptionalString(this.relocationId); } - private AllocationId(String id, String relocationId) { + public AllocationId(String id, String relocationId) { Objects.requireNonNull(id, "Argument [id] must be non-null"); this.id = id; this.relocationId = relocationId; From 17f079d83765bb56c6b4368052c533b1c8bf78e0 Mon Sep 17 00:00:00 2001 From: Bharathwaj G Date: Tue, 19 Jul 2022 19:44:50 +0530 Subject: [PATCH 4/4] pit seg Signed-off-by: Bharathwaj G --- .../segments/TransportPitSegmentsAction.java | 33 ++++++++++++------- .../node/TransportBroadcastByNodeAction.java | 8 ++++- 2 files changed, 29 insertions(+), 12 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/admin/indices/segments/TransportPitSegmentsAction.java b/server/src/main/java/org/opensearch/action/admin/indices/segments/TransportPitSegmentsAction.java index 8e8102102836f..87cba1dc0ae88 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/segments/TransportPitSegmentsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/segments/TransportPitSegmentsAction.java @@ -132,8 +132,8 @@ protected ShardsIterator shards(ClusterState clusterState, PitSegmentsRequest re final ShardId shardId = entry.getKey(); Optional shardRouting = shardsIterator.getShardRoutings().stream().filter(r -> r.shardId().equals(shardId)).findFirst(); ShardRouting sr = shardRouting.get(); - iterators.add(sr); - // iterators.add(new PitAwareShardRouting(sr, pitId)); + PitAwareShardRouting psr = new PitAwareShardRouting(sr, pitId); + iterators.add(psr); } } } @@ -175,17 +175,28 @@ protected PitSegmentsRequest readRequestFrom(StreamInput in) throws IOException } @Override - protected ShardSegments shardOperation(PitSegmentsRequest request, ShardRouting shardRouting) { -// PitAwareShardRouting pitAwareShardRouting = (PitAwareShardRouting) shardRouting; -// SearchContextIdForNode searchContextIdForNode = decode(namedWriteableRegistry, -// pitAwareShardRouting.getPitId()).shards().get(shardRouting.shardId()); -// PitReaderContext pitReaderContext = searchService.getPitReaderContext(searchContextIdForNode.getSearchContextId()); -// return new ShardSegments(pitReaderContext.getShardRouting(), pitReaderContext.getSegments()); + public List getShardsFromInputStream(StreamInput in) throws IOException { + System.out.println("getShardsFromInputStream from TBBNA = " + in); + return in.readList(PitAwareShardRouting::new); + } + @Override + protected ShardSegments shardOperation(PitSegmentsRequest request, ShardRouting shardRouting) { - IndexService indexService = indicesService.indexServiceSafe(shardRouting.index()); - IndexShard indexShard = indexService.getShard(shardRouting.id()); - return new ShardSegments(indexShard.routingEntry(), indexShard.segments(request.verbose())); + PitAwareShardRouting pitAwareShardRouting = (PitAwareShardRouting) shardRouting; + SearchContextIdForNode searchContextIdForNode = decode(namedWriteableRegistry, + pitAwareShardRouting.getPitId()).shards().get(shardRouting.shardId()); + PitReaderContext pitReaderContext = searchService.getPitReaderContext(searchContextIdForNode.getSearchContextId()); + return new ShardSegments(pitReaderContext.getShardRouting(), pitReaderContext.getSegments()); +// for(String pitId : request.getPitIds()) { +// SearchContextIdForNode searchContextIdForNode = decode(namedWriteableRegistry, +// pitId).shards().get(shardRouting.shardId()); +// if(searchContextIdForNode == null) continue; +// PitReaderContext pitReaderContext = searchService.getPitReaderContext(searchContextIdForNode.getSearchContextId()); +// return new ShardSegments(pitReaderContext.getShardRouting(), pitReaderContext.getSegments()); +// +// } +// return null; } public class PitAwareShardRouting extends ShardRouting { diff --git a/server/src/main/java/org/opensearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java b/server/src/main/java/org/opensearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java index f849be4db4e2b..93cadecb09420 100644 --- a/server/src/main/java/org/opensearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java +++ b/server/src/main/java/org/opensearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java @@ -532,6 +532,11 @@ private void onShardOperation( } } + public List getShardsFromInputStream(StreamInput in) throws IOException { + System.out.println("getShardsFromInputStream from TBBNA = " + in); + return in.readList(ShardRouting::new); + } + /** * A node request * @@ -547,7 +552,8 @@ public class NodeRequest extends TransportRequest implements IndicesRequest { public NodeRequest(StreamInput in) throws IOException { super(in); indicesLevelRequest = readRequestFrom(in); - shards = in.readList(ShardRouting::new); + //shards = in.readList(ShardRouting::new); + shards = getShardsFromInputStream(in); nodeId = in.readString(); }