Skip to content

Commit

Permalink
changes as per new security model
Browse files Browse the repository at this point in the history
Signed-off-by: Bharathwaj G <bharath78910@gmail.com>
  • Loading branch information
bharath-techie committed Sep 20, 2022
1 parent fc48cfb commit bb950d3
Show file tree
Hide file tree
Showing 22 changed files with 559 additions and 396 deletions.
9 changes: 3 additions & 6 deletions server/src/main/java/org/opensearch/action/ActionModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -238,16 +238,14 @@
import org.opensearch.action.search.ClearScrollAction;
import org.opensearch.action.search.CreatePitAction;
import org.opensearch.action.search.DeletePitAction;
import org.opensearch.action.search.GetAllPitsAction;
import org.opensearch.action.search.MultiSearchAction;
import org.opensearch.action.search.NodesGetAllPitsAction;
import org.opensearch.action.search.GetAllPitsAction;
import org.opensearch.action.search.SearchAction;
import org.opensearch.action.search.SearchScrollAction;
import org.opensearch.action.search.TransportClearScrollAction;
import org.opensearch.action.search.TransportCreatePitAction;
import org.opensearch.action.search.TransportDeletePitAction;
import org.opensearch.action.search.TransportGetAllPitsAction;
import org.opensearch.action.search.TransportNodesGetAllPitsAction;
import org.opensearch.action.search.TransportMultiSearchAction;
import org.opensearch.action.search.TransportSearchAction;
import org.opensearch.action.search.TransportSearchScrollAction;
Expand Down Expand Up @@ -677,10 +675,9 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg

// point in time actions
actions.register(CreatePitAction.INSTANCE, TransportCreatePitAction.class);
actions.register(GetAllPitsAction.INSTANCE, TransportGetAllPitsAction.class);
actions.register(DeletePitAction.INSTANCE, TransportDeletePitAction.class);
actions.register(PitSegmentsAction.INSTANCE, TransportPitSegmentsAction.class);
actions.register(NodesGetAllPitsAction.INSTANCE, TransportNodesGetAllPitsAction.class);
actions.register(GetAllPitsAction.INSTANCE, TransportGetAllPitsAction.class);

// Remote Store
actions.register(RestoreRemoteStoreAction.INSTANCE, TransportRestoreRemoteStoreAction.class);
Expand Down Expand Up @@ -859,7 +856,7 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {

// Point in time API
registerHandler.accept(new RestCreatePitAction());
registerHandler.accept(new RestDeletePitAction(nodesInCluster));
registerHandler.accept(new RestDeletePitAction());
registerHandler.accept(new RestGetAllPitsAction(nodesInCluster));
registerHandler.accept(new RestPitSegmentsAction(nodesInCluster));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package org.opensearch.action.admin.indices.segments;

import org.opensearch.action.ActionListener;
import org.opensearch.action.search.ListPitInfo;
import org.opensearch.action.search.PitService;
import org.opensearch.action.search.SearchContextId;
import org.opensearch.action.search.SearchContextIdForNode;
Expand Down Expand Up @@ -45,6 +46,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.opensearch.action.search.SearchContextId.decode;

Expand Down Expand Up @@ -93,10 +95,11 @@ public TransportPitSegmentsAction(
@Override
protected void doExecute(Task task, PitSegmentsRequest request, ActionListener<IndicesSegmentResponse> listener) {
List<String> pitIds = request.getPitIds();
// when security plugin intercepts the request, if PITs are not present in the cluster the PIT IDs in request will be empty
// and in this case return empty response
if (pitIds.isEmpty()) {
listener.onResponse(new IndicesSegmentResponse(new ShardSegments[] {}, 0, 0, 0, new ArrayList<>()));
if (pitIds.size() == 1 && "_all".equals(pitIds.get(0))) {
pitService.getAllPits(ActionListener.wrap(response -> {
request.clearAndSetPitIds(response.getPitInfos().stream().map(ListPitInfo::getPitId).collect(Collectors.toList()));
super.doExecute(task, request, listener);
}, listener::onFailure));
} else {
super.doExecute(task, request, listener);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,11 @@
*/
public class GetAllPitNodesRequest extends BaseNodesRequest<GetAllPitNodesRequest> {

// Security plugin intercepts and sets the response with permitted PIT contexts
private GetAllPitNodesResponse getAllPitNodesResponse;

@Inject
public GetAllPitNodesRequest(DiscoveryNode... concreteNodes) {
super(concreteNodes);
}

public void setGetAllPitNodesResponse(GetAllPitNodesResponse getAllPitNodesResponse) {
this.getAllPitNodesResponse = getAllPitNodesResponse;
}

public GetAllPitNodesResponse getGetAllPitNodesResponse() {
return getAllPitNodesResponse;
}

public GetAllPitNodesRequest(StreamInput in) throws IOException {
super(in);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,10 @@ public GetAllPitNodesResponse(List<ListPitInfo> listPitInfos, GetAllPitNodesResp
public GetAllPitNodesResponse(
List<ListPitInfo> listPitInfos,
ClusterName clusterName,
List<GetAllPitNodeResponse> getAllPitNodeResponse,
List<GetAllPitNodeResponse> getAllPitNodeResponseList,
List<FailedNodeException> failures
) {
super(clusterName, getAllPitNodeResponse, failures);
super(clusterName, getAllPitNodeResponseList, failures);
pitInfos.addAll(listPitInfos);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import org.opensearch.action.ActionType;

/**
* Action type for listing all PIT reader contexts
* Action type for retrieving all PIT reader contexts from nodes
*/
public class GetAllPitsAction extends ActionType<GetAllPitNodesResponse> {
public static final GetAllPitsAction INSTANCE = new GetAllPitsAction();
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -170,16 +170,14 @@ public Map<String, String[]> getIndicesForPits(List<String> pitIds) {
/**
* Get all active point in time contexts
*/
public void getAllPits(GetAllPitNodesResponse getAllPitNodesResponse, ActionListener<GetAllPitNodesResponse> getAllPitsListener) {
public void getAllPits(ActionListener<GetAllPitNodesResponse> getAllPitsListener) {
final List<DiscoveryNode> nodes = new ArrayList<>();
for (ObjectCursor<DiscoveryNode> cursor : clusterService.state().nodes().getDataNodes().values()) {
DiscoveryNode node = cursor.value;
nodes.add(node);
}
logger.debug("Number of active PITs in cluster: " + getAllPitNodesResponse.getPitInfos().size());
DiscoveryNode[] disNodesArr = nodes.toArray(new DiscoveryNode[nodes.size()]);
GetAllPitNodesRequest getAllPitNodesRequest = new GetAllPitNodesRequest(disNodesArr);
getAllPitNodesRequest.setGetAllPitNodesResponse(getAllPitNodesResponse);
transportService.sendRequest(
transportService.getLocalNode(),
GetAllPitsAction.NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.opensearch.action.ActionListener;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.io.stream.NamedWriteableRegistry;
import org.opensearch.tasks.Task;
Expand All @@ -21,6 +20,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
* Transport action for deleting point in time searches - supports deleting list and all point in time searches
Expand All @@ -34,9 +34,6 @@ public TransportDeletePitAction(
TransportService transportService,
ActionFilters actionFilters,
NamedWriteableRegistry namedWriteableRegistry,
TransportSearchAction transportSearchAction,
ClusterService clusterService,
SearchTransportService searchTransportService,
PitService pitService
) {
super(DeletePitAction.NAME, transportService, actionFilters, DeletePitRequest::new);
Expand All @@ -50,10 +47,8 @@ public TransportDeletePitAction(
@Override
protected void doExecute(Task task, DeletePitRequest request, ActionListener<DeletePitResponse> listener) {
List<String> pitIds = request.getPitIds();
// when security plugin intercepts the request, if PITs are not present in the cluster the PIT IDs in request will be empty
// and in this case return empty response
if (pitIds.isEmpty()) {
listener.onResponse(new DeletePitResponse(new ArrayList<>()));
if (pitIds.size() == 1 && "_all".equals(pitIds.get(0))) {
deleteAllPits(listener);
} else {
deletePits(listener, request);
}
Expand All @@ -75,4 +70,24 @@ private void deletePits(ActionListener<DeletePitResponse> listener, DeletePitReq
}
pitService.deletePitContexts(nodeToContextsMap, listener);
}

/**
* Delete all active PIT reader contexts leveraging list all PITs
*
* For Cross cluster PITs :
* - mixed cluster PITs ( PIT comprising local and remote ) will be fully deleted. Since there will atleast be
* one reader context with PIT ID present in local cluster, 'Get all PITs' will retrieve the PIT ID with which
* we can completely delete the PIT contexts in both local and remote cluster.
* - fully remote PITs will not be deleted as 'Get all PITs' operates on local cluster only and no PIT info can
* be retrieved when it's fully remote.
*/
private void deleteAllPits(ActionListener<DeletePitResponse> listener) {
// Get all PITs and execute delete operation for the PITs.
pitService.getAllPits(ActionListener.wrap(getAllPitNodesResponse -> {
DeletePitRequest deletePitRequest = new DeletePitRequest(
getAllPitNodesResponse.getPitInfos().stream().map(r -> r.getPitId()).collect(Collectors.toList())
);
deletePits(listener, deletePitRequest);
}, listener::onFailure));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,79 @@

package org.opensearch.action.search;

import org.opensearch.action.ActionListener;
import org.opensearch.action.FailedNodeException;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.action.support.nodes.TransportNodesAction;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.tasks.Task;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.search.SearchService;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.util.List;

/**
* Transport action to get all active PIT contexts across the cluster
* Transport action to get all active PIT contexts across all nodes
*/
public class TransportGetAllPitsAction extends HandledTransportAction<GetAllPitNodesRequest, GetAllPitNodesResponse> {
private final PitService pitService;
public class TransportGetAllPitsAction extends TransportNodesAction<
GetAllPitNodesRequest,
GetAllPitNodesResponse,
GetAllPitNodeRequest,
GetAllPitNodeResponse> {
private final SearchService searchService;

@Inject
public TransportGetAllPitsAction(ActionFilters actionFilters, TransportService transportService, PitService pitService) {
super(GetAllPitsAction.NAME, transportService, actionFilters, in -> new GetAllPitNodesRequest(in));
this.pitService = pitService;
public TransportGetAllPitsAction(
ThreadPool threadPool,
ClusterService clusterService,
TransportService transportService,
ActionFilters actionFilters,
SearchService searchService
) {
super(
GetAllPitsAction.NAME,
threadPool,
clusterService,
transportService,
actionFilters,
GetAllPitNodesRequest::new,
GetAllPitNodeRequest::new,
ThreadPool.Names.SAME,
GetAllPitNodeResponse.class
);
this.searchService = searchService;
}

@Override
protected GetAllPitNodesResponse newResponse(
GetAllPitNodesRequest request,
List<GetAllPitNodeResponse> getAllPitNodeResponses,
List<FailedNodeException> failures
) {
return new GetAllPitNodesResponse(clusterService.getClusterName(), getAllPitNodeResponses, failures);
}

@Override
protected GetAllPitNodeRequest newNodeRequest(GetAllPitNodesRequest request) {
return new GetAllPitNodeRequest();
}

@Override
protected GetAllPitNodeResponse newNodeResponse(StreamInput in) throws IOException {
return new GetAllPitNodeResponse(in);
}

protected void doExecute(Task task, GetAllPitNodesRequest request, ActionListener<GetAllPitNodesResponse> listener) {
// If security plugin intercepts the request, it'll replace all PIT IDs with permitted PIT IDs
if (request.getGetAllPitNodesResponse() != null) {
listener.onResponse(request.getGetAllPitNodesResponse());
}
/**
* This retrieves all active PITs in the node
*/
@Override
protected GetAllPitNodeResponse nodeOperation(GetAllPitNodeRequest request) {
GetAllPitNodeResponse nodeResponse = new GetAllPitNodeResponse(
transportService.getLocalNode(),
searchService.getAllPITReaderContexts()
);
return nodeResponse;
}
}
Loading

0 comments on commit bb950d3

Please sign in to comment.