Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
"number_of_shards": 2,
"number_of_replicas": 0,
"sort.field": [ "CounterID", "EventDate", "UserID", "EventTime", "WatchID" ],
"sort.order": [ "desc", "desc", "desc", "desc", "desc" ]
"sort.order": [ "desc", "desc", "desc", "desc", "desc" ],
"optimized.enabled": true
}
},
"mappings" : {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,14 @@ public interface OpenSearchClient {
*/
Map<String, Integer> getIndexMaxResultWindows(String... indexExpression);

/**
* Check if index is optimized.
*
* @param indexName index name
* @return true if index is optimized, false otherwise
*/
boolean isIndexOptimized(String indexName);

/**
* Perform search query in the search request.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,18 @@ public Map<String, Integer> getIndexMaxResultWindows(String... indexExpression)
}
}

@Override
public boolean isIndexOptimized(String indexName) {
try {
GetSettingsResponse settingsResponse =
client.admin().indices().prepareGetSettings(indexName).setLocal(true).get();
Settings settings = settingsResponse.getIndexToSettings().get(indexName);
return settings != null && settings.getAsBoolean("index.optimized.enabled", false);
} catch (Exception e) {
return false;
}
}

/** TODO: Scroll doesn't work for aggregation. Support aggregation later. */
@Override
public OpenSearchResponse search(OpenSearchRequest request) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,4 +261,9 @@ public void deletePit(DeletePitRequest deletePitRequest) {
"Error occurred while deleting PIT for internal plugin operation", e);
}
}

@Override
public boolean isIndexOptimized(String indexName) {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import org.opensearch.sql.expression.function.BuiltinFunctionName;
import org.opensearch.sql.expression.function.PPLBuiltinOperators;
import org.opensearch.sql.expression.function.udf.datetime.ExtractFunction;
import org.opensearch.sql.opensearch.client.OpenSearchClient;
import org.opensearch.sql.opensearch.data.value.OpenSearchExprValueFactory;
import org.opensearch.sql.opensearch.response.OpenSearchResponse;
import org.opensearch.sql.opensearch.storage.OpenSearchIndex;
Expand Down Expand Up @@ -138,6 +139,10 @@ public class OpenSearchQueryRequest implements OpenSearchRequest {
/** List of includes expected in the response. */
@EqualsAndHashCode.Exclude @ToString.Exclude private final List<String> includes;

/** OpenSearch client. */
@EqualsAndHashCode.Exclude @ToString.Exclude
private final OpenSearchClient client;

@EqualsAndHashCode.Exclude private boolean needClean = true;

/** Indicate the search already done. */
Expand All @@ -158,6 +163,8 @@ public class OpenSearchQueryRequest implements OpenSearchRequest {
private static final Logger LOGGER =
LogManager.getLogger(OpenSearchQueryRequest.class);

private final boolean indexOptimized;

/** Constructor of OpenSearchQueryRequest. */
public OpenSearchQueryRequest(
String indexName, int size, OpenSearchExprValueFactory factory, List<String> includes) {
Expand All @@ -175,6 +182,8 @@ public OpenSearchQueryRequest(
this.exprValueFactory = factory;
this.includes = includes;
this.pushedDownRelNodeTree = null;
this.client = null;
this.indexOptimized = false;
}

/** Constructor of OpenSearchQueryRequest. */
Expand All @@ -183,7 +192,7 @@ public OpenSearchQueryRequest(
SearchSourceBuilder sourceBuilder,
OpenSearchExprValueFactory factory,
List<String> includes) {
this(indexName, sourceBuilder, factory, includes, (RelNode) null);
this(indexName, sourceBuilder, factory, includes, (RelNode) null, false);
}

/** Constructor of OpenSearchQueryRequest with RelNode tree support. */
Expand All @@ -193,11 +202,24 @@ public OpenSearchQueryRequest(
OpenSearchExprValueFactory factory,
List<String> includes,
RelNode pushedDownRelNodeTree) {
this(indexName, sourceBuilder, factory, includes, pushedDownRelNodeTree, false);
}

/** Constructor of OpenSearchQueryRequest with RelNode tree and index optimization support. */
public OpenSearchQueryRequest(
IndexName indexName,
SearchSourceBuilder sourceBuilder,
OpenSearchExprValueFactory factory,
List<String> includes,
RelNode pushedDownRelNodeTree,
boolean indexOptimized) {
this.indexName = indexName;
this.sourceBuilder = sourceBuilder;
this.exprValueFactory = factory;
this.includes = includes;
this.pushedDownRelNodeTree = pushedDownRelNodeTree;
this.client = null;
this.indexOptimized = indexOptimized;
}

/** Constructor of OpenSearchQueryRequest with PIT support. */
Expand All @@ -208,7 +230,7 @@ public OpenSearchQueryRequest(
List<String> includes,
TimeValue cursorKeepAlive,
String pitId) {
this(indexName, sourceBuilder, factory, includes, cursorKeepAlive, pitId, null);
this(indexName, sourceBuilder, factory, includes, cursorKeepAlive, pitId, null, false);
}

/** Constructor of OpenSearchQueryRequest with PIT and RelNode tree support. */
Expand All @@ -220,13 +242,28 @@ public OpenSearchQueryRequest(
TimeValue cursorKeepAlive,
String pitId,
RelNode pushedDownRelNodeTree) {
this(indexName, sourceBuilder, factory, includes, cursorKeepAlive, pitId, pushedDownRelNodeTree, false);
}

/** Constructor of OpenSearchQueryRequest with PIT, RelNode tree and index optimization support. */
public OpenSearchQueryRequest(
IndexName indexName,
SearchSourceBuilder sourceBuilder,
OpenSearchExprValueFactory factory,
List<String> includes,
TimeValue cursorKeepAlive,
String pitId,
RelNode pushedDownRelNodeTree,
boolean indexOptimized) {
this.indexName = indexName;
this.sourceBuilder = sourceBuilder;
this.exprValueFactory = factory;
this.includes = includes;
this.cursorKeepAlive = cursorKeepAlive;
this.pitId = pitId;
this.pushedDownRelNodeTree = pushedDownRelNodeTree;
this.client = null;
this.indexOptimized = indexOptimized;
}

/** true if the request is a count aggregation request. */
Expand Down Expand Up @@ -272,6 +309,8 @@ public OpenSearchQueryRequest(StreamInput in, OpenSearchStorageEngine engine) th
exprValueFactory =
new OpenSearchExprValueFactory(
index.getFieldOpenSearchTypes(), index.isFieldTypeTolerance());
client = index.getClient();
this.indexOptimized = client.isIndexOptimized(indexName.toString());

// RelNode tree is not serialized/deserialized for now
// It is only used during the initial query execution
Expand All @@ -291,8 +330,9 @@ public OpenSearchResponse search(
// get the value before set searchDone = true
boolean isCountAggRequest = isCountAggRequest();
searchDone = true;
// Convert pushed-down RelNode tree to Substrait bytes
sourceBuilder.queryPlanIR(convertToSubstraitAndSerialize(pushedDownRelNodeTree));
if (indexOptimized) {
sourceBuilder.queryPlanIR(convertToSubstraitAndSerialize(pushedDownRelNodeTree));
}
return new OpenSearchResponse(
searchAction.apply(
new SearchRequest().indices(indexName.getIndexNames()).source(sourceBuilder)),
Expand All @@ -314,8 +354,9 @@ public OpenSearchResponse searchWithPIT(Function<SearchRequest, SearchResponse>
SearchHits.empty(), exprValueFactory, includes, isCountAggRequest());
} else {
this.sourceBuilder.pointInTimeBuilder(new PointInTimeBuilder(this.pitId));
// Convert pushed-down RelNode tree to Substrait bytes
sourceBuilder.queryPlanIR(convertToSubstraitAndSerialize(pushedDownRelNodeTree));
if (indexOptimized) {
sourceBuilder.queryPlanIR(convertToSubstraitAndSerialize(pushedDownRelNodeTree));
}
this.sourceBuilder.timeout(cursorKeepAlive);
// check for search after
if (searchAfter != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ public class OpenSearchRequestBuilder {

@ToString.Exclude private final Settings settings;

/** Whether the index is optimized. */
private boolean indexOptimized = false;

/** RelNode tree for pushed-down operations (optional).
* -- SETTER --
* Set the RelNode tree for pushed-down operations.
Expand Down Expand Up @@ -121,13 +124,14 @@ public OpenSearchRequest build(
TimeValue cursorKeepAlive,
OpenSearchClient client,
boolean isMappingEmpty) {
indexOptimized = client.isIndexOptimized(indexName.toString());
/* Don't use PIT search:
* 1. If the size of source is 0. It means this is an aggregation request and no need to use pit.
* 2. If mapping is empty. It means no data in the index. PIT search relies on `_id` fields to do sort, thus it will fail if using PIT search in this case.
*/
if (sourceBuilder.size() == 0 || isMappingEmpty) {
return new OpenSearchQueryRequest(
indexName, sourceBuilder, exprValueFactory, List.of(), pushedDownRelNodeTree);
indexName, sourceBuilder, exprValueFactory, List.of(), pushedDownRelNodeTree, indexOptimized);
}
return buildRequestWithPit(indexName, cursorKeepAlive, client);
}
Expand All @@ -144,13 +148,13 @@ private OpenSearchRequest buildRequestWithPit(
// Search with PIT request
String pitId = createPit(indexName, cursorKeepAlive, client);
return new OpenSearchQueryRequest(
indexName, sourceBuilder, exprValueFactory, includes, cursorKeepAlive, pitId, pushedDownRelNodeTree);
indexName, sourceBuilder, exprValueFactory, includes, cursorKeepAlive, pitId, pushedDownRelNodeTree, indexOptimized);
} else {
sourceBuilder.from(startFrom);
sourceBuilder.size(size);
// Search with non-Pit request
return new OpenSearchQueryRequest(
indexName, sourceBuilder, exprValueFactory, includes, pushedDownRelNodeTree);
indexName, sourceBuilder, exprValueFactory, includes, pushedDownRelNodeTree, indexOptimized);
}
} else {
if (startFrom != 0) {
Expand All @@ -160,7 +164,7 @@ private OpenSearchRequest buildRequestWithPit(
// Search with PIT request
String pitId = createPit(indexName, cursorKeepAlive, client);
return new OpenSearchQueryRequest(
indexName, sourceBuilder, exprValueFactory, includes, cursorKeepAlive, pitId, pushedDownRelNodeTree);
indexName, sourceBuilder, exprValueFactory, includes, cursorKeepAlive, pitId, pushedDownRelNodeTree, indexOptimized);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ public Result implement(EnumerableRelImplementor implementor, Prefer pref) {
RelNode pushedDownTree = null;
try {
if (pushDownContext != null && !pushDownContext.isEmpty()) {
// FIXME : Make this optional based on the index setting
LOG.info("Full RelNode tree:\n{}", RelOptUtil.toString(CalciteToolsHelper.OpenSearchRelRunners.getCurrentRelNode()));
LOG.info("=== PushDownContext contains {} operations ===", pushDownContext.size());
int index = 0;
Expand Down
Loading