Skip to content

Commit

Permalink
Support for pagination in v2 engine of SELECT * FROM <table> queries (
Browse files Browse the repository at this point in the history
opensearch-project#1666)

v2 SQL engine can now paginate simple queries. Pagination is initiated by setting fetch_size property in the request JSON.

Pagination is implemented using the OpenSearch Scroll API. Please see pagination-v2.md for implementation details.
---------

Signed-off-by: MaxKsyunz <maxk@bitquilltech.com>
Signed-off-by: Yury-Fridlyand <yury.fridlyand@improving.com>
Signed-off-by: Max Ksyunz <maxk@bitquilltech.com>
Co-authored-by: Yury-Fridlyand <yury.fridlyand@improving.com>
Co-authored-by: GabeFernandez310 <Gabriel.Fernandez@improving.com>
Co-authored-by: Andrew Carbonetto <andrewc@bitquilltech.com>
Signed-off-by: Mitchell Gale <Mitchell.Gale@improving.com>
  • Loading branch information
4 people authored and MitchellGale committed Jun 12, 2023
1 parent 0e6a648 commit 99928e6
Show file tree
Hide file tree
Showing 141 changed files with 6,383 additions and 1,507 deletions.
1 change: 1 addition & 0 deletions core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ dependencies {
testImplementation('org.junit.jupiter:junit-jupiter:5.6.2')
testImplementation group: 'org.hamcrest', name: 'hamcrest-library', version: '2.1'
testImplementation group: 'org.mockito', name: 'mockito-core', version: '3.12.4'
testImplementation group: 'org.mockito', name: 'mockito-inline', version: '3.12.4'
testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter', version: '3.12.4'
}

Expand Down
26 changes: 24 additions & 2 deletions core/src/main/java/org/opensearch/sql/analysis/Analyzer.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

package org.opensearch.sql.analysis;

import static org.opensearch.sql.analysis.DataSourceSchemaIdentifierNameResolver.DEFAULT_DATASOURCE_NAME;
import static org.opensearch.sql.ast.tree.Sort.NullOrder.NULL_FIRST;
import static org.opensearch.sql.ast.tree.Sort.NullOrder.NULL_LAST;
import static org.opensearch.sql.ast.tree.Sort.SortOrder.ASC;
Expand Down Expand Up @@ -42,13 +43,16 @@
import org.opensearch.sql.ast.expression.UnresolvedExpression;
import org.opensearch.sql.ast.tree.AD;
import org.opensearch.sql.ast.tree.Aggregation;
import org.opensearch.sql.ast.tree.CloseCursor;
import org.opensearch.sql.ast.tree.Dedupe;
import org.opensearch.sql.ast.tree.Eval;
import org.opensearch.sql.ast.tree.FetchCursor;
import org.opensearch.sql.ast.tree.Filter;
import org.opensearch.sql.ast.tree.Head;
import org.opensearch.sql.ast.tree.Kmeans;
import org.opensearch.sql.ast.tree.Limit;
import org.opensearch.sql.ast.tree.ML;
import org.opensearch.sql.ast.tree.Paginate;
import org.opensearch.sql.ast.tree.Parse;
import org.opensearch.sql.ast.tree.Project;
import org.opensearch.sql.ast.tree.RareTopN;
Expand Down Expand Up @@ -80,12 +84,15 @@
import org.opensearch.sql.expression.parse.ParseExpression;
import org.opensearch.sql.planner.logical.LogicalAD;
import org.opensearch.sql.planner.logical.LogicalAggregation;
import org.opensearch.sql.planner.logical.LogicalCloseCursor;
import org.opensearch.sql.planner.logical.LogicalDedupe;
import org.opensearch.sql.planner.logical.LogicalEval;
import org.opensearch.sql.planner.logical.LogicalFetchCursor;
import org.opensearch.sql.planner.logical.LogicalFilter;
import org.opensearch.sql.planner.logical.LogicalLimit;
import org.opensearch.sql.planner.logical.LogicalML;
import org.opensearch.sql.planner.logical.LogicalMLCommons;
import org.opensearch.sql.planner.logical.LogicalPaginate;
import org.opensearch.sql.planner.logical.LogicalPlan;
import org.opensearch.sql.planner.logical.LogicalProject;
import org.opensearch.sql.planner.logical.LogicalRareTopN;
Expand Down Expand Up @@ -208,7 +215,6 @@ public LogicalPlan visitTableFunction(TableFunction node, AnalysisContext contex
tableFunctionImplementation.applyArguments());
}


@Override
public LogicalPlan visitLimit(Limit node, AnalysisContext context) {
LogicalPlan child = node.getChild().get(0).accept(this, context);
Expand Down Expand Up @@ -561,6 +567,23 @@ public LogicalPlan visitML(ML node, AnalysisContext context) {
return new LogicalML(child, node.getArguments());
}

@Override
public LogicalPlan visitPaginate(Paginate paginate, AnalysisContext context) {
LogicalPlan child = paginate.getChild().get(0).accept(this, context);
return new LogicalPaginate(paginate.getPageSize(), List.of(child));
}

@Override
public LogicalPlan visitFetchCursor(FetchCursor cursor, AnalysisContext context) {
return new LogicalFetchCursor(cursor.getCursor(),
dataSourceService.getDataSource(DEFAULT_DATASOURCE_NAME).getStorageEngine());
}

@Override
public LogicalPlan visitCloseCursor(CloseCursor closeCursor, AnalysisContext context) {
return new LogicalCloseCursor(closeCursor.getChild().get(0).accept(this, context));
}

/**
* The first argument is always "asc", others are optional.
* Given nullFirst argument, use its value. Otherwise just use DEFAULT_ASC/DESC.
Expand All @@ -576,5 +599,4 @@ private SortOption analyzeSortOption(List<Argument> fieldArgs) {
}
return asc ? SortOption.DEFAULT_ASC : SortOption.DEFAULT_DESC;
}

}
15 changes: 15 additions & 0 deletions core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,16 @@
import org.opensearch.sql.ast.statement.Statement;
import org.opensearch.sql.ast.tree.AD;
import org.opensearch.sql.ast.tree.Aggregation;
import org.opensearch.sql.ast.tree.CloseCursor;
import org.opensearch.sql.ast.tree.Dedupe;
import org.opensearch.sql.ast.tree.Eval;
import org.opensearch.sql.ast.tree.FetchCursor;
import org.opensearch.sql.ast.tree.Filter;
import org.opensearch.sql.ast.tree.Head;
import org.opensearch.sql.ast.tree.Kmeans;
import org.opensearch.sql.ast.tree.Limit;
import org.opensearch.sql.ast.tree.ML;
import org.opensearch.sql.ast.tree.Paginate;
import org.opensearch.sql.ast.tree.Parse;
import org.opensearch.sql.ast.tree.Project;
import org.opensearch.sql.ast.tree.RareTopN;
Expand Down Expand Up @@ -294,4 +297,16 @@ public T visitQuery(Query node, C context) {
public T visitExplain(Explain node, C context) {
return visitStatement(node, context);
}

public T visitPaginate(Paginate paginate, C context) {
return visitChildren(paginate, context);
}

public T visitFetchCursor(FetchCursor cursor, C context) {
return visitChildren(cursor, context);
}

public T visitCloseCursor(CloseCursor closeCursor, C context) {
return visitChildren(closeCursor, context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
public class Query extends Statement {

protected final UnresolvedPlan plan;
protected final int fetchSize;

@Override
public <R, C> R accept(AbstractNodeVisitor<R, C> visitor, C context) {
Expand Down
38 changes: 38 additions & 0 deletions core/src/main/java/org/opensearch/sql/ast/tree/CloseCursor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.ast.tree;

import java.util.List;
import org.opensearch.sql.ast.AbstractNodeVisitor;
import org.opensearch.sql.ast.Node;

/**
* AST node to represent close cursor operation.
* Actually a wrapper to the AST.
*/
public class CloseCursor extends UnresolvedPlan {

/**
* An instance of {@link FetchCursor}.
*/
private UnresolvedPlan cursor;

@Override
public <T, C> T accept(AbstractNodeVisitor<T, C> nodeVisitor, C context) {
return nodeVisitor.visitCloseCursor(this, context);
}

@Override
public UnresolvedPlan attach(UnresolvedPlan child) {
this.cursor = child;
return this;
}

@Override
public List<? extends Node> getChild() {
return List.of(cursor);
}
}
32 changes: 32 additions & 0 deletions core/src/main/java/org/opensearch/sql/ast/tree/FetchCursor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.ast.tree;

import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.opensearch.sql.ast.AbstractNodeVisitor;

/**
* An unresolved plan that represents fetching the next
* batch in paginationed plan.
*/
@RequiredArgsConstructor
@EqualsAndHashCode(callSuper = false)
public class FetchCursor extends UnresolvedPlan {
@Getter
final String cursor;

@Override
public <T, C> T accept(AbstractNodeVisitor<T, C> nodeVisitor, C context) {
return nodeVisitor.visitFetchCursor(this, context);
}

@Override
public UnresolvedPlan attach(UnresolvedPlan child) {
throw new UnsupportedOperationException("Cursor unresolved plan does not support children");
}
}
48 changes: 48 additions & 0 deletions core/src/main/java/org/opensearch/sql/ast/tree/Paginate.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.ast.tree;

import java.util.List;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
import org.opensearch.sql.ast.AbstractNodeVisitor;
import org.opensearch.sql.ast.Node;

/**
* AST node to represent pagination operation.
* Actually a wrapper to the AST.
*/
@RequiredArgsConstructor
@EqualsAndHashCode(callSuper = false)
@ToString
public class Paginate extends UnresolvedPlan {
@Getter
private final int pageSize;
private UnresolvedPlan child;

public Paginate(int pageSize, UnresolvedPlan child) {
this.pageSize = pageSize;
this.child = child;
}

@Override
public List<? extends Node> getChild() {
return List.of(child);
}

@Override
public <T, C> T accept(AbstractNodeVisitor<T, C> nodeVisitor, C context) {
return nodeVisitor.visitPaginate(this, context);
}

@Override
public UnresolvedPlan attach(UnresolvedPlan child) {
this.child = child;
return this;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.exception;

/**
* This should be thrown on serialization of a PhysicalPlan tree if paging is finished.
* Processing of such exception should outcome of responding no cursor to the user.
*/
public class NoCursorException extends RuntimeException {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.exception;

/**
* This should be thrown by V2 engine to support fallback scenario.
*/
public class UnsupportedCursorRequestException extends RuntimeException {
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.sql.common.response.ResponseListener;
import org.opensearch.sql.data.model.ExprValue;
import org.opensearch.sql.data.type.ExprType;
import org.opensearch.sql.executor.pagination.Cursor;
import org.opensearch.sql.planner.physical.PhysicalPlan;

/**
Expand Down Expand Up @@ -53,6 +54,7 @@ void execute(PhysicalPlan plan, ExecutionContext context,
class QueryResponse {
private final Schema schema;
private final List<ExprValue> results;
private final Cursor cursor;
}

@Data
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.sql.executor.execution;

import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.common.response.ResponseListener;
import org.opensearch.sql.executor.ExecutionEngine;
import org.opensearch.sql.executor.QueryId;
import org.opensearch.sql.executor.QueryService;

/**
* Query plan which does not reflect a search query being executed.
* It contains a command or an action, for example, a DDL query.
*/
public class CommandPlan extends AbstractPlan {

/**
* The query plan ast.
*/
protected final UnresolvedPlan plan;

/**
* Query service.
*/
protected final QueryService queryService;

protected final ResponseListener<ExecutionEngine.QueryResponse> listener;

/** Constructor. */
public CommandPlan(QueryId queryId, UnresolvedPlan plan, QueryService queryService,
ResponseListener<ExecutionEngine.QueryResponse> listener) {
super(queryId);
this.plan = plan;
this.queryService = queryService;
this.listener = listener;
}

@Override
public void execute() {
queryService.execute(plan, listener);
}

@Override
public void explain(ResponseListener<ExecutionEngine.ExplainResponse> listener) {
throw new UnsupportedOperationException("CommandPlan does not support explain");
}
}
Loading

0 comments on commit 99928e6

Please sign in to comment.