Skip to content

Commit

Permalink
Support pagination in V2 engine, phase 1 (#226)
Browse files Browse the repository at this point in the history
* Fixing integration tests broken during POC

Signed-off-by: MaxKsyunz <maxk@bitquilltech.com>

* Comment to clarify an exception.

Signed-off-by: MaxKsyunz <maxk@bitquilltech.com>

* Add support for paginated scroll request, first page.

Implement PaginatedPlanCache.convertToPlan for second page to work.

Signed-off-by: MaxKsyunz <maxk@bitquilltech.com>

* Progress on paginated scroll request, subsequent page.

Signed-off-by: MaxKsyunz <maxk@bitquilltech.com>

* Move `ExpressionSerializer` from `opensearch` to `core`.

Signed-off-by: Yury-Fridlyand <yury.fridlyand@improving.com>

* Rename `Cursor` `asString` to `toString`.

Signed-off-by: Yury-Fridlyand <yury.fridlyand@improving.com>

* Disable scroll cleaning.

Signed-off-by: Yury-Fridlyand <yury.fridlyand@improving.com>

* Add full cursor serialization and deserialization.

Signed-off-by: Yury-Fridlyand <yury.fridlyand@improving.com>

* Misc fixes.

Signed-off-by: Yury-Fridlyand <yury.fridlyand@improving.com>

* Further work on pagination.

* Added push down page size from `LogicalPaginate` to `LogicalRelation`.
* Improved cursor encoding and decoding.
* Added cursor compression.
* Fixed issuing `SearchScrollRequest`.
* Fixed returning last empty page.
* Minor code grooming/commenting.

Signed-off-by: Yury-Fridlyand <yury.fridlyand@improving.com>

* Pagination fix for empty indices.

Signed-off-by: Yury-Fridlyand <yury.fridlyand@improving.com>

* Fix error reporting on wrong cursor.

Signed-off-by: Yury-Fridlyand <yury.fridlyand@improving.com>

* Minor comments and error reporting improvement.

Signed-off-by: Yury-Fridlyand <yury.fridlyand@improving.com>

* Add an end-to-end integration test.

Signed-off-by: Yury-Fridlyand <yury.fridlyand@improving.com>

* Add `explain` request handlers.

Signed-off-by: Yury-Fridlyand <yury.fridlyand@improving.com>

* Add IT for explain.

Signed-off-by: Yury-Fridlyand <yury.fridlyand@improving.com>

* Address issues flagged by checkstyle build step (#229)

Signed-off-by: MaxKsyunz <maxk@bitquilltech.com>

* Pagination, phase 1: Add unit tests for `:core` module with coverage. (#230)

* Add unit tests for `:core` module with coverage. Uncovered: `toCursor`, because it is will be changed soon.

Signed-off-by: Yury-Fridlyand <yury.fridlyand@improving.com>

* Pagination, phase 1: Add unit tests for SQL module with coverage. (#239)

* Add unit tests for SQL module with coverage.

Signed-off-by: Yury-Fridlyand <yury.fridlyand@improving.com>

* Update sql/src/main/java/org/opensearch/sql/sql/domain/SQLQueryRequest.java

Signed-off-by: Yury-Fridlyand <yury.fridlyand@improving.com>

Co-authored-by: GabeFernandez310 <Gabriel.Fernandez@improving.com>

---------

Signed-off-by: Yury-Fridlyand <yury.fridlyand@improving.com>
Co-authored-by: GabeFernandez310 <Gabriel.Fernandez@improving.com>

* Pagination, phase 1: Add unit tests for `:opensearch` module with coverage. (#233)

* Add UT for `:opensearch` module with full coverage, except `toCursor`.

Signed-off-by: Yury-Fridlyand <yury.fridlyand@improving.com>

* Fix checkstyle.

Signed-off-by: Yury-Fridlyand <yury.fridlyand@improving.com>

---------

Signed-off-by: Yury-Fridlyand <yury.fridlyand@improving.com>

* Fix the merges.

Signed-off-by: Yury-Fridlyand <yury.fridlyand@improving.com>

* Fix explain.

Signed-off-by: Yury-Fridlyand <yury.fridlyand@improving.com>

* Fix scroll cleaning.

Signed-off-by: Yury-Fridlyand <yury.fridlyand@improving.com>

* Store `TotalHits` and use it to report `total` in response.

Signed-off-by: Yury-Fridlyand <yury.fridlyand@improving.com>

* Add missing UT for `:protocol` module.

Signed-off-by: Yury-Fridlyand <yury.fridlyand@improving.com>

* Fix PPL UTs damaged in f4ea4ad.

Signed-off-by: Yury-Fridlyand <yury.fridlyand@improving.com>

* Minor checkstyle fixes.

Signed-off-by: Yury-Fridlyand <yury.fridlyand@improving.com>

* Fallback to v1 engine for pagination (#245)

* Pagination fallback integration tests.

Signed-off-by: MaxKsyunz <maxk@bitquilltech.com>

* Add UT with coverage for `toCursor` serialization.

Signed-off-by: Yury-Fridlyand <yury.fridlyand@improving.com>

* Fix broken tests in `legacy`.

Signed-off-by: Yury-Fridlyand <yury.fridlyand@improving.com>

* Fix getting `total` from non-paged requests and from queries without `FROM` clause.

Signed-off-by: Yury-Fridlyand <yury.fridlyand@improving.com>

* Fix scroll cleaning.

Signed-off-by: Yury-Fridlyand <yury.fridlyand@improving.com>

* Fix cursor request processing.

Signed-off-by: Yury-Fridlyand <yury.fridlyand@improving.com>

* Update ITs.

Signed-off-by: Yury-Fridlyand <yury.fridlyand@improving.com>

* Fix (again) TotalHits feature.

Signed-off-by: Yury-Fridlyand <yury.fridlyand@improving.com>

* Fix typo in prometheus config.

Signed-off-by: Yury-Fridlyand <yury.fridlyand@improving.com>

* Recover commented logging.

Signed-off-by: Yury-Fridlyand <yury.fridlyand@improving.com>

* Move `test_pagination_blackbox` to a separate class and add logging.

Signed-off-by: Yury-Fridlyand <yury.fridlyand@improving.com>

* Address some PR feedbacks: rename some classes and revert unnecessary whitespace changed.

Signed-off-by: Yury-Fridlyand <yury.fridlyand@improving.com>

* Minor commenting.

Signed-off-by: Yury-Fridlyand <yury.fridlyand@improving.com>

* Address PR comments.

* Add javadocs
* Renames
* Cleaning up some comments
* Remove unused code
* Speed up IT

Signed-off-by: Yury-Fridlyand <yury.fridlyand@improving.com>

* Minor missing changes.

Signed-off-by: Yury-Fridlyand <yury.fridlyand@improving.com>

* Integration tests for fetch_size, max_result_window, and query.size_limit (#248)

Signed-off-by: MaxKsyunz <maxk@bitquilltech.com>

* Remove `PaginatedQueryService`, extend `QueryService` to hold two planners and use them.

Signed-off-by: Yury-Fridlyand <yury.fridlyand@improving.com>

* Move push down functions from request builders to a new interface.

Signed-off-by: Yury-Fridlyand <yury.fridlyand@improving.com>

* Some file moves.

Signed-off-by: Yury-Fridlyand <yury.fridlyand@improving.com>

* Minor clean-up according to PR review.

Signed-off-by: Yury-Fridlyand <yury.fridlyand@improving.com>

---------

Signed-off-by: MaxKsyunz <maxk@bitquilltech.com>
Signed-off-by: Yury-Fridlyand <yury.fridlyand@improving.com>
Co-authored-by: MaxKsyunz <maxk@bitquilltech.com>
Co-authored-by: GabeFernandez310 <Gabriel.Fernandez@improving.com>
Co-authored-by: Max Ksyunz <max.ksyunz@improving.com>
  • Loading branch information
4 people committed Mar 29, 2023
1 parent 23cc0f6 commit e951192
Show file tree
Hide file tree
Showing 147 changed files with 4,975 additions and 578 deletions.
1 change: 1 addition & 0 deletions core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ dependencies {
testImplementation('org.junit.jupiter:junit-jupiter:5.6.2')
testImplementation group: 'org.hamcrest', name: 'hamcrest-library', version: '2.1'
testImplementation group: 'org.mockito', name: 'mockito-core', version: '3.12.4'
testImplementation group: 'org.mockito', name: 'mockito-inline', version: '3.12.4'
testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter', version: '3.12.4'
}

Expand Down
8 changes: 8 additions & 0 deletions core/src/main/java/org/opensearch/sql/analysis/Analyzer.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.opensearch.sql.ast.tree.Kmeans;
import org.opensearch.sql.ast.tree.Limit;
import org.opensearch.sql.ast.tree.ML;
import org.opensearch.sql.ast.tree.Paginate;
import org.opensearch.sql.ast.tree.Parse;
import org.opensearch.sql.ast.tree.Project;
import org.opensearch.sql.ast.tree.RareTopN;
Expand Down Expand Up @@ -83,6 +84,7 @@
import org.opensearch.sql.planner.logical.LogicalLimit;
import org.opensearch.sql.planner.logical.LogicalML;
import org.opensearch.sql.planner.logical.LogicalMLCommons;
import org.opensearch.sql.planner.logical.LogicalPaginate;
import org.opensearch.sql.planner.logical.LogicalPlan;
import org.opensearch.sql.planner.logical.LogicalProject;
import org.opensearch.sql.planner.logical.LogicalRareTopN;
Expand Down Expand Up @@ -520,6 +522,12 @@ public LogicalPlan visitML(ML node, AnalysisContext context) {
return new LogicalML(child, node.getArguments());
}

@Override
public LogicalPlan visitPaginate(Paginate paginate, AnalysisContext context) {
LogicalPlan child = paginate.getChild().get(0).accept(this, context);
return new LogicalPaginate(paginate.getPageSize(), List.of(child));
}

/**
* The first argument is always "asc", others are optional.
* Given nullFirst argument, use its value. Otherwise just use DEFAULT_ASC/DESC.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.opensearch.sql.ast.tree.Kmeans;
import org.opensearch.sql.ast.tree.Limit;
import org.opensearch.sql.ast.tree.ML;
import org.opensearch.sql.ast.tree.Paginate;
import org.opensearch.sql.ast.tree.Parse;
import org.opensearch.sql.ast.tree.Project;
import org.opensearch.sql.ast.tree.RareTopN;
Expand Down Expand Up @@ -289,4 +290,8 @@ public T visitQuery(Query node, C context) {
public T visitExplain(Explain node, C context) {
return visitStatement(node, context);
}

public T visitPaginate(Paginate paginate, C context) {
return visitChildren(paginate, context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
public class Query extends Statement {

protected final UnresolvedPlan plan;
protected final int fetchSize;

@Override
public <R, C> R accept(AbstractNodeVisitor<R, C> visitor, C context) {
Expand Down
48 changes: 48 additions & 0 deletions core/src/main/java/org/opensearch/sql/ast/tree/Paginate.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.ast.tree;

import java.util.List;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
import org.opensearch.sql.ast.AbstractNodeVisitor;
import org.opensearch.sql.ast.Node;

/**
* AST node to represent pagination operation.
* Actually a wrapper to the AST.
*/
@RequiredArgsConstructor
@EqualsAndHashCode(callSuper = false)
@ToString
public class Paginate extends UnresolvedPlan {
@Getter
private final int pageSize;
private UnresolvedPlan child;

public Paginate(int pageSize, UnresolvedPlan child) {
this.pageSize = pageSize;
this.child = child;
}

@Override
public List<? extends Node> getChild() {
return List.of(child);
}

@Override
public <T, C> T accept(AbstractNodeVisitor<T, C> nodeVisitor, C context) {
return nodeVisitor.visitPaginate(this, context);
}

@Override
public UnresolvedPlan attach(UnresolvedPlan child) {
this.child = child;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,13 @@
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap;
import com.google.gson.Gson;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import org.opensearch.sql.datasource.DataSourceService;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.exception;

/**
* This should be thrown by V2 engine to support fallback scenario.
*/
public class UnsupportedCursorRequestException extends RuntimeException {
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.sql.common.response.ResponseListener;
import org.opensearch.sql.data.model.ExprValue;
import org.opensearch.sql.data.type.ExprType;
import org.opensearch.sql.executor.pagination.Cursor;
import org.opensearch.sql.planner.physical.PhysicalPlan;

/**
Expand Down Expand Up @@ -53,6 +54,8 @@ void execute(PhysicalPlan plan, ExecutionContext context,
class QueryResponse {
private final Schema schema;
private final List<ExprValue> results;
private final long total;
private final Cursor cursor;
}

@Data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
* Query id of {@link AbstractPlan}.
*/
public class QueryId {
public static final QueryId None = new QueryId("");
/**
* Query id.
*/
Expand Down
20 changes: 19 additions & 1 deletion core/src/main/java/org/opensearch/sql/executor/QueryService.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
import org.opensearch.sql.common.response.ResponseListener;
import org.opensearch.sql.planner.PlanContext;
import org.opensearch.sql.planner.Planner;
import org.opensearch.sql.planner.logical.LogicalPaginate;
import org.opensearch.sql.planner.logical.LogicalPlan;
import org.opensearch.sql.planner.optimizer.LogicalPlanOptimizer;
import org.opensearch.sql.planner.physical.PhysicalPlan;

/**
Expand All @@ -28,7 +30,15 @@ public class QueryService {

private final ExecutionEngine executionEngine;

/**
* There are two planners, one - to handle pagination requests (cursor/scroll) only and
* another one for everything else.
* @see OpenSearchPluginModule#queryPlanFactory (:plugin module)
* @see LogicalPlanOptimizer#paginationCreate
* @see QueryService
*/
private final Planner planner;
private final Planner paginationPlanner;

/**
* Execute the {@link UnresolvedPlan}, using {@link ResponseListener} to get response.
Expand All @@ -46,6 +56,14 @@ public void execute(UnresolvedPlan plan,
}
}

/**
* Execute a physical plan without analyzing or planning anything.
*/
public void executePlan(PhysicalPlan plan,
ResponseListener<ExecutionEngine.QueryResponse> listener) {
executionEngine.execute(plan, ExecutionContext.emptyExecutionContext(), listener);
}

/**
* Execute the {@link UnresolvedPlan}, with {@link PlanContext} and using {@link ResponseListener}
* to get response.
Expand Down Expand Up @@ -97,6 +115,6 @@ public LogicalPlan analyze(UnresolvedPlan plan) {
* Translate {@link LogicalPlan} to {@link PhysicalPlan}.
*/
public PhysicalPlan plan(LogicalPlan plan) {
return planner.plan(plan);
return plan instanceof LogicalPaginate ? paginationPlanner.plan(plan) : planner.plan(plan);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.executor.execution;

import org.opensearch.sql.common.response.ResponseListener;
import org.opensearch.sql.executor.ExecutionEngine;
import org.opensearch.sql.executor.QueryId;
import org.opensearch.sql.executor.QueryService;
import org.opensearch.sql.executor.pagination.PaginatedPlanCache;
import org.opensearch.sql.planner.physical.PhysicalPlan;

/**
* ContinuePaginatedPlan represents cursor a request.
* It returns subsequent pages to the user (2nd page and all next).
* {@link PaginatedPlan}
*/
public class ContinuePaginatedPlan extends AbstractPlan {

private final String cursor;
private final QueryService queryService;
private final PaginatedPlanCache paginatedPlanCache;

private final ResponseListener<ExecutionEngine.QueryResponse> queryResponseListener;


/**
* Create an abstract plan that can continue paginating a given cursor.
*/
public ContinuePaginatedPlan(QueryId queryId, String cursor, QueryService queryService,
PaginatedPlanCache planCache,
ResponseListener<ExecutionEngine.QueryResponse>
queryResponseListener) {
super(queryId);
this.cursor = cursor;
this.paginatedPlanCache = planCache;
this.queryService = queryService;
this.queryResponseListener = queryResponseListener;
}

@Override
public void execute() {
try {
PhysicalPlan plan = paginatedPlanCache.convertToPlan(cursor);
queryService.executePlan(plan, queryResponseListener);
} catch (Exception e) {
queryResponseListener.onFailure(e);
}
}

@Override
public void explain(ResponseListener<ExecutionEngine.ExplainResponse> listener) {
listener.onFailure(new UnsupportedOperationException(
"Explain of a paged query continuation is not supported. "
+ "Use `explain` for the initial query request."));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.executor.execution;

import org.apache.commons.lang3.NotImplementedException;
import org.opensearch.sql.ast.tree.Paginate;
import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.common.response.ResponseListener;
import org.opensearch.sql.executor.ExecutionEngine;
import org.opensearch.sql.executor.QueryId;
import org.opensearch.sql.executor.QueryService;

/**
* PaginatedPlan represents a page request. Dislike a regular QueryPlan,
* it returns paged response to the user and cursor, which allows to query
* next page.
* {@link ContinuePaginatedPlan}
*/
public class PaginatedPlan extends AbstractPlan {
final UnresolvedPlan plan;
final int fetchSize;
final QueryService queryService;
final ResponseListener<ExecutionEngine.QueryResponse>
queryResponseResponseListener;

/**
* Create an abstract plan that can start paging a query.
*/
public PaginatedPlan(QueryId queryId, UnresolvedPlan plan, int fetchSize,
QueryService queryService,
ResponseListener<ExecutionEngine.QueryResponse>
queryResponseResponseListener) {
super(queryId);
this.plan = plan;
this.fetchSize = fetchSize;
this.queryService = queryService;
this.queryResponseResponseListener = queryResponseResponseListener;
}

@Override
public void execute() {
queryService.execute(new Paginate(fetchSize, plan), queryResponseResponseListener);
}

@Override
public void explain(ResponseListener<ExecutionEngine.ExplainResponse> listener) {
listener.onFailure(new NotImplementedException(
"`explain` feature for paginated requests is not implemented yet."));
}
}
Loading

0 comments on commit e951192

Please sign in to comment.