Skip to content

Commit

Permalink
List all PITs API
Browse files Browse the repository at this point in the history
Signed-off-by: Bharathwaj G <bharath78910@gmail.com>
  • Loading branch information
bharath-techie committed May 5, 2022
1 parent fa1f2cd commit da3d6ee
Show file tree
Hide file tree
Showing 11 changed files with 533 additions and 0 deletions.
7 changes: 7 additions & 0 deletions server/src/main/java/org/opensearch/action/ActionModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -238,12 +238,14 @@
import org.opensearch.action.search.ClearScrollAction;
import org.opensearch.action.search.CreatePITAction;
import org.opensearch.action.search.DeletePITAction;
import org.opensearch.action.search.GetAllPITsAction;
import org.opensearch.action.search.MultiSearchAction;
import org.opensearch.action.search.SearchAction;
import org.opensearch.action.search.SearchScrollAction;
import org.opensearch.action.search.TransportClearScrollAction;
import org.opensearch.action.search.TransportCreatePITAction;
import org.opensearch.action.search.TransportDeletePITAction;
import org.opensearch.action.search.TransportGetAllPITsAction;
import org.opensearch.action.search.TransportMultiSearchAction;
import org.opensearch.action.search.TransportSearchAction;
import org.opensearch.action.search.TransportSearchScrollAction;
Expand Down Expand Up @@ -407,6 +409,7 @@
import org.opensearch.rest.action.search.RestCreatePITAction;
import org.opensearch.rest.action.search.RestDeletePITAction;
import org.opensearch.rest.action.search.RestExplainAction;
import org.opensearch.rest.action.search.RestGetAllPITsAction;
import org.opensearch.rest.action.search.RestMultiSearchAction;
import org.opensearch.rest.action.search.RestSearchAction;
import org.opensearch.rest.action.search.RestSearchScrollAction;
Expand Down Expand Up @@ -666,8 +669,11 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
actions.register(ImportDanglingIndexAction.INSTANCE, TransportImportDanglingIndexAction.class);
actions.register(DeleteDanglingIndexAction.INSTANCE, TransportDeleteDanglingIndexAction.class);
actions.register(FindDanglingIndexAction.INSTANCE, TransportFindDanglingIndexAction.class);

// Point in time actions
actions.register(CreatePITAction.INSTANCE, TransportCreatePITAction.class);
actions.register(DeletePITAction.INSTANCE, TransportDeletePITAction.class);
actions.register(GetAllPITsAction.INSTANCE, TransportGetAllPITsAction.class);

return unmodifiableMap(actions.getRegistry());
}
Expand Down Expand Up @@ -844,6 +850,7 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
// Point in time API
registerHandler.accept(new RestCreatePITAction());
registerHandler.accept(new RestDeletePITAction());
registerHandler.accept(new RestGetAllPITsAction());
for (ActionPlugin plugin : actionPlugins) {
for (RestHandler handler : plugin.getRestHandlers(
settings,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.search;

import org.opensearch.action.support.nodes.BaseNodeRequest;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;

import java.io.IOException;

/**
* Request to get all active PITs in a node
*/
public class GetAllPITNodeRequest extends BaseNodeRequest {
GetAllPITNodesRequest request;

@Inject
public GetAllPITNodeRequest(GetAllPITNodesRequest request) {
this.request = request;
}

public GetAllPITNodeRequest(StreamInput in) throws IOException {
super(in);
request = new GetAllPITNodesRequest(in);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
request.writeTo(out);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.search;

import org.opensearch.action.support.nodes.BaseNodeResponse;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.xcontent.ToXContentFragment;
import org.opensearch.common.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.Collections;
import java.util.List;

/**
* Response which holds information about all PIT contexts in a node
*/
public class GetAllPITNodeResponse extends BaseNodeResponse implements ToXContentFragment {
private List<PitInfo> pitsInfo;

@Inject
public GetAllPITNodeResponse(StreamInput in, List<PitInfo> pitsInfo) throws IOException {
super(in);
this.pitsInfo = pitsInfo;
}

public GetAllPITNodeResponse(DiscoveryNode node, List<PitInfo> pitsInfo) {
super(node);
this.pitsInfo = pitsInfo;
}

public GetAllPITNodeResponse(StreamInput in) throws IOException {
super(in);
this.pitsInfo = Collections.unmodifiableList(in.readList(PitInfo::new));
}

public List<PitInfo> getPitsInfo() {
return pitsInfo;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeList(pitsInfo);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.startArray("pitsInfo");
for (PitInfo pit : pitsInfo) {
pit.toXContent(builder, params);
}
builder.endArray();
builder.endObject();
return builder;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.search;

import org.opensearch.action.support.nodes.BaseNodesRequest;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;

import java.io.IOException;

/**
* Request to get all active PIT IDs in set of nodes
*/
public class GetAllPITNodesRequest extends BaseNodesRequest<GetAllPITNodesRequest> {
@Inject
public GetAllPITNodesRequest(DiscoveryNode... concreteNodes) {
super(concreteNodes);
}

public GetAllPITNodesRequest(StreamInput in) throws IOException {
super(in);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.search;

import org.opensearch.action.FailedNodeException;
import org.opensearch.action.support.nodes.BaseNodesResponse;
import org.opensearch.cluster.ClusterName;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.xcontent.ToXContentObject;
import org.opensearch.common.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

/**
* Response structure to hold all active PIT contexts information from all nodes
*/
public class GetAllPITNodesResponse extends BaseNodesResponse<GetAllPITNodeResponse> implements ToXContentObject {

List<PitInfo> pitsInfo = new ArrayList<>();

@Inject
public GetAllPITNodesResponse(StreamInput in) throws IOException {
super(in);
}

public GetAllPITNodesResponse(
ClusterName clusterName,
List<GetAllPITNodeResponse> getAllPITNodeResponses,
List<FailedNodeException> failures
) {
super(clusterName, getAllPITNodeResponses, failures);
Set<String> uniquePitIds = new HashSet<>();
pitsInfo.addAll(
getAllPITNodeResponses.stream()
.flatMap(p -> p.getPitsInfo().stream().filter(t -> uniquePitIds.add(t.getPitId())))
.collect(Collectors.toList())
);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.startArray("pitsInfo");
for (PitInfo pit : pitsInfo) {
pit.toXContent(builder, params);
}
builder.endArray();
builder.endObject();
return builder;
}

@Override
public List<GetAllPITNodeResponse> readNodesFrom(StreamInput in) throws IOException {
return in.readList(GetAllPITNodeResponse::new);
}

@Override
public void writeNodesTo(StreamOutput out, List<GetAllPITNodeResponse> nodes) throws IOException {
out.writeList(nodes);
}

public List<PitInfo> getPITIDs() {
return new ArrayList<>(pitsInfo);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.search;

import org.opensearch.action.ActionType;

/**
* Action type for listing all PIT reader contexts
*/
public class GetAllPITsAction extends ActionType<GetAllPITNodesResponse> {
public static final GetAllPITsAction INSTANCE = new GetAllPITsAction();
public static final String NAME = "indices:data/readall/pit";

private GetAllPITsAction() {
super(NAME, GetAllPITNodesResponse::new);
}
}
58 changes: 58 additions & 0 deletions server/src/main/java/org/opensearch/action/search/PitInfo.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.search;

import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.io.stream.Writeable;
import org.opensearch.common.xcontent.ToXContentFragment;
import org.opensearch.common.xcontent.XContentBuilder;

import java.io.IOException;

/**
* This holds information about pit reader context such as pit id and creation time
*/
public class PitInfo implements ToXContentFragment, Writeable {
private final String pitId;
private final long creationTime;

public PitInfo(String pitId, long creationTime) {
this.pitId = pitId;
this.creationTime = creationTime;
}

public PitInfo(StreamInput in) throws IOException {
this.pitId = in.readString();
this.creationTime = in.readLong();
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("pitId", pitId);
builder.field("creationTime", creationTime);
builder.endObject();
return builder;
}

public String getPitId() {
return pitId;
}

public long getCreationTime() {
return creationTime;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(pitId);
out.writeLong(creationTime);
}
}
Loading

0 comments on commit da3d6ee

Please sign in to comment.