Skip to content

Commit

Permalink
List all PITs service layer changes (#4016)
Browse files Browse the repository at this point in the history
* List all pits service layer changes

Signed-off-by: Bharathwaj G <bharath78910@gmail.com>
  • Loading branch information
bharath-techie committed Aug 3, 2022
1 parent 66c24ff commit 6f43dbc
Show file tree
Hide file tree
Showing 21 changed files with 793 additions and 107 deletions.
3 changes: 3 additions & 0 deletions server/src/main/java/org/opensearch/action/ActionModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -234,12 +234,14 @@
import org.opensearch.action.search.ClearScrollAction;
import org.opensearch.action.search.CreatePitAction;
import org.opensearch.action.search.DeletePitAction;
import org.opensearch.action.search.GetAllPitsAction;
import org.opensearch.action.search.MultiSearchAction;
import org.opensearch.action.search.SearchAction;
import org.opensearch.action.search.SearchScrollAction;
import org.opensearch.action.search.TransportClearScrollAction;
import org.opensearch.action.search.TransportCreatePitAction;
import org.opensearch.action.search.TransportDeletePitAction;
import org.opensearch.action.search.TransportGetAllPitsAction;
import org.opensearch.action.search.TransportMultiSearchAction;
import org.opensearch.action.search.TransportSearchAction;
import org.opensearch.action.search.TransportSearchScrollAction;
Expand Down Expand Up @@ -663,6 +665,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg

// point in time actions
actions.register(CreatePitAction.INSTANCE, TransportCreatePitAction.class);
actions.register(GetAllPitsAction.INSTANCE, TransportGetAllPitsAction.class);
actions.register(DeletePitAction.INSTANCE, TransportDeletePitAction.class);

return unmodifiableMap(actions.getRegistry());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@

/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.search;

import org.opensearch.action.support.nodes.BaseNodeRequest;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;

import java.io.IOException;

/**
* Inner node get all pits request
*/
public class GetAllPitNodeRequest extends BaseNodeRequest {

public GetAllPitNodeRequest() {
super();
}

public GetAllPitNodeRequest(StreamInput in) throws IOException {
super(in);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@

/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.search;

import org.opensearch.action.support.nodes.BaseNodeResponse;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.xcontent.ToXContentFragment;
import org.opensearch.common.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.Collections;
import java.util.List;

/**
* Inner node get all pits response
*/
public class GetAllPitNodeResponse extends BaseNodeResponse implements ToXContentFragment {

/**
* List of active PITs in the associated node
*/
private final List<ListPitInfo> pitInfos;

public GetAllPitNodeResponse(DiscoveryNode node, List<ListPitInfo> pitInfos) {
super(node);
if (pitInfos == null) {
throw new IllegalArgumentException("Pits info cannot be null");
}
this.pitInfos = Collections.unmodifiableList(pitInfos);
}

public GetAllPitNodeResponse(StreamInput in) throws IOException {
super(in);
this.pitInfos = Collections.unmodifiableList(in.readList(ListPitInfo::new));
}

public List<ListPitInfo> getPitInfos() {
return pitInfos;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeList(pitInfos);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("node", this.getNode().getName());
builder.startArray("pitInfos");
for (ListPitInfo pit : pitInfos) {
pit.toXContent(builder, params);
}

builder.endArray();
builder.endObject();
return builder;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.search;

import org.opensearch.action.support.nodes.BaseNodesRequest;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;

import java.io.IOException;

/**
* Request to get all active PIT IDs from all nodes of cluster
*/
public class GetAllPitNodesRequest extends BaseNodesRequest<GetAllPitNodesRequest> {

@Inject
public GetAllPitNodesRequest(DiscoveryNode... concreteNodes) {
super(concreteNodes);
}

public GetAllPitNodesRequest(StreamInput in) throws IOException {
super(in);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.search;

import org.opensearch.action.FailedNodeException;
import org.opensearch.action.support.nodes.BaseNodesResponse;
import org.opensearch.cluster.ClusterName;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.xcontent.ToXContentObject;
import org.opensearch.common.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

/**
* This class transforms active PIT objects from all nodes to unique PIT objects
*/
public class GetAllPitNodesResponse extends BaseNodesResponse<GetAllPitNodeResponse> implements ToXContentObject {

/**
* List of unique PITs across all nodes
*/
private final Set<ListPitInfo> pitInfos = new HashSet<>();

public GetAllPitNodesResponse(StreamInput in) throws IOException {
super(in);
}

public GetAllPitNodesResponse(
ClusterName clusterName,
List<GetAllPitNodeResponse> getAllPitNodeResponse,
List<FailedNodeException> failures
) {
super(clusterName, getAllPitNodeResponse, failures);
Set<String> uniquePitIds = new HashSet<>();
pitInfos.addAll(
getAllPitNodeResponse.stream()
.flatMap(p -> p.getPitInfos().stream().filter(t -> uniquePitIds.add(t.getPitId())))
.collect(Collectors.toList())
);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.startArray("pitInfos");
for (ListPitInfo pit : pitInfos) {
pit.toXContent(builder, params);
}
builder.endArray();
builder.endObject();
return builder;
}

@Override
public List<GetAllPitNodeResponse> readNodesFrom(StreamInput in) throws IOException {
return in.readList(GetAllPitNodeResponse::new);
}

@Override
public void writeNodesTo(StreamOutput out, List<GetAllPitNodeResponse> nodes) throws IOException {
out.writeList(nodes);
}

public List<ListPitInfo> getPitInfos() {
return Collections.unmodifiableList(new ArrayList<>(pitInfos));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.search;

import org.opensearch.action.ActionType;

/**
* Action type for listing all PIT reader contexts
*/
public class GetAllPitsAction extends ActionType<GetAllPitNodesResponse> {
public static final GetAllPitsAction INSTANCE = new GetAllPitsAction();
public static final String NAME = "indices:data/read/point_in_time/readall";

private GetAllPitsAction() {
super(NAME, GetAllPitNodesResponse::new);
}
}
63 changes: 63 additions & 0 deletions server/src/main/java/org/opensearch/action/search/ListPitInfo.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.search;

import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.io.stream.Writeable;
import org.opensearch.common.xcontent.ToXContentFragment;
import org.opensearch.common.xcontent.XContentBuilder;

import java.io.IOException;

/**
* This holds information about pit reader context such as pit id and creation time
*/
public class ListPitInfo implements ToXContentFragment, Writeable {
private final String pitId;
private final long creationTime;
private final long keepAlive;

public ListPitInfo(String pitId, long creationTime, long keepAlive) {
this.pitId = pitId;
this.creationTime = creationTime;
this.keepAlive = keepAlive;
}

public ListPitInfo(StreamInput in) throws IOException {
this.pitId = in.readString();
this.creationTime = in.readLong();
this.keepAlive = in.readLong();
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("pitId", pitId);
builder.field("creationTime", creationTime);
builder.field("keepAlive", keepAlive);
builder.endObject();
return builder;
}

public String getPitId() {
return pitId;
}

public long getCreationTime() {
return creationTime;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(pitId);
out.writeLong(creationTime);
out.writeLong(keepAlive);
}
}
Loading

0 comments on commit 6f43dbc

Please sign in to comment.