Skip to content

Commit

Permalink
Fix conflicts: Merge paginated plan optimizer into the regular optimi…
Browse files Browse the repository at this point in the history
…zer. (opensearch-project#1516)

Merge paginated plan optimizer into the regular optimizer.
---------

Signed-off-by: MaxKsyunz <maxk@bitquilltech.com>
Co-authored-by: Yury-Fridlyand <yury.fridlyand@improving.com>
  • Loading branch information
2 people authored and acarbonetto committed Apr 18, 2023
1 parent b930a41 commit bacec9d
Show file tree
Hide file tree
Showing 14 changed files with 145 additions and 156 deletions.
12 changes: 1 addition & 11 deletions core/src/main/java/org/opensearch/sql/executor/QueryService.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@
import org.opensearch.sql.common.response.ResponseListener;
import org.opensearch.sql.planner.PlanContext;
import org.opensearch.sql.planner.Planner;
import org.opensearch.sql.planner.logical.LogicalPaginate;
import org.opensearch.sql.planner.logical.LogicalPlan;
import org.opensearch.sql.planner.optimizer.LogicalPlanOptimizer;
import org.opensearch.sql.planner.physical.PhysicalPlan;

/**
Expand All @@ -30,15 +28,7 @@ public class QueryService {

private final ExecutionEngine executionEngine;

/**
* There are two planners, one - to handle pagination requests (cursor/scroll) only and
* another one for everything else.
* @see OpenSearchPluginModule#queryPlanFactory (:plugin module)
* @see LogicalPlanOptimizer#paginationCreate
* @see QueryService
*/
private final Planner planner;
private final Planner paginationPlanner;

/**
* Execute the {@link UnresolvedPlan}, using {@link ResponseListener} to get response.
Expand Down Expand Up @@ -115,6 +105,6 @@ public LogicalPlan analyze(UnresolvedPlan plan) {
* Translate {@link LogicalPlan} to {@link PhysicalPlan}.
*/
public PhysicalPlan plan(LogicalPlan plan) {
return plan instanceof LogicalPaginate ? paginationPlanner.plan(plan) : planner.plan(plan);
return planner.plan(plan);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ public static LogicalPlan rename(
return new LogicalRename(input, renameMap);
}

public static LogicalPlan paginate(LogicalPlan input, int fetchSize) {
return new LogicalPaginate(fetchSize, List.of(input));
}

public static LogicalPlan project(LogicalPlan input, NamedExpression... fields) {
return new LogicalProject(input, Arrays.asList(fields), ImmutableList.of());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import com.google.common.collect.ImmutableList;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import org.opensearch.sql.storage.Table;

Expand All @@ -26,18 +25,13 @@ public class LogicalRelation extends LogicalPlan {
@Getter
private final Table table;

@Getter
@Setter
private Integer pageSize;

/**
* Constructor of LogicalRelation.
*/
public LogicalRelation(String relationName, Table table) {
super(ImmutableList.of());
this.relationName = relationName;
this.table = table;
this.pageSize = null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import org.opensearch.sql.planner.optimizer.rule.CreatePagingTableScanBuilder;
import org.opensearch.sql.planner.optimizer.rule.MergeFilterAndFilter;
import org.opensearch.sql.planner.optimizer.rule.PushFilterUnderSort;
import org.opensearch.sql.planner.optimizer.rule.PushPageSize;
import org.opensearch.sql.planner.optimizer.rule.read.CreateTableScanBuilder;
import org.opensearch.sql.planner.optimizer.rule.read.TableScanPushDown;
import org.opensearch.sql.planner.optimizer.rule.write.CreateTableWriteBuilder;
Expand Down Expand Up @@ -53,29 +52,6 @@ public static LogicalPlanOptimizer create() {
* Phase 2: Transformations that rely on data source push down capability
*/
new CreateTableScanBuilder(),
TableScanPushDown.PUSH_DOWN_FILTER,
TableScanPushDown.PUSH_DOWN_AGGREGATION,
TableScanPushDown.PUSH_DOWN_SORT,
TableScanPushDown.PUSH_DOWN_LIMIT,
TableScanPushDown.PUSH_DOWN_HIGHLIGHT,
TableScanPushDown.PUSH_DOWN_PROJECT,
new CreateTableWriteBuilder()));
}

/**
* Create {@link LogicalPlanOptimizer} with pre-defined rules.
*/
public static LogicalPlanOptimizer paginationCreate() {
return new LogicalPlanOptimizer(Arrays.asList(
/*
* Phase 1: Transformations that rely on relational algebra equivalence
*/
new MergeFilterAndFilter(),
new PushFilterUnderSort(),
/*
* Phase 2: Transformations that rely on data source push down capability
*/
new PushPageSize(),
new CreatePagingTableScanBuilder(),
TableScanPushDown.PUSH_DOWN_FILTER,
TableScanPushDown.PUSH_DOWN_AGGREGATION,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,45 +5,68 @@

package org.opensearch.sql.planner.optimizer.rule;

import static org.opensearch.sql.planner.optimizer.pattern.Patterns.table;

import com.facebook.presto.matching.Capture;
import com.facebook.presto.matching.Captures;
import com.facebook.presto.matching.Pattern;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.List;
import lombok.Getter;
import lombok.experimental.Accessors;
import org.opensearch.sql.planner.logical.LogicalPaginate;
import org.opensearch.sql.planner.logical.LogicalPlan;
import org.opensearch.sql.planner.logical.LogicalRelation;
import org.opensearch.sql.planner.optimizer.Rule;
import org.opensearch.sql.storage.Table;
import org.opensearch.sql.storage.read.TableScanBuilder;

/**
* Rule to create a paged TableScanBuilder in pagination request.
*/
public class CreatePagingTableScanBuilder implements Rule<LogicalRelation> {
/** Capture the table inside matched logical relation operator. */
private final Capture<Table> capture;

public class CreatePagingTableScanBuilder implements Rule<LogicalPaginate> {
/** Capture the table inside matched logical paginate operator. */
private LogicalPlan relationParent = null;
/** Pattern that matches logical relation operator. */
@Accessors(fluent = true)
@Getter
private final Pattern<LogicalRelation> pattern;
private final Pattern<LogicalPaginate> pattern;

/**
* Constructor.
*/
public CreatePagingTableScanBuilder() {
this.capture = Capture.newCapture();
this.pattern = Pattern.typeOf(LogicalRelation.class)
.with(table().capturedAs(capture));
this.pattern = Pattern.typeOf(LogicalPaginate.class).matching(this::findLogicalRelation);
}

/**
* Finds an instance of LogicalRelation and saves a reference in relationParent variable.
* @param logicalPaginate An instance of LogicalPaginate
* @return true if {@link LogicalRelation} node was found among the descendents of
* {@link this.logicalPaginate}, false otherwise.
*/
private boolean findLogicalRelation(LogicalPaginate logicalPaginate) {
Deque<LogicalPlan> plans = new ArrayDeque<>();
plans.add(logicalPaginate);
do {
final var plan = plans.removeFirst();
final var children = plan.getChild();
if (children.stream().anyMatch(LogicalRelation.class::isInstance)) {
if (children.size() > 1) {
throw new UnsupportedOperationException(
"Unsupported plan: relation operator cannot have siblings");
}
relationParent = plan;
return true;
}
plans.addAll(children);
} while (!plans.isEmpty());
return false;
}


@Override
public LogicalPlan apply(LogicalRelation plan, Captures captures) {
TableScanBuilder scanBuilder = captures.get(capture)
.createPagedScanBuilder(plan.getPageSize());
// TODO: Remove this after Prometheus refactored to new table scan builder too
return (scanBuilder == null) ? plan : scanBuilder;
public LogicalPlan apply(LogicalPaginate plan, Captures captures) {
var logicalRelation = (LogicalRelation) relationParent.getChild().get(0);
var scan = logicalRelation.getTable().createPagedScanBuilder(plan.getPageSize());
relationParent.replaceChildPlans(List.of(scan));

return plan;
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,6 @@ class QueryServiceTest {
@Mock
private Planner planner;

@Mock
private Planner paginationPlanner;

@Mock
private UnresolvedPlan ast;

Expand Down Expand Up @@ -120,9 +117,8 @@ class Helper {
public Helper() {
lenient().when(analyzer.analyze(any(), any())).thenReturn(logicalPlan);
lenient().when(planner.plan(any())).thenReturn(plan);
lenient().when(paginationPlanner.plan(any())).thenReturn(plan);

queryService = new QueryService(analyzer, executionEngine, planner, paginationPlanner);
queryService = new QueryService(analyzer, executionEngine, planner);
}

Helper executeSuccess() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.opensearch.sql.executor.QueryService;
import org.opensearch.sql.planner.Planner;
import org.opensearch.sql.planner.logical.LogicalPaginate;
import org.opensearch.sql.planner.logical.LogicalPlan;
import org.opensearch.sql.planner.physical.PhysicalPlan;

@DisplayNameGeneration(DisplayNameGenerator.ReplaceUnderscores.class)
Expand All @@ -43,7 +42,7 @@ public static void setUp() {
when(analyzer.analyze(any(), any())).thenReturn(mock(LogicalPaginate.class));
var planner = mock(Planner.class);
when(planner.plan(any())).thenReturn(mock(PhysicalPlan.class));
queryService = new QueryService(analyzer, new DefaultExecutionEngine(), null, planner);
queryService = new QueryService(analyzer, new DefaultExecutionEngine(), planner);
}

@Test
Expand Down Expand Up @@ -79,7 +78,7 @@ public void onFailure(Exception e) {
}
};
var plan = new PaginatedPlan(QueryId.queryId(), mock(UnresolvedPlan.class), 10,
new QueryService(null, new DefaultExecutionEngine(), null, null), listener);
new QueryService(null, new DefaultExecutionEngine(), null), listener);
plan.execute();
}

Expand Down
Loading

0 comments on commit bacec9d

Please sign in to comment.