Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge OpenSearchPagedIndexScan and OpenSearchIndexScan #1600

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
68db706
Remove OpenSearchPagedIndexScan and related classes.
Apr 28, 2023
ef93375
Some bug fixes
May 2, 2023
306a7fe
Updating tests
May 2, 2023
7d5b126
Updating PaginationWindowIT
May 2, 2023
dc2c471
Addressing Code Analysis warnings
May 2, 2023
91cea61
Do not serialize OpenSearchScrollRequest when needClean is true.
May 2, 2023
2ee810c
Fix checkstyle errors.
May 2, 2023
39ce902
Complete unit test coverage.
May 4, 2023
fabd4ee
Code improvements
May 4, 2023
6eb0498
Checkstyle fixes
May 4, 2023
bdfe563
Updating expected out in doctest
May 4, 2023
1f6b6f1
Refactoring
May 5, 2023
a8295e4
Checkstyle fixes
May 5, 2023
86429f8
Rename createContinuePaginatedPlan in QueryPlanFactory to create
May 5, 2023
4f5b69d
Address PR comments.
May 9, 2023
b1050dd
Address PR comments.
May 10, 2023
a16f2fa
WIP OpenSearchIndexScan refactor. Unit tests pass.
May 12, 2023
eff76b2
Refactor OpenSearchIndexScan and OpenSearchRequest.
May 16, 2023
8548d90
Refactor OpenSearchIndexScan and OpenSearchRequest.
May 16, 2023
f4770a8
WIP
May 17, 2023
e350662
Updating imports to reflect changes in opensearch core. (#1645)
May 23, 2023
413087a
Merge branch 'feature/pagination/integ' into feature/pagination/integ…
May 24, 2023
7e812e5
Integrating with main.
May 24, 2023
1daacbb
Merge branch 'feature/pagination/integ' into feature/pagination/integ…
May 24, 2023
f1e1342
Address refactoring comments WIP
May 25, 2023
73f18df
Restore error to explain requests containing only a cursor.
May 25, 2023
60226be
Complete test coverage.
May 26, 2023
fabb179
Address checkstyle issues.
May 26, 2023
23bc3ab
Better class name.
May 26, 2023
264e483
Update design document to reflect refactor.
May 26, 2023
97ef3a5
Addressed PR feedback.
May 26, 2023
2cecaca
Addressed PR feedback.
May 26, 2023
840c4a9
Minor cleanup.
May 26, 2023
4c9e958
Update core/src/main/java/org/opensearch/sql/planner/logical/LogicalP…
May 26, 2023
102706b
Update core/src/main/java/org/opensearch/sql/planner/logical/LogicalP…
May 26, 2023
2f4b48b
Update core/src/main/java/org/opensearch/sql/ast/tree/FetchCursor.java
May 26, 2023
33ad6dd
Minor cleanup 2
May 27, 2023
0abe298
Remove assertions that no longer apply
May 29, 2023
1dc3320
Minor cleanup
May 29, 2023
701bce7
Update test to account for prior changes
May 29, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions core/src/main/java/org/opensearch/sql/analysis/Analyzer.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

package org.opensearch.sql.analysis;

import static org.opensearch.sql.analysis.DataSourceSchemaIdentifierNameResolver.DEFAULT_DATASOURCE_NAME;
import static org.opensearch.sql.ast.tree.Sort.NullOrder.NULL_FIRST;
import static org.opensearch.sql.ast.tree.Sort.NullOrder.NULL_LAST;
import static org.opensearch.sql.ast.tree.Sort.SortOrder.ASC;
Expand Down Expand Up @@ -44,6 +45,7 @@
import org.opensearch.sql.ast.tree.Aggregation;
import org.opensearch.sql.ast.tree.Dedupe;
import org.opensearch.sql.ast.tree.Eval;
import org.opensearch.sql.ast.tree.FetchCursor;
import org.opensearch.sql.ast.tree.Filter;
import org.opensearch.sql.ast.tree.Head;
import org.opensearch.sql.ast.tree.Kmeans;
Expand All @@ -64,7 +66,6 @@
import org.opensearch.sql.common.antlr.SyntaxCheckException;
import org.opensearch.sql.data.model.ExprMissingValue;
import org.opensearch.sql.data.type.ExprCoreType;
import org.opensearch.sql.data.type.ExprType;
import org.opensearch.sql.datasource.DataSourceService;
import org.opensearch.sql.exception.SemanticCheckException;
import org.opensearch.sql.expression.DSL;
Expand All @@ -84,6 +85,7 @@
import org.opensearch.sql.planner.logical.LogicalAggregation;
import org.opensearch.sql.planner.logical.LogicalDedupe;
import org.opensearch.sql.planner.logical.LogicalEval;
import org.opensearch.sql.planner.logical.LogicalFetchCursor;
import org.opensearch.sql.planner.logical.LogicalFilter;
import org.opensearch.sql.planner.logical.LogicalLimit;
import org.opensearch.sql.planner.logical.LogicalML;
Expand Down Expand Up @@ -211,7 +213,6 @@ public LogicalPlan visitTableFunction(TableFunction node, AnalysisContext contex
tableFunctionImplementation.applyArguments());
}


@Override
public LogicalPlan visitLimit(Limit node, AnalysisContext context) {
LogicalPlan child = node.getChild().get(0).accept(this, context);
Expand Down Expand Up @@ -587,4 +588,9 @@ private SortOption analyzeSortOption(List<Argument> fieldArgs) {
return asc ? SortOption.DEFAULT_ASC : SortOption.DEFAULT_DESC;
}

@Override
public LogicalPlan visitFetchCursor(FetchCursor cursor, AnalysisContext context) {
return new LogicalFetchCursor(cursor.getCursor(),
dataSourceService.getDataSource(DEFAULT_DATASOURCE_NAME).getStorageEngine());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.opensearch.sql.ast.tree.Aggregation;
import org.opensearch.sql.ast.tree.Dedupe;
import org.opensearch.sql.ast.tree.Eval;
import org.opensearch.sql.ast.tree.FetchCursor;
import org.opensearch.sql.ast.tree.Filter;
import org.opensearch.sql.ast.tree.Head;
import org.opensearch.sql.ast.tree.Kmeans;
Expand Down Expand Up @@ -299,4 +300,8 @@ public T visitExplain(Explain node, C context) {
public T visitPaginate(Paginate paginate, C context) {
return visitChildren(paginate, context);
}

public T visitFetchCursor(FetchCursor cursor, C context) {
return visit(cursor, context);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit

Suggested change
return visit(cursor, context);
return visitChildren(cursor, context);

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lave for later

}
}
32 changes: 32 additions & 0 deletions core/src/main/java/org/opensearch/sql/ast/tree/FetchCursor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.ast.tree;

import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.opensearch.sql.ast.AbstractNodeVisitor;

/**
* An unresolved plan that represents fetching the next
* batch in paginationed plan.
*/
@RequiredArgsConstructor
@EqualsAndHashCode(callSuper = false)
public class FetchCursor extends UnresolvedPlan {
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved
@Getter
final String cursor;

@Override
public <T, C> T accept(AbstractNodeVisitor<T, C> nodeVisitor, C context) {
return nodeVisitor.visitFetchCursor(this, context);
}

@Override
public UnresolvedPlan attach(UnresolvedPlan child) {
throw new UnsupportedOperationException("Cursor unresolved plan does not support children");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,6 @@ public void execute(UnresolvedPlan plan,
}
}

/**
* Execute a physical plan without analyzing or planning anything.
*/
public void executePlan(PhysicalPlan plan,
ResponseListener<ExecutionEngine.QueryResponse> listener) {
executionEngine.execute(plan, ExecutionContext.emptyExecutionContext(), listener);
}

/**
* Execute the {@link UnresolvedPlan}, with {@link PlanContext} and using {@link ResponseListener}
* to get response.
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@
import org.opensearch.sql.ast.statement.Explain;
import org.opensearch.sql.ast.statement.Query;
import org.opensearch.sql.ast.statement.Statement;
import org.opensearch.sql.ast.tree.FetchCursor;
import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.common.response.ResponseListener;
import org.opensearch.sql.exception.UnsupportedCursorRequestException;
import org.opensearch.sql.executor.ExecutionEngine;
import org.opensearch.sql.executor.QueryId;
import org.opensearch.sql.executor.QueryService;
import org.opensearch.sql.executor.pagination.PlanSerializer;
import org.opensearch.sql.executor.pagination.CanPaginateVisitor;

/**
* QueryExecution Factory.
Expand All @@ -39,7 +41,6 @@ public class QueryPlanFactory
* Query Service.
*/
private final QueryService queryService;
private final PlanSerializer planSerializer;

/**
* NO_CONSUMER_RESPONSE_LISTENER should never be called. It is only used as constructor
Expand All @@ -65,25 +66,28 @@ public void onFailure(Exception e) {
/**
* Create QueryExecution from Statement.
*/
public AbstractPlan createContinuePaginatedPlan(
public AbstractPlan create(
Statement statement,
Optional<ResponseListener<ExecutionEngine.QueryResponse>> queryListener,
Optional<ResponseListener<ExecutionEngine.ExplainResponse>> explainListener) {
return statement.accept(this, Pair.of(queryListener, explainListener));
}

/**
* Creates a ContinuePaginatedPlan from a cursor.
* Creates a QueryPlan from a cursor.
*/
public AbstractPlan createContinuePaginatedPlan(String cursor, boolean isExplain,
ResponseListener<ExecutionEngine.QueryResponse> queryResponseListener,
ResponseListener<ExecutionEngine.ExplainResponse> explainListener) {
public AbstractPlan create(String cursor, boolean isExplain,
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved
ResponseListener<ExecutionEngine.QueryResponse> queryResponseListener,
ResponseListener<ExecutionEngine.ExplainResponse> explainListener) {
QueryId queryId = QueryId.queryId();
var plan = new ContinuePaginatedPlan(queryId, cursor, queryService,
planSerializer, queryResponseListener);
var plan = new QueryPlan(queryId, new FetchCursor(cursor), queryService, queryResponseListener);
return isExplain ? new ExplainPlan(queryId, plan, explainListener) : plan;
}

boolean canConvertToCursor(UnresolvedPlan plan) {
return plan.accept(new CanPaginateVisitor(), null);
}

@Override
public AbstractPlan visitQuery(
Query node,
Expand All @@ -94,7 +98,7 @@ public AbstractPlan visitQuery(
context.getLeft().isPresent(), "[BUG] query listener must be not null");

if (node.getFetchSize() > 0) {
if (planSerializer.canConvertToCursor(node.getPlan())) {
if (canConvertToCursor(node.getPlan())) {
return new QueryPlan(QueryId.queryId(), node.getPlan(), node.getFetchSize(),
queryService,
context.getLeft().get());
Expand All @@ -119,7 +123,7 @@ public AbstractPlan visitExplain(

return new ExplainPlan(
QueryId.queryId(),
createContinuePaginatedPlan(node.getStatement(),
create(node.getStatement(),
Optional.of(NO_CONSUMER_RESPONSE_LISTENER), Optional.empty()),
context.getRight().get());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import lombok.RequiredArgsConstructor;
import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.exception.NoCursorException;
import org.opensearch.sql.planner.SerializablePlan;
import org.opensearch.sql.planner.physical.PhysicalPlan;
Expand All @@ -34,9 +33,6 @@ public class PlanSerializer {

private final StorageEngine engine;

public boolean canConvertToCursor(UnresolvedPlan plan) {
return plan.accept(new CanPaginateVisitor(), null);
}

/**
* Converts a physical plan tree to a cursor.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@

package org.opensearch.sql.planner;

import org.opensearch.sql.executor.pagination.PlanSerializer;
import org.opensearch.sql.planner.logical.LogicalAggregation;
import org.opensearch.sql.planner.logical.LogicalDedupe;
import org.opensearch.sql.planner.logical.LogicalEval;
import org.opensearch.sql.planner.logical.LogicalFetchCursor;
import org.opensearch.sql.planner.logical.LogicalFilter;
import org.opensearch.sql.planner.logical.LogicalLimit;
import org.opensearch.sql.planner.logical.LogicalNested;
Expand Down Expand Up @@ -148,6 +150,11 @@ public PhysicalPlan visitRelation(LogicalRelation node, C context) {
+ "implementing and optimizing logical plan with relation involved");
}

@Override
public PhysicalPlan visitFetchCursor(LogicalFetchCursor plan, C context) {
return new PlanSerializer(plan.getEngine()).convertToPlan(plan.getCursor());
}

protected PhysicalPlan visitChild(LogicalPlan node, C context) {
// Logical operators visited here must have a single child
return node.getChild().get(0).accept(this, context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,6 @@
package org.opensearch.sql.planner;

import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import org.opensearch.sql.executor.pagination.PlanSerializer;

/**
* All subtypes of PhysicalPlan which needs to be serialized (in cursor, for pagination feature)
Expand All @@ -29,21 +25,6 @@
*/
public interface SerializablePlan extends Externalizable {

/**
* Argument is an instance of {@link PlanSerializer.CursorDeserializationStream}.
*/
@Override
void readExternal(ObjectInput in) throws IOException, ClassNotFoundException;

/**
* Each plan which has as a child plan should do.
* <pre>{@code
* out.writeObject(input.getPlanForSerialization());
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved
* }</pre>
*/
@Override
void writeExternal(ObjectOutput out) throws IOException;

/**
* Override to return child or delegated plan, so parent plan should skip this one
* for serialization, but it should try to serialize grandchild plan.
Expand All @@ -55,6 +36,10 @@ public interface SerializablePlan extends Externalizable {
* </pre>
* In that case only plans A and C should be attempted to serialize.
* It is needed to skip a `ResourceMonitorPlan` instance only, actually.
*
* <pre>{@code
* * A.writeObject(B.getPlanForSerialization());
* }</pre>
* @return Next plan for serialization.
*/
default SerializablePlan getPlanForSerialization() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner.logical;

import java.util.List;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;
import org.opensearch.sql.planner.logical.LogicalPlan;
import org.opensearch.sql.planner.logical.LogicalPlanNodeVisitor;
import org.opensearch.sql.storage.StorageEngine;

@EqualsAndHashCode(callSuper = false)
@ToString
public class LogicalFetchCursor extends LogicalPlan {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

javadoc?

@Getter
private final String cursor;

@Getter
private final StorageEngine engine;

/**
* LogicalCursor constructor. Does not have child plans.
*/
public LogicalFetchCursor(String cursor, StorageEngine engine) {
super(List.of());
this.cursor = cursor;
this.engine = engine;
}

@Override
public <R, C> R accept(LogicalPlanNodeVisitor<R, C> visitor, C context) {
return visitor.visitFetchCursor(this, context);
}
}
Loading