Skip to content

Commit

Permalink
two phase create pit
Browse files Browse the repository at this point in the history
Signed-off-by: Bharathwaj G <bharath78910@gmail.com>
  • Loading branch information
bharath-techie committed Apr 4, 2022
1 parent 0d1f337 commit 72bd39b
Show file tree
Hide file tree
Showing 11 changed files with 536 additions and 180 deletions.
21 changes: 7 additions & 14 deletions server/src/main/java/org/opensearch/action/ActionModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -235,14 +235,7 @@
import org.opensearch.action.ingest.SimulatePipelineTransportAction;
import org.opensearch.action.main.MainAction;
import org.opensearch.action.main.TransportMainAction;
import org.opensearch.action.search.ClearScrollAction;
import org.opensearch.action.search.MultiSearchAction;
import org.opensearch.action.search.SearchAction;
import org.opensearch.action.search.SearchScrollAction;
import org.opensearch.action.search.TransportClearScrollAction;
import org.opensearch.action.search.TransportMultiSearchAction;
import org.opensearch.action.search.TransportSearchAction;
import org.opensearch.action.search.TransportSearchScrollAction;
import org.opensearch.action.search.*;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.AutoCreateIndex;
import org.opensearch.action.support.DestructiveOperations;
Expand Down Expand Up @@ -398,12 +391,7 @@
import org.opensearch.rest.action.ingest.RestGetPipelineAction;
import org.opensearch.rest.action.ingest.RestPutPipelineAction;
import org.opensearch.rest.action.ingest.RestSimulatePipelineAction;
import org.opensearch.rest.action.search.RestClearScrollAction;
import org.opensearch.rest.action.search.RestCountAction;
import org.opensearch.rest.action.search.RestExplainAction;
import org.opensearch.rest.action.search.RestMultiSearchAction;
import org.opensearch.rest.action.search.RestSearchAction;
import org.opensearch.rest.action.search.RestSearchScrollAction;
import org.opensearch.rest.action.search.*;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.usage.UsageService;
Expand Down Expand Up @@ -660,6 +648,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
actions.register(ImportDanglingIndexAction.INSTANCE, TransportImportDanglingIndexAction.class);
actions.register(DeleteDanglingIndexAction.INSTANCE, TransportDeleteDanglingIndexAction.class);
actions.register(FindDanglingIndexAction.INSTANCE, TransportFindDanglingIndexAction.class);
actions.register(CreatePITAction.INSTANCE, TransportCreatePITAction.class);

return unmodifiableMap(actions.getRegistry());
}
Expand Down Expand Up @@ -832,6 +821,10 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
registerHandler.accept(new RestRepositoriesAction());
registerHandler.accept(new RestSnapshotAction());
registerHandler.accept(new RestTemplatesAction());

// Point in time API
registerHandler.accept(new RestCreatePITAction());

for (ActionPlugin plugin : actionPlugins) {
for (RestHandler handler : plugin.getRestHandlers(
settings,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@

import org.opensearch.action.ActionType;

public class CreatePITAction extends ActionType<PITResponse> {
public class CreatePITAction extends ActionType<SearchResponse> {
public static final CreatePITAction INSTANCE = new CreatePITAction();
public static final String NAME = "indices:data/read/pit";

private CreatePITAction() {
super(NAME, PITResponse::new);
super(NAME, SearchResponse::new);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,43 @@
import java.util.Map;
import java.util.Objects;

public class PITRequest extends ActionRequest implements IndicesRequest.Replaceable {
public PITRequest(TimeValue keepAlive) {
public class CreatePITRequest extends ActionRequest implements IndicesRequest.Replaceable {

private TimeValue keepAlive;
private final boolean allowPartialPitCreation;
@Nullable
private String routing = null;
@Nullable
private String preference = null;
private String[] indices = Strings.EMPTY_ARRAY;
private IndicesOptions indicesOptions = SearchRequest.DEFAULT_INDICES_OPTIONS;


public CreatePITRequest(TimeValue keepAlive, boolean allowPartialPitCreation) {
this.keepAlive = keepAlive;
this.allowPartialPitCreation = allowPartialPitCreation;
}

public CreatePITRequest(StreamInput in) throws IOException {
super(in);
indices = in.readStringArray();
indicesOptions = IndicesOptions.readIndicesOptions(in);
routing = in.readOptionalString();
preference = in.readOptionalString();
keepAlive = in.readTimeValue();
routing = in.readOptionalString();
allowPartialPitCreation = in.readBoolean();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeStringArray(indices);
indicesOptions.writeIndicesOptions(out);
out.writeOptionalString(preference);
out.writeTimeValue(keepAlive);
out.writeOptionalString(routing);
out.writeBoolean(allowPartialPitCreation);
}

public String getRouting() {
Expand All @@ -49,55 +83,26 @@ public TimeValue getKeepAlive() {
return keepAlive;
}

private TimeValue keepAlive;

public PITRequest(StreamInput in) throws IOException {
super(in);
indices = in.readStringArray();
indicesOptions = IndicesOptions.readIndicesOptions(in);
routing = in.readOptionalString();
preference = in.readOptionalString();
keepAlive = in.readTimeValue();
routing = in.readOptionalString();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeStringArray(indices);
indicesOptions.writeIndicesOptions(out);
out.writeOptionalString(preference);
out.writeTimeValue(keepAlive);
out.writeOptionalString(routing);
public boolean isAllowPartialPitCreation() {
return allowPartialPitCreation;
}

public void setRouting(String routing) {
this.routing = routing;
}

@Nullable
private String routing = null;

public void setPreference(String preference) {
this.preference = preference;
}

@Nullable
private String preference = null;

public void setIndices(String[] indices) {
this.indices = indices;
}

private String[] indices = Strings.EMPTY_ARRAY;

public void setIndicesOptions(IndicesOptions indicesOptions) {
this.indicesOptions = Objects.requireNonNull(indicesOptions, "indicesOptions must not be null");
}

private IndicesOptions indicesOptions = SearchRequest.DEFAULT_INDICES_OPTIONS;


@Override
public ActionRequestValidationException validate() {
return null;
Expand All @@ -113,8 +118,6 @@ public IndicesOptions indicesOptions() {
return indicesOptions;
}



public void setKeepAlive(TimeValue keepAlive) {
this.keepAlive = keepAlive;
}
Expand All @@ -128,7 +131,7 @@ public Task createTask(long id, String type, String action, TaskId parentTaskId,
* Sets the indices the search will be executed on.
*/
@Override
public PITRequest indices(String... indices) {
public CreatePITRequest indices(String... indices) {
SearchRequest.validateIndices(indices);
this.indices = indices;
return this;
Expand Down
57 changes: 0 additions & 57 deletions server/src/main/java/org/opensearch/action/search/PITResponse.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ public class SearchTransportService {
public static final String FETCH_ID_ACTION_NAME = "indices:data/read/search[phase/fetch/id]";
public static final String QUERY_CAN_MATCH_NAME = "indices:data/read/search[can_match]";
public static final String CREATE_READER_CONTEXT_ACTION_NAME = "indices:data/read/search[create_context]";
public static final String UPDATE_READER_CONTEXT_ACTION_NAME = "indices:data/read/search[update_context]";

private final TransportService transportService;
private final BiFunction<Transport.Connection, SearchActionListener, ActionListener> responseWrapper;
Expand Down Expand Up @@ -141,6 +142,20 @@ public void sendFreeContext(
);
}

public void updatePitContext(
Transport.Connection connection,
TransportCreatePITAction.UpdatePITReaderRequest request,
ActionListener<TransportCreatePITAction.UpdatePitContextResponse> actionListener) {
transportService.sendRequest(
connection,
UPDATE_READER_CONTEXT_ACTION_NAME,
request,
TransportRequestOptions.EMPTY,
new ActionListenerResponseHandler<TransportCreatePITAction.UpdatePitContextResponse>(actionListener,
TransportCreatePITAction.UpdatePitContextResponse::new)
);
}

public void sendCanMatch(
Transport.Connection connection,
final ShardSearchRequest request,
Expand Down Expand Up @@ -308,7 +323,7 @@ public Map<String, Long> getPendingSearchRequests() {
return new HashMap<>(clientConnections);
}

static class ScrollFreeContextRequest extends TransportRequest {
static class ScrollFreeContextRequest extends TransportRequest {
private ShardSearchContextId contextId;

ScrollFreeContextRequest(ShardSearchContextId contextId) {
Expand Down Expand Up @@ -546,12 +561,50 @@ public static void registerRequestHandler(TransportService transportService, Sea
}
);
TransportActionProxy.registerProxyAction(transportService, QUERY_CAN_MATCH_NAME, SearchService.CanMatchResponse::new);
transportService.registerRequestHandler(CREATE_READER_CONTEXT_ACTION_NAME, ThreadPool.Names.SAME, TransportCreatePITAction.CreateReaderContextRequest::new,
transportService.registerRequestHandler(
CREATE_READER_CONTEXT_ACTION_NAME,
ThreadPool.Names.SAME,
TransportCreatePITAction.CreateReaderContextRequest::new,
(request, channel, task) -> {
ChannelActionListener<TransportCreatePITAction.CreateReaderContextResponse, TransportCreatePITAction.CreateReaderContextRequest> listener = new ChannelActionListener<>(channel, CREATE_READER_CONTEXT_ACTION_NAME, request);
searchService.openReaderContext(request.getShardId(), request.getKeepAlive(), ActionListener.wrap(r -> listener.onResponse(new TransportCreatePITAction.CreateReaderContextResponse(r)), listener::onFailure));
ChannelActionListener<
TransportCreatePITAction.CreateReaderContextResponse,
TransportCreatePITAction.CreateReaderContextRequest>
listener =
new ChannelActionListener<>(channel, CREATE_READER_CONTEXT_ACTION_NAME, request);
searchService.openReaderContext(
request.getShardId(),
request.getKeepAlive(),
ActionListener.wrap(
r ->
listener.onResponse(
new TransportCreatePITAction.CreateReaderContextResponse(r)),
listener::onFailure));
});
TransportActionProxy.registerProxyAction(transportService, CREATE_READER_CONTEXT_ACTION_NAME, TransportCreatePITAction.CreateReaderContextResponse::new);
TransportActionProxy.registerProxyAction(
transportService,
CREATE_READER_CONTEXT_ACTION_NAME,
TransportCreatePITAction.CreateReaderContextResponse::new);

transportService.registerRequestHandler(
UPDATE_READER_CONTEXT_ACTION_NAME,
ThreadPool.Names.SAME,
TransportCreatePITAction.UpdatePITReaderRequest::new,
(request, channel, task) -> {
ChannelActionListener<TransportCreatePITAction.UpdatePitContextResponse,
TransportCreatePITAction.UpdatePITReaderRequest> listener =
new ChannelActionListener<>(channel, UPDATE_READER_CONTEXT_ACTION_NAME, request);
searchService.updatePitIdAndKeepAlive(request,
ActionListener.wrap(
r -> listener.onResponse(r),
listener::onFailure
));
}
);
TransportActionProxy.registerProxyAction(
transportService,
UPDATE_READER_CONTEXT_ACTION_NAME,
TransportCreatePITAction.UpdatePitContextResponse::new);

}

/**
Expand Down
Loading

0 comments on commit 72bd39b

Please sign in to comment.