From a8c3b23f178e00648ee57de511d5a8e423edcc0e Mon Sep 17 00:00:00 2001 From: Venkata krishnan Sowrirajan Date: Fri, 28 Jul 2023 11:40:59 -0700 Subject: [PATCH 01/11] Support for nested field filter push down to table source - Implement nested fields filter push down 1. See if a new ResolvedExpression called NestedFieldReferenceExpression has to be implemented or not. If implemented, convert the NestedFieldReferenceExpression in to RexFieldAccess 2. Revisit ExpressionConverter to see if the FieldReferenceExpression has to be converted to a RexFieldAccess 3. Add more tests 4. Check other rules like PushFilterInCalcIntoTableSourceScanRule, PushPartitionIntoTableSourceScanRule --- .../converter/ExpressionConverter.java | 14 ++- .../abilities/source/FilterPushDownSpec.java | 11 ++- ...shFilterInCalcIntoTableSourceScanRule.java | 3 +- .../PushFilterIntoSourceScanRuleBase.java | 7 +- .../PushFilterIntoTableSourceScanRule.java | 16 +++- .../planner/plan/utils/FlinkRexUtil.scala | 14 ++- .../planner/plan/utils/RexNodeExtractor.scala | 94 ++++++++++++++++++- ...PushFilterIntoTableSourceScanRuleTest.java | 16 ++++ ...FilterIntoTableSourceScanRuleTestBase.java | 5 + .../PushFilterIntoTableSourceScanRuleTest.xml | 18 ++++ 10 files changed, 183 insertions(+), 15 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/ExpressionConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/ExpressionConverter.java index ab5e0cf09c3a4..7c5eb4d5235f8 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/ExpressionConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/ExpressionConverter.java @@ -199,7 +199,19 @@ public RexNode visit(FieldReferenceExpression fieldReference) { // Calcite will shuffle the output order of groupings. // So the output fields order will be changed too. // See RelBuilder.aggregate, it use ImmutableBitSet to store groupings, - return relBuilder.field(fieldReference.getName()); + String[] nestedFields = fieldReference.getName().split("\\."); + if (nestedFields.length > 0) { + RexNode fieldAccess = relBuilder.field(nestedFields[0]); + for (int i = 1; i < nestedFields.length; i++) { + fieldAccess = + relBuilder + .getRexBuilder() + .makeFieldAccess(fieldAccess, nestedFields[i], true); + } + return fieldAccess; + } else { + return relBuilder.field(fieldReference.getName()); + } } @Override diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/FilterPushDownSpec.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/FilterPushDownSpec.java index c7bd36ceb8f10..0eacda7100f91 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/FilterPushDownSpec.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/FilterPushDownSpec.java @@ -35,6 +35,7 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName; +import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rex.RexBuilder; import org.apache.calcite.rex.RexNode; @@ -45,6 +46,8 @@ import java.util.TimeZone; import java.util.stream.Collectors; +import scala.Option; + import static org.apache.flink.util.Preconditions.checkNotNull; /** @@ -86,7 +89,7 @@ public List getPredicates() { @Override public void apply(DynamicTableSource tableSource, SourceAbilityContext context) { - SupportsFilterPushDown.Result result = apply(predicates, tableSource, context); + SupportsFilterPushDown.Result result = apply(predicates, tableSource, context, null); if (result.getAcceptedFilters().size() != predicates.size()) { throw new TableException("All predicates should be accepted here."); } @@ -95,7 +98,8 @@ public void apply(DynamicTableSource tableSource, SourceAbilityContext context) public static SupportsFilterPushDown.Result apply( List predicates, DynamicTableSource tableSource, - SourceAbilityContext context) { + SourceAbilityContext context, + RelDataType relDataType) { if (tableSource instanceof SupportsFilterPushDown) { RexNodeToExpressionConverter converter = new RexNodeToExpressionConverter( @@ -104,7 +108,8 @@ public static SupportsFilterPushDown.Result apply( context.getFunctionCatalog(), context.getCatalogManager(), TimeZone.getTimeZone( - TableConfigUtils.getLocalTimeZone(context.getTableConfig()))); + TableConfigUtils.getLocalTimeZone(context.getTableConfig())), + Option.apply(relDataType)); List filters = predicates.stream() .map( diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterInCalcIntoTableSourceScanRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterInCalcIntoTableSourceScanRule.java index 4dc970af83183..81af806e1d28f 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterInCalcIntoTableSourceScanRule.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterInCalcIntoTableSourceScanRule.java @@ -103,7 +103,8 @@ private void pushFilterIntoScan( convertiblePredicates, relOptTable.unwrap(TableSourceTable.class), scan, - relBuilder); + relBuilder, + null); SupportsFilterPushDown.Result result = pushdownResultWithScan._1; TableSourceTable tableSourceTable = pushdownResultWithScan._2; diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoSourceScanRuleBase.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoSourceScanRuleBase.java index eb5e4f6d25de6..e557a7b7a8963 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoSourceScanRuleBase.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoSourceScanRuleBase.java @@ -33,6 +33,7 @@ import org.apache.calcite.plan.RelOptRuleCall; import org.apache.calcite.plan.RelOptRuleOperand; import org.apache.calcite.rel.core.TableScan; +import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rex.RexNode; import org.apache.calcite.tools.RelBuilder; @@ -86,7 +87,8 @@ protected RexNode createRemainingCondition( RexNode[] convertiblePredicates, TableSourceTable oldTableSourceTable, TableScan scan, - RelBuilder relBuilder) { + RelBuilder relBuilder, + RelDataType relDataType) { // record size before applyFilters for update statistics int originPredicatesSize = convertiblePredicates.length; @@ -97,7 +99,8 @@ protected RexNode createRemainingCondition( FilterPushDownSpec.apply( Arrays.asList(convertiblePredicates), newTableSource, - SourceAbilityContext.from(scan)); + SourceAbilityContext.from(scan), + relDataType); relBuilder.push(scan); List acceptedPredicates = diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java index eaa6999cdd7fe..bcb148d81a8d1 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java @@ -18,10 +18,13 @@ package org.apache.flink.table.planner.plan.rules.logical; +import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown; +import org.apache.flink.table.planner.calcite.FlinkTypeFactory; import org.apache.flink.table.planner.plan.schema.FlinkPreparingTableBase; import org.apache.flink.table.planner.plan.schema.TableSourceTable; import org.apache.flink.table.planner.plan.utils.FlinkRexUtil; +import org.apache.flink.table.types.logical.RowType; import org.apache.calcite.plan.RelOptRuleCall; import org.apache.calcite.rel.core.Filter; @@ -31,6 +34,9 @@ import scala.Tuple2; +import static org.apache.flink.table.planner.connectors.DynamicSourceUtils.createProducedType; +import static org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTypeFactory; + /** * Planner rule that tries to push a filter into a {@link LogicalTableScan}, which table is a {@link * TableSourceTable}. And the table source in the table is a {@link SupportsFilterPushDown}. @@ -77,12 +83,17 @@ private void pushFilterIntoScan( FlinkPreparingTableBase relOptTable) { RelBuilder relBuilder = call.builder(); + final TableSourceTable sourceTable = scan.getTable().unwrap(TableSourceTable.class); + final FlinkTypeFactory typeFactory = unwrapTypeFactory(scan); + final ResolvedSchema schema = sourceTable.contextResolvedTable().getResolvedSchema(); + final RowType producedType = createProducedType(schema, sourceTable.tableSource()); Tuple2 extractedPredicates = FlinkRexUtil.extractPredicates( filter.getInput().getRowType().getFieldNames().toArray(new String[0]), filter.getCondition(), scan, - relBuilder.getRexBuilder()); + relBuilder.getRexBuilder(), + typeFactory.buildRelNodeRowType(producedType)); RexNode[] convertiblePredicates = extractedPredicates._1; RexNode[] unconvertedPredicates = extractedPredicates._2; @@ -96,7 +107,8 @@ private void pushFilterIntoScan( convertiblePredicates, relOptTable.unwrap(TableSourceTable.class), scan, - relBuilder); + relBuilder, + typeFactory.buildRelNodeRowType(producedType)); SupportsFilterPushDown.Result result = scanAfterPushdownWithResult._1; TableSourceTable tableSourceTable = scanAfterPushdownWithResult._2; diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtil.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtil.scala index 057fa50ed9c47..e00149a06881b 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtil.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtil.scala @@ -628,6 +628,14 @@ object FlinkRexUtil { projects.forall(RexUtil.isDeterministic) } + def extractPredicates( + inputNames: Array[String], + filterExpression: RexNode, + rel: RelNode, + rexBuilder: RexBuilder): (Array[RexNode], Array[RexNode]) = { + extractPredicates(inputNames, filterExpression, rel, rexBuilder, null) + } + /** * Return convertible rex nodes and unconverted rex nodes extracted from the filter expression. */ @@ -635,7 +643,8 @@ object FlinkRexUtil { inputNames: Array[String], filterExpression: RexNode, rel: RelNode, - rexBuilder: RexBuilder): (Array[RexNode], Array[RexNode]) = { + rexBuilder: RexBuilder, + relDataType: RelDataType): (Array[RexNode], Array[RexNode]) = { val context = ShortcutUtils.unwrapContext(rel) val maxCnfNodeCount = FlinkRelOptUtil.getMaxCnfNodeCount(rel); val converter = @@ -644,7 +653,8 @@ object FlinkRexUtil { inputNames, context.getFunctionCatalog, context.getCatalogManager, - TimeZone.getTimeZone(TableConfigUtils.getLocalTimeZone(context.getTableConfig))); + TimeZone.getTimeZone(TableConfigUtils.getLocalTimeZone(context.getTableConfig)), + Some(relDataType)); RexNodeExtractor.extractConjunctiveConditions( filterExpression, diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala index a7cbb4a9ffc01..445cd45f38679 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala @@ -37,6 +37,7 @@ import org.apache.flink.table.types.logical.YearMonthIntervalType import org.apache.flink.util.Preconditions import org.apache.calcite.plan.RelOptUtil +import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rex._ import org.apache.calcite.sql.{SqlFunction, SqlKind, SqlPostfixOperator} import org.apache.calcite.sql.fun.{SqlStdOperatorTable, SqlTrimFunction} @@ -395,9 +396,19 @@ class RexNodeToExpressionConverter( inputNames: Array[String], functionCatalog: FunctionCatalog, catalogManager: CatalogManager, - timeZone: TimeZone) + timeZone: TimeZone, + relDataType: Option[RelDataType] = None) extends RexVisitor[Option[ResolvedExpression]] { + def this( + rexBuilder: RexBuilder, + inputNames: Array[String], + functionCatalog: FunctionCatalog, + catalogManager: CatalogManager, + timeZone: TimeZone) = { + this(rexBuilder, inputNames, functionCatalog, catalogManager, timeZone, null) + } + override def visitInputRef(inputRef: RexInputRef): Option[ResolvedExpression] = { Preconditions.checkArgument(inputRef.getIndex < inputNames.length) Some( @@ -405,8 +416,7 @@ class RexNodeToExpressionConverter( inputNames(inputRef.getIndex), fromLogicalTypeToDataType(FlinkTypeFactory.toLogicalType(inputRef.getType)), 0, - inputRef.getIndex - )) + inputRef.getIndex)) } override def visitTableInputRef(rexTableInputRef: RexTableInputRef): Option[ResolvedExpression] = @@ -538,8 +548,84 @@ class RexNodeToExpressionConverter( } } - override def visitFieldAccess(fieldAccess: RexFieldAccess): Option[ResolvedExpression] = None + override def visitFieldAccess(fieldAccess: RexFieldAccess): Option[ResolvedExpression] = { + relDataType match { + case Some(dataType) => + val schema = new NestedSchema(dataType) + def internalVisit(fieldAccess: RexFieldAccess): (Boolean, Int, List[String]) = { + fieldAccess.getReferenceExpr match { + case ref: RexInputRef => + (true, ref.getIndex, List(ref.getName, fieldAccess.getField.getName)) + case fac: RexFieldAccess => + val (success, i, n) = internalVisit(fac) + (success, i, if (success) n :+ fieldAccess.getField.getName else null) + case expr => + // only extract operands of the expression + expr.accept(this) + (false, -1, null) + } + } + + // extract the info + val (success, index, names) = internalVisit(fieldAccess) + if (!success) { + throw new TableException( + "Nested fields access is only supported on top of input references.") + } + + val topLevelNodeName = schema.inputRowType.getFieldNames.get(index) + val topLevelNode = if (!schema.columns.contains(topLevelNodeName)) { + val fieldType = schema.inputRowType.getFieldList.get(index).getType + val node = new NestedColumn(topLevelNodeName, index, fieldType) + schema.columns.put(topLevelNodeName, node) + node + } else { + schema.columns.get(topLevelNodeName) + } + + val leaf = names.slice(1, names.size).foldLeft(topLevelNode) { + case (parent, name) => + if (parent.isLeaf) { + Some( + new FieldReferenceExpression( + schema.columns.get(0).name, + fromLogicalTypeToDataType(FlinkTypeFactory.toLogicalType(parent.originFieldType)), + index, + fieldAccess.getField.getIndex)) + } + if (!parent.children.containsKey(name)) { + val rowtype = parent.originFieldType + val index = rowtype.getFieldNames.indexOf(name) + if (index < 0) { + throw new TableException( + String + .format("Could not find field %s in field %s.", name, parent.originFieldType)) + } + parent.addChild( + new NestedColumn(name, index, rowtype.getFieldList.get(index).getType)) + } + parent.children.get(name) + } + leaf.markLeaf() + + var (topLevelColumnName, nestedColumn) = schema.columns.head + val nestedFieldName = new StringBuilder() + while (!nestedColumn.isLeaf) { + nestedFieldName.append(topLevelColumnName).append(".") + topLevelColumnName = nestedColumn.children.head._1 + nestedColumn = nestedColumn.children.head._2 + } + nestedFieldName.append(topLevelColumnName) + + Some( + new FieldReferenceExpression( + nestedFieldName.toString(), + fromLogicalTypeToDataType(FlinkTypeFactory.toLogicalType(leaf.originFieldType)), + index, + fieldAccess.getField.getIndex)) + } + } override def visitCorrelVariable(correlVariable: RexCorrelVariable): Option[ResolvedExpression] = None diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTest.java index a2c2a8447a7aa..b6d182156865a 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTest.java @@ -118,4 +118,20 @@ public void testWithInterval() { util.tableEnv().executeSql(ddl); super.testWithInterval(); } + + @Test + public void testNestedFieldFilterPushdown() { + String ddl = + "CREATE TABLE MTable (\n" + + " a ROW<`a1` INT, `a2` INT>,\n" + + " b ROW<`b1` ROW<`b11` INT>, `b2` INT>\n" + + ") WITH (\n" + + " 'connector' = 'values',\n" + + " 'filterable-fields' = 'a.a1;a.a2;b.b1.b11;b.b2',\n" + + " 'bounded' = 'true'\n" + + ")"; + + util.tableEnv().executeSql(ddl); + super.testFilterWithNestedFields(); + } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTestBase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTestBase.java index 80629167fd1ee..4d3aa8a0a1f8a 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTestBase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTestBase.java @@ -147,4 +147,9 @@ public void testWithNullLiteral() { + " FROM MyTable)\n" + "SELECT a FROM MyView WHERE a IS NOT NULL\n"); } + + @Test + public void testFilterWithNestedFields() { + util.verifyRelPlan("SELECT * FROM MTable WHERE b.`b1`.`b11` > 2"); + } } diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTest.xml index e396f27ab555c..a73f9c2c1aec2 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTest.xml @@ -414,4 +414,22 @@ LogicalProject(a=[$0]) ]]> + + + 2]]> + + + ($1.b1.b11, 2)]) + +- LogicalTableScan(table=[[default_catalog, default_database, MTable]]) +]]> + + + (b.b1.b11, 2)]]]) +]]> + + From 3186606697769da028baf7a5b54aa3a81295dcd3 Mon Sep 17 00:00:00 2001 From: Venkata krishnan Sowrirajan Date: Mon, 28 Aug 2023 15:04:37 -0700 Subject: [PATCH 02/11] changes based on FLIP to use NestedFieldReferenceExpression --- .../utils/ApiExpressionDefaultVisitor.java | 6 + .../expressions/BaseReferenceExpression.java | 22 ++++ .../table/expressions/ExpressionVisitor.java | 5 + .../NestedFieldReferenceExpression.java | 123 ++++++++++++++++++ .../converter/ExpressionConverter.java | 23 ++-- .../planner/plan/utils/RexNodeExtractor.scala | 70 ++-------- ...PushFilterIntoTableSourceScanRuleTest.java | 2 +- ...FilterIntoTableSourceScanRuleTestBase.java | 5 - .../table/planner/utils/FilterUtils.java | 11 ++ 9 files changed, 187 insertions(+), 80 deletions(-) create mode 100644 flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/BaseReferenceExpression.java create mode 100644 flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/NestedFieldReferenceExpression.java diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/utils/ApiExpressionDefaultVisitor.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/utils/ApiExpressionDefaultVisitor.java index ce499a90446e5..9797f53a331c2 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/utils/ApiExpressionDefaultVisitor.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/utils/ApiExpressionDefaultVisitor.java @@ -25,6 +25,7 @@ import org.apache.flink.table.expressions.FieldReferenceExpression; import org.apache.flink.table.expressions.LocalReferenceExpression; import org.apache.flink.table.expressions.LookupCallExpression; +import org.apache.flink.table.expressions.NestedFieldReferenceExpression; import org.apache.flink.table.expressions.ResolvedExpression; import org.apache.flink.table.expressions.SqlCallExpression; import org.apache.flink.table.expressions.TableReferenceExpression; @@ -117,4 +118,9 @@ public T visit(SqlCallExpression sqlCall) { public T visitNonApiExpression(Expression other) { return defaultMethod(other); } + + @Override + public T visit(NestedFieldReferenceExpression nestedFieldReference) { + return defaultMethod(nestedFieldReference); + } } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/BaseReferenceExpression.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/BaseReferenceExpression.java new file mode 100644 index 0000000000000..030bb16db2774 --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/BaseReferenceExpression.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.expressions; + +/** TODO: javadoc. */ +public abstract class BaseReferenceExpression implements ResolvedExpression {} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/ExpressionVisitor.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/ExpressionVisitor.java index 41e9b852b24fa..334ce5be38a31 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/ExpressionVisitor.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/ExpressionVisitor.java @@ -48,4 +48,9 @@ public interface ExpressionVisitor { // -------------------------------------------------------------------------------------------- R visit(Expression other); + + default R visit(NestedFieldReferenceExpression nestedFieldReference) { + throw new UnsupportedOperationException( + "This visitor does not support visiting nested field references."); + } } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/NestedFieldReferenceExpression.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/NestedFieldReferenceExpression.java new file mode 100644 index 0000000000000..6f4526f5c134b --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/NestedFieldReferenceExpression.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.expressions; + +import org.apache.flink.table.types.DataType; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +/** + * A reference to a nested field in an input. The reference contains. + * + *
    + *
  • type + *
  • index of an input the field belongs to + *
  • index of a field within the corresponding input + *
+ */ +public class NestedFieldReferenceExpression extends BaseReferenceExpression { + + private final String name; + + private final DataType dataType; + + /** + * index of an input the field belongs to. e.g. for a join, `inputIndex` of left input is 0 and + * `inputIndex` of right input is 1. + */ + private final int inputIndex; + + /** + * Nested field reference index to traverse from the top level column to the nested leaf column. + */ + private final int[] fieldIndices; + + public NestedFieldReferenceExpression( + String name, DataType dataType, int inputIndex, int[] fieldIndices) { + this.name = name; + this.dataType = dataType; + this.inputIndex = inputIndex; + this.fieldIndices = fieldIndices; + } + + public String getName() { + return name; + } + + public int getInputIndex() { + return inputIndex; + } + + public int[] getFieldIndices() { + return fieldIndices; + } + + @Override + public DataType getOutputDataType() { + return dataType; + } + + @Override + public List getResolvedChildren() { + return Collections.emptyList(); + } + + @Override + public String asSummaryString() { + return name; + } + + @Override + public List getChildren() { + return Collections.emptyList(); + } + + @Override + public R accept(ExpressionVisitor visitor) { + return visitor.visit(this); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + NestedFieldReferenceExpression that = (NestedFieldReferenceExpression) o; + return name.equals(that.name) + && dataType.equals(that.dataType) + && inputIndex == that.inputIndex + && Arrays.equals(fieldIndices, that.fieldIndices); + } + + @Override + public int hashCode() { + return Objects.hash(name, dataType, inputIndex, fieldIndices); + } + + @Override + public String toString() { + return asSummaryString(); + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/ExpressionConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/ExpressionConverter.java index 7c5eb4d5235f8..ac63017706a08 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/ExpressionConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/ExpressionConverter.java @@ -26,6 +26,7 @@ import org.apache.flink.table.expressions.ExpressionVisitor; import org.apache.flink.table.expressions.FieldReferenceExpression; import org.apache.flink.table.expressions.LocalReferenceExpression; +import org.apache.flink.table.expressions.NestedFieldReferenceExpression; import org.apache.flink.table.expressions.TimeIntervalUnit; import org.apache.flink.table.expressions.TimePointUnit; import org.apache.flink.table.expressions.TypeLiteralExpression; @@ -199,19 +200,17 @@ public RexNode visit(FieldReferenceExpression fieldReference) { // Calcite will shuffle the output order of groupings. // So the output fields order will be changed too. // See RelBuilder.aggregate, it use ImmutableBitSet to store groupings, - String[] nestedFields = fieldReference.getName().split("\\."); - if (nestedFields.length > 0) { - RexNode fieldAccess = relBuilder.field(nestedFields[0]); - for (int i = 1; i < nestedFields.length; i++) { - fieldAccess = - relBuilder - .getRexBuilder() - .makeFieldAccess(fieldAccess, nestedFields[i], true); - } - return fieldAccess; - } else { - return relBuilder.field(fieldReference.getName()); + return relBuilder.field(fieldReference.getName()); + } + + @Override + public RexNode visit(NestedFieldReferenceExpression nestedFieldReference) { + int[] fieldIndices = nestedFieldReference.getFieldIndices(); + RexNode fieldAccess = relBuilder.field(fieldIndices[0]); + for (int i = 1; i < fieldIndices.length; i++) { + fieldAccess = relBuilder.getRexBuilder().makeFieldAccess(fieldAccess, fieldIndices[i]); } + return fieldAccess; } @Override diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala index 445cd45f38679..fbd50455a1a48 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala @@ -44,11 +44,11 @@ import org.apache.calcite.sql.fun.{SqlStdOperatorTable, SqlTrimFunction} import org.apache.calcite.util.{TimestampString, Util} import java.util -import java.util.{List => JList, TimeZone} +import java.util.{Collections, List => JList, TimeZone} +import scala.collection.{mutable, JavaConverters} import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ -import scala.collection.mutable import scala.util.{Failure, Success, Try} object RexNodeExtractor extends Logging { @@ -551,62 +551,8 @@ class RexNodeToExpressionConverter( override def visitFieldAccess(fieldAccess: RexFieldAccess): Option[ResolvedExpression] = { relDataType match { case Some(dataType) => - val schema = new NestedSchema(dataType) - def internalVisit(fieldAccess: RexFieldAccess): (Boolean, Int, List[String]) = { - fieldAccess.getReferenceExpr match { - case ref: RexInputRef => - (true, ref.getIndex, List(ref.getName, fieldAccess.getField.getName)) - case fac: RexFieldAccess => - val (success, i, n) = internalVisit(fac) - (success, i, if (success) n :+ fieldAccess.getField.getName else null) - case expr => - // only extract operands of the expression - expr.accept(this) - (false, -1, null) - } - } - - // extract the info - val (success, index, names) = internalVisit(fieldAccess) - if (!success) { - throw new TableException( - "Nested fields access is only supported on top of input references.") - } - - val topLevelNodeName = schema.inputRowType.getFieldNames.get(index) - val topLevelNode = if (!schema.columns.contains(topLevelNodeName)) { - val fieldType = schema.inputRowType.getFieldList.get(index).getType - val node = new NestedColumn(topLevelNodeName, index, fieldType) - schema.columns.put(topLevelNodeName, node) - node - } else { - schema.columns.get(topLevelNodeName) - } - - val leaf = names.slice(1, names.size).foldLeft(topLevelNode) { - case (parent, name) => - if (parent.isLeaf) { - Some( - new FieldReferenceExpression( - schema.columns.get(0).name, - fromLogicalTypeToDataType(FlinkTypeFactory.toLogicalType(parent.originFieldType)), - index, - fieldAccess.getField.getIndex)) - } - if (!parent.children.containsKey(name)) { - val rowtype = parent.originFieldType - val index = rowtype.getFieldNames.indexOf(name) - if (index < 0) { - throw new TableException( - String - .format("Could not find field %s in field %s.", name, parent.originFieldType)) - } - parent.addChild( - new NestedColumn(name, index, rowtype.getFieldList.get(index).getType)) - } - parent.children.get(name) - } - leaf.markLeaf() + val schema = NestedProjectionUtil.build(Collections.singletonList(fieldAccess), dataType) + val fieldIndexArray = NestedProjectionUtil.convertToIndexArray(schema) var (topLevelColumnName, nestedColumn) = schema.columns.head val nestedFieldName = new StringBuilder() @@ -619,11 +565,11 @@ class RexNodeToExpressionConverter( nestedFieldName.append(topLevelColumnName) Some( - new FieldReferenceExpression( + new NestedFieldReferenceExpression( nestedFieldName.toString(), - fromLogicalTypeToDataType(FlinkTypeFactory.toLogicalType(leaf.originFieldType)), - index, - fieldAccess.getField.getIndex)) + fromLogicalTypeToDataType(FlinkTypeFactory.toLogicalType(fieldAccess.getType)), + fieldAccess.getField.getIndex, + fieldIndexArray(0))) } } override def visitCorrelVariable(correlVariable: RexCorrelVariable): Option[ResolvedExpression] = diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTest.java index b6d182156865a..fc40e2230cdeb 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTest.java @@ -132,6 +132,6 @@ public void testNestedFieldFilterPushdown() { + ")"; util.tableEnv().executeSql(ddl); - super.testFilterWithNestedFields(); + util.verifyRelPlan("SELECT * FROM MTable WHERE b.`b1`.`b11` > 2"); } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTestBase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTestBase.java index 4d3aa8a0a1f8a..80629167fd1ee 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTestBase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTestBase.java @@ -147,9 +147,4 @@ public void testWithNullLiteral() { + " FROM MyTable)\n" + "SELECT a FROM MyView WHERE a IS NOT NULL\n"); } - - @Test - public void testFilterWithNestedFields() { - util.verifyRelPlan("SELECT * FROM MTable WHERE b.`b1`.`b11` > 2"); - } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/FilterUtils.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/FilterUtils.java index 0209193266244..64421efd3c4d8 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/FilterUtils.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/FilterUtils.java @@ -22,6 +22,7 @@ import org.apache.flink.table.expressions.CallExpression; import org.apache.flink.table.expressions.Expression; import org.apache.flink.table.expressions.FieldReferenceExpression; +import org.apache.flink.table.expressions.NestedFieldReferenceExpression; import org.apache.flink.table.expressions.ResolvedExpression; import org.apache.flink.table.expressions.ValueLiteralExpression; import org.apache.flink.table.functions.BuiltInFunctionDefinitions; @@ -96,6 +97,12 @@ private static boolean shouldPushDownUnaryExpression( } } + if (expr instanceof NestedFieldReferenceExpression) { + if (filterableFields.contains(((NestedFieldReferenceExpression) expr).getName())) { + return true; + } + } + if (expr instanceof ValueLiteralExpression) { return true; } @@ -156,6 +163,10 @@ private static Comparable getValue(Expression expr, Function Date: Mon, 28 Aug 2023 17:42:07 -0700 Subject: [PATCH 03/11] Add more tests and don't push filters on nested fields inside an array or map --- .../planner/plan/utils/RexNodeExtractor.scala | 6 ++ ...PushFilterIntoTableSourceScanRuleTest.java | 65 +++++++++++++---- .../PushFilterIntoTableSourceScanRuleTest.xml | 70 +++++++++++++++++-- 3 files changed, 122 insertions(+), 19 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala index fbd50455a1a48..a33858c58bd12 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala @@ -549,6 +549,12 @@ class RexNodeToExpressionConverter( } override def visitFieldAccess(fieldAccess: RexFieldAccess): Option[ResolvedExpression] = { + fieldAccess.getReferenceExpr match { + // push down on nested field inside a map or array is not supported + case _: RexCall => return None + case _ => // do nothing + } + relDataType match { case Some(dataType) => val schema = NestedProjectionUtil.build(Collections.singletonList(fieldAccess), dataType) diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTest.java index fc40e2230cdeb..d6ec7824dc481 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTest.java @@ -86,6 +86,39 @@ public void setup() { + ")"; util.tableEnv().executeSql(ddl2); + + String ddl3 = + "CREATE TABLE NestedTable (\n" + + " id int,\n" + + " deepNested row, nested2 row>,\n" + + " nested row,\n" + + " `deepNestedWith.` row<`.value` int, nested row>,\n" + + " name string,\n" + + " testMap Map\n" + + ") WITH (\n" + + " 'connector' = 'values',\n" + + " 'filterable-fields' = 'deepNested.nested1.value;deepNestedWith..nested..value;'," + + " 'bounded' = 'true'\n" + + ")"; + util.tableEnv().executeSql(ddl3); + + String ddl4 = + "CREATE TABLE NestedItemTable (\n" + + " `ID` INT,\n" + + " `Timestamp` TIMESTAMP(3),\n" + + " `Result` ROW<\n" + + " `Mid` ROW<" + + " `data_arr` ROW<`value` BIGINT> ARRAY,\n" + + " `data_map` MAP>" + + " >" + + " >,\n" + + " WATERMARK FOR `Timestamp` AS `Timestamp`\n" + + ") WITH (\n" + + " 'connector' = 'values',\n" + + " 'filterable-fields' = 'Result.Mid.data_map;'," + + " 'bounded' = 'true'\n" + + ")"; + util.tableEnv().executeSql(ddl4); } @Test @@ -120,18 +153,26 @@ public void testWithInterval() { } @Test - public void testNestedFieldFilterPushdown() { - String ddl = - "CREATE TABLE MTable (\n" - + " a ROW<`a1` INT, `a2` INT>,\n" - + " b ROW<`b1` ROW<`b11` INT>, `b2` INT>\n" - + ") WITH (\n" - + " 'connector' = 'values',\n" - + " 'filterable-fields' = 'a.a1;a.a2;b.b1.b11;b.b2',\n" - + " 'bounded' = 'true'\n" - + ")"; + public void testBasicNestedFilter() { + util.verifyRelPlan("SELECT * FROM NestedTable WHERE deepNested.nested1.`value` > 2"); + } - util.tableEnv().executeSql(ddl); - util.verifyRelPlan("SELECT * FROM MTable WHERE b.`b1`.`b11` > 2"); + @Test + public void testNestedFilterWithDotInTheName() { + util.verifyRelPlan("SELECT id FROM NestedTable WHERE" + + " `deepNestedWith.`.nested.`.value` > 5"); + } + + @Test + public void testNestedFilterOnMapKey() { + util.verifyRelPlan( + "SELECT * FROM NestedItemTable WHERE" + + " `Result`.`Mid`.data_map['item'].`value` = 3"); + } + + @Test + public void testNestedFilterOnArrayField() { + util.verifyRelPlan( + "SELECT * FROM NestedItemTable WHERE `Result`.`Mid`.data_arr[2].`value` = 3"); } } diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTest.xml index a73f9c2c1aec2..7260ee8dbc4d5 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTest.xml @@ -414,21 +414,77 @@ LogicalProject(a=[$0]) ]]> - + - 2]]> + 2]]> ($1.b1.b11, 2)]) - +- LogicalTableScan(table=[[default_catalog, default_database, MTable]]) +LogicalProject(id=[$0], deepNested=[$1], nested=[$2], deepNestedWith.=[$3], name=[$4], testMap=[$5]) ++- LogicalFilter(condition=[>($1.nested1.value, 2)]) + +- LogicalTableScan(table=[[default_catalog, default_database, NestedTable]]) ]]> (b.b1.b11, 2)]]]) +LogicalProject(id=[$0], deepNested=[$1], nested=[$2], deepNestedWith.=[$3], name=[$4], testMap=[$5]) ++- LogicalTableScan(table=[[default_catalog, default_database, NestedTable, filter=[>(deepNested.nested1.value, 2)]]]) +]]> + + + + + 5]]> + + + ($3.nested..value, 5)]) + +- LogicalTableScan(table=[[default_catalog, default_database, NestedTable]]) +]]> + + + (deepNestedWith..nested..value, 5)]]]) +]]> + + + + + + + + + + + + + + + + + + + + + + From 0229780e22ef802e56c831fd23d117c3e1e81885 Mon Sep 17 00:00:00 2001 From: Venkata krishnan Sowrirajan Date: Mon, 28 Aug 2023 18:11:29 -0700 Subject: [PATCH 04/11] more changes --- .../expressions/BaseReferenceExpression.java | 22 ------------------- .../NestedFieldReferenceExpression.java | 2 +- .../planner/plan/utils/RexNodeExtractor.scala | 2 +- ...PushFilterIntoTableSourceScanRuleTest.java | 4 ++-- 4 files changed, 4 insertions(+), 26 deletions(-) delete mode 100644 flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/BaseReferenceExpression.java diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/BaseReferenceExpression.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/BaseReferenceExpression.java deleted file mode 100644 index 030bb16db2774..0000000000000 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/BaseReferenceExpression.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.expressions; - -/** TODO: javadoc. */ -public abstract class BaseReferenceExpression implements ResolvedExpression {} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/NestedFieldReferenceExpression.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/NestedFieldReferenceExpression.java index 6f4526f5c134b..2e60bb52c92d5 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/NestedFieldReferenceExpression.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/NestedFieldReferenceExpression.java @@ -34,7 +34,7 @@ *
  • index of a field within the corresponding input * */ -public class NestedFieldReferenceExpression extends BaseReferenceExpression { +public class NestedFieldReferenceExpression implements ResolvedExpression { private final String name; diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala index a33858c58bd12..c07b1fb5ab12b 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala @@ -550,7 +550,7 @@ class RexNodeToExpressionConverter( override def visitFieldAccess(fieldAccess: RexFieldAccess): Option[ResolvedExpression] = { fieldAccess.getReferenceExpr match { - // push down on nested field inside a map or array is not supported + // push down on nested field inside a composite type like map or array is not supported case _: RexCall => return None case _ => // do nothing } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTest.java index d6ec7824dc481..ddcf809e83b44 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTest.java @@ -159,8 +159,8 @@ public void testBasicNestedFilter() { @Test public void testNestedFilterWithDotInTheName() { - util.verifyRelPlan("SELECT id FROM NestedTable WHERE" - + " `deepNestedWith.`.nested.`.value` > 5"); + util.verifyRelPlan( + "SELECT id FROM NestedTable WHERE" + " `deepNestedWith.`.nested.`.value` > 5"); } @Test From f5b723454f1ed1e62eaf39c8f37a81a1bf193bb4 Mon Sep 17 00:00:00 2001 From: Venkata krishnan Sowrirajan Date: Tue, 29 Aug 2023 11:58:55 -0700 Subject: [PATCH 05/11] More changes based on the FLIP discussions --- .../NestedFieldReferenceExpression.java | 44 ++++++------------- .../converter/ExpressionConverter.java | 9 ++-- .../planner/plan/utils/RexNodeExtractor.scala | 14 +++--- ...PushFilterIntoTableSourceScanRuleTest.java | 2 +- 4 files changed, 25 insertions(+), 44 deletions(-) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/NestedFieldReferenceExpression.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/NestedFieldReferenceExpression.java index 2e60bb52c92d5..aa0ca8056d1cd 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/NestedFieldReferenceExpression.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/NestedFieldReferenceExpression.java @@ -18,6 +18,7 @@ package org.apache.flink.table.expressions; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.table.types.DataType; import java.util.Arrays; @@ -34,41 +35,25 @@ *
  • index of a field within the corresponding input * */ +@PublicEvolving public class NestedFieldReferenceExpression implements ResolvedExpression { - private final String name; + /** Nested field names to traverse from the top level column to the nested leaf column. */ + private final String[] fieldNames; private final DataType dataType; - /** - * index of an input the field belongs to. e.g. for a join, `inputIndex` of left input is 0 and - * `inputIndex` of right input is 1. - */ - private final int inputIndex; - - /** - * Nested field reference index to traverse from the top level column to the nested leaf column. - */ - private final int[] fieldIndices; - - public NestedFieldReferenceExpression( - String name, DataType dataType, int inputIndex, int[] fieldIndices) { - this.name = name; + public NestedFieldReferenceExpression(String[] fieldNames, DataType dataType) { + this.fieldNames = fieldNames; this.dataType = dataType; - this.inputIndex = inputIndex; - this.fieldIndices = fieldIndices; - } - - public String getName() { - return name; } - public int getInputIndex() { - return inputIndex; + public String[] getFieldNames() { + return fieldNames; } - public int[] getFieldIndices() { - return fieldIndices; + public String getName() { + return String.join("_", fieldNames); } @Override @@ -83,7 +68,7 @@ public List getResolvedChildren() { @Override public String asSummaryString() { - return name; + return getName(); } @Override @@ -105,15 +90,12 @@ public boolean equals(Object o) { return false; } NestedFieldReferenceExpression that = (NestedFieldReferenceExpression) o; - return name.equals(that.name) - && dataType.equals(that.dataType) - && inputIndex == that.inputIndex - && Arrays.equals(fieldIndices, that.fieldIndices); + return Arrays.equals(fieldNames, that.fieldNames) && dataType.equals(that.dataType); } @Override public int hashCode() { - return Objects.hash(name, dataType, inputIndex, fieldIndices); + return Objects.hash(fieldNames, dataType); } @Override diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/ExpressionConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/ExpressionConverter.java index ac63017706a08..66c90e344edfd 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/ExpressionConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/ExpressionConverter.java @@ -205,10 +205,11 @@ public RexNode visit(FieldReferenceExpression fieldReference) { @Override public RexNode visit(NestedFieldReferenceExpression nestedFieldReference) { - int[] fieldIndices = nestedFieldReference.getFieldIndices(); - RexNode fieldAccess = relBuilder.field(fieldIndices[0]); - for (int i = 1; i < fieldIndices.length; i++) { - fieldAccess = relBuilder.getRexBuilder().makeFieldAccess(fieldAccess, fieldIndices[i]); + String[] fieldNames = nestedFieldReference.getFieldNames(); + RexNode fieldAccess = relBuilder.field(fieldNames[0]); + for (int i = 1; i < fieldNames.length; i++) { + fieldAccess = + relBuilder.getRexBuilder().makeFieldAccess(fieldAccess, fieldNames[i], true); } return fieldAccess; } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala index c07b1fb5ab12b..0d5d0823cf248 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala @@ -49,6 +49,7 @@ import java.util.{Collections, List => JList, TimeZone} import scala.collection.{mutable, JavaConverters} import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer import scala.util.{Failure, Success, Try} object RexNodeExtractor extends Logging { @@ -558,24 +559,21 @@ class RexNodeToExpressionConverter( relDataType match { case Some(dataType) => val schema = NestedProjectionUtil.build(Collections.singletonList(fieldAccess), dataType) - val fieldIndexArray = NestedProjectionUtil.convertToIndexArray(schema) var (topLevelColumnName, nestedColumn) = schema.columns.head - val nestedFieldName = new StringBuilder() + val fieldNames = new ArrayBuffer[String]() while (!nestedColumn.isLeaf) { - nestedFieldName.append(topLevelColumnName).append(".") + fieldNames.add(topLevelColumnName) topLevelColumnName = nestedColumn.children.head._1 nestedColumn = nestedColumn.children.head._2 } - nestedFieldName.append(topLevelColumnName) + fieldNames.add(topLevelColumnName) Some( new NestedFieldReferenceExpression( - nestedFieldName.toString(), - fromLogicalTypeToDataType(FlinkTypeFactory.toLogicalType(fieldAccess.getType)), - fieldAccess.getField.getIndex, - fieldIndexArray(0))) + fieldNames.toArray, + fromLogicalTypeToDataType(FlinkTypeFactory.toLogicalType(fieldAccess.getType)))) } } override def visitCorrelVariable(correlVariable: RexCorrelVariable): Option[ResolvedExpression] = diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTest.java index ddcf809e83b44..be9d40e7f8a85 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTest.java @@ -97,7 +97,7 @@ public void setup() { + " testMap Map\n" + ") WITH (\n" + " 'connector' = 'values',\n" - + " 'filterable-fields' = 'deepNested.nested1.value;deepNestedWith..nested..value;'," + + " 'filterable-fields' = 'deepNested_nested1_value;deepNestedWith._nested_.value;'," + " 'bounded' = 'true'\n" + ")"; util.tableEnv().executeSql(ddl3); From fbc35d2eafdf0f8483a866d100b74e356f9f6ae5 Mon Sep 17 00:00:00 2001 From: Venkata krishnan Sowrirajan Date: Tue, 29 Aug 2023 15:07:15 -0700 Subject: [PATCH 06/11] some changes --- .../expressions/utils/ResolvedExpressionDefaultVisitor.java | 6 ++++++ .../flink/table/expressions/ExpressionDefaultVisitor.java | 5 +++++ .../apache/flink/table/expressions/ExpressionVisitor.java | 3 +-- .../table/expressions/NestedFieldReferenceExpression.java | 3 +-- .../logical/PushFilterIntoTableSourceScanRuleTest.java | 4 ++-- 5 files changed, 15 insertions(+), 6 deletions(-) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/utils/ResolvedExpressionDefaultVisitor.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/utils/ResolvedExpressionDefaultVisitor.java index 1370175cb8e73..3bf93880d7d56 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/utils/ResolvedExpressionDefaultVisitor.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/utils/ResolvedExpressionDefaultVisitor.java @@ -22,6 +22,7 @@ import org.apache.flink.table.expressions.CallExpression; import org.apache.flink.table.expressions.FieldReferenceExpression; import org.apache.flink.table.expressions.LocalReferenceExpression; +import org.apache.flink.table.expressions.NestedFieldReferenceExpression; import org.apache.flink.table.expressions.ResolvedExpression; import org.apache.flink.table.expressions.ResolvedExpressionVisitor; import org.apache.flink.table.expressions.TableReferenceExpression; @@ -70,5 +71,10 @@ public T visit(ResolvedExpression other) { return defaultMethod(other); } + @Override + public T visit(NestedFieldReferenceExpression nestedFieldReference) { + return defaultMethod(nestedFieldReference); + } + protected abstract T defaultMethod(ResolvedExpression expression); } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/ExpressionDefaultVisitor.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/ExpressionDefaultVisitor.java index 95af8c185028a..cc38f09258271 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/ExpressionDefaultVisitor.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/ExpressionDefaultVisitor.java @@ -52,5 +52,10 @@ public T visit(Expression other) { return defaultMethod(other); } + @Override + public T visit(NestedFieldReferenceExpression nestedFieldReference) { + return defaultMethod(nestedFieldReference); + } + protected abstract T defaultMethod(Expression expression); } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/ExpressionVisitor.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/ExpressionVisitor.java index 334ce5be38a31..62f9ff7bbca83 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/ExpressionVisitor.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/ExpressionVisitor.java @@ -50,7 +50,6 @@ public interface ExpressionVisitor { R visit(Expression other); default R visit(NestedFieldReferenceExpression nestedFieldReference) { - throw new UnsupportedOperationException( - "This visitor does not support visiting nested field references."); + throw new UnsupportedOperationException("NestedFieldReferenceExpression is not supported."); } } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/NestedFieldReferenceExpression.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/NestedFieldReferenceExpression.java index aa0ca8056d1cd..c3b3cd08c5a7a 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/NestedFieldReferenceExpression.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/NestedFieldReferenceExpression.java @@ -30,9 +30,8 @@ * A reference to a nested field in an input. The reference contains. * *
      + *
    • nested field names to traverse from the top level column to the nested leaf column. *
    • type - *
    • index of an input the field belongs to - *
    • index of a field within the corresponding input *
    */ @PublicEvolving diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTest.java index be9d40e7f8a85..aa8997a02eaa6 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTest.java @@ -115,7 +115,7 @@ public void setup() { + " WATERMARK FOR `Timestamp` AS `Timestamp`\n" + ") WITH (\n" + " 'connector' = 'values',\n" - + " 'filterable-fields' = 'Result.Mid.data_map;'," + + " 'filterable-fields' = 'Result_Mid_data_map;'," + " 'bounded' = 'true'\n" + ")"; util.tableEnv().executeSql(ddl4); @@ -160,7 +160,7 @@ public void testBasicNestedFilter() { @Test public void testNestedFilterWithDotInTheName() { util.verifyRelPlan( - "SELECT id FROM NestedTable WHERE" + " `deepNestedWith.`.nested.`.value` > 5"); + "SELECT id FROM NestedTable WHERE `deepNestedWith.`.nested.`.value` > 5"); } @Test From 08038627ef7c91fe3bff7554e108a8ce3638a283 Mon Sep 17 00:00:00 2001 From: Venkata krishnan Sowrirajan Date: Wed, 30 Aug 2023 18:34:58 -0700 Subject: [PATCH 07/11] Simplify the changes --- .../plan/abilities/source/FilterPushDownSpec.java | 10 +++++----- .../PushFilterInCalcIntoTableSourceScanRule.java | 3 +-- .../logical/PushFilterIntoSourceScanRuleBase.java | 7 ++----- .../logical/PushFilterIntoTableSourceScanRule.java | 3 +-- .../table/planner/plan/utils/RexNodeExtractor.scala | 3 ++- 5 files changed, 11 insertions(+), 15 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/FilterPushDownSpec.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/FilterPushDownSpec.java index 0eacda7100f91..007fa0a4054df 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/FilterPushDownSpec.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/FilterPushDownSpec.java @@ -35,7 +35,6 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName; -import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rex.RexBuilder; import org.apache.calcite.rex.RexNode; @@ -89,7 +88,7 @@ public List getPredicates() { @Override public void apply(DynamicTableSource tableSource, SourceAbilityContext context) { - SupportsFilterPushDown.Result result = apply(predicates, tableSource, context, null); + SupportsFilterPushDown.Result result = apply(predicates, tableSource, context); if (result.getAcceptedFilters().size() != predicates.size()) { throw new TableException("All predicates should be accepted here."); } @@ -98,8 +97,7 @@ public void apply(DynamicTableSource tableSource, SourceAbilityContext context) public static SupportsFilterPushDown.Result apply( List predicates, DynamicTableSource tableSource, - SourceAbilityContext context, - RelDataType relDataType) { + SourceAbilityContext context) { if (tableSource instanceof SupportsFilterPushDown) { RexNodeToExpressionConverter converter = new RexNodeToExpressionConverter( @@ -109,7 +107,9 @@ public static SupportsFilterPushDown.Result apply( context.getCatalogManager(), TimeZone.getTimeZone( TableConfigUtils.getLocalTimeZone(context.getTableConfig())), - Option.apply(relDataType)); + Option.apply( + context.getTypeFactory() + .buildRelNodeRowType(context.getSourceRowType()))); List filters = predicates.stream() .map( diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterInCalcIntoTableSourceScanRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterInCalcIntoTableSourceScanRule.java index 81af806e1d28f..4dc970af83183 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterInCalcIntoTableSourceScanRule.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterInCalcIntoTableSourceScanRule.java @@ -103,8 +103,7 @@ private void pushFilterIntoScan( convertiblePredicates, relOptTable.unwrap(TableSourceTable.class), scan, - relBuilder, - null); + relBuilder); SupportsFilterPushDown.Result result = pushdownResultWithScan._1; TableSourceTable tableSourceTable = pushdownResultWithScan._2; diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoSourceScanRuleBase.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoSourceScanRuleBase.java index e557a7b7a8963..eb5e4f6d25de6 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoSourceScanRuleBase.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoSourceScanRuleBase.java @@ -33,7 +33,6 @@ import org.apache.calcite.plan.RelOptRuleCall; import org.apache.calcite.plan.RelOptRuleOperand; import org.apache.calcite.rel.core.TableScan; -import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rex.RexNode; import org.apache.calcite.tools.RelBuilder; @@ -87,8 +86,7 @@ protected RexNode createRemainingCondition( RexNode[] convertiblePredicates, TableSourceTable oldTableSourceTable, TableScan scan, - RelBuilder relBuilder, - RelDataType relDataType) { + RelBuilder relBuilder) { // record size before applyFilters for update statistics int originPredicatesSize = convertiblePredicates.length; @@ -99,8 +97,7 @@ protected RexNode createRemainingCondition( FilterPushDownSpec.apply( Arrays.asList(convertiblePredicates), newTableSource, - SourceAbilityContext.from(scan), - relDataType); + SourceAbilityContext.from(scan)); relBuilder.push(scan); List acceptedPredicates = diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java index bcb148d81a8d1..eece6d7b99cd5 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java @@ -107,8 +107,7 @@ private void pushFilterIntoScan( convertiblePredicates, relOptTable.unwrap(TableSourceTable.class), scan, - relBuilder, - typeFactory.buildRelNodeRowType(producedType)); + relBuilder); SupportsFilterPushDown.Result result = scanAfterPushdownWithResult._1; TableSourceTable tableSourceTable = scanAfterPushdownWithResult._2; diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala index 0d5d0823cf248..8a5a5fd02920c 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala @@ -417,7 +417,8 @@ class RexNodeToExpressionConverter( inputNames(inputRef.getIndex), fromLogicalTypeToDataType(FlinkTypeFactory.toLogicalType(inputRef.getType)), 0, - inputRef.getIndex)) + inputRef.getIndex + )) } override def visitTableInputRef(rexTableInputRef: RexTableInputRef): Option[ResolvedExpression] = From c6825fd1b054dc790bf196a02f22ca4ec53b9be4 Mon Sep 17 00:00:00 2001 From: Venkata krishnan Sowrirajan Date: Thu, 31 Aug 2023 14:12:41 -0700 Subject: [PATCH 08/11] Need to check whether this change for other cases like DeletePushDown etc and all --- .../logical/PushFilterIntoTableSourceScanRule.java | 7 +------ .../table/planner/plan/utils/FlinkRexUtil.scala | 13 ++----------- 2 files changed, 3 insertions(+), 17 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java index eece6d7b99cd5..ce73cb8ddc034 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java @@ -83,17 +83,12 @@ private void pushFilterIntoScan( FlinkPreparingTableBase relOptTable) { RelBuilder relBuilder = call.builder(); - final TableSourceTable sourceTable = scan.getTable().unwrap(TableSourceTable.class); - final FlinkTypeFactory typeFactory = unwrapTypeFactory(scan); - final ResolvedSchema schema = sourceTable.contextResolvedTable().getResolvedSchema(); - final RowType producedType = createProducedType(schema, sourceTable.tableSource()); Tuple2 extractedPredicates = FlinkRexUtil.extractPredicates( filter.getInput().getRowType().getFieldNames().toArray(new String[0]), filter.getCondition(), scan, - relBuilder.getRexBuilder(), - typeFactory.buildRelNodeRowType(producedType)); + relBuilder.getRexBuilder()); RexNode[] convertiblePredicates = extractedPredicates._1; RexNode[] unconvertedPredicates = extractedPredicates._2; diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtil.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtil.scala index e00149a06881b..82590106330bf 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtil.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtil.scala @@ -628,14 +628,6 @@ object FlinkRexUtil { projects.forall(RexUtil.isDeterministic) } - def extractPredicates( - inputNames: Array[String], - filterExpression: RexNode, - rel: RelNode, - rexBuilder: RexBuilder): (Array[RexNode], Array[RexNode]) = { - extractPredicates(inputNames, filterExpression, rel, rexBuilder, null) - } - /** * Return convertible rex nodes and unconverted rex nodes extracted from the filter expression. */ @@ -643,8 +635,7 @@ object FlinkRexUtil { inputNames: Array[String], filterExpression: RexNode, rel: RelNode, - rexBuilder: RexBuilder, - relDataType: RelDataType): (Array[RexNode], Array[RexNode]) = { + rexBuilder: RexBuilder): (Array[RexNode], Array[RexNode]) = { val context = ShortcutUtils.unwrapContext(rel) val maxCnfNodeCount = FlinkRelOptUtil.getMaxCnfNodeCount(rel); val converter = @@ -654,7 +645,7 @@ object FlinkRexUtil { context.getFunctionCatalog, context.getCatalogManager, TimeZone.getTimeZone(TableConfigUtils.getLocalTimeZone(context.getTableConfig)), - Some(relDataType)); + Some(rel.getRowType)); RexNodeExtractor.extractConjunctiveConditions( filterExpression, From 428d8846605c04cd21f5ff67fe75e62d57cf7152 Mon Sep 17 00:00:00 2001 From: Venkata krishnan Sowrirajan Date: Wed, 6 Sep 2023 09:04:16 -0700 Subject: [PATCH 09/11] Add fieldIndices to the NestedFieldReferenceExpression --- .../NestedFieldReferenceExpression.java | 20 +++++++++++++++---- .../PushFilterIntoTableSourceScanRule.java | 6 ------ .../planner/plan/utils/RexNodeExtractor.scala | 3 ++- ...PushFilterIntoTableSourceScanRuleTest.java | 2 +- 4 files changed, 19 insertions(+), 12 deletions(-) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/NestedFieldReferenceExpression.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/NestedFieldReferenceExpression.java index c3b3cd08c5a7a..44666594905e6 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/NestedFieldReferenceExpression.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/NestedFieldReferenceExpression.java @@ -31,6 +31,7 @@ * *
      *
    • nested field names to traverse from the top level column to the nested leaf column. + *
    • nested field indices to traverse from the top level column to the nested leaf column. *
    • type *
    */ @@ -40,10 +41,15 @@ public class NestedFieldReferenceExpression implements ResolvedExpression { /** Nested field names to traverse from the top level column to the nested leaf column. */ private final String[] fieldNames; + /** Nested field index to traverse from the top level column to the nested leaf column. */ + private final int[] fieldIndices; + private final DataType dataType; - public NestedFieldReferenceExpression(String[] fieldNames, DataType dataType) { + public NestedFieldReferenceExpression( + String[] fieldNames, int[] fieldIndices, DataType dataType) { this.fieldNames = fieldNames; + this.fieldIndices = fieldIndices; this.dataType = dataType; } @@ -51,8 +57,12 @@ public String[] getFieldNames() { return fieldNames; } + public int[] getFieldIndices() { + return fieldIndices; + } + public String getName() { - return String.join("_", fieldNames); + return String.join(".", fieldNames); } @Override @@ -89,12 +99,14 @@ public boolean equals(Object o) { return false; } NestedFieldReferenceExpression that = (NestedFieldReferenceExpression) o; - return Arrays.equals(fieldNames, that.fieldNames) && dataType.equals(that.dataType); + return Arrays.equals(fieldNames, that.fieldNames) + && Arrays.equals(fieldIndices, that.fieldIndices) + && dataType.equals(that.dataType); } @Override public int hashCode() { - return Objects.hash(fieldNames, dataType); + return Objects.hash(fieldNames, fieldIndices, dataType); } @Override diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java index ce73cb8ddc034..eaa6999cdd7fe 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java @@ -18,13 +18,10 @@ package org.apache.flink.table.planner.plan.rules.logical; -import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown; -import org.apache.flink.table.planner.calcite.FlinkTypeFactory; import org.apache.flink.table.planner.plan.schema.FlinkPreparingTableBase; import org.apache.flink.table.planner.plan.schema.TableSourceTable; import org.apache.flink.table.planner.plan.utils.FlinkRexUtil; -import org.apache.flink.table.types.logical.RowType; import org.apache.calcite.plan.RelOptRuleCall; import org.apache.calcite.rel.core.Filter; @@ -34,9 +31,6 @@ import scala.Tuple2; -import static org.apache.flink.table.planner.connectors.DynamicSourceUtils.createProducedType; -import static org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTypeFactory; - /** * Planner rule that tries to push a filter into a {@link LogicalTableScan}, which table is a {@link * TableSourceTable}. And the table source in the table is a {@link SupportsFilterPushDown}. diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala index 8a5a5fd02920c..42a822fccb9cf 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala @@ -560,7 +560,7 @@ class RexNodeToExpressionConverter( relDataType match { case Some(dataType) => val schema = NestedProjectionUtil.build(Collections.singletonList(fieldAccess), dataType) - + val fieldIndices = NestedProjectionUtil.convertToIndexArray(schema) var (topLevelColumnName, nestedColumn) = schema.columns.head val fieldNames = new ArrayBuffer[String]() @@ -574,6 +574,7 @@ class RexNodeToExpressionConverter( Some( new NestedFieldReferenceExpression( fieldNames.toArray, + fieldIndices(0), fromLogicalTypeToDataType(FlinkTypeFactory.toLogicalType(fieldAccess.getType)))) } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTest.java index aa8997a02eaa6..691dcd9b755ce 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTest.java @@ -97,7 +97,7 @@ public void setup() { + " testMap Map\n" + ") WITH (\n" + " 'connector' = 'values',\n" - + " 'filterable-fields' = 'deepNested_nested1_value;deepNestedWith._nested_.value;'," + + " 'filterable-fields' = 'deepNested.nested1.value;deepNestedWith..nested..value;'," + " 'bounded' = 'true'\n" + ")"; util.tableEnv().executeSql(ddl3); From c26c780ca59c47672a4b457080d2a79184b7e3f5 Mon Sep 17 00:00:00 2001 From: Venkata krishnan Sowrirajan Date: Sat, 9 Sep 2023 18:26:00 -0700 Subject: [PATCH 10/11] Quote the nested field name --- .../NestedFieldReferenceExpression.java | 12 +++++++++++- .../PushFilterIntoTableSourceScanRuleTest.java | 10 ++++++++-- .../PushFilterIntoTableSourceScanRuleTest.xml | 18 ++++++++++++++++++ 3 files changed, 37 insertions(+), 3 deletions(-) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/NestedFieldReferenceExpression.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/NestedFieldReferenceExpression.java index 44666594905e6..85aff7211f570 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/NestedFieldReferenceExpression.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/NestedFieldReferenceExpression.java @@ -62,7 +62,13 @@ public int[] getFieldIndices() { } public String getName() { - return String.join(".", fieldNames); + return String.format( + "`%s`", + String.join( + ".", + Arrays.stream(fieldNames) + .map(this::quoteIdentifier) + .toArray(String[]::new))); } @Override @@ -113,4 +119,8 @@ public int hashCode() { public String toString() { return asSummaryString(); } + + private String quoteIdentifier(String identifier) { + return identifier.replace("`", "``"); + } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTest.java index 691dcd9b755ce..47e03c3beb409 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTest.java @@ -92,12 +92,12 @@ public void setup() { + " id int,\n" + " deepNested row, nested2 row>,\n" + " nested row,\n" - + " `deepNestedWith.` row<`.value` int, nested row>,\n" + + " `deepNestedWith.` row<`.value` int, nested row<```name` string, `.value` int>>,\n" + " name string,\n" + " testMap Map\n" + ") WITH (\n" + " 'connector' = 'values',\n" - + " 'filterable-fields' = 'deepNested.nested1.value;deepNestedWith..nested..value;'," + + " 'filterable-fields' = '`deepNested.nested1.value`;`deepNestedWith..nested..value`;`deepNestedWith..nested.``name`;'," + " 'bounded' = 'true'\n" + ")"; util.tableEnv().executeSql(ddl3); @@ -163,6 +163,12 @@ public void testNestedFilterWithDotInTheName() { "SELECT id FROM NestedTable WHERE `deepNestedWith.`.nested.`.value` > 5"); } + @Test + public void testNestedFilterWithBacktickInTheName() { + util.verifyRelPlan( + "SELECT id FROM NestedTable WHERE `deepNestedWith.`.nested.```name` = 'foo'"); + } + @Test public void testNestedFilterOnMapKey() { util.verifyRelPlan( diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTest.xml index 7260ee8dbc4d5..b27c92d52bfcc 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTest.xml @@ -447,6 +447,24 @@ LogicalProject(id=[$0]) (deepNestedWith..nested..value, 5)]]]) +]]> + + + + + + + + + + + From 8924b9384361997b5ce063434cca94965552d166 Mon Sep 17 00:00:00 2001 From: Venkata krishnan Sowrirajan Date: Thu, 14 Sep 2023 00:09:56 -0700 Subject: [PATCH 11/11] Address review comments --- .../NestedFieldReferenceExpression.java | 4 +- .../planner/plan/utils/RexNodeExtractor.scala | 6 ++- .../planner/factories/TestValuesCatalog.java | 2 +- .../factories/TestValuesTableFactory.java | 15 +++++- .../table/planner/utils/FilterUtils.java | 36 +++++++++---- .../runtime/batch/sql/TableSourceITCase.scala | 38 +++++++++++++ .../stream/sql/TableSourceITCase.scala | 54 +++++++++++++++++++ 7 files changed, 140 insertions(+), 15 deletions(-) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/NestedFieldReferenceExpression.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/NestedFieldReferenceExpression.java index 85aff7211f570..70575ddf2a01d 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/NestedFieldReferenceExpression.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/NestedFieldReferenceExpression.java @@ -27,7 +27,7 @@ import java.util.Objects; /** - * A reference to a nested field in an input. The reference contains. + * A reference to a nested field in an input. The reference contains: * *
      *
    • nested field names to traverse from the top level column to the nested leaf column. @@ -112,7 +112,7 @@ public boolean equals(Object o) { @Override public int hashCode() { - return Objects.hash(fieldNames, fieldIndices, dataType); + return Objects.hash(Arrays.hashCode(fieldNames), Arrays.hashCode(fieldIndices), dataType); } @Override diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala index 42a822fccb9cf..482ce56dc6389 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala @@ -34,6 +34,7 @@ import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromLog import org.apache.flink.table.types.DataType import org.apache.flink.table.types.logical.LogicalTypeRoot._ import org.apache.flink.table.types.logical.YearMonthIntervalType +import org.apache.flink.table.types.utils.TypeConversions import org.apache.flink.util.Preconditions import org.apache.calcite.plan.RelOptUtil @@ -407,7 +408,7 @@ class RexNodeToExpressionConverter( functionCatalog: FunctionCatalog, catalogManager: CatalogManager, timeZone: TimeZone) = { - this(rexBuilder, inputNames, functionCatalog, catalogManager, timeZone, null) + this(rexBuilder, inputNames, functionCatalog, catalogManager, timeZone, None) } override def visitInputRef(inputRef: RexInputRef): Option[ResolvedExpression] = { @@ -575,7 +576,8 @@ class RexNodeToExpressionConverter( new NestedFieldReferenceExpression( fieldNames.toArray, fieldIndices(0), - fromLogicalTypeToDataType(FlinkTypeFactory.toLogicalType(fieldAccess.getType)))) + TypeConversions.fromLogicalToDataType( + FlinkTypeFactory.toLogicalType(fieldAccess.getType)))) } } override def visitCorrelVariable(correlVariable: RexCorrelVariable): Option[ResolvedExpression] = diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesCatalog.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesCatalog.java index badd3ecb004b0..2fc45dc743a0a 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesCatalog.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesCatalog.java @@ -92,7 +92,7 @@ public List listPartitionsByFilter( Function> getter = getValueGetter(partition.getPartitionSpec(), schema); return FilterUtils.isRetainedAfterApplyingFilterPredicates( - resolvedExpressions, getter); + resolvedExpressions, getter, null); }) .collect(Collectors.toList()); } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java index f739fa558a4e6..8c5f34445c97e 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java @@ -1007,6 +1007,17 @@ private Function> getValueGetter(Row row) { }; } + private Function> getNestedValueGetter(Row row) { + return fieldIndices -> { + Object current = row; + for (int i = 0; i < fieldIndices.length - 1; i++) { + current = ((Row) current).getField(fieldIndices[i]); + } + return (Comparable) + ((Row) current).getField(fieldIndices[fieldIndices.length - 1]); + }; + } + @Override public DynamicTableSource copy() { return new TestValuesScanTableSourceWithoutProjectionPushDown( @@ -1183,7 +1194,9 @@ private Map, Collection> filterAllData( for (Row row : allData.get(partition)) { boolean isRetained = FilterUtils.isRetainedAfterApplyingFilterPredicates( - filterPredicates, getValueGetter(row)); + filterPredicates, + getValueGetter(row), + getNestedValueGetter(row)); if (isRetained) { remainData.add(row); } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/FilterUtils.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/FilterUtils.java index 64421efd3c4d8..32d3afa233a9d 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/FilterUtils.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/FilterUtils.java @@ -29,6 +29,8 @@ import org.apache.flink.table.functions.FunctionDefinition; import org.apache.flink.util.Preconditions; +import javax.annotation.Nullable; + import java.util.List; import java.util.Optional; import java.util.Set; @@ -51,7 +53,9 @@ && shouldPushDownUnaryExpression( } public static boolean isRetainedAfterApplyingFilterPredicates( - List predicates, Function> getter) { + List predicates, + Function> getter, + @Nullable Function> nestedFieldGetter) { for (ResolvedExpression predicate : predicates) { if (predicate instanceof CallExpression) { FunctionDefinition definition = @@ -63,13 +67,17 @@ public static boolean isRetainedAfterApplyingFilterPredicates( if (!(expr instanceof CallExpression && expr.getChildren().size() == 2)) { throw new TableException(expr + " not supported!"); } - result = binaryFilterApplies((CallExpression) expr, getter); + result = + binaryFilterApplies( + (CallExpression) expr, getter, nestedFieldGetter); if (result) { break; } } } else if (predicate.getChildren().size() == 2) { - result = binaryFilterApplies((CallExpression) predicate, getter); + result = + binaryFilterApplies( + (CallExpression) predicate, getter, nestedFieldGetter); } else { throw new UnsupportedOperationException( String.format("Unsupported expr: %s.", predicate)); @@ -120,12 +128,14 @@ private static boolean shouldPushDownUnaryExpression( @SuppressWarnings({"unchecked", "rawtypes"}) private static boolean binaryFilterApplies( - CallExpression binExpr, Function> getter) { + CallExpression binExpr, + Function> getter, + Function> nestedFieldGetter) { List children = binExpr.getChildren(); Preconditions.checkArgument(children.size() == 2); - Comparable lhsValue = getValue(children.get(0), getter); - Comparable rhsValue = getValue(children.get(1), getter); + Comparable lhsValue = getValue(children.get(0), getter, nestedFieldGetter); + Comparable rhsValue = getValue(children.get(1), getter, nestedFieldGetter); FunctionDefinition functionDefinition = binExpr.getFunctionDefinition(); if (BuiltInFunctionDefinitions.GREATER_THAN.equals(functionDefinition)) { return lhsValue.compareTo(rhsValue) > 0; @@ -148,7 +158,10 @@ private static boolean isComparable(Class clazz) { return Comparable.class.isAssignableFrom(clazz); } - private static Comparable getValue(Expression expr, Function> getter) { + private static Comparable getValue( + Expression expr, + Function> getter, + Function> nestedFieldGetter) { if (expr instanceof ValueLiteralExpression) { Optional value = ((ValueLiteralExpression) expr) @@ -164,11 +177,16 @@ private static Comparable getValue(Expression expr, Function 20000 + """.stripMargin, + Seq(row(3, "Mike")) + ) + } + + @Test + def testNestedFilterOnArray(): Unit = { + checkResult( + """ + |SELECT id, + | deepNested.nested1.name AS nestedName, + | nestedItem.deepArray[2].`value` FROM NestedTable + |WHERE nestedItem.deepArray[2].`value` > 1 + """.stripMargin, + Seq(row(1, "Sarah", 2), row(2, "Rob", 2), row(3, "Mike", 2)) + ) + } + + @Test + def testNestedFilterOnMap(): Unit = { + checkResult( + """ + |SELECT id, + | deepNested.nested1.name AS nestedName, + | nestedItem.deepMap['Monday'] FROM NestedTable + |WHERE nestedItem.deepMap['Monday'] = 1 + """.stripMargin, + Seq(row(1, "Sarah", 1), row(2, "Rob", 1), row(3, "Mike", 1)) + ) + } } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSourceITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSourceITCase.scala index 206ae68d04064..6deb1fd61c7f4 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSourceITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSourceITCase.scala @@ -101,6 +101,7 @@ class TableSourceITCase extends StreamingTestBase { |) WITH ( | 'connector' = 'values', | 'nested-projection-supported' = 'true', + | 'filterable-fields' = '`nested.value`;`nestedItem.deepMap`;`nestedItem.deepArray`', | 'data-id' = '$nestedTableDataId', | 'bounded' = 'true' |) @@ -376,4 +377,57 @@ class TableSourceITCase extends StreamingTestBase { assertThat(t, containsCause(new TableException(SourceWatermarkFunction.ERROR_MESSAGE))) } } + + @Test + def testSimpleNestedFilter(): Unit = { + val query = + """ + |SELECT id, deepNested.nested1.name AS nestedName FROM NestedTable + | WHERE nested.`value` > 20000 + """.stripMargin + val result = tEnv.sqlQuery(query).toAppendStream[Row] + val sink = new TestingAppendSink + result.addSink(sink) + env.execute() + + val expected = Seq("3,Mike") + assertEquals(expected.sorted, sink.getAppendResults.sorted) + } + + @Test + def testNestedFilterOnArray(): Unit = { + val query = + """ + |SELECT id, + | deepNested.nested1.name AS nestedName, + | nestedItem.deepArray[2].`value` FROM NestedTable + |WHERE nestedItem.deepArray[2].`value` > 1 + """.stripMargin + val result = tEnv.sqlQuery(query).toAppendStream[Row] + val sink = new TestingAppendSink + result.addSink(sink) + env.execute() + + val expected = Seq("1,Sarah,2", "2,Rob,2", "3,Mike,2") + assertEquals(expected.sorted, sink.getAppendResults.sorted) + } + + @Test + def testNestedFilterOnMap(): Unit = { + val query = + """ + |SELECT id, + | deepNested.nested1.name AS nestedName, + | nestedItem.deepMap['Monday'] FROM NestedTable + |WHERE nestedItem.deepMap['Monday'] = 1 + """.stripMargin + + val result = tEnv.sqlQuery(query).toAppendStream[Row] + val sink = new TestingAppendSink + result.addSink(sink) + env.execute() + + val expected = Seq("1,Sarah,1", "2,Rob,1", "3,Mike,1") + assertEquals(expected.sorted, sink.getAppendResults.sorted) + } }