Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge paginated plan optimizer into the regular optimizer. #1516

12 changes: 1 addition & 11 deletions core/src/main/java/org/opensearch/sql/executor/QueryService.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@
import org.opensearch.sql.common.response.ResponseListener;
import org.opensearch.sql.planner.PlanContext;
import org.opensearch.sql.planner.Planner;
import org.opensearch.sql.planner.logical.LogicalPaginate;
import org.opensearch.sql.planner.logical.LogicalPlan;
import org.opensearch.sql.planner.optimizer.LogicalPlanOptimizer;
import org.opensearch.sql.planner.physical.PhysicalPlan;

/**
Expand All @@ -30,15 +28,7 @@ public class QueryService {

private final ExecutionEngine executionEngine;

/**
* There are two planners, one - to handle pagination requests (cursor/scroll) only and
* another one for everything else.
* @see OpenSearchPluginModule#queryPlanFactory (:plugin module)
* @see LogicalPlanOptimizer#paginationCreate
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved
* @see QueryService
*/
private final Planner planner;
private final Planner paginationPlanner;

/**
* Execute the {@link UnresolvedPlan}, using {@link ResponseListener} to get response.
Expand Down Expand Up @@ -115,6 +105,6 @@ public LogicalPlan analyze(UnresolvedPlan plan) {
* Translate {@link LogicalPlan} to {@link PhysicalPlan}.
*/
public PhysicalPlan plan(LogicalPlan plan) {
return plan instanceof LogicalPaginate ? paginationPlanner.plan(plan) : planner.plan(plan);
return planner.plan(plan);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ public static LogicalPlan rename(
return new LogicalRename(input, renameMap);
}

public static LogicalPlan paginate(LogicalPlan input, int fetchSize) {
return new LogicalPaginate(fetchSize, List.of(input));
}

public static LogicalPlan project(LogicalPlan input, NamedExpression... fields) {
return new LogicalProject(input, Arrays.asList(fields), ImmutableList.of());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import com.google.common.collect.ImmutableList;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import org.opensearch.sql.storage.Table;

Expand All @@ -26,18 +25,13 @@ public class LogicalRelation extends LogicalPlan {
@Getter
private final Table table;

@Getter
@Setter
private Integer pageSize;

/**
* Constructor of LogicalRelation.
*/
public LogicalRelation(String relationName, Table table) {
super(ImmutableList.of());
this.relationName = relationName;
this.table = table;
this.pageSize = null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import org.opensearch.sql.planner.optimizer.rule.CreatePagingTableScanBuilder;
import org.opensearch.sql.planner.optimizer.rule.MergeFilterAndFilter;
import org.opensearch.sql.planner.optimizer.rule.PushFilterUnderSort;
import org.opensearch.sql.planner.optimizer.rule.PushPageSize;
import org.opensearch.sql.planner.optimizer.rule.read.CreateTableScanBuilder;
import org.opensearch.sql.planner.optimizer.rule.read.TableScanPushDown;
import org.opensearch.sql.planner.optimizer.rule.write.CreateTableWriteBuilder;
Expand Down Expand Up @@ -53,29 +52,6 @@ public static LogicalPlanOptimizer create() {
* Phase 2: Transformations that rely on data source push down capability
*/
new CreateTableScanBuilder(),
TableScanPushDown.PUSH_DOWN_FILTER,
TableScanPushDown.PUSH_DOWN_AGGREGATION,
TableScanPushDown.PUSH_DOWN_SORT,
TableScanPushDown.PUSH_DOWN_LIMIT,
TableScanPushDown.PUSH_DOWN_HIGHLIGHT,
TableScanPushDown.PUSH_DOWN_PROJECT,
new CreateTableWriteBuilder()));
}

/**
* Create {@link LogicalPlanOptimizer} with pre-defined rules.
*/
public static LogicalPlanOptimizer paginationCreate() {
return new LogicalPlanOptimizer(Arrays.asList(
/*
* Phase 1: Transformations that rely on relational algebra equivalence
*/
new MergeFilterAndFilter(),
new PushFilterUnderSort(),
/*
* Phase 2: Transformations that rely on data source push down capability
*/
new PushPageSize(),
new CreatePagingTableScanBuilder(),
TableScanPushDown.PUSH_DOWN_FILTER,
TableScanPushDown.PUSH_DOWN_AGGREGATION,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,45 +5,66 @@

package org.opensearch.sql.planner.optimizer.rule;

import static org.opensearch.sql.planner.optimizer.pattern.Patterns.table;

import com.facebook.presto.matching.Capture;
import com.facebook.presto.matching.Captures;
import com.facebook.presto.matching.Pattern;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.List;
import lombok.Getter;
import lombok.experimental.Accessors;
import org.opensearch.sql.planner.logical.LogicalPaginate;
import org.opensearch.sql.planner.logical.LogicalPlan;
import org.opensearch.sql.planner.logical.LogicalRelation;
import org.opensearch.sql.planner.optimizer.Rule;
import org.opensearch.sql.storage.Table;
import org.opensearch.sql.storage.read.TableScanBuilder;

/**
* Rule to create a paged TableScanBuilder in pagination request.
*/
public class CreatePagingTableScanBuilder implements Rule<LogicalRelation> {
/** Capture the table inside matched logical relation operator. */
private final Capture<Table> capture;

public class CreatePagingTableScanBuilder implements Rule<LogicalPaginate> {
/** Capture the table inside matched logical paginate operator. */
private LogicalPlan relationParent = null;
/** Pattern that matches logical relation operator. */
@Accessors(fluent = true)
@Getter
private final Pattern<LogicalRelation> pattern;
private final Pattern<LogicalPaginate> pattern;

/**
* Constructor.
*/
public CreatePagingTableScanBuilder() {
this.capture = Capture.newCapture();
this.pattern = Pattern.typeOf(LogicalRelation.class)
.with(table().capturedAs(capture));
this.pattern = Pattern.typeOf(LogicalPaginate.class).matching(this::findLogicalRelation);
}

/**
* Finds an instance of LogicalRelation and saves a reference in relationParent variable.
* @param logicalPaginate An instance of LogicalPaginate
* @return true if {@link LogicalRelation} node was found among the descendents of
* {@link this.logicalPaginate}, false otherwise.
*/
private boolean findLogicalRelation(LogicalPaginate logicalPaginate) {
Deque<LogicalPlan> plans = new ArrayDeque<>();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: you could create ArrayDeque with logicalPagination

new ArrayDeque<>(logicalPaginate);

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sadly, I cannot -- that constructor expects a collection.

Compared to adding a conversion to collection, just calling push is simpler.

plans.push(logicalPaginate);
do {
var plan = plans.pop();
if (plan.getChild().stream().anyMatch(LogicalRelation.class::isInstance)) {
if (plan.getChild().size() > 1) {
throw new UnsupportedOperationException();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a message to explain what's wrong here

}
relationParent = plan;
return true;
}
plan.getChild().forEach(plans::push);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's an addAll() but no pushAll() method... hmm
Maybe use addAll() and removeFirst() instead?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll switch for the sake of removing forEach(plans:push).

} while (!plans.isEmpty());
return false;
}


@Override
public LogicalPlan apply(LogicalRelation plan, Captures captures) {
TableScanBuilder scanBuilder = captures.get(capture)
.createPagedScanBuilder(plan.getPageSize());
// TODO: Remove this after Prometheus refactored to new table scan builder too
return (scanBuilder == null) ? plan : scanBuilder;
public LogicalPlan apply(LogicalPaginate plan, Captures captures) {
var logicalRelation = (LogicalRelation) relationParent.getChild().get(0);
var scan = logicalRelation.getTable().createPagedScanBuilder(plan.getPageSize());
relationParent.replaceChildPlans(List.of(scan));

return plan;
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,6 @@ class QueryServiceTest {
@Mock
private Planner planner;

@Mock
private Planner paginationPlanner;

@Mock
private UnresolvedPlan ast;

Expand Down Expand Up @@ -120,9 +117,8 @@ class Helper {
public Helper() {
lenient().when(analyzer.analyze(any(), any())).thenReturn(logicalPlan);
lenient().when(planner.plan(any())).thenReturn(plan);
lenient().when(paginationPlanner.plan(any())).thenReturn(plan);

queryService = new QueryService(analyzer, executionEngine, planner, paginationPlanner);
queryService = new QueryService(analyzer, executionEngine, planner);
}

Helper executeSuccess() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public static void setUp() {
when(storageEngine.getTableScan(anyString(), anyString()))
.thenReturn(mock(TableScanOperator.class));
paginatedPlanCache = new PaginatedPlanCache(storageEngine);
queryService = new QueryService(null, new DefaultExecutionEngine(), null, null);
queryService = new QueryService(null, new DefaultExecutionEngine(), null);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.opensearch.sql.executor.QueryService;
import org.opensearch.sql.planner.Planner;
import org.opensearch.sql.planner.logical.LogicalPaginate;
import org.opensearch.sql.planner.logical.LogicalPlan;
import org.opensearch.sql.planner.physical.PhysicalPlan;

@DisplayNameGeneration(DisplayNameGenerator.ReplaceUnderscores.class)
Expand All @@ -43,7 +42,7 @@ public static void setUp() {
when(analyzer.analyze(any(), any())).thenReturn(mock(LogicalPaginate.class));
var planner = mock(Planner.class);
when(planner.plan(any())).thenReturn(mock(PhysicalPlan.class));
queryService = new QueryService(analyzer, new DefaultExecutionEngine(), null, planner);
queryService = new QueryService(analyzer, new DefaultExecutionEngine(), planner);
}

@Test
Expand Down Expand Up @@ -79,7 +78,7 @@ public void onFailure(Exception e) {
}
};
var plan = new PaginatedPlan(QueryId.None, mock(UnresolvedPlan.class), 10,
new QueryService(null, new DefaultExecutionEngine(), null, null), listener);
new QueryService(null, new DefaultExecutionEngine(), null), listener);
plan.execute();
}

Expand Down
Loading