Skip to content

Commit

Permalink
Addressing review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Bharathwaj G <bharath78910@gmail.com>
  • Loading branch information
bharath-techie committed Apr 29, 2022
1 parent de5c4e4 commit 65c123a
Show file tree
Hide file tree
Showing 8 changed files with 86 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,13 @@

package org.opensearch.search.slice;

import org.opensearch.action.ActionFuture;
import org.opensearch.action.admin.indices.alias.IndicesAliasesRequest;

import org.opensearch.action.index.IndexRequestBuilder;
import org.opensearch.action.search.CreatePITAction;
import org.opensearch.action.search.CreatePITRequest;
import org.opensearch.action.search.CreatePITResponse;
import org.opensearch.action.search.SearchPhaseExecutionException;
import org.opensearch.action.search.SearchRequestBuilder;
import org.opensearch.action.search.SearchResponse;
Expand All @@ -46,6 +50,7 @@
import org.opensearch.search.Scroll;
import org.opensearch.search.SearchException;
import org.opensearch.search.SearchHit;
import org.opensearch.search.builder.PointInTimeBuilder;
import org.opensearch.search.sort.SortBuilders;
import org.opensearch.test.OpenSearchIntegTestCase;

Expand Down Expand Up @@ -129,6 +134,63 @@ public void testSearchSort() throws Exception {
}
}

public void testSearchSortWithPIT() throws Exception {
int numShards = randomIntBetween(1, 7);
int numDocs = randomIntBetween(100, 1000);
setupIndex(numDocs, numShards);
int max = randomIntBetween(2, numShards * 3);
CreatePITRequest pitRequest = new CreatePITRequest(TimeValue.timeValueDays(1), true);
pitRequest.setIndices(new String[] { "test" });
ActionFuture<CreatePITResponse> execute = client().execute(CreatePITAction.INSTANCE, pitRequest);
CreatePITResponse pitResponse = execute.get();
for (String field : new String[] { "_id", "random_int", "static_int" }) {
int fetchSize = randomIntBetween(10, 100);

// test _doc sort
SearchRequestBuilder request = client().prepareSearch("test")
.setQuery(matchAllQuery())
.setPointInTime(new PointInTimeBuilder(pitResponse.getId()))
.setSize(fetchSize)
.addSort(SortBuilders.fieldSort("_doc"));
assertSearchSlicesWithPIT(request, field, max, numDocs);

// test numeric sort
request = client().prepareSearch("test")
.setQuery(matchAllQuery())
.setPointInTime(new PointInTimeBuilder(pitResponse.getId()))
.setSize(fetchSize)
.addSort(SortBuilders.fieldSort("random_int"));
assertSearchSlicesWithPIT(request, field, max, numDocs);
}
}

private void assertSearchSlicesWithPIT(SearchRequestBuilder request, String field, int numSlice, int numDocs) {
int totalResults = 0;
List<String> keys = new ArrayList<>();
for (int id = 0; id < numSlice; id++) {
SliceBuilder sliceBuilder = new SliceBuilder(field, id, numSlice);
SearchResponse searchResponse = request.slice(sliceBuilder).setFrom(0).get();
totalResults += searchResponse.getHits().getHits().length;
int expectedSliceResults = (int) searchResponse.getHits().getTotalHits().value;
int numSliceResults = searchResponse.getHits().getHits().length;
for (SearchHit hit : searchResponse.getHits().getHits()) {
assertTrue(keys.add(hit.getId()));
}
while (searchResponse.getHits().getHits().length > 0) {
searchResponse = request.setFrom(numSliceResults).slice(sliceBuilder).get();
totalResults += searchResponse.getHits().getHits().length;
numSliceResults += searchResponse.getHits().getHits().length;
for (SearchHit hit : searchResponse.getHits().getHits()) {
assertTrue(keys.add(hit.getId()));
}
}
assertThat(numSliceResults, equalTo(expectedSliceResults));
}
assertThat(totalResults, equalTo(numDocs));
assertThat(keys.size(), equalTo(numDocs));
assertThat(new HashSet(keys).size(), equalTo(numDocs));
}

public void testWithPreferenceAndRoutings() throws Exception {
int numShards = 10;
int totalDocs = randomIntBetween(100, 1000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ public final String buildDescription() {
Strings.arrayToDelimitedString(indices, ",", sb);
sb.append("], ");
sb.append("pointintime[").append(keepAlive).append("], ");
sb.append("allowPartialPitCreation[").append(allowPartialPitCreation).append("], ");
return sb.toString();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ public class CreatePITResponse extends ActionResponse implements StatusToXConten
private final ShardSearchFailure[] shardFailures;

public CreatePITResponse(SearchResponse searchResponse) {
if (searchResponse.pointInTimeId() == null || searchResponse.pointInTimeId().isEmpty()) {
throw new IllegalArgumentException("Point in time ID is empty");
}
this.id = searchResponse.pointInTimeId();
this.totalShards = searchResponse.getTotalShards();
this.successfulShards = searchResponse.getSuccessfulShards();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.opensearch.transport.Transport;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.function.BiFunction;
Expand All @@ -37,10 +38,7 @@
import static org.opensearch.common.unit.TimeValue.timeValueSeconds;

/**
* Controller for creating PIT reader context
* Phase 1 of create PIT request : Create PIT reader contexts in the associated shards with a temporary keep alive
* Phase 2 of create PIT : Update PIT reader context with PIT ID and keep alive from request and
* fail user request if any of the updates in this phase are failed - we clean up PITs in case of such failures
* Controller to handle PIT related logic
*/
public class PITController implements Runnable {
private final Runnable runner;
Expand Down Expand Up @@ -81,6 +79,12 @@ private TimeValue getCreatePitTemporaryKeepAlive() {
return CREATE_PIT_TEMPORARY_KEEPALIVE_SETTING.get(clusterService.getSettings());
}

/**
* Method for creating PIT reader context
* Phase 1 of create PIT request : Create PIT reader contexts in the associated shards with a temporary keep alive
* Phase 2 of create PIT : Update PIT reader context with PIT ID and keep alive from request and
* fail user request if any of the updates in this phase are failed - we clean up PITs in case of such failures
*/
public void executeCreatePit() {
SearchRequest searchRequest = new SearchRequest(request.getIndices());
searchRequest.preference(request.getPreference());
Expand All @@ -94,7 +98,7 @@ public void executeCreatePit() {
task.getAction(),
() -> task.getDescription(),
task.getParentTaskId(),
task.getHeaders()
new HashMap<>()
);

final StepListener<SearchResponse> createPitListener = new StepListener<>();
Expand All @@ -118,6 +122,7 @@ public void executeCreatePit() {
* Creates PIT reader context with temporary keep alive
*/
void executeCreatePit(Task task, SearchRequest searchRequest, StepListener<SearchResponse> createPitListener) {
logger.debug("Creating PIT context");
transportSearchAction.executeRequest(
task,
searchRequest,
Expand Down Expand Up @@ -152,6 +157,7 @@ void executeUpdatePitId(
ActionListener<CreatePITResponse> updatePitIdListener
) {
createPitListener.whenComplete(searchResponse -> {
logger.debug("Updating PIT context with PIT ID, creation time and keep alive");
CreatePITResponse createPITResponse = new CreatePITResponse(searchResponse);
SearchContextId contextId = SearchContextId.decode(namedWriteableRegistry, createPITResponse.getId());
final StepListener<BiFunction<String, String, DiscoveryNode>> lookupListener = getConnectionLookupListener(contextId);
Expand All @@ -166,7 +172,7 @@ void executeUpdatePitId(
* store the create time ( same create time for all PIT contexts across shards ) to be used
* for list PIT api
*/
long createTime = System.currentTimeMillis();
final long createTime = System.currentTimeMillis();
for (Map.Entry<ShardId, SearchContextIdForNode> entry : contextId.shards().entrySet()) {
DiscoveryNode node = nodelookup.apply(entry.getValue().getClusterAlias(), entry.getValue().getNode());
try {
Expand Down Expand Up @@ -205,10 +211,10 @@ private StepListener<BiFunction<String, String, DiscoveryNode>> getConnectionLoo

final StepListener<BiFunction<String, String, DiscoveryNode>> lookupListener = new StepListener<>();

if (clusters.isEmpty() == false) {
searchTransportService.getRemoteClusterService().collectNodes(clusters, lookupListener);
} else {
if (clusters.isEmpty()) {
lookupListener.onResponse((cluster, nodeId) -> state.getNodes().get(nodeId));
} else {
searchTransportService.getRemoteClusterService().collectNodes(clusters, lookupListener);
}
return lookupListener;
}
Expand Down Expand Up @@ -246,7 +252,7 @@ public void onResponse(Integer freed) {

@Override
public void onFailure(Exception e) {
logger.debug("Cleaning up PIT contexts failed ", e);
logger.error("Cleaning up PIT contexts failed ", e);
}
};
ClearScrollController.closeContexts(clusterService.state().getNodes(), searchTransportService, contexts, deleteListener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
package org.opensearch.common.settings;

import org.apache.logging.log4j.LogManager;
import org.opensearch.action.main.TransportMainAction;
import org.opensearch.action.search.PITController;
import org.opensearch.cluster.routing.allocation.decider.NodeLoadAwareAllocationDecider;
import org.opensearch.index.IndexModule;
Expand Down
4 changes: 2 additions & 2 deletions server/src/main/java/org/opensearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -1297,8 +1297,8 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc
}

if (source.slice() != null) {
if (context.scrollContext() == null) {
throw new SearchException(shardTarget, "`slice` cannot be used outside of a scroll context");
if (context.scrollContext() == null || context.readerContext() instanceof PitReaderContext) {
throw new SearchException(shardTarget, "`slice` cannot be used outside of a scroll context or PIT context");
}
context.sliceBuilder(source.slice());
}
Expand Down
4 changes: 0 additions & 4 deletions server/src/main/java/org/opensearch/tasks/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -364,10 +364,6 @@ public String getHeader(String header) {
return headers.get(header);
}

public Map<String, String> getHeaders() {
return headers;
}

public TaskResult result(DiscoveryNode node, Exception error) throws IOException {
return new TaskResult(taskInfo(node.getId(), true, true), error);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ public Settings onNodeStopped(String nodeName) throws Exception {
.setPointInTime(new PointInTimeBuilder(pitResponse.getId()).setKeepAlive(TimeValue.timeValueDays(1)))
.get();
assertEquals(1, searchResponse.getSuccessfulShards());
assertEquals(1, searchResponse.getFailedShards());
assertEquals(0, searchResponse.getSkippedShards());
assertEquals(2, searchResponse.getTotalShards());
return super.onNodeStopped(nodeName);
}
Expand Down

0 comments on commit 65c123a

Please sign in to comment.