Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge paginated plan optimizer into the regular optimizer. #1516

Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@
import org.opensearch.sql.common.response.ResponseListener;
import org.opensearch.sql.planner.PlanContext;
import org.opensearch.sql.planner.Planner;
import org.opensearch.sql.planner.logical.LogicalPaginate;
import org.opensearch.sql.planner.logical.LogicalPlan;
import org.opensearch.sql.planner.optimizer.LogicalPlanOptimizer;
import org.opensearch.sql.planner.physical.PhysicalPlan;

/**
Expand All @@ -34,11 +32,9 @@ public class QueryService {
* There are two planners, one - to handle pagination requests (cursor/scroll) only and
* another one for everything else.
* @see OpenSearchPluginModule#queryPlanFactory (:plugin module)
* @see LogicalPlanOptimizer#paginationCreate
* @see QueryService
*/
private final Planner planner;
private final Planner paginationPlanner;

/**
* Execute the {@link UnresolvedPlan}, using {@link ResponseListener} to get response.
Expand Down Expand Up @@ -115,6 +111,6 @@ public LogicalPlan analyze(UnresolvedPlan plan) {
* Translate {@link LogicalPlan} to {@link PhysicalPlan}.
*/
public PhysicalPlan plan(LogicalPlan plan) {
return plan instanceof LogicalPaginate ? paginationPlanner.plan(plan) : planner.plan(plan);
return planner.plan(plan);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ public static LogicalPlan rename(
return new LogicalRename(input, renameMap);
}

public static LogicalPlan paginate(LogicalPlan input, int fetchSize) {
return new LogicalPaginate(fetchSize, List.of(input));
}

public static LogicalPlan project(LogicalPlan input, NamedExpression... fields) {
return new LogicalProject(input, Arrays.asList(fields), ImmutableList.of());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,29 +43,6 @@ public LogicalPlanOptimizer(List<Rule<?>> rules) {
* Create {@link LogicalPlanOptimizer} with pre-defined rules.
*/
public static LogicalPlanOptimizer create() {
return new LogicalPlanOptimizer(Arrays.asList(
/*
* Phase 1: Transformations that rely on relational algebra equivalence
*/
new MergeFilterAndFilter(),
new PushFilterUnderSort(),
/*
* Phase 2: Transformations that rely on data source push down capability
*/
new CreateTableScanBuilder(),
TableScanPushDown.PUSH_DOWN_FILTER,
TableScanPushDown.PUSH_DOWN_AGGREGATION,
TableScanPushDown.PUSH_DOWN_SORT,
TableScanPushDown.PUSH_DOWN_LIMIT,
TableScanPushDown.PUSH_DOWN_HIGHLIGHT,
TableScanPushDown.PUSH_DOWN_PROJECT,
new CreateTableWriteBuilder()));
}

/**
* Create {@link LogicalPlanOptimizer} with pre-defined rules.
*/
public static LogicalPlanOptimizer paginationCreate() {
return new LogicalPlanOptimizer(Arrays.asList(
/*
* Phase 1: Transformations that rely on relational algebra equivalence
Expand All @@ -76,6 +53,7 @@ public static LogicalPlanOptimizer paginationCreate() {
* Phase 2: Transformations that rely on data source push down capability
*/
new PushPageSize(),
new CreateTableScanBuilder(),
new CreatePagingTableScanBuilder(),
TableScanPushDown.PUSH_DOWN_FILTER,
TableScanPushDown.PUSH_DOWN_AGGREGATION,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,45 +5,59 @@

package org.opensearch.sql.planner.optimizer.rule;

import static org.opensearch.sql.planner.optimizer.pattern.Patterns.table;

import com.facebook.presto.matching.Capture;
import com.facebook.presto.matching.Captures;
import com.facebook.presto.matching.Pattern;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.List;
import lombok.Getter;
import lombok.experimental.Accessors;
import org.opensearch.sql.planner.logical.LogicalPaginate;
import org.opensearch.sql.planner.logical.LogicalPlan;
import org.opensearch.sql.planner.logical.LogicalRelation;
import org.opensearch.sql.planner.optimizer.Rule;
import org.opensearch.sql.storage.Table;
import org.opensearch.sql.storage.read.TableScanBuilder;
import org.opensearch.sql.planner.physical.PhysicalPlan;

/**
* Rule to create a paged TableScanBuilder in pagination request.
*/
public class CreatePagingTableScanBuilder implements Rule<LogicalRelation> {
/** Capture the table inside matched logical relation operator. */
private final Capture<Table> capture;

public class CreatePagingTableScanBuilder implements Rule<LogicalPaginate> {
/** Capture the table inside matched logical paginate operator. */
private LogicalPlan relationParent = null;
/** Pattern that matches logical relation operator. */
@Accessors(fluent = true)
@Getter
private final Pattern<LogicalRelation> pattern;
private final Pattern<LogicalPaginate> pattern;

/**
* Constructor.
*/
public CreatePagingTableScanBuilder() {
this.capture = Capture.newCapture();
this.pattern = Pattern.typeOf(LogicalRelation.class)
.with(table().capturedAs(capture));
this.pattern = Pattern.typeOf(LogicalPaginate.class).matching(lp -> {
Deque<LogicalPlan> plans = new ArrayDeque<>();
plans.push(lp);
do {
var plan = plans.pop();
if (plan.getChild().stream().anyMatch(LogicalRelation.class::isInstance)) {
if (plan.getChild().size() > 1) {
throw new UnsupportedOperationException();
}
relationParent = plan;
return true;
}
plan.getChild().forEach(plans::push);
} while (!plans.isEmpty());
return false;
});
}


@Override
public LogicalPlan apply(LogicalRelation plan, Captures captures) {
TableScanBuilder scanBuilder = captures.get(capture)
.createPagedScanBuilder(plan.getPageSize());
// TODO: Remove this after Prometheus refactored to new table scan builder too
return (scanBuilder == null) ? plan : scanBuilder;
public LogicalPlan apply(LogicalPaginate plan, Captures captures) {
var logicalRelation = (LogicalRelation) relationParent.getChild().get(0);
var scan = logicalRelation.getTable().createPagedScanBuilder(logicalRelation.getPageSize());
relationParent.replaceChildPlans(List.of(scan));

return plan;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,6 @@ class QueryServiceTest {
@Mock
private Planner planner;

@Mock
private Planner paginationPlanner;

@Mock
private UnresolvedPlan ast;

Expand Down Expand Up @@ -120,9 +117,8 @@ class Helper {
public Helper() {
lenient().when(analyzer.analyze(any(), any())).thenReturn(logicalPlan);
lenient().when(planner.plan(any())).thenReturn(plan);
lenient().when(paginationPlanner.plan(any())).thenReturn(plan);

queryService = new QueryService(analyzer, executionEngine, planner, paginationPlanner);
queryService = new QueryService(analyzer, executionEngine, planner);
}

Helper executeSuccess() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public static void setUp() {
when(storageEngine.getTableScan(anyString(), anyString()))
.thenReturn(mock(TableScanOperator.class));
paginatedPlanCache = new PaginatedPlanCache(storageEngine);
queryService = new QueryService(null, new DefaultExecutionEngine(), null, null);
queryService = new QueryService(null, new DefaultExecutionEngine(), null);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.opensearch.sql.executor.QueryService;
import org.opensearch.sql.planner.Planner;
import org.opensearch.sql.planner.logical.LogicalPaginate;
import org.opensearch.sql.planner.logical.LogicalPlan;
import org.opensearch.sql.planner.physical.PhysicalPlan;

@DisplayNameGeneration(DisplayNameGenerator.ReplaceUnderscores.class)
Expand All @@ -43,7 +42,7 @@ public static void setUp() {
when(analyzer.analyze(any(), any())).thenReturn(mock(LogicalPaginate.class));
var planner = mock(Planner.class);
when(planner.plan(any())).thenReturn(mock(PhysicalPlan.class));
queryService = new QueryService(analyzer, new DefaultExecutionEngine(), null, planner);
queryService = new QueryService(analyzer, new DefaultExecutionEngine(), planner);
}

@Test
Expand Down Expand Up @@ -79,7 +78,7 @@ public void onFailure(Exception e) {
}
};
var plan = new PaginatedPlan(QueryId.None, mock(UnresolvedPlan.class), 10,
new QueryService(null, new DefaultExecutionEngine(), null, null), listener);
new QueryService(null, new DefaultExecutionEngine(), null), listener);
plan.execute();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.opensearch.sql.data.model.ExprValueUtils.integerValue;
import static org.opensearch.sql.data.model.ExprValueUtils.longValue;
Expand All @@ -21,6 +23,7 @@
import static org.opensearch.sql.planner.logical.LogicalPlanDSL.filter;
import static org.opensearch.sql.planner.logical.LogicalPlanDSL.highlight;
import static org.opensearch.sql.planner.logical.LogicalPlanDSL.limit;
import static org.opensearch.sql.planner.logical.LogicalPlanDSL.paginate;
import static org.opensearch.sql.planner.logical.LogicalPlanDSL.project;
import static org.opensearch.sql.planner.logical.LogicalPlanDSL.relation;
import static org.opensearch.sql.planner.logical.LogicalPlanDSL.sort;
Expand All @@ -47,6 +50,8 @@
import org.opensearch.sql.planner.logical.LogicalPaginate;
import org.opensearch.sql.planner.logical.LogicalPlan;
import org.opensearch.sql.planner.logical.LogicalRelation;
import org.opensearch.sql.planner.optimizer.rule.CreatePagingTableScanBuilder;
import org.opensearch.sql.planner.optimizer.rule.read.CreateTableScanBuilder;
import org.opensearch.sql.planner.physical.PhysicalPlan;
import org.opensearch.sql.storage.Table;
import org.opensearch.sql.storage.read.TableScanBuilder;
Expand All @@ -62,11 +67,19 @@ class LogicalPlanOptimizerTest {
@Spy
private TableScanBuilder tableScanBuilder;

@Spy
private TableScanBuilder pagedTableScanBuilder;

@BeforeEach
void setUp() {
lenient().when(table.createScanBuilder()).thenReturn(tableScanBuilder);
lenient().when(table.createPagedScanBuilder(anyInt())).thenReturn(pagedTableScanBuilder);
}





/**
* Filter - Filter --> Filter.
*/
Expand Down Expand Up @@ -313,48 +326,76 @@ public PhysicalPlan implement(LogicalPlan plan) {

@Test
void paged_table_scan_builder_support_project_push_down_can_apply_its_rule() {
when(tableScanBuilder.pushDownProject(any())).thenReturn(true);
when(table.createPagedScanBuilder(anyInt())).thenReturn(tableScanBuilder);

var relation = new LogicalRelation("schema", table);
relation.setPageSize(anyInt());
var relation = Mockito.spy(new LogicalRelation("schema", table));

assertEquals(
tableScanBuilder,
LogicalPlanOptimizer.paginationCreate().optimize(project(relation))
);
paginate(project(pagedTableScanBuilder), 4),
LogicalPlanOptimizer.create().optimize(paginate(project(relation), 4)));
}

@Test
void push_page_size() {
var relation = new LogicalRelation("schema", table);
var paginate = new LogicalPaginate(42, List.of(project(relation)));
assertNull(relation.getPageSize());
LogicalPlanOptimizer.paginationCreate().optimize(paginate);
LogicalPlanOptimizer.create().optimize(paginate);
assertEquals(42, relation.getPageSize());
}

@Test
void push_page_size_noop_if_no_relation() {
var paginate = new LogicalPaginate(42, List.of(project(values())));
LogicalPlanOptimizer.paginationCreate().optimize(paginate);
LogicalPlanOptimizer.create().optimize(paginate);
}

@Test
void pagination_optimizer_simple_query() {
var projectPlan = project(relation("schema", table), DSL.named(DSL.ref("intV", INTEGER)));

var optimizer = new LogicalPlanOptimizer(
List.of(new CreateTableScanBuilder(), new CreatePagingTableScanBuilder()));

{
optimizer.optimize(projectPlan);
verify(table).createScanBuilder();
verify(table, never()).createPagedScanBuilder(anyInt());
// Assert that createPagedTableScan was not called
// Assert that createTableScan was called
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO?

}
}

@Test
void pagination_optimizer_paged_query() {
var relation = new LogicalRelation("schema", table);
relation.setPageSize(4);
var projectPlan = project(relation, DSL.named(DSL.ref("intV", INTEGER)));
var pagedPlan = new LogicalPaginate(10, List.of(projectPlan));

var optimizer = new LogicalPlanOptimizer(
List.of(new CreateTableScanBuilder(), new CreatePagingTableScanBuilder()));
var optimized = optimizer.optimize(pagedPlan);
verify(table).createPagedScanBuilder(anyInt());
}

@Test
void push_page_size_noop_if_no_sub_plans() {
var paginate = new LogicalPaginate(42, List.of());
LogicalPlanOptimizer.paginationCreate().optimize(paginate);
assertEquals(paginate,
LogicalPlanOptimizer.create().optimize(paginate));
}

@Test
void table_scan_builder_support_offset_push_down_can_apply_its_rule() {
when(table.createPagedScanBuilder(anyInt())).thenReturn(tableScanBuilder);
when(table.createPagedScanBuilder(anyInt())).thenReturn(pagedTableScanBuilder);

var optimized = LogicalPlanOptimizer.paginationCreate()
.optimize(new LogicalPaginate(42, List.of(project(relation("schema", table)))));
var relation = new LogicalRelation("schema", table);
relation.setPageSize(4);
var optimized = LogicalPlanOptimizer.create()
.optimize(new LogicalPaginate(42, List.of(project(relation))));
// `optimized` structure: LogicalPaginate -> LogicalProject -> TableScanBuilder
// LogicalRelation replaced by a TableScanBuilder instance
assertEquals(tableScanBuilder, optimized.getChild().get(0).getChild().get(0));
assertEquals(project(pagedTableScanBuilder), optimized);
}

private LogicalPlan optimize(LogicalPlan plan) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,6 @@ public class TableTest {
@Test
public void createPagedScanBuilder_throws() {
var table = mock(Table.class, withSettings().defaultAnswer(InvocationOnMock::callRealMethod));
assertThrows(Throwable.class, () -> table.createPagedScanBuilder(0));
assertThrows(Throwable.class, () -> table.createPagedScanBuilder(4));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,7 @@ public QueryPlanFactory queryPlanFactory(ExecutionEngine executionEngine,
new Analyzer(
new ExpressionAnalyzer(functionRepository), dataSourceService, functionRepository);
Planner planner = new Planner(LogicalPlanOptimizer.create());
Planner paginationPlanner = new Planner(LogicalPlanOptimizer.paginationCreate());
QueryService queryService = new QueryService(analyzer, executionEngine, planner, paginationPlanner);
QueryService queryService = new QueryService(analyzer, executionEngine, planner);
return new QueryPlanFactory(queryService, paginatedPlanCache);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ public QueryService queryService(ExecutionEngine executionEngine) {
new Analyzer(
new ExpressionAnalyzer(functionRepository), dataSourceService, functionRepository);
Planner planner = new Planner(LogicalPlanOptimizer.create());
Planner paginationPlanner = new Planner(LogicalPlanOptimizer.paginationCreate());
return new QueryService(analyzer, executionEngine, planner, paginationPlanner);
return new QueryService(analyzer, executionEngine, planner);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,8 @@ public QueryPlanFactory queryPlanFactory(DataSourceService dataSourceService,
new Analyzer(
new ExpressionAnalyzer(functionRepository), dataSourceService, functionRepository);
Planner planner = new Planner(LogicalPlanOptimizer.create());
Planner paginationPlanner = new Planner(LogicalPlanOptimizer.paginationCreate());
QueryService queryService = new QueryService(
analyzer, executionEngine, planner, paginationPlanner);
analyzer, executionEngine, planner);
return new QueryPlanFactory(queryService, paginatedPlanCache);
}
}