Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pitsegments1 #9

Open
wants to merge 4 commits into
base: listpits
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 6 additions & 19 deletions server/src/main/java/org/opensearch/action/ActionModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,9 @@
import org.opensearch.action.admin.indices.rollover.RolloverAction;
import org.opensearch.action.admin.indices.rollover.TransportRolloverAction;
import org.opensearch.action.admin.indices.segments.IndicesSegmentsAction;
import org.opensearch.action.admin.indices.segments.PitSegmentsAction;
import org.opensearch.action.admin.indices.segments.TransportIndicesSegmentsAction;
import org.opensearch.action.admin.indices.segments.TransportPitSegmentsAction;
import org.opensearch.action.admin.indices.settings.get.GetSettingsAction;
import org.opensearch.action.admin.indices.settings.get.TransportGetSettingsAction;
import org.opensearch.action.admin.indices.settings.put.TransportUpdateSettingsAction;
Expand Down Expand Up @@ -366,25 +368,7 @@
import org.opensearch.rest.action.admin.indices.RestUpgradeAction;
import org.opensearch.rest.action.admin.indices.RestUpgradeStatusAction;
import org.opensearch.rest.action.admin.indices.RestValidateQueryAction;
import org.opensearch.rest.action.cat.AbstractCatAction;
import org.opensearch.rest.action.cat.RestAliasAction;
import org.opensearch.rest.action.cat.RestAllocationAction;
import org.opensearch.rest.action.cat.RestCatAction;
import org.opensearch.rest.action.cat.RestCatRecoveryAction;
import org.opensearch.rest.action.cat.RestFielddataAction;
import org.opensearch.rest.action.cat.RestHealthAction;
import org.opensearch.rest.action.cat.RestIndicesAction;
import org.opensearch.rest.action.cat.RestMasterAction;
import org.opensearch.rest.action.cat.RestNodeAttrsAction;
import org.opensearch.rest.action.cat.RestNodesAction;
import org.opensearch.rest.action.cat.RestPluginsAction;
import org.opensearch.rest.action.cat.RestRepositoriesAction;
import org.opensearch.rest.action.cat.RestSegmentsAction;
import org.opensearch.rest.action.cat.RestShardsAction;
import org.opensearch.rest.action.cat.RestSnapshotAction;
import org.opensearch.rest.action.cat.RestTasksAction;
import org.opensearch.rest.action.cat.RestTemplatesAction;
import org.opensearch.rest.action.cat.RestThreadPoolAction;
import org.opensearch.rest.action.cat.*;
import org.opensearch.rest.action.document.RestBulkAction;
import org.opensearch.rest.action.document.RestDeleteAction;
import org.opensearch.rest.action.document.RestGetAction;
Expand Down Expand Up @@ -622,6 +606,8 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
actions.register(NodesReloadSecureSettingsAction.INSTANCE, TransportNodesReloadSecureSettingsAction.class);
actions.register(AutoCreateAction.INSTANCE, AutoCreateAction.TransportAction.class);

actions.register(PitSegmentsAction.INSTANCE, TransportPitSegmentsAction.class);

// Indexed scripts
actions.register(PutStoredScriptAction.INSTANCE, TransportPutStoredScriptAction.class);
actions.register(GetStoredScriptAction.INSTANCE, TransportGetStoredScriptAction.class);
Expand Down Expand Up @@ -835,6 +821,7 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
registerHandler.accept(new RestAliasAction());
registerHandler.accept(new RestThreadPoolAction());
registerHandler.accept(new RestPluginsAction());
registerHandler.accept(new RestPitSegmentsAction());
registerHandler.accept(new RestFielddataAction());
registerHandler.accept(new RestNodeAttrsAction());
registerHandler.accept(new RestRepositoriesAction());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.indices.segments;

import org.opensearch.action.ActionType;

public class PitSegmentsAction extends ActionType<IndicesSegmentResponse> {

public static final PitSegmentsAction INSTANCE = new PitSegmentsAction();
public static final String NAME = "indices:monitor/point_in_time/segments";

private PitSegmentsAction() {
super(NAME, IndicesSegmentResponse::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.indices.segments;

import org.opensearch.action.support.broadcast.BroadcastRequest;
import org.opensearch.common.Strings;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;

public class PitSegmentsRequest extends BroadcastRequest<PitSegmentsRequest> {

protected boolean verbose = false;
private Collection<String> pitIds;

public PitSegmentsRequest() {
this(Strings.EMPTY_ARRAY);
}

public PitSegmentsRequest(StreamInput in) throws IOException {
super(in);
pitIds = Arrays.asList(in.readStringArray());
verbose = in.readBoolean();
}

public PitSegmentsRequest(String... indices) {
super(indices);
pitIds = Collections.emptyList();
}

/**
* <code>true</code> if detailed information about each segment should be returned,
* <code>false</code> otherwise.
*/
public boolean verbose() {
return verbose;
}

/**
* Sets the <code>verbose</code> option.
* @see #verbose()
*/
public void verbose(boolean v) {
verbose = v;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
if (pitIds == null) {
out.writeVInt(0);
} else {
out.writeStringArray(pitIds.toArray(new String[pitIds.size()]));
}
out.writeBoolean(verbose);

}

public Collection<String> getPitIds() {
return pitIds;
}

public void setPitIds(Collection<String> pitIds) {
this.pitIds = pitIds;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@

/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.indices.segments;

import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.opensearch.action.ActionListener;
import org.opensearch.action.search.GetAllPitNodesRequest;
import org.opensearch.action.search.GetAllPitNodesResponse;
import org.opensearch.action.search.GetAllPitsAction;
import org.opensearch.action.search.ListPitInfo;
import org.opensearch.action.search.SearchContextId;
import org.opensearch.action.search.SearchContextIdForNode;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.DefaultShardOperationFailedException;
import org.opensearch.action.support.broadcast.node.TransportBroadcastByNodeAction;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.block.ClusterBlockException;
import org.opensearch.cluster.block.ClusterBlockLevel;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.*;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Strings;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.io.stream.NamedWriteableRegistry;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.xcontent.XContentBuilder;
import org.opensearch.index.IndexService;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.ShardId;
import org.opensearch.indices.IndicesService;
import org.opensearch.search.SearchService;
import org.opensearch.search.internal.PitReaderContext;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportException;
import org.opensearch.transport.TransportResponseHandler;
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.util.*;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import static org.opensearch.action.search.SearchContextId.decode;

public class TransportPitSegmentsAction extends TransportBroadcastByNodeAction<PitSegmentsRequest, IndicesSegmentResponse, ShardSegments> {

private final ClusterService clusterService;
private final IndicesService indicesService;
private final SearchService searchService;
private final NamedWriteableRegistry namedWriteableRegistry;
private final TransportService transportService;


@Inject
public TransportPitSegmentsAction(ClusterService clusterService, TransportService transportService,
IndicesService indicesService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
SearchService searchService,
NamedWriteableRegistry namedWriteableRegistry) {
super(PitSegmentsAction.NAME, clusterService, transportService, actionFilters, indexNameExpressionResolver,
PitSegmentsRequest::new, ThreadPool.Names.MANAGEMENT);
this.clusterService = clusterService;
this.indicesService = indicesService;
this.searchService = searchService;
this.namedWriteableRegistry = namedWriteableRegistry;
this.transportService = transportService;

}

@Override
protected void doExecute(Task task, PitSegmentsRequest request, ActionListener<IndicesSegmentResponse> listener) {
//TODO : Refactor to use a util method to "get all PITs" once such a method is implemented.
if (request.getPitIds().isEmpty()) {
final List<DiscoveryNode> nodes = new LinkedList<>();
for (ObjectCursor<DiscoveryNode> cursor : clusterService.state().nodes().getDataNodes().values()) {
DiscoveryNode node = cursor.value;
nodes.add(node);
}
DiscoveryNode[] disNodesArr = new DiscoveryNode[nodes.size()];
nodes.toArray(disNodesArr);
transportService.sendRequest(transportService.getLocalNode(), GetAllPitsAction.NAME,
new GetAllPitNodesRequest(disNodesArr), new TransportResponseHandler<GetAllPitNodesResponse>() {
@Override
public void handleResponse(GetAllPitNodesResponse response) {
request.setPitIds(response.getPITIDs().stream().map(ListPitInfo::getPitId).collect(Collectors.toList()));
getDoExecute(task, request, listener);
}

@Override
public void handleException(TransportException exp) {
listener.onFailure(exp);
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}

@Override
public GetAllPitNodesResponse read(StreamInput in) throws IOException {
return new GetAllPitNodesResponse(in);
}
});
} else {
getDoExecute(task, request, listener);
}
}

private void getDoExecute(Task task, PitSegmentsRequest request, ActionListener<IndicesSegmentResponse> listener) {
super.doExecute(task, request, listener);
}

@Override
protected ShardsIterator shards(ClusterState clusterState, PitSegmentsRequest request, String[] concreteIndices) {
final ArrayList<ShardRouting> iterators = new ArrayList<>();
ShardsIterator shardsIterator = clusterState.routingTable().allShards(concreteIndices);
for (String pitId : request.getPitIds()) {
SearchContextId searchContext = decode(namedWriteableRegistry, pitId);
for (Map.Entry<ShardId, SearchContextIdForNode> entry : searchContext.shards().entrySet()) {
final SearchContextIdForNode perNode = entry.getValue();
if (Strings.isEmpty(perNode.getClusterAlias())) {
final ShardId shardId = entry.getKey();
Optional<ShardRouting> shardRouting = shardsIterator.getShardRoutings().stream().filter(r -> r.shardId().equals(shardId)).findFirst();
ShardRouting sr = shardRouting.get();
PitAwareShardRouting psr = new PitAwareShardRouting(sr, pitId);
iterators.add(psr);
}
}
}
return new PlainShardsIterator(iterators);
// return clusterState.routingTable().allShards(concreteIndices);
}

@Override
protected ClusterBlockException checkGlobalBlock(ClusterState state, PitSegmentsRequest request) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
}

@Override
protected ClusterBlockException checkRequestBlock(ClusterState state, PitSegmentsRequest countRequest, String[] concreteIndices) {
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_READ, concreteIndices);
}

// @Override
// protected Writeable.Reader<TransportBroadcastByNodeAction<PitSegmentsRequest, IndicesSegmentResponse, ShardSegments>.NodeRequest> getNodeRequestWriteable() {
// return NodeRequest::new;
// }

@Override
protected ShardSegments readShardResult(StreamInput in) throws IOException {
return new ShardSegments(in);
}

@Override
protected IndicesSegmentResponse newResponse(PitSegmentsRequest request, int totalShards, int successfulShards, int failedShards,
List<ShardSegments> results, List<DefaultShardOperationFailedException> shardFailures,
ClusterState clusterState) {
return new IndicesSegmentResponse(results.toArray(new ShardSegments[results.size()]), totalShards, successfulShards, failedShards,
shardFailures);
}

@Override
protected PitSegmentsRequest readRequestFrom(StreamInput in) throws IOException {
return new PitSegmentsRequest(in);
}

@Override
public List<ShardRouting> getShardsFromInputStream(StreamInput in) throws IOException {
System.out.println("getShardsFromInputStream from TBBNA = " + in);
return in.readList(PitAwareShardRouting::new);
}

@Override
protected ShardSegments shardOperation(PitSegmentsRequest request, ShardRouting shardRouting) {

PitAwareShardRouting pitAwareShardRouting = (PitAwareShardRouting) shardRouting;
SearchContextIdForNode searchContextIdForNode = decode(namedWriteableRegistry,
pitAwareShardRouting.getPitId()).shards().get(shardRouting.shardId());
PitReaderContext pitReaderContext = searchService.getPitReaderContext(searchContextIdForNode.getSearchContextId());
return new ShardSegments(pitReaderContext.getShardRouting(), pitReaderContext.getSegments());
// for(String pitId : request.getPitIds()) {
// SearchContextIdForNode searchContextIdForNode = decode(namedWriteableRegistry,
// pitId).shards().get(shardRouting.shardId());
// if(searchContextIdForNode == null) continue;
// PitReaderContext pitReaderContext = searchService.getPitReaderContext(searchContextIdForNode.getSearchContextId());
// return new ShardSegments(pitReaderContext.getShardRouting(), pitReaderContext.getSegments());
//
// }
// return null;
}

public class PitAwareShardRouting extends ShardRouting {

private final String pitId;

public PitAwareShardRouting(StreamInput in) throws IOException {
super(in);
this.pitId = in.readString();
}

public PitAwareShardRouting(
ShardRouting shardRouting,
String pitId
) {
super(shardRouting.shardId(), shardRouting.currentNodeId(), shardRouting.relocatingNodeId(),
shardRouting.primary(), shardRouting.state(), shardRouting.recoverySource(),
shardRouting.unassignedInfo(),
shardRouting.allocationId(), shardRouting.getExpectedShardSize());
this.pitId = pitId;
}

public String getPitId() {
return pitId;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(pitId);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
super.toXContent(builder, params);
builder.field("pitId", pitId);
return builder.endObject();
}
}


}
Loading