diff --git a/flink-python/pyflink/table/tests/test_udf.py b/flink-python/pyflink/table/tests/test_udf.py index 7f041cdfc6061..19cdc5fddf0bd 100644 --- a/flink-python/pyflink/table/tests/test_udf.py +++ b/flink-python/pyflink/table/tests/test_udf.py @@ -748,6 +748,54 @@ def local_zoned_timestamp_func(local_zoned_timestamp_param): actual = source_sink_utils.results() self.assert_equals(actual, ["+I[1970-01-01T00:00:00.123Z]"]) + def test_execute_from_json_plan(self): + # create source file path + tmp_dir = self.tempdir + data = ['1,1', '3,3', '2,2'] + source_path = tmp_dir + '/test_execute_from_json_plan_input.csv' + sink_path = tmp_dir + '/test_execute_from_json_plan_out' + with open(source_path, 'w') as fd: + for ele in data: + fd.write(ele + '\n') + + source_table = """ + CREATE TABLE source_table ( + a BIGINT, + b BIGINT + ) WITH ( + 'connector' = 'filesystem', + 'path' = '%s', + 'format' = 'csv' + ) + """ % source_path + self.t_env.execute_sql(source_table) + + self.t_env.execute_sql(""" + CREATE TABLE sink_table ( + id BIGINT, + data BIGINT + ) WITH ( + 'connector' = 'filesystem', + 'path' = '%s', + 'format' = 'csv' + ) + """ % sink_path) + + add_one = udf(lambda i: i + 1, result_type=DataTypes.BIGINT()) + self.t_env.create_temporary_system_function("add_one", add_one) + + json_plan = self.t_env._j_tenv.getJsonPlan("INSERT INTO sink_table SELECT " + "a, " + "add_one(b) " + "FROM source_table") + from py4j.java_gateway import get_method + get_method(self.t_env._j_tenv.executeJsonPlan(json_plan), "await")() + + import glob + lines = [line.strip() for file in glob.glob(sink_path + '/*') for line in open(file, 'r')] + lines.sort() + self.assertEqual(lines, ['1,2', '2,3', '3,4']) + class PyFlinkBlinkBatchUserDefinedFunctionTests(UserDefinedFunctionTests, PyFlinkBlinkBatchTableTestCase): diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonCalc.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonCalc.java index c269dfbc04605..b0526684097b1 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonCalc.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonCalc.java @@ -24,16 +24,39 @@ import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecPythonCalc; import org.apache.flink.table.types.logical.RowType; -import org.apache.calcite.rex.RexProgram; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import org.apache.calcite.rex.RexNode; + +import java.util.Collections; +import java.util.List; /** Batch {@link ExecNode} for Python ScalarFunctions. */ +@JsonIgnoreProperties(ignoreUnknown = true) public class BatchExecPythonCalc extends CommonExecPythonCalc implements BatchExecNode { public BatchExecPythonCalc( - RexProgram calcProgram, + List projection, InputProperty inputProperty, RowType outputType, String description) { - super(calcProgram, inputProperty, outputType, description); + this( + projection, + getNewNodeId(), + Collections.singletonList(inputProperty), + outputType, + description); + } + + @JsonCreator + public BatchExecPythonCalc( + @JsonProperty(FIELD_NAME_PROJECTION) List projection, + @JsonProperty(FIELD_NAME_ID) int id, + @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List inputProperties, + @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType, + @JsonProperty(FIELD_NAME_DESCRIPTION) String description) { + super(projection, id, inputProperties, outputType, description); } } diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCalc.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCalc.java index 9f1f0daf33ed3..151e7d6616790 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCalc.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCalc.java @@ -40,23 +40,30 @@ import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + import org.apache.calcite.rex.RexCall; import org.apache.calcite.rex.RexFieldAccess; import org.apache.calcite.rex.RexInputRef; import org.apache.calcite.rex.RexNode; -import org.apache.calcite.rex.RexProgram; import java.lang.reflect.Constructor; import java.util.ArrayList; -import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; import java.util.stream.Collectors; +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** Base class for exec Python Calc. */ +@JsonIgnoreProperties(ignoreUnknown = true) public abstract class CommonExecPythonCalc extends ExecNodeBase implements SingleTransformationTranslator { + public static final String FIELD_NAME_PROJECTION = "projection"; + private static final String PYTHON_SCALAR_FUNCTION_OPERATOR_NAME = "org.apache.flink.table.runtime.operators.python.scalar." + "RowDataPythonScalarFunctionOperator"; @@ -65,15 +72,18 @@ public abstract class CommonExecPythonCalc extends ExecNodeBase "org.apache.flink.table.runtime.operators.python.scalar.arrow." + "RowDataArrowPythonScalarFunctionOperator"; - private final RexProgram calcProgram; + @JsonProperty(FIELD_NAME_PROJECTION) + private final List projection; public CommonExecPythonCalc( - RexProgram calcProgram, - InputProperty inputProperty, + List projection, + int id, + List inputProperties, RowType outputType, String description) { - super(Collections.singletonList(inputProperty), outputType, description); - this.calcProgram = calcProgram; + super(id, inputProperties, outputType, description); + checkArgument(inputProperties.size() == 1); + this.projection = checkNotNull(projection); } @SuppressWarnings("unchecked") @@ -85,8 +95,7 @@ protected Transformation translateToPlanInternal(PlannerBase planner) { final Configuration config = CommonPythonUtil.getMergedConfig(planner.getExecEnv(), planner.getTableConfig()); OneInputTransformation ret = - createPythonOneInputTransformation( - inputTransform, calcProgram, getDescription(), config); + createPythonOneInputTransformation(inputTransform, getDescription(), config); if (CommonPythonUtil.isPythonWorkerUsingManagedMemory(config)) { ret.declareManagedMemoryUseCaseAtSlotScope(ManagedMemoryUseCase.PYTHON); } @@ -94,20 +103,15 @@ protected Transformation translateToPlanInternal(PlannerBase planner) { } private OneInputTransformation createPythonOneInputTransformation( - Transformation inputTransform, - RexProgram calcProgram, - String name, - Configuration config) { + Transformation inputTransform, String name, Configuration config) { List pythonRexCalls = - calcProgram.getProjectList().stream() - .map(calcProgram::expandLocalRef) + projection.stream() .filter(x -> x instanceof RexCall) .map(x -> (RexCall) x) .collect(Collectors.toList()); List forwardedFields = - calcProgram.getProjectList().stream() - .map(calcProgram::expandLocalRef) + projection.stream() .filter(x -> x instanceof RexInputRef) .map(x -> ((RexInputRef) x).getIndex()) .collect(Collectors.toList()); @@ -142,7 +146,7 @@ private OneInputTransformation createPythonOneInputTransformat pythonUdfInputOffsets, pythonFunctionInfos, forwardedFields.stream().mapToInt(x -> x).toArray(), - calcProgram.getExprList().stream() + pythonRexCalls.stream() .anyMatch( x -> PythonUtil.containsPythonCall( diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonCalc.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonCalc.java index fe684f654eeeb..2fc7fb95b6b5a 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonCalc.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonCalc.java @@ -24,16 +24,39 @@ import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecPythonCalc; import org.apache.flink.table.types.logical.RowType; -import org.apache.calcite.rex.RexProgram; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import org.apache.calcite.rex.RexNode; + +import java.util.Collections; +import java.util.List; /** Stream {@link ExecNode} for Python ScalarFunctions. */ +@JsonIgnoreProperties(ignoreUnknown = true) public class StreamExecPythonCalc extends CommonExecPythonCalc implements StreamExecNode { public StreamExecPythonCalc( - RexProgram calcProgram, + List projection, InputProperty inputProperty, RowType outputType, String description) { - super(calcProgram, inputProperty, outputType, description); + this( + projection, + getNewNodeId(), + Collections.singletonList(inputProperty), + outputType, + description); + } + + @JsonCreator + public StreamExecPythonCalc( + @JsonProperty(FIELD_NAME_PROJECTION) List projection, + @JsonProperty(FIELD_NAME_ID) int id, + @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List inputProperties, + @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType, + @JsonProperty(FIELD_NAME_DESCRIPTION) String description) { + super(projection, id, inputProperties, outputType, description); } } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalPythonCalc.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalPythonCalc.scala index e52c9ed061b9e..a32457ba7da71 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalPythonCalc.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalPythonCalc.scala @@ -18,6 +18,7 @@ package org.apache.flink.table.planner.plan.nodes.physical.batch +import org.apache.flink.table.api.TableException import org.apache.flink.table.planner.calcite.FlinkTypeFactory import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecPythonCalc import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, InputProperty} @@ -28,6 +29,8 @@ import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core.Calc import org.apache.calcite.rex.RexProgram +import scala.collection.JavaConversions._ + /** * Batch physical RelNode for Python ScalarFunctions. */ @@ -49,8 +52,13 @@ class BatchPhysicalPythonCalc( } override def translateToExecNode(): ExecNode[_] = { + val projection = calcProgram.getProjectList.map(calcProgram.expandLocalRef) + if (calcProgram.getCondition != null) { + throw new TableException("The condition of BatchPhysicalPythonCalc should be null.") + } + new BatchExecPythonCalc( - getProgram, + projection, InputProperty.DEFAULT, FlinkTypeFactory.toLogicalRowType(getRowType), getRelDetailedDescription) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalPythonCalc.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalPythonCalc.scala index 9adec39eaced0..b7eff1171a612 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalPythonCalc.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalPythonCalc.scala @@ -18,6 +18,7 @@ package org.apache.flink.table.planner.plan.nodes.physical.stream +import org.apache.flink.table.api.TableException import org.apache.flink.table.planner.calcite.FlinkTypeFactory import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonCalc import org.apache.flink.table.planner.plan.nodes.exec.{InputProperty, ExecNode} @@ -28,6 +29,8 @@ import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core.Calc import org.apache.calcite.rex.RexProgram +import scala.collection.JavaConversions._ + /** * Stream physical RelNode for Python ScalarFunctions. */ @@ -49,8 +52,13 @@ class StreamPhysicalPythonCalc( } override def translateToExecNode(): ExecNode[_] = { + val projection = calcProgram.getProjectList.map(calcProgram.expandLocalRef) + if (calcProgram.getCondition != null) { + throw new TableException("The condition of StreamPhysicalPythonCalc should be null.") + } + new StreamExecPythonCalc( - getProgram, + projection, InputProperty.DEFAULT, FlinkTypeFactory.toLogicalRowType(getRowType), getRelDetailedDescription) diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/JsonSerdeCoverageTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/JsonSerdeCoverageTest.java index 87aaea8547db3..e3539a3282698 100644 --- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/JsonSerdeCoverageTest.java +++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/JsonSerdeCoverageTest.java @@ -49,7 +49,6 @@ public class JsonSerdeCoverageTest { "StreamExecPythonGroupTableAggregate", "StreamExecPythonOverAggregate", "StreamExecPythonCorrelate", - "StreamExecPythonCalc", "StreamExecSort", "StreamExecMultipleInput", "StreamExecValues"); diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest.java new file mode 100644 index 0000000000000..07a367d6d24d0 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.stream; + +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.BooleanPythonScalarFunction; +import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.PythonScalarFunction; +import org.apache.flink.table.planner.utils.StreamTableTestUtil; +import org.apache.flink.table.planner.utils.TableTestBase; + +import org.junit.Before; +import org.junit.Test; + +/** Test json serialization/deserialization for calc. */ +public class PythonCalcJsonPlanTest extends TableTestBase { + + private StreamTableTestUtil util; + private TableEnvironment tEnv; + + @Before + public void setup() { + util = streamTestUtil(TableConfig.getDefault()); + tEnv = util.getTableEnv(); + + String srcTableDdl = + "CREATE TABLE MyTable (\n" + + " a bigint,\n" + + " b int not null,\n" + + " c varchar,\n" + + " d timestamp(3)\n" + + ") with (\n" + + " 'connector' = 'values',\n" + + " 'bounded' = 'false')"; + tEnv.executeSql(srcTableDdl); + } + + @Test + public void testPythonCalc() { + tEnv.createTemporaryFunction("pyFunc", new PythonScalarFunction("pyFunc")); + String sinkTableDdl = + "CREATE TABLE MySink (\n" + + " a bigint,\n" + + " b int\n" + + ") with (\n" + + " 'connector' = 'values',\n" + + " 'table-sink-class' = 'DEFAULT')"; + tEnv.executeSql(sinkTableDdl); + util.verifyJsonPlan("insert into MySink select a, pyFunc(b, b) from MyTable"); + } + + @Test + public void testPythonFunctionInWhereClause() { + tEnv.createTemporaryFunction("pyFunc", new BooleanPythonScalarFunction("pyFunc")); + String sinkTableDdl = + "CREATE TABLE MySink (\n" + + " a bigint,\n" + + " b int\n" + + ") with (\n" + + " 'connector' = 'values',\n" + + " 'table-sink-class' = 'DEFAULT')"; + tEnv.executeSql(sinkTableDdl); + util.verifyJsonPlan("insert into MySink select a, b from MyTable where pyFunc(b, b + 1)"); + } +} diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest_jsonplan/testPythonCalc.out b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest_jsonplan/testPythonCalc.out new file mode 100644 index 0000000000000..6d4ac0e711012 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest_jsonplan/testPythonCalc.out @@ -0,0 +1,158 @@ +{ + "flinkVersion" : "", + "nodes" : [ { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTableSourceScan", + "scanTableSource" : { + "identifier" : { + "catalogName" : "default_catalog", + "databaseName" : "default_database", + "tableName" : "MyTable" + }, + "catalogTable" : { + "schema.3.data-type" : "TIMESTAMP(3)", + "schema.2.data-type" : "VARCHAR(2147483647)", + "schema.3.name" : "d", + "connector" : "values", + "schema.0.data-type" : "BIGINT", + "schema.2.name" : "c", + "schema.1.name" : "b", + "bounded" : "false", + "schema.0.name" : "a", + "schema.1.data-type" : "INT NOT NULL" + }, + "sourceAbilitySpecs" : [ { + "type" : "ProjectPushDown", + "projectedFields" : [ [ 0 ], [ 1 ] ], + "producedType" : { + "type" : "ROW", + "nullable" : false, + "fields" : [ { + "a" : "BIGINT" + }, { + "b" : "INT NOT NULL" + } ] + } + } ] + }, + "id" : 1, + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "BIGINT" + }, { + "b" : "INT NOT NULL" + } ] + }, + "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, b]]], fields=[a, b])", + "inputProperties" : [ ] + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonCalc", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : { + "typeName" : "BIGINT", + "nullable" : true + } + }, { + "kind" : "REX_CALL", + "operator" : { + "name" : "pyFunc", + "kind" : "OTHER_FUNCTION", + "syntax" : "FUNCTION", + "functionKind" : "SCALAR", + "instance" : "rO0ABXNyAGBvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnBsYW5uZXIucnVudGltZS51dGlscy5KYXZhVXNlckRlZmluZWRTY2FsYXJGdW5jdGlvbnMkUHl0aG9uU2NhbGFyRnVuY3Rpb275pBZGRJT8qAIAAUwABG5hbWV0ABJMamF2YS9sYW5nL1N0cmluZzt4cgAvb3JnLmFwYWNoZS5mbGluay50YWJsZS5mdW5jdGlvbnMuU2NhbGFyRnVuY3Rpb26383IwrjqOqQIAAHhyADRvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5Vc2VyRGVmaW5lZEZ1bmN0aW9uWWgLCLtDDxYCAAB4cHQABnB5RnVuYw", + "bridging" : true + }, + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : { + "typeName" : "INTEGER", + "nullable" : false + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : { + "typeName" : "INTEGER", + "nullable" : false + } + } ], + "type" : { + "typeName" : "INTEGER", + "nullable" : false + } + } ], + "id" : 2, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "BIGINT" + }, { + "EXPR$1" : "INT NOT NULL" + } ] + }, + "description" : "PythonCalc(select=[a, pyFunc(b, b) AS EXPR$1])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink", + "dynamicTableSink" : { + "identifier" : { + "catalogName" : "default_catalog", + "databaseName" : "default_database", + "tableName" : "MySink" + }, + "catalogTable" : { + "table-sink-class" : "DEFAULT", + "connector" : "values", + "schema.0.data-type" : "BIGINT", + "schema.1.name" : "b", + "schema.0.name" : "a", + "schema.1.data-type" : "INT" + } + }, + "inputChangelogMode" : [ "INSERT" ], + "id" : 3, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "BIGINT" + }, { + "EXPR$1" : "INT NOT NULL" + } ] + }, + "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[a, EXPR$1])" + } ], + "edges" : [ { + "source" : 1, + "target" : 2, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 2, + "target" : 3, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest_jsonplan/testPythonFunctionInWhereClause.out b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest_jsonplan/testPythonFunctionInWhereClause.out new file mode 100644 index 0000000000000..b3ae19a9e5208 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest_jsonplan/testPythonFunctionInWhereClause.out @@ -0,0 +1,291 @@ +{ + "flinkVersion" : "", + "nodes" : [ { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTableSourceScan", + "scanTableSource" : { + "identifier" : { + "catalogName" : "default_catalog", + "databaseName" : "default_database", + "tableName" : "MyTable" + }, + "catalogTable" : { + "schema.3.data-type" : "TIMESTAMP(3)", + "schema.2.data-type" : "VARCHAR(2147483647)", + "schema.3.name" : "d", + "connector" : "values", + "schema.0.data-type" : "BIGINT", + "schema.2.name" : "c", + "schema.1.name" : "b", + "bounded" : "false", + "schema.0.name" : "a", + "schema.1.data-type" : "INT NOT NULL" + }, + "sourceAbilitySpecs" : [ { + "type" : "FilterPushDown", + "predicates" : [ ] + }, { + "type" : "ProjectPushDown", + "projectedFields" : [ [ 0 ], [ 1 ] ], + "producedType" : { + "type" : "ROW", + "nullable" : false, + "fields" : [ { + "a" : "BIGINT" + }, { + "b" : "INT NOT NULL" + } ] + } + } ] + }, + "id" : 1, + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "BIGINT" + }, { + "b" : "INT NOT NULL" + } ] + }, + "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable, filter=[], project=[a, b]]], fields=[a, b])", + "inputProperties" : [ ] + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : { + "typeName" : "BIGINT", + "nullable" : true + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : { + "typeName" : "INTEGER", + "nullable" : false + } + }, { + "kind" : "REX_CALL", + "operator" : { + "name" : "+", + "kind" : "PLUS", + "syntax" : "BINARY" + }, + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : { + "typeName" : "INTEGER", + "nullable" : false + } + }, { + "kind" : "LITERAL", + "value" : "1", + "type" : { + "typeName" : "INTEGER", + "nullable" : false + } + } ], + "type" : { + "typeName" : "INTEGER", + "nullable" : false + } + } ], + "condition" : null, + "id" : 2, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "BIGINT" + }, { + "b" : "INT NOT NULL" + }, { + "f0" : "INT NOT NULL" + } ] + }, + "description" : "Calc(select=[a, b, (b + 1) AS f0])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonCalc", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : { + "typeName" : "BIGINT", + "nullable" : true + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : { + "typeName" : "INTEGER", + "nullable" : false + } + }, { + "kind" : "REX_CALL", + "operator" : { + "name" : "org$apache$flink$table$planner$runtime$utils$JavaUserDefinedScalarFunctions$BooleanPythonScalarFunction$7a841a4bac68feb7427ad21acb894ac4", + "kind" : "OTHER_FUNCTION", + "syntax" : "FUNCTION", + "functionKind" : "SCALAR", + "instance" : "rO0ABXNyAGdvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnBsYW5uZXIucnVudGltZS51dGlscy5KYXZhVXNlckRlZmluZWRTY2FsYXJGdW5jdGlvbnMkQm9vbGVhblB5dGhvblNjYWxhckZ1bmN0aW9uBxoseFaMfWACAAFMAARuYW1ldAASTGphdmEvbGFuZy9TdHJpbmc7eHIAL29yZy5hcGFjaGUuZmxpbmsudGFibGUuZnVuY3Rpb25zLlNjYWxhckZ1bmN0aW9ut_NyMK46jqkCAAB4cgA0b3JnLmFwYWNoZS5mbGluay50YWJsZS5mdW5jdGlvbnMuVXNlckRlZmluZWRGdW5jdGlvblloCwi7Qw8WAgAAeHB0AAZweUZ1bmM", + "bridging" : true + }, + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : { + "typeName" : "INTEGER", + "nullable" : false + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : { + "typeName" : "INTEGER", + "nullable" : false + } + } ], + "type" : { + "typeName" : "BOOLEAN", + "nullable" : false + } + } ], + "id" : 3, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "BIGINT" + }, { + "b" : "INT NOT NULL" + }, { + "f0" : "BOOLEAN NOT NULL" + } ] + }, + "description" : "PythonCalc(select=[a, b, org$apache$flink$table$planner$runtime$utils$JavaUserDefinedScalarFunctions$BooleanPythonScalarFunction$7a841a4bac68feb7427ad21acb894ac4(b, f0) AS f0])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : { + "typeName" : "BIGINT", + "nullable" : true + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : { + "typeName" : "INTEGER", + "nullable" : false + } + } ], + "condition" : { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : { + "typeName" : "BOOLEAN", + "nullable" : false + } + }, + "id" : 4, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "BIGINT" + }, { + "b" : "INT NOT NULL" + } ] + }, + "description" : "Calc(select=[a, b], where=[f0])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink", + "dynamicTableSink" : { + "identifier" : { + "catalogName" : "default_catalog", + "databaseName" : "default_database", + "tableName" : "MySink" + }, + "catalogTable" : { + "table-sink-class" : "DEFAULT", + "connector" : "values", + "schema.0.data-type" : "BIGINT", + "schema.1.name" : "b", + "schema.0.name" : "a", + "schema.1.data-type" : "INT" + } + }, + "inputChangelogMode" : [ "INSERT" ], + "id" : 5, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "BIGINT" + }, { + "b" : "INT NOT NULL" + } ] + }, + "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[a, b])" + } ], + "edges" : [ { + "source" : 1, + "target" : 2, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 2, + "target" : 3, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 3, + "target" : 4, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 4, + "target" : 5, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file