From 29f99aa8345c1ad40b767f244bd1ad083dcc7b7b Mon Sep 17 00:00:00 2001 From: Max Ksyunz Date: Wed, 14 Jun 2023 15:42:57 -0700 Subject: [PATCH] Simplify OpenSearchIndexScanBuilder (#275) (#1738) Signed-off-by: Max Ksyunz --- .../opensearch/storage/OpenSearchIndex.java | 17 +++++------- .../scan/OpenSearchIndexScanBuilder.java | 16 ++++++----- .../OpenSearchIndexScanOptimizationTest.java | 27 +++++-------------- 3 files changed, 22 insertions(+), 38 deletions(-) diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java index 532d62333d..6c620e5042 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java @@ -10,6 +10,7 @@ import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; +import java.util.function.Function; import lombok.RequiredArgsConstructor; import org.opensearch.common.unit.TimeValue; import org.opensearch.sql.common.setting.Settings; @@ -33,7 +34,6 @@ import org.opensearch.sql.planner.logical.LogicalPlan; import org.opensearch.sql.planner.physical.PhysicalPlan; import org.opensearch.sql.storage.Table; -import org.opensearch.sql.storage.TableScanOperator; import org.opensearch.sql.storage.read.TableScanBuilder; /** OpenSearch table (index) implementation. */ @@ -171,19 +171,14 @@ public PhysicalPlan implement(LogicalPlan plan) { public TableScanBuilder createScanBuilder() { final int querySizeLimit = settings.getSettingValue(Settings.Key.QUERY_SIZE_LIMIT); + final TimeValue cursorKeepAlive = settings.getSettingValue(Settings.Key.SQL_CURSOR_KEEP_ALIVE); var builder = new OpenSearchRequestBuilder( querySizeLimit, createExprValueFactory()); - - return new OpenSearchIndexScanBuilder(builder) { - @Override - protected TableScanOperator createScan(OpenSearchRequestBuilder requestBuilder) { - final TimeValue cursorKeepAlive = - settings.getSettingValue(Settings.Key.SQL_CURSOR_KEEP_ALIVE); - return new OpenSearchIndexScan(client, requestBuilder.getMaxResponseSize(), - requestBuilder.build(indexName, getMaxResultWindow(), cursorKeepAlive)); - } - }; + Function createScanOperator = + requestBuilder -> new OpenSearchIndexScan(client, requestBuilder.getMaxResponseSize(), + requestBuilder.build(indexName, getMaxResultWindow(), cursorKeepAlive)); + return new OpenSearchIndexScanBuilder(builder, createScanOperator); } private OpenSearchExprValueFactory createExprValueFactory() { diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanBuilder.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanBuilder.java index c6df692095..3a0d06d079 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanBuilder.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanBuilder.java @@ -5,6 +5,7 @@ package org.opensearch.sql.opensearch.storage.scan; +import java.util.function.Function; import lombok.EqualsAndHashCode; import org.opensearch.sql.expression.ReferenceExpression; import org.opensearch.sql.opensearch.request.OpenSearchRequestBuilder; @@ -24,8 +25,9 @@ * by delegated builder internally. This is to avoid conditional check of different push down logic * for non-aggregate and aggregate query everywhere. */ -public abstract class OpenSearchIndexScanBuilder extends TableScanBuilder { +public class OpenSearchIndexScanBuilder extends TableScanBuilder { + private final Function scanFactory; /** * Delegated index scan builder for non-aggregate or aggregate query. */ @@ -38,25 +40,27 @@ public abstract class OpenSearchIndexScanBuilder extends TableScanBuilder { /** * Constructor used during query execution. */ - protected OpenSearchIndexScanBuilder(OpenSearchRequestBuilder requestBuilder) { + public OpenSearchIndexScanBuilder(OpenSearchRequestBuilder requestBuilder, + Function scanFactory) { this.delegate = new OpenSearchIndexScanQueryBuilder(requestBuilder); + this.scanFactory = scanFactory; } /** * Constructor used for unit tests. */ - protected OpenSearchIndexScanBuilder(PushDownQueryBuilder translator) { + protected OpenSearchIndexScanBuilder(PushDownQueryBuilder translator, + Function scanFactory) { this.delegate = translator; + this.scanFactory = scanFactory; } @Override public TableScanOperator build() { - return createScan(delegate.build()); + return scanFactory.apply(delegate.build()); } - protected abstract TableScanOperator createScan(OpenSearchRequestBuilder requestBuilder); - @Override public boolean pushDownFilter(LogicalFilter filter) { return delegate.pushDownFilter(filter); diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanOptimizationTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanOptimizationTest.java index 6bf9002a67..d5283cecb7 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanOptimizationTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanOptimizationTest.java @@ -7,7 +7,6 @@ package org.opensearch.sql.opensearch.storage.scan; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.reset; import static org.mockito.Mockito.times; @@ -86,7 +85,6 @@ import org.opensearch.sql.planner.optimizer.PushDownPageSize; import org.opensearch.sql.planner.optimizer.rule.read.CreateTableScanBuilder; import org.opensearch.sql.storage.Table; -import org.opensearch.sql.storage.TableScanOperator; @ExtendWith(MockitoExtension.class) class OpenSearchIndexScanOptimizationTest { @@ -106,12 +104,7 @@ class OpenSearchIndexScanOptimizationTest { @BeforeEach void setUp() { - indexScanBuilder = new OpenSearchIndexScanBuilder(requestBuilder) { - @Override - protected TableScanOperator createScan(OpenSearchRequestBuilder build) { - return indexScan; - } - }; + indexScanBuilder = new OpenSearchIndexScanBuilder(requestBuilder, requestBuilder -> indexScan); when(table.createScanBuilder()).thenReturn(indexScanBuilder); } @@ -698,23 +691,15 @@ void project_literal_should_not_be_pushed_down() { private OpenSearchIndexScanBuilder indexScanBuilder(Runnable... verifyPushDownCalls) { this.verifyPushDownCalls = verifyPushDownCalls; - return new OpenSearchIndexScanBuilder(new OpenSearchIndexScanQueryBuilder(requestBuilder)) { - @Override - protected TableScanOperator createScan(OpenSearchRequestBuilder build) { - return indexScan; - } - }; + return new OpenSearchIndexScanBuilder(new OpenSearchIndexScanQueryBuilder(requestBuilder), + requestBuilder -> indexScan); } private OpenSearchIndexScanBuilder indexScanAggBuilder(Runnable... verifyPushDownCalls) { this.verifyPushDownCalls = verifyPushDownCalls; - return new OpenSearchIndexScanBuilder(new OpenSearchIndexScanAggregationBuilder( - requestBuilder, mock(LogicalAggregation.class))) { - @Override - protected TableScanOperator createScan(OpenSearchRequestBuilder build) { - return indexScan; - } - }; + var aggregationBuilder = new OpenSearchIndexScanAggregationBuilder( + requestBuilder, mock(LogicalAggregation.class)); + return new OpenSearchIndexScanBuilder(aggregationBuilder, builder -> indexScan); } private void assertEqualsAfterOptimization(LogicalPlan expected, LogicalPlan actual) {