Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update list pit poc #23

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/gradle-check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,9 @@ jobs:
### Gradle Check (Jenkins) Run Completed with:
* **RESULT:** ${{ env.result }} :x:
* **FAILURES:**
```
${{ env.test_failures }}
```
* **URL:** ${{ env.workflow_url }}
* **CommitID:** ${{ env.pr_from_sha }}
Please examine the workflow log, locate, and copy-paste the failure below, then iterate to green.
Expand Down
6 changes: 5 additions & 1 deletion .idea/runConfigurations/Debug_OpenSearch.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,7 @@
import org.opensearch.action.get.GetRequest;
import org.opensearch.action.get.MultiGetRequest;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.search.ClearScrollRequest;
import org.opensearch.action.search.CreatePitRequest;
import org.opensearch.action.search.DeletePitRequest;
import org.opensearch.action.search.MultiSearchRequest;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchScrollRequest;
import org.opensearch.action.search.*;
import org.opensearch.action.support.ActiveShardCount;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.action.support.WriteRequest;
Expand Down Expand Up @@ -502,6 +497,11 @@ static Request getAllPits() {
return new Request(HttpGet.METHOD_NAME, "/_search/point_in_time/_all");
}

static Request updatePit(UpdatePitRequest updatePitRequest) throws IOException {
Request request = new Request(HttpDelete.METHOD_NAME, "/_search/point_in_time");
request.setEntity(createEntity(updatePitRequest, REQUEST_BODY_CONTENT_TYPE));
return request;
}
static Request multiSearch(MultiSearchRequest multiSearchRequest) throws IOException {
Request request = new Request(HttpPost.METHOD_NAME, "/_msearch");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,18 +57,7 @@
import org.opensearch.action.get.MultiGetResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.search.ClearScrollRequest;
import org.opensearch.action.search.ClearScrollResponse;
import org.opensearch.action.search.CreatePitRequest;
import org.opensearch.action.search.CreatePitResponse;
import org.opensearch.action.search.DeletePitRequest;
import org.opensearch.action.search.DeletePitResponse;
import org.opensearch.action.search.GetAllPitNodesResponse;
import org.opensearch.action.search.MultiSearchRequest;
import org.opensearch.action.search.MultiSearchResponse;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.search.SearchScrollRequest;
import org.opensearch.action.search.*;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.action.update.UpdateRequest;
import org.opensearch.action.update.UpdateResponse;
Expand Down Expand Up @@ -1403,6 +1392,16 @@ public final Cancellable getAllPitsAsync(RequestOptions options, ActionListener<
);
}

public final UpdatePitResponse updatePit(UpdatePitRequest updatePitRequest, RequestOptions options) throws IOException {
return performRequestAndParseEntity(
updatePitRequest,
RequestConverters::updatePit,
options,
UpdatePitResponse::fromXContent,
emptySet()
);
}

/**
* Clears one or more scroll ids using the Clear Scroll API.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
* Tests point in time API with rest high level client
*/
public class PitIT extends OpenSearchRestHighLevelClientTestCase {

//--this is is PITIT
@Before
public void indexDocuments() throws IOException {
Request doc1 = new Request(HttpPut.METHOD_NAME, "/index/_doc/1");
Expand Down
18 changes: 3 additions & 15 deletions server/src/main/java/org/opensearch/action/ActionModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -247,20 +247,7 @@
import org.opensearch.action.ingest.SimulatePipelineTransportAction;
import org.opensearch.action.main.MainAction;
import org.opensearch.action.main.TransportMainAction;
import org.opensearch.action.search.ClearScrollAction;
import org.opensearch.action.search.CreatePitAction;
import org.opensearch.action.search.DeletePitAction;
import org.opensearch.action.search.MultiSearchAction;
import org.opensearch.action.search.GetAllPitsAction;
import org.opensearch.action.search.SearchAction;
import org.opensearch.action.search.SearchScrollAction;
import org.opensearch.action.search.TransportClearScrollAction;
import org.opensearch.action.search.TransportCreatePitAction;
import org.opensearch.action.search.TransportDeletePitAction;
import org.opensearch.action.search.TransportGetAllPitsAction;
import org.opensearch.action.search.TransportMultiSearchAction;
import org.opensearch.action.search.TransportSearchAction;
import org.opensearch.action.search.TransportSearchScrollAction;
import org.opensearch.action.search.*;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.AutoCreateIndex;
import org.opensearch.action.support.DestructiveOperations;
Expand Down Expand Up @@ -699,7 +686,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
actions.register(DeletePitAction.INSTANCE, TransportDeletePitAction.class);
actions.register(PitSegmentsAction.INSTANCE, TransportPitSegmentsAction.class);
actions.register(GetAllPitsAction.INSTANCE, TransportGetAllPitsAction.class);

// TODO: register the api here
// Remote Store
actions.register(RestoreRemoteStoreAction.INSTANCE, TransportRestoreRemoteStoreAction.class);

Expand Down Expand Up @@ -890,6 +877,7 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
registerHandler.accept(new RestGetAllPitsAction(nodesInCluster));
registerHandler.accept(new RestPitSegmentsAction(nodesInCluster));
registerHandler.accept(new RestDeleteDecommissionStateAction());
// TODO: add update api here

for (ActionPlugin plugin : actionPlugins) {
for (RestHandler handler : plugin.getRestHandlers(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ void executeUpdatePitId(
}, updatePitIdListener::onFailure);
}


private StepListener<BiFunction<String, String, DiscoveryNode>> getConnectionLookupListener(SearchContextId contextId) {
ClusterState state = clusterService.state();
final Set<String> clusters = contextId.shards()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,22 @@ public void updatePitContext(
);
}

public void updatePitContext(
Transport.Connection connection,
TransportUpdatePitAction.UpdateReaderContextRequest request,
SearchTask task,
ActionListener<TransportUpdatePitAction.UpdateReaderContextResponse> actionListener
) {
transportService.sendChildRequest(
connection,
UPDATE_READER_CONTEXT_ACTION_NAME,
request,
task,
TransportRequestOptions.EMPTY,
new ActionListenerResponseHandler<>(actionListener, TransportUpdatePitAction.UpdateReaderContextResponse::new)
);
}

public void createPitContext(
Transport.Connection connection,
TransportCreatePitAction.CreateReaderContextRequest request,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.search;

import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.action.ActionListener;
import org.opensearch.action.StepListener;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.io.stream.NamedWriteableRegistry;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.shard.ShardId;
import org.opensearch.search.SearchPhaseResult;
import org.opensearch.search.internal.ShardSearchContextId;
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportRequest;
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.util.Arrays;

public class TransportUpdatePitAction extends HandledTransportAction<UpdatePitRequest, UpdatePitResponse> {

public static final String UPDATE_PIT_ACTION = "update_pit";
private final TransportService transportService;
private final SearchTransportService searchTransportService;
private final ClusterService clusterService;
private final TransportSearchAction transportSearchAction;
private final NamedWriteableRegistry namedWriteableRegistry;
private final UpdatePitController updatePitController;

@Inject
public TransportUpdatePitAction(
TransportService transportService,
ActionFilters actionFilters,
SearchTransportService searchTransportService,
ClusterService clusterService,
TransportSearchAction transportSearchAction,
NamedWriteableRegistry namedWriteableRegistry,
UpdatePitController updatePitController
) {
super(UpdatePitAction.NAME, transportService, actionFilters, in -> new UpdatePitRequest(in));
this.transportService = transportService;
this.searchTransportService = searchTransportService;
this.clusterService = clusterService;
this.transportSearchAction = transportSearchAction;
this.namedWriteableRegistry = namedWriteableRegistry;
this.updatePitController = updatePitController;
}

@Override
protected void doExecute(Task task, UpdatePitRequest request, ActionListener<UpdatePitResponse> listener) {
final StepListener<SearchResponse> updatePitListener = new StepListener<>();
updatePitController.executeUpdatePit(request, task, updatePitListener);
}

public static class UpdateReaderContextRequest extends TransportRequest {
private final ShardId shardId;
private final TimeValue keepAlive;

public UpdateReaderContextRequest(ShardId shardId, TimeValue keepAlive){
this.shardId = shardId;
this.keepAlive = keepAlive;
}

public ShardId getShardId() { return shardId;}

public TimeValue getKeepAlive() {
return keepAlive;
}

public UpdateReaderContextRequest(StreamInput in) throws IOException {
super(in);
this.shardId = new ShardId(in);
this.keepAlive = in.readTimeValue();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
shardId.writeTo(out);
out.writeTimeValue(keepAlive);
}
}

public static class UpdateReaderContextResponse extends SearchPhaseResult {
public UpdateReaderContextResponse(ShardSearchContextId shardSearchContextId){
this.contextId = shardSearchContextId;
}

public UpdateReaderContextResponse(StreamInput in) throws IOException {
super(in);
contextId = new ShardSearchContextId(in);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
contextId.writeTo(out);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.search;

import org.opensearch.action.ActionType;

public class UpdatePitAction extends ActionType<UpdatePitResponse> {
public static final UpdatePitAction INSTANCE = new UpdatePitAction();
public static final String NAME = "indices:data/read/point_in_time/update";

private UpdatePitAction(){
super(NAME, UpdatePitResponse::new);
}
}
Loading