diff --git a/flink-python/pyflink/table/tests/test_pandas_udaf.py b/flink-python/pyflink/table/tests/test_pandas_udaf.py index 1140af83eb1e0..97944fd99016b 100644 --- a/flink-python/pyflink/table/tests/test_pandas_udaf.py +++ b/flink-python/pyflink/table/tests/test_pandas_udaf.py @@ -760,6 +760,76 @@ def test_proc_time_over_rows_window_aggregate_function(self): "+I[3, 2.0, 4]"]) os.remove(source_path) + def test_execute_over_aggregate_from_json_plan(self): + # create source file path + tmp_dir = self.tempdir + data = [ + '1,1,2013-01-01 03:10:00', + '3,2,2013-01-01 03:10:00', + '2,1,2013-01-01 03:10:00', + '1,5,2013-01-01 03:10:00', + '1,8,2013-01-01 04:20:00', + '2,3,2013-01-01 03:30:00' + ] + source_path = tmp_dir + '/test_execute_over_aggregate_from_json_plan.csv' + sink_path = tmp_dir + '/test_execute_over_aggregate_from_json_plan' + with open(source_path, 'w') as fd: + for ele in data: + fd.write(ele + '\n') + + source_table = """ + CREATE TABLE source_table ( + a TINYINT, + b SMALLINT, + rowtime TIMESTAMP(3), + WATERMARK FOR rowtime AS rowtime - INTERVAL '60' MINUTE + ) WITH ( + 'connector' = 'filesystem', + 'path' = '%s', + 'format' = 'csv' + ) + """ % source_path + self.t_env.execute_sql(source_table) + + self.t_env.execute_sql(""" + CREATE TABLE sink_table ( + a TINYINT, + b FLOAT, + c SMALLINT + ) WITH ( + 'connector' = 'filesystem', + 'path' = '%s', + 'format' = 'csv' + ) + """ % sink_path) + + max_add_min_udaf = udaf(lambda a: a.max() + a.min(), + result_type=DataTypes.SMALLINT(), + func_type='pandas') + self.t_env.get_config().get_configuration().set_string( + "pipeline.time-characteristic", "EventTime") + self.t_env.create_temporary_system_function("mean_udaf", mean_udaf) + self.t_env.create_temporary_system_function("max_add_min_udaf", max_add_min_udaf) + + json_plan = self.t_env._j_tenv.getJsonPlan(""" + insert into sink_table + select a, + mean_udaf(b) + over (PARTITION BY a ORDER BY rowtime + ROWS BETWEEN 1 PRECEDING AND CURRENT ROW), + max_add_min_udaf(b) + over (PARTITION BY a ORDER BY rowtime + ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) + from source_table + """) + from py4j.java_gateway import get_method + get_method(self.t_env._j_tenv.executeJsonPlan(json_plan), "await")() + + import glob + lines = [line.strip() for file in glob.glob(sink_path + '/*') for line in open(file, 'r')] + lines.sort() + self.assertEqual(lines, ['1,1.0,2', '1,3.0,6', '1,6.5,13', '2,1.0,2', '2,2.0,4', '3,2.0,4']) + @udaf(result_type=DataTypes.FLOAT(), func_type="pandas") def mean_udaf(v): diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonOverAggregate.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonOverAggregate.java index 246734afd1b7c..f566f6f5b2e47 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonOverAggregate.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonOverAggregate.java @@ -46,6 +46,9 @@ import org.apache.flink.table.types.logical.TimestampKind; import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + import org.apache.calcite.rel.core.AggregateCall; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,6 +57,10 @@ import java.lang.reflect.InvocationTargetException; import java.math.BigDecimal; import java.util.Collections; +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; /** Stream {@link ExecNode} for python time-based over operator. */ public class StreamExecPythonOverAggregate extends ExecNodeBase @@ -77,6 +84,9 @@ public class StreamExecPythonOverAggregate extends ExecNodeBase "org.apache.flink.table.runtime.operators.python.aggregate.arrow.stream." + "StreamArrowPythonProcTimeBoundedRowsOperator"; + public static final String FIELD_NAME_OVER_SPEC = "overSpec"; + + @JsonProperty(FIELD_NAME_OVER_SPEC) private final OverSpec overSpec; public StreamExecPythonOverAggregate( @@ -84,8 +94,24 @@ public StreamExecPythonOverAggregate( InputProperty inputProperty, RowType outputType, String description) { - super(Collections.singletonList(inputProperty), outputType, description); - this.overSpec = overSpec; + this( + overSpec, + getNewNodeId(), + Collections.singletonList(inputProperty), + outputType, + description); + } + + @JsonCreator + public StreamExecPythonOverAggregate( + @JsonProperty(FIELD_NAME_OVER_SPEC) OverSpec overSpec, + @JsonProperty(FIELD_NAME_ID) int id, + @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List inputProperties, + @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType, + @JsonProperty(FIELD_NAME_DESCRIPTION) String description) { + super(id, inputProperties, outputType, description); + checkArgument(inputProperties.size() == 1); + this.overSpec = checkNotNull(overSpec); } @SuppressWarnings("unchecked") diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/JsonSerdeCoverageTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/JsonSerdeCoverageTest.java index b84a4153e4ee2..35f140f439b2f 100644 --- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/JsonSerdeCoverageTest.java +++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/JsonSerdeCoverageTest.java @@ -45,7 +45,6 @@ public class JsonSerdeCoverageTest { "StreamExecWindowTableFunction", "StreamExecGroupTableAggregate", "StreamExecPythonGroupTableAggregate", - "StreamExecPythonOverAggregate", "StreamExecSort", "StreamExecMultipleInput", "StreamExecValues"); diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest.java new file mode 100644 index 0000000000000..ec5f4e3f96768 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.stream; + +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedAggFunctions.PandasAggregateFunction; +import org.apache.flink.table.planner.utils.StreamTableTestUtil; +import org.apache.flink.table.planner.utils.TableTestBase; + +import org.junit.Before; +import org.junit.Test; + +/** Test json serialization for over aggregate. */ +public class PythonOverAggregateJsonPlanTest extends TableTestBase { + private StreamTableTestUtil util; + private TableEnvironment tEnv; + + @Before + public void setup() { + util = streamTestUtil(TableConfig.getDefault()); + tEnv = util.getTableEnv(); + String srcTableDdl = + "CREATE TABLE MyTable (\n" + + " a int,\n" + + " b varchar,\n" + + " c int not null,\n" + + " rowtime timestamp(3),\n" + + " proctime as PROCTIME(),\n" + + " watermark for rowtime as rowtime" + + ") with (\n" + + " 'connector' = 'values',\n" + + " 'bounded' = 'false')"; + tEnv.executeSql(srcTableDdl); + tEnv.createTemporarySystemFunction("pyFunc", new PandasAggregateFunction()); + } + + @Test + public void testProcTimeBoundedPartitionedRangeOver() { + String sinkTableDdl = + "CREATE TABLE MySink (\n" + + " a bigint,\n" + + " b bigint\n" + + ") with (\n" + + " 'connector' = 'values',\n" + + " 'sink-insert-only' = 'false',\n" + + " 'table-sink-class' = 'DEFAULT')"; + tEnv.executeSql(sinkTableDdl); + String sql = + "insert into MySink SELECT a,\n" + + " pyFunc(c, c) OVER (PARTITION BY a ORDER BY proctime\n" + + " RANGE BETWEEN INTERVAL '2' HOUR PRECEDING AND CURRENT ROW)\n" + + "FROM MyTable"; + util.verifyJsonPlan(sql); + } + + @Test + public void testProcTimeBoundedNonPartitionedRangeOver() { + String sinkTableDdl = + "CREATE TABLE MySink (\n" + + " a bigint,\n" + + " b bigint\n" + + ") with (\n" + + " 'connector' = 'values',\n" + + " 'sink-insert-only' = 'false',\n" + + " 'table-sink-class' = 'DEFAULT')"; + tEnv.executeSql(sinkTableDdl); + String sql = + "insert into MySink SELECT a,\n" + + " pyFunc(c, c) OVER (ORDER BY proctime\n" + + " RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW)\n" + + " FROM MyTable"; + util.verifyJsonPlan(sql); + } + + @Test + public void testProcTimeUnboundedPartitionedRangeOver() { + String sinkTableDdl = + "CREATE TABLE MySink (\n" + + " a bigint,\n" + + " b bigint\n" + + ") with (\n" + + " 'connector' = 'values',\n" + + " 'sink-insert-only' = 'false',\n" + + " 'table-sink-class' = 'DEFAULT')"; + tEnv.executeSql(sinkTableDdl); + String sql = + "insert into MySink SELECT a,\n" + + " pyFunc(c, c) OVER (PARTITION BY a ORDER BY proctime RANGE UNBOUNDED PRECEDING)\n" + + "FROM MyTable"; + util.verifyJsonPlan(sql); + } + + @Test + public void testRowTimeBoundedPartitionedRowsOver() { + String sinkTableDdl = + "CREATE TABLE MySink (\n" + + " a bigint,\n" + + " b bigint\n" + + ") with (\n" + + " 'connector' = 'values',\n" + + " 'sink-insert-only' = 'false',\n" + + " 'table-sink-class' = 'DEFAULT')"; + tEnv.executeSql(sinkTableDdl); + String sql = + "insert into MySink SELECT a,\n" + + " pyFunc(c, c) OVER (PARTITION BY a ORDER BY rowtime\n" + + " ROWS BETWEEN 5 preceding AND CURRENT ROW)\n" + + "FROM MyTable"; + util.verifyJsonPlan(sql); + } + + @Test + public void testProcTimeBoundedPartitionedRowsOverWithBuiltinProctime() { + String sinkTableDdl = + "CREATE TABLE MySink (\n" + + " a bigint,\n" + + " b bigint\n" + + ") with (\n" + + " 'connector' = 'values',\n" + + " 'sink-insert-only' = 'false',\n" + + " 'table-sink-class' = 'DEFAULT')"; + tEnv.executeSql(sinkTableDdl); + String sql = + "insert into MySink SELECT a, " + + " pyFunc(c, c) OVER (" + + " PARTITION BY a ORDER BY proctime() ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) " + + "FROM MyTable"; + util.verifyJsonPlan(sql); + } +} diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedNonPartitionedRangeOver.out b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedNonPartitionedRangeOver.out new file mode 100644 index 0000000000000..37c44801bb336 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedNonPartitionedRangeOver.out @@ -0,0 +1,490 @@ +{ + "flinkVersion" : "", + "nodes" : [ { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTableSourceScan", + "scanTableSource" : { + "identifier" : { + "catalogName" : "default_catalog", + "databaseName" : "default_database", + "tableName" : "MyTable" + }, + "catalogTable" : { + "schema.watermark.0.strategy.expr" : "`rowtime`", + "schema.4.expr" : "PROCTIME()", + "schema.0.data-type" : "INT", + "schema.2.name" : "c", + "schema.1.name" : "b", + "bounded" : "false", + "schema.4.name" : "proctime", + "schema.1.data-type" : "VARCHAR(2147483647)", + "schema.3.data-type" : "TIMESTAMP(3)", + "schema.2.data-type" : "INT NOT NULL", + "schema.3.name" : "rowtime", + "connector" : "values", + "schema.watermark.0.rowtime" : "rowtime", + "schema.watermark.0.strategy.data-type" : "TIMESTAMP(3)", + "schema.4.data-type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL", + "schema.0.name" : "a" + }, + "sourceAbilitySpecs" : [ { + "type" : "ProjectPushDown", + "projectedFields" : [ [ 2 ], [ 0 ], [ 3 ] ], + "producedType" : { + "type" : "ROW", + "nullable" : false, + "fields" : [ { + "c" : "INT NOT NULL" + }, { + "a" : "INT" + }, { + "rowtime" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "REGULAR" + } + } ] + } + } ] + }, + "id" : 1, + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "c" : "INT NOT NULL" + }, { + "a" : "INT" + }, { + "rowtime" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "REGULAR" + } + } ] + }, + "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[c, a, rowtime]]], fields=[c, a, rowtime])", + "inputProperties" : [ ] + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : { + "typeName" : "INTEGER", + "nullable" : false + } + }, { + "kind" : "REX_CALL", + "operator" : { + "name" : "PROCTIME", + "kind" : "OTHER_FUNCTION", + "syntax" : "FUNCTION" + }, + "operands" : [ ], + "type" : { + "timestampKind" : "PROCTIME", + "typeName" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : { + "typeName" : "INTEGER", + "nullable" : true + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : { + "typeName" : "TIMESTAMP", + "nullable" : true, + "precision" : 3 + } + } ], + "condition" : null, + "id" : 2, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "c" : "INT NOT NULL" + }, { + "proctime" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + }, { + "a" : "INT" + }, { + "rowtime" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "REGULAR" + } + } ] + }, + "description" : "Calc(select=[c, PROCTIME() AS proctime, a, rowtime])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWatermarkAssigner", + "watermarkExpr" : { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : { + "typeName" : "TIMESTAMP", + "nullable" : true, + "precision" : 3 + } + }, + "rowtimeFieldIndex" : 3, + "id" : 3, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "c" : "INT NOT NULL" + }, { + "proctime" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + }, { + "a" : "INT" + }, { + "rowtime" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + }, + "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : { + "typeName" : "INTEGER", + "nullable" : false + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : { + "timestampKind" : "PROCTIME", + "typeName" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false + } + }, { + "kind" : "REX_CALL", + "operator" : { + "name" : "CAST", + "kind" : "CAST", + "syntax" : "SPECIAL" + }, + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : { + "typeName" : "INTEGER", + "nullable" : true + } + } ], + "type" : { + "typeName" : "BIGINT", + "nullable" : true + } + } ], + "condition" : null, + "id" : 4, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "c" : "INT NOT NULL" + }, { + "proctime" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + }, { + "$2" : "BIGINT" + } ] + }, + "description" : "Calc(select=[c, proctime, CAST(a) AS $2])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecExchange", + "id" : 5, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "SINGLETON" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "c" : "INT NOT NULL" + }, { + "proctime" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + }, { + "$2" : "BIGINT" + } ] + }, + "description" : "Exchange(distribution=[single])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonOverAggregate", + "overSpec" : { + "partition" : { + "fields" : [ ] + }, + "groups" : [ { + "orderBy" : { + "fields" : [ { + "index" : 1, + "isAscending" : true, + "nullIsLast" : false + } ] + }, + "isRows" : false, + "lowerBound" : { + "kind" : "BOUNDED_WINDOW", + "isPreceding" : true, + "offset" : { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : { + "typeName" : "INTERVAL_SECOND", + "nullable" : false, + "precision" : 2, + "scale" : 6 + } + } + }, + "upperBound" : { + "kind" : "CURRENT_ROW" + }, + "aggCalls" : [ { + "name" : "w0$o0", + "aggFunction" : { + "name" : "pyFunc", + "kind" : "OTHER_FUNCTION", + "syntax" : "FUNCTION", + "functionKind" : "AGGREGATE", + "instance" : "rO0ABXNyAGBvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnBsYW5uZXIucnVudGltZS51dGlscy5KYXZhVXNlckRlZmluZWRBZ2dGdW5jdGlvbnMkUGFuZGFzQWdncmVnYXRlRnVuY3Rpb27LwxUYPcn9XwIAAHhyAFRvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnBsYW5uZXIucnVudGltZS51dGlscy5KYXZhVXNlckRlZmluZWRBZ2dGdW5jdGlvbnMkV2VpZ2h0ZWRBdmdnU2rsM9U-BAIAAHhyADJvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5BZ2dyZWdhdGVGdW5jdGlvbiDUjNyhaBuJAgAAeHIAPG9yZy5hcGFjaGUuZmxpbmsudGFibGUuZnVuY3Rpb25zLkltcGVyYXRpdmVBZ2dyZWdhdGVGdW5jdGlvbvJXgPavzWynAgAAeHIANG9yZy5hcGFjaGUuZmxpbmsudGFibGUuZnVuY3Rpb25zLlVzZXJEZWZpbmVkRnVuY3Rpb25ZaAsIu0MPFgIAAHhw", + "bridging" : true + }, + "argList" : [ 0, 0 ], + "filterArg" : -1, + "distinct" : false, + "approximate" : false, + "ignoreNulls" : false, + "type" : { + "typeName" : "BIGINT", + "nullable" : true + } + } ] + } ], + "constants" : [ { + "kind" : "LITERAL", + "value" : 10000, + "type" : { + "typeName" : "INTERVAL_SECOND", + "nullable" : false, + "precision" : 2, + "scale" : 6 + } + } ], + "originalInputFields" : 3 + }, + "id" : 6, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "c" : "INT NOT NULL" + }, { + "proctime" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + }, { + "$2" : "BIGINT" + }, { + "w0$o0" : "BIGINT" + } ] + }, + "description" : "PythonOverAggregate(orderBy=[proctime ASC], window=[ RANG BETWEEN 10000 PRECEDING AND CURRENT ROW], select=[c, proctime, $2, pyFunc(c, c) AS w0$o0])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : { + "typeName" : "BIGINT", + "nullable" : true + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : { + "typeName" : "BIGINT", + "nullable" : true + } + } ], + "condition" : null, + "id" : 7, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "$0" : "BIGINT" + }, { + "$1" : "BIGINT" + } ] + }, + "description" : "Calc(select=[$2 AS $0, w0$o0 AS $1])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink", + "dynamicTableSink" : { + "identifier" : { + "catalogName" : "default_catalog", + "databaseName" : "default_database", + "tableName" : "MySink" + }, + "catalogTable" : { + "sink-insert-only" : "false", + "table-sink-class" : "DEFAULT", + "connector" : "values", + "schema.0.data-type" : "BIGINT", + "schema.1.name" : "b", + "schema.0.name" : "a", + "schema.1.data-type" : "BIGINT" + } + }, + "inputChangelogMode" : [ "INSERT" ], + "id" : 8, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "$0" : "BIGINT" + }, { + "$1" : "BIGINT" + } ] + }, + "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[$0, $1])" + } ], + "edges" : [ { + "source" : 1, + "target" : 2, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 2, + "target" : 3, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 3, + "target" : 4, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 4, + "target" : 5, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 5, + "target" : 6, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 6, + "target" : 7, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 7, + "target" : 8, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedPartitionedRangeOver.out b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedPartitionedRangeOver.out new file mode 100644 index 0000000000000..50129d5f6c0bf --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedPartitionedRangeOver.out @@ -0,0 +1,504 @@ +{ + "flinkVersion" : "", + "nodes" : [ { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTableSourceScan", + "scanTableSource" : { + "identifier" : { + "catalogName" : "default_catalog", + "databaseName" : "default_database", + "tableName" : "MyTable" + }, + "catalogTable" : { + "schema.watermark.0.strategy.expr" : "`rowtime`", + "schema.4.expr" : "PROCTIME()", + "schema.0.data-type" : "INT", + "schema.2.name" : "c", + "schema.1.name" : "b", + "bounded" : "false", + "schema.4.name" : "proctime", + "schema.1.data-type" : "VARCHAR(2147483647)", + "schema.3.data-type" : "TIMESTAMP(3)", + "schema.2.data-type" : "INT NOT NULL", + "schema.3.name" : "rowtime", + "connector" : "values", + "schema.watermark.0.rowtime" : "rowtime", + "schema.watermark.0.strategy.data-type" : "TIMESTAMP(3)", + "schema.4.data-type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL", + "schema.0.name" : "a" + }, + "sourceAbilitySpecs" : [ { + "type" : "ProjectPushDown", + "projectedFields" : [ [ 0 ], [ 2 ], [ 3 ] ], + "producedType" : { + "type" : "ROW", + "nullable" : false, + "fields" : [ { + "a" : "INT" + }, { + "c" : "INT NOT NULL" + }, { + "rowtime" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "REGULAR" + } + } ] + } + } ] + }, + "id" : 1, + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT" + }, { + "c" : "INT NOT NULL" + }, { + "rowtime" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "REGULAR" + } + } ] + }, + "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, c, rowtime]]], fields=[a, c, rowtime])", + "inputProperties" : [ ] + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : { + "typeName" : "INTEGER", + "nullable" : true + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : { + "typeName" : "INTEGER", + "nullable" : false + } + }, { + "kind" : "REX_CALL", + "operator" : { + "name" : "PROCTIME", + "kind" : "OTHER_FUNCTION", + "syntax" : "FUNCTION" + }, + "operands" : [ ], + "type" : { + "timestampKind" : "PROCTIME", + "typeName" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : { + "typeName" : "TIMESTAMP", + "nullable" : true, + "precision" : 3 + } + } ], + "condition" : null, + "id" : 2, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT" + }, { + "c" : "INT NOT NULL" + }, { + "proctime" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + }, { + "rowtime" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "REGULAR" + } + } ] + }, + "description" : "Calc(select=[a, c, PROCTIME() AS proctime, rowtime])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWatermarkAssigner", + "watermarkExpr" : { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : { + "typeName" : "TIMESTAMP", + "nullable" : true, + "precision" : 3 + } + }, + "rowtimeFieldIndex" : 3, + "id" : 3, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT" + }, { + "c" : "INT NOT NULL" + }, { + "proctime" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + }, { + "rowtime" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + }, + "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : { + "typeName" : "INTEGER", + "nullable" : true + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : { + "typeName" : "INTEGER", + "nullable" : false + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : { + "timestampKind" : "PROCTIME", + "typeName" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false + } + }, { + "kind" : "REX_CALL", + "operator" : { + "name" : "CAST", + "kind" : "CAST", + "syntax" : "SPECIAL" + }, + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : { + "typeName" : "INTEGER", + "nullable" : true + } + } ], + "type" : { + "typeName" : "BIGINT", + "nullable" : true + } + } ], + "condition" : null, + "id" : 4, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT" + }, { + "c" : "INT NOT NULL" + }, { + "proctime" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + }, { + "$3" : "BIGINT" + } ] + }, + "description" : "Calc(select=[a, c, proctime, CAST(a) AS $3])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecExchange", + "id" : 5, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "HASH", + "keys" : [ 0 ] + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT" + }, { + "c" : "INT NOT NULL" + }, { + "proctime" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + }, { + "$3" : "BIGINT" + } ] + }, + "description" : "Exchange(distribution=[hash[a]])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonOverAggregate", + "overSpec" : { + "partition" : { + "fields" : [ 0 ] + }, + "groups" : [ { + "orderBy" : { + "fields" : [ { + "index" : 2, + "isAscending" : true, + "nullIsLast" : false + } ] + }, + "isRows" : false, + "lowerBound" : { + "kind" : "BOUNDED_WINDOW", + "isPreceding" : true, + "offset" : { + "kind" : "INPUT_REF", + "inputIndex" : 4, + "type" : { + "typeName" : "INTERVAL_HOUR", + "nullable" : false, + "precision" : 2, + "scale" : 6 + } + } + }, + "upperBound" : { + "kind" : "CURRENT_ROW" + }, + "aggCalls" : [ { + "name" : "w0$o0", + "aggFunction" : { + "name" : "pyFunc", + "kind" : "OTHER_FUNCTION", + "syntax" : "FUNCTION", + "functionKind" : "AGGREGATE", + "instance" : "rO0ABXNyAGBvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnBsYW5uZXIucnVudGltZS51dGlscy5KYXZhVXNlckRlZmluZWRBZ2dGdW5jdGlvbnMkUGFuZGFzQWdncmVnYXRlRnVuY3Rpb27LwxUYPcn9XwIAAHhyAFRvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnBsYW5uZXIucnVudGltZS51dGlscy5KYXZhVXNlckRlZmluZWRBZ2dGdW5jdGlvbnMkV2VpZ2h0ZWRBdmdnU2rsM9U-BAIAAHhyADJvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5BZ2dyZWdhdGVGdW5jdGlvbiDUjNyhaBuJAgAAeHIAPG9yZy5hcGFjaGUuZmxpbmsudGFibGUuZnVuY3Rpb25zLkltcGVyYXRpdmVBZ2dyZWdhdGVGdW5jdGlvbvJXgPavzWynAgAAeHIANG9yZy5hcGFjaGUuZmxpbmsudGFibGUuZnVuY3Rpb25zLlVzZXJEZWZpbmVkRnVuY3Rpb25ZaAsIu0MPFgIAAHhw", + "bridging" : true + }, + "argList" : [ 1, 1 ], + "filterArg" : -1, + "distinct" : false, + "approximate" : false, + "ignoreNulls" : false, + "type" : { + "typeName" : "BIGINT", + "nullable" : true + } + } ] + } ], + "constants" : [ { + "kind" : "LITERAL", + "value" : 7200000, + "type" : { + "typeName" : "INTERVAL_HOUR", + "nullable" : false, + "precision" : 2, + "scale" : 6 + } + } ], + "originalInputFields" : 4 + }, + "id" : 6, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT" + }, { + "c" : "INT NOT NULL" + }, { + "proctime" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + }, { + "$3" : "BIGINT" + }, { + "w0$o0" : "BIGINT" + } ] + }, + "description" : "PythonOverAggregate(partitionBy=[a], orderBy=[proctime ASC], window=[ RANG BETWEEN 7200000 PRECEDING AND CURRENT ROW], select=[a, c, proctime, $3, pyFunc(c, c) AS w0$o0])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : { + "typeName" : "BIGINT", + "nullable" : true + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 4, + "type" : { + "typeName" : "BIGINT", + "nullable" : true + } + } ], + "condition" : null, + "id" : 7, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "$0" : "BIGINT" + }, { + "$1" : "BIGINT" + } ] + }, + "description" : "Calc(select=[$3 AS $0, w0$o0 AS $1])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink", + "dynamicTableSink" : { + "identifier" : { + "catalogName" : "default_catalog", + "databaseName" : "default_database", + "tableName" : "MySink" + }, + "catalogTable" : { + "sink-insert-only" : "false", + "table-sink-class" : "DEFAULT", + "connector" : "values", + "schema.0.data-type" : "BIGINT", + "schema.1.name" : "b", + "schema.0.name" : "a", + "schema.1.data-type" : "BIGINT" + } + }, + "inputChangelogMode" : [ "INSERT" ], + "id" : 8, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "$0" : "BIGINT" + }, { + "$1" : "BIGINT" + } ] + }, + "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[$0, $1])" + } ], + "edges" : [ { + "source" : 1, + "target" : 2, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 2, + "target" : 3, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 3, + "target" : 4, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 4, + "target" : 5, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 5, + "target" : 6, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 6, + "target" : 7, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 7, + "target" : 8, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedPartitionedRowsOverWithBuiltinProctime.out b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedPartitionedRowsOverWithBuiltinProctime.out new file mode 100644 index 0000000000000..4e130b7b00252 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedPartitionedRowsOverWithBuiltinProctime.out @@ -0,0 +1,420 @@ +{ + "flinkVersion" : "", + "nodes" : [ { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTableSourceScan", + "scanTableSource" : { + "identifier" : { + "catalogName" : "default_catalog", + "databaseName" : "default_database", + "tableName" : "MyTable" + }, + "catalogTable" : { + "schema.watermark.0.strategy.expr" : "`rowtime`", + "schema.4.expr" : "PROCTIME()", + "schema.0.data-type" : "INT", + "schema.2.name" : "c", + "schema.1.name" : "b", + "bounded" : "false", + "schema.4.name" : "proctime", + "schema.1.data-type" : "VARCHAR(2147483647)", + "schema.3.data-type" : "TIMESTAMP(3)", + "schema.2.data-type" : "INT NOT NULL", + "schema.3.name" : "rowtime", + "connector" : "values", + "schema.watermark.0.rowtime" : "rowtime", + "schema.watermark.0.strategy.data-type" : "TIMESTAMP(3)", + "schema.4.data-type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL", + "schema.0.name" : "a" + }, + "sourceAbilitySpecs" : [ { + "type" : "ProjectPushDown", + "projectedFields" : [ [ 0 ], [ 2 ], [ 3 ] ], + "producedType" : { + "type" : "ROW", + "nullable" : false, + "fields" : [ { + "a" : "INT" + }, { + "c" : "INT NOT NULL" + }, { + "rowtime" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "REGULAR" + } + } ] + } + } ] + }, + "id" : 1, + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT" + }, { + "c" : "INT NOT NULL" + }, { + "rowtime" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "REGULAR" + } + } ] + }, + "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, c, rowtime]]], fields=[a, c, rowtime])", + "inputProperties" : [ ] + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWatermarkAssigner", + "watermarkExpr" : { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : { + "typeName" : "TIMESTAMP", + "nullable" : true, + "precision" : 3 + } + }, + "rowtimeFieldIndex" : 2, + "id" : 2, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT" + }, { + "c" : "INT NOT NULL" + }, { + "rowtime" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + }, + "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : { + "typeName" : "INTEGER", + "nullable" : true + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : { + "typeName" : "INTEGER", + "nullable" : false + } + }, { + "kind" : "REX_CALL", + "operator" : { + "name" : "CAST", + "kind" : "CAST", + "syntax" : "SPECIAL" + }, + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : { + "typeName" : "INTEGER", + "nullable" : true + } + } ], + "type" : { + "typeName" : "BIGINT", + "nullable" : true + } + }, { + "kind" : "REX_CALL", + "operator" : { + "name" : "PROCTIME", + "kind" : "OTHER_FUNCTION", + "syntax" : "FUNCTION" + }, + "operands" : [ ], + "type" : { + "timestampKind" : "PROCTIME", + "typeName" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false + } + } ], + "condition" : null, + "id" : 3, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT" + }, { + "c" : "INT NOT NULL" + }, { + "$2" : "BIGINT" + }, { + "$3" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + } ] + }, + "description" : "Calc(select=[a, c, CAST(a) AS $2, PROCTIME() AS $3])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecExchange", + "id" : 4, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "HASH", + "keys" : [ 0 ] + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT" + }, { + "c" : "INT NOT NULL" + }, { + "$2" : "BIGINT" + }, { + "$3" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + } ] + }, + "description" : "Exchange(distribution=[hash[a]])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonOverAggregate", + "overSpec" : { + "partition" : { + "fields" : [ 0 ] + }, + "groups" : [ { + "orderBy" : { + "fields" : [ { + "index" : 3, + "isAscending" : true, + "nullIsLast" : false + } ] + }, + "isRows" : true, + "lowerBound" : { + "kind" : "BOUNDED_WINDOW", + "isPreceding" : true, + "offset" : { + "kind" : "INPUT_REF", + "inputIndex" : 4, + "type" : { + "typeName" : "INTEGER", + "nullable" : false + } + } + }, + "upperBound" : { + "kind" : "CURRENT_ROW" + }, + "aggCalls" : [ { + "name" : "w0$o0", + "aggFunction" : { + "name" : "pyFunc", + "kind" : "OTHER_FUNCTION", + "syntax" : "FUNCTION", + "functionKind" : "AGGREGATE", + "instance" : "rO0ABXNyAGBvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnBsYW5uZXIucnVudGltZS51dGlscy5KYXZhVXNlckRlZmluZWRBZ2dGdW5jdGlvbnMkUGFuZGFzQWdncmVnYXRlRnVuY3Rpb27LwxUYPcn9XwIAAHhyAFRvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnBsYW5uZXIucnVudGltZS51dGlscy5KYXZhVXNlckRlZmluZWRBZ2dGdW5jdGlvbnMkV2VpZ2h0ZWRBdmdnU2rsM9U-BAIAAHhyADJvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5BZ2dyZWdhdGVGdW5jdGlvbiDUjNyhaBuJAgAAeHIAPG9yZy5hcGFjaGUuZmxpbmsudGFibGUuZnVuY3Rpb25zLkltcGVyYXRpdmVBZ2dyZWdhdGVGdW5jdGlvbvJXgPavzWynAgAAeHIANG9yZy5hcGFjaGUuZmxpbmsudGFibGUuZnVuY3Rpb25zLlVzZXJEZWZpbmVkRnVuY3Rpb25ZaAsIu0MPFgIAAHhw", + "bridging" : true + }, + "argList" : [ 1, 1 ], + "filterArg" : -1, + "distinct" : false, + "approximate" : false, + "ignoreNulls" : false, + "type" : { + "typeName" : "BIGINT", + "nullable" : true + } + } ] + } ], + "constants" : [ { + "kind" : "LITERAL", + "value" : "4", + "type" : { + "typeName" : "INTEGER", + "nullable" : false + } + } ], + "originalInputFields" : 4 + }, + "id" : 5, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT" + }, { + "c" : "INT NOT NULL" + }, { + "$2" : "BIGINT" + }, { + "$3" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + }, { + "w0$o0" : "BIGINT" + } ] + }, + "description" : "PythonOverAggregate(partitionBy=[a], orderBy=[$3 ASC], window=[ ROWS BETWEEN 4 PRECEDING AND CURRENT ROW], select=[a, c, $2, $3, pyFunc(c, c) AS w0$o0])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : { + "typeName" : "BIGINT", + "nullable" : true + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 4, + "type" : { + "typeName" : "BIGINT", + "nullable" : true + } + } ], + "condition" : null, + "id" : 6, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "$0" : "BIGINT" + }, { + "$1" : "BIGINT" + } ] + }, + "description" : "Calc(select=[$2 AS $0, w0$o0 AS $1])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink", + "dynamicTableSink" : { + "identifier" : { + "catalogName" : "default_catalog", + "databaseName" : "default_database", + "tableName" : "MySink" + }, + "catalogTable" : { + "sink-insert-only" : "false", + "table-sink-class" : "DEFAULT", + "connector" : "values", + "schema.0.data-type" : "BIGINT", + "schema.1.name" : "b", + "schema.0.name" : "a", + "schema.1.data-type" : "BIGINT" + } + }, + "inputChangelogMode" : [ "INSERT" ], + "id" : 7, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "$0" : "BIGINT" + }, { + "$1" : "BIGINT" + } ] + }, + "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[$0, $1])" + } ], + "edges" : [ { + "source" : 1, + "target" : 2, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 2, + "target" : 3, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 3, + "target" : 4, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 4, + "target" : 5, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 5, + "target" : 6, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 6, + "target" : 7, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeUnboundedPartitionedRangeOver.out b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeUnboundedPartitionedRangeOver.out new file mode 100644 index 0000000000000..fc723704c0699 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeUnboundedPartitionedRangeOver.out @@ -0,0 +1,484 @@ +{ + "flinkVersion" : "", + "nodes" : [ { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTableSourceScan", + "scanTableSource" : { + "identifier" : { + "catalogName" : "default_catalog", + "databaseName" : "default_database", + "tableName" : "MyTable" + }, + "catalogTable" : { + "schema.watermark.0.strategy.expr" : "`rowtime`", + "schema.4.expr" : "PROCTIME()", + "schema.0.data-type" : "INT", + "schema.2.name" : "c", + "schema.1.name" : "b", + "bounded" : "false", + "schema.4.name" : "proctime", + "schema.1.data-type" : "VARCHAR(2147483647)", + "schema.3.data-type" : "TIMESTAMP(3)", + "schema.2.data-type" : "INT NOT NULL", + "schema.3.name" : "rowtime", + "connector" : "values", + "schema.watermark.0.rowtime" : "rowtime", + "schema.watermark.0.strategy.data-type" : "TIMESTAMP(3)", + "schema.4.data-type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL", + "schema.0.name" : "a" + }, + "sourceAbilitySpecs" : [ { + "type" : "ProjectPushDown", + "projectedFields" : [ [ 0 ], [ 2 ], [ 3 ] ], + "producedType" : { + "type" : "ROW", + "nullable" : false, + "fields" : [ { + "a" : "INT" + }, { + "c" : "INT NOT NULL" + }, { + "rowtime" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "REGULAR" + } + } ] + } + } ] + }, + "id" : 1, + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT" + }, { + "c" : "INT NOT NULL" + }, { + "rowtime" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "REGULAR" + } + } ] + }, + "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, c, rowtime]]], fields=[a, c, rowtime])", + "inputProperties" : [ ] + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : { + "typeName" : "INTEGER", + "nullable" : true + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : { + "typeName" : "INTEGER", + "nullable" : false + } + }, { + "kind" : "REX_CALL", + "operator" : { + "name" : "PROCTIME", + "kind" : "OTHER_FUNCTION", + "syntax" : "FUNCTION" + }, + "operands" : [ ], + "type" : { + "timestampKind" : "PROCTIME", + "typeName" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : { + "typeName" : "TIMESTAMP", + "nullable" : true, + "precision" : 3 + } + } ], + "condition" : null, + "id" : 2, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT" + }, { + "c" : "INT NOT NULL" + }, { + "proctime" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + }, { + "rowtime" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "REGULAR" + } + } ] + }, + "description" : "Calc(select=[a, c, PROCTIME() AS proctime, rowtime])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWatermarkAssigner", + "watermarkExpr" : { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : { + "typeName" : "TIMESTAMP", + "nullable" : true, + "precision" : 3 + } + }, + "rowtimeFieldIndex" : 3, + "id" : 3, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT" + }, { + "c" : "INT NOT NULL" + }, { + "proctime" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + }, { + "rowtime" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + }, + "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : { + "typeName" : "INTEGER", + "nullable" : true + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : { + "typeName" : "INTEGER", + "nullable" : false + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : { + "timestampKind" : "PROCTIME", + "typeName" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false + } + }, { + "kind" : "REX_CALL", + "operator" : { + "name" : "CAST", + "kind" : "CAST", + "syntax" : "SPECIAL" + }, + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : { + "typeName" : "INTEGER", + "nullable" : true + } + } ], + "type" : { + "typeName" : "BIGINT", + "nullable" : true + } + } ], + "condition" : null, + "id" : 4, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT" + }, { + "c" : "INT NOT NULL" + }, { + "proctime" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + }, { + "$3" : "BIGINT" + } ] + }, + "description" : "Calc(select=[a, c, proctime, CAST(a) AS $3])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecExchange", + "id" : 5, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "HASH", + "keys" : [ 0 ] + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT" + }, { + "c" : "INT NOT NULL" + }, { + "proctime" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + }, { + "$3" : "BIGINT" + } ] + }, + "description" : "Exchange(distribution=[hash[a]])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonOverAggregate", + "overSpec" : { + "partition" : { + "fields" : [ 0 ] + }, + "groups" : [ { + "orderBy" : { + "fields" : [ { + "index" : 2, + "isAscending" : true, + "nullIsLast" : false + } ] + }, + "isRows" : false, + "lowerBound" : { + "kind" : "UNBOUNDED_PRECEDING" + }, + "upperBound" : { + "kind" : "CURRENT_ROW" + }, + "aggCalls" : [ { + "name" : "w0$o0", + "aggFunction" : { + "name" : "pyFunc", + "kind" : "OTHER_FUNCTION", + "syntax" : "FUNCTION", + "functionKind" : "AGGREGATE", + "instance" : "rO0ABXNyAGBvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnBsYW5uZXIucnVudGltZS51dGlscy5KYXZhVXNlckRlZmluZWRBZ2dGdW5jdGlvbnMkUGFuZGFzQWdncmVnYXRlRnVuY3Rpb27LwxUYPcn9XwIAAHhyAFRvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnBsYW5uZXIucnVudGltZS51dGlscy5KYXZhVXNlckRlZmluZWRBZ2dGdW5jdGlvbnMkV2VpZ2h0ZWRBdmdnU2rsM9U-BAIAAHhyADJvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5BZ2dyZWdhdGVGdW5jdGlvbiDUjNyhaBuJAgAAeHIAPG9yZy5hcGFjaGUuZmxpbmsudGFibGUuZnVuY3Rpb25zLkltcGVyYXRpdmVBZ2dyZWdhdGVGdW5jdGlvbvJXgPavzWynAgAAeHIANG9yZy5hcGFjaGUuZmxpbmsudGFibGUuZnVuY3Rpb25zLlVzZXJEZWZpbmVkRnVuY3Rpb25ZaAsIu0MPFgIAAHhw", + "bridging" : true + }, + "argList" : [ 1, 1 ], + "filterArg" : -1, + "distinct" : false, + "approximate" : false, + "ignoreNulls" : false, + "type" : { + "typeName" : "BIGINT", + "nullable" : true + } + } ] + } ], + "constants" : [ ], + "originalInputFields" : 4 + }, + "id" : 6, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT" + }, { + "c" : "INT NOT NULL" + }, { + "proctime" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + }, { + "$3" : "BIGINT" + }, { + "w0$o0" : "BIGINT" + } ] + }, + "description" : "PythonOverAggregate(partitionBy=[a], orderBy=[proctime ASC], window=[ RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, c, proctime, $3, pyFunc(c, c) AS w0$o0])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : { + "typeName" : "BIGINT", + "nullable" : true + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 4, + "type" : { + "typeName" : "BIGINT", + "nullable" : true + } + } ], + "condition" : null, + "id" : 7, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "$0" : "BIGINT" + }, { + "$1" : "BIGINT" + } ] + }, + "description" : "Calc(select=[$3 AS $0, w0$o0 AS $1])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink", + "dynamicTableSink" : { + "identifier" : { + "catalogName" : "default_catalog", + "databaseName" : "default_database", + "tableName" : "MySink" + }, + "catalogTable" : { + "sink-insert-only" : "false", + "table-sink-class" : "DEFAULT", + "connector" : "values", + "schema.0.data-type" : "BIGINT", + "schema.1.name" : "b", + "schema.0.name" : "a", + "schema.1.data-type" : "BIGINT" + } + }, + "inputChangelogMode" : [ "INSERT" ], + "id" : 8, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "$0" : "BIGINT" + }, { + "$1" : "BIGINT" + } ] + }, + "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[$0, $1])" + } ], + "edges" : [ { + "source" : 1, + "target" : 2, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 2, + "target" : 3, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 3, + "target" : 4, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 4, + "target" : 5, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 5, + "target" : 6, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 6, + "target" : 7, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 7, + "target" : 8, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testRowTimeBoundedPartitionedRowsOver.out b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testRowTimeBoundedPartitionedRowsOver.out new file mode 100644 index 0000000000000..e9e770da54951 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testRowTimeBoundedPartitionedRowsOver.out @@ -0,0 +1,415 @@ +{ + "flinkVersion" : "", + "nodes" : [ { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTableSourceScan", + "scanTableSource" : { + "identifier" : { + "catalogName" : "default_catalog", + "databaseName" : "default_database", + "tableName" : "MyTable" + }, + "catalogTable" : { + "schema.watermark.0.strategy.expr" : "`rowtime`", + "schema.4.expr" : "PROCTIME()", + "schema.0.data-type" : "INT", + "schema.2.name" : "c", + "schema.1.name" : "b", + "bounded" : "false", + "schema.4.name" : "proctime", + "schema.1.data-type" : "VARCHAR(2147483647)", + "schema.3.data-type" : "TIMESTAMP(3)", + "schema.2.data-type" : "INT NOT NULL", + "schema.3.name" : "rowtime", + "connector" : "values", + "schema.watermark.0.rowtime" : "rowtime", + "schema.watermark.0.strategy.data-type" : "TIMESTAMP(3)", + "schema.4.data-type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL", + "schema.0.name" : "a" + }, + "sourceAbilitySpecs" : [ { + "type" : "ProjectPushDown", + "projectedFields" : [ [ 0 ], [ 2 ], [ 3 ] ], + "producedType" : { + "type" : "ROW", + "nullable" : false, + "fields" : [ { + "a" : "INT" + }, { + "c" : "INT NOT NULL" + }, { + "rowtime" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "REGULAR" + } + } ] + } + } ] + }, + "id" : 1, + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT" + }, { + "c" : "INT NOT NULL" + }, { + "rowtime" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "REGULAR" + } + } ] + }, + "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, c, rowtime]]], fields=[a, c, rowtime])", + "inputProperties" : [ ] + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWatermarkAssigner", + "watermarkExpr" : { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : { + "typeName" : "TIMESTAMP", + "nullable" : true, + "precision" : 3 + } + }, + "rowtimeFieldIndex" : 2, + "id" : 2, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT" + }, { + "c" : "INT NOT NULL" + }, { + "rowtime" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + }, + "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : { + "typeName" : "INTEGER", + "nullable" : true + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : { + "typeName" : "INTEGER", + "nullable" : false + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : { + "timestampKind" : "ROWTIME", + "typeName" : "TIMESTAMP", + "nullable" : true + } + }, { + "kind" : "REX_CALL", + "operator" : { + "name" : "CAST", + "kind" : "CAST", + "syntax" : "SPECIAL" + }, + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : { + "typeName" : "INTEGER", + "nullable" : true + } + } ], + "type" : { + "typeName" : "BIGINT", + "nullable" : true + } + } ], + "condition" : null, + "id" : 3, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT" + }, { + "c" : "INT NOT NULL" + }, { + "rowtime" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "ROWTIME" + } + }, { + "$3" : "BIGINT" + } ] + }, + "description" : "Calc(select=[a, c, rowtime, CAST(a) AS $3])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecExchange", + "id" : 4, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "HASH", + "keys" : [ 0 ] + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT" + }, { + "c" : "INT NOT NULL" + }, { + "rowtime" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "ROWTIME" + } + }, { + "$3" : "BIGINT" + } ] + }, + "description" : "Exchange(distribution=[hash[a]])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonOverAggregate", + "overSpec" : { + "partition" : { + "fields" : [ 0 ] + }, + "groups" : [ { + "orderBy" : { + "fields" : [ { + "index" : 2, + "isAscending" : true, + "nullIsLast" : false + } ] + }, + "isRows" : true, + "lowerBound" : { + "kind" : "BOUNDED_WINDOW", + "isPreceding" : true, + "offset" : { + "kind" : "INPUT_REF", + "inputIndex" : 4, + "type" : { + "typeName" : "INTEGER", + "nullable" : false + } + } + }, + "upperBound" : { + "kind" : "CURRENT_ROW" + }, + "aggCalls" : [ { + "name" : "w0$o0", + "aggFunction" : { + "name" : "pyFunc", + "kind" : "OTHER_FUNCTION", + "syntax" : "FUNCTION", + "functionKind" : "AGGREGATE", + "instance" : "rO0ABXNyAGBvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnBsYW5uZXIucnVudGltZS51dGlscy5KYXZhVXNlckRlZmluZWRBZ2dGdW5jdGlvbnMkUGFuZGFzQWdncmVnYXRlRnVuY3Rpb27LwxUYPcn9XwIAAHhyAFRvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnBsYW5uZXIucnVudGltZS51dGlscy5KYXZhVXNlckRlZmluZWRBZ2dGdW5jdGlvbnMkV2VpZ2h0ZWRBdmdnU2rsM9U-BAIAAHhyADJvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5BZ2dyZWdhdGVGdW5jdGlvbiDUjNyhaBuJAgAAeHIAPG9yZy5hcGFjaGUuZmxpbmsudGFibGUuZnVuY3Rpb25zLkltcGVyYXRpdmVBZ2dyZWdhdGVGdW5jdGlvbvJXgPavzWynAgAAeHIANG9yZy5hcGFjaGUuZmxpbmsudGFibGUuZnVuY3Rpb25zLlVzZXJEZWZpbmVkRnVuY3Rpb25ZaAsIu0MPFgIAAHhw", + "bridging" : true + }, + "argList" : [ 1, 1 ], + "filterArg" : -1, + "distinct" : false, + "approximate" : false, + "ignoreNulls" : false, + "type" : { + "typeName" : "BIGINT", + "nullable" : true + } + } ] + } ], + "constants" : [ { + "kind" : "LITERAL", + "value" : "5", + "type" : { + "typeName" : "INTEGER", + "nullable" : false + } + } ], + "originalInputFields" : 4 + }, + "id" : 5, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT" + }, { + "c" : "INT NOT NULL" + }, { + "rowtime" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "ROWTIME" + } + }, { + "$3" : "BIGINT" + }, { + "w0$o0" : "BIGINT" + } ] + }, + "description" : "PythonOverAggregate(partitionBy=[a], orderBy=[rowtime ASC], window=[ ROWS BETWEEN 5 PRECEDING AND CURRENT ROW], select=[a, c, rowtime, $3, pyFunc(c, c) AS w0$o0])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : { + "typeName" : "BIGINT", + "nullable" : true + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 4, + "type" : { + "typeName" : "BIGINT", + "nullable" : true + } + } ], + "condition" : null, + "id" : 6, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "$0" : "BIGINT" + }, { + "$1" : "BIGINT" + } ] + }, + "description" : "Calc(select=[$3 AS $0, w0$o0 AS $1])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink", + "dynamicTableSink" : { + "identifier" : { + "catalogName" : "default_catalog", + "databaseName" : "default_database", + "tableName" : "MySink" + }, + "catalogTable" : { + "sink-insert-only" : "false", + "table-sink-class" : "DEFAULT", + "connector" : "values", + "schema.0.data-type" : "BIGINT", + "schema.1.name" : "b", + "schema.0.name" : "a", + "schema.1.data-type" : "BIGINT" + } + }, + "inputChangelogMode" : [ "INSERT" ], + "id" : 7, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "$0" : "BIGINT" + }, { + "$1" : "BIGINT" + } ] + }, + "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[$0, $1])" + } ], + "edges" : [ { + "source" : 1, + "target" : 2, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 2, + "target" : 3, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 3, + "target" : 4, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 4, + "target" : 5, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 5, + "target" : 6, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 6, + "target" : 7, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file