diff --git a/flink-python/pyflink/table/tests/test_udaf.py b/flink-python/pyflink/table/tests/test_udaf.py index 54caf694f90a9..c682c3cdb282c 100644 --- a/flink-python/pyflink/table/tests/test_udaf.py +++ b/flink-python/pyflink/table/tests/test_udaf.py @@ -785,6 +785,73 @@ def test_session_group_window_over_time(self): "+I[1, 2018-03-11 03:10:00.0, 2018-03-11 04:10:00.0, 2]", "+I[1, 2018-03-11 04:20:00.0, 2018-03-11 04:50:00.0, 1]"]) + def test_execute_group_window_aggregate_from_json_plan(self): + # create source file path + tmp_dir = self.tempdir + data = [ + '1,1,2,2018-03-11 03:10:00', + '3,3,2,2018-03-11 03:10:00', + '2,2,1,2018-03-11 03:10:00', + '1,1,3,2018-03-11 03:40:00', + '1,1,8,2018-03-11 04:20:00', + '2,2,3,2018-03-11 03:30:00' + ] + source_path = tmp_dir + '/test_execute_group_window_aggregate_from_json_plan.csv' + sink_path = tmp_dir + '/test_execute_group_window_aggregate_from_json_plan' + with open(source_path, 'w') as fd: + for ele in data: + fd.write(ele + '\n') + + source_table = """ + CREATE TABLE source_table ( + a TINYINT, + b SMALLINT, + c SMALLINT, + rowtime TIMESTAMP(3), + WATERMARK FOR rowtime AS rowtime - INTERVAL '60' MINUTE + ) WITH ( + 'connector' = 'filesystem', + 'path' = '%s', + 'format' = 'csv' + ) + """ % source_path + self.t_env.execute_sql(source_table) + + self.t_env.execute_sql(""" + CREATE TABLE sink_table ( + a BIGINT, + w_start TIMESTAMP(3), + w_end TIMESTAMP(3), + b BIGINT + ) WITH ( + 'connector' = 'filesystem', + 'path' = '%s', + 'format' = 'csv' + ) + """ % sink_path) + + self.t_env.create_temporary_function("my_count", CountAggregateFunction()) + + json_plan = self.t_env._j_tenv.getJsonPlan("INSERT INTO sink_table " + "SELECT a, " + "SESSION_START(rowtime, INTERVAL '30' MINUTE), " + "SESSION_END(rowtime, INTERVAL '30' MINUTE), " + "my_count(c) " + "FROM source_table " + "GROUP BY " + "a, b, SESSION(rowtime, INTERVAL '30' MINUTE)") + from py4j.java_gateway import get_method + get_method(self.t_env._j_tenv.executeJsonPlan(json_plan), "await")() + + import glob + lines = [line.strip() for file in glob.glob(sink_path + '/*') for line in open(file, 'r')] + lines.sort() + self.assertEqual(lines, + ['1,"2018-03-11 03:10:00","2018-03-11 04:10:00",2', + '1,"2018-03-11 04:20:00","2018-03-11 04:50:00",1', + '2,"2018-03-11 03:10:00","2018-03-11 04:00:00",2', + '3,"2018-03-11 03:10:00","2018-03-11 03:40:00",1']) + if __name__ == '__main__': import unittest diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupWindowAggregate.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupWindowAggregate.java index 07bdec35d03bc..c8a44a29f06c0 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupWindowAggregate.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupWindowAggregate.java @@ -41,9 +41,9 @@ import org.apache.flink.table.planner.plan.logical.TumblingGroupWindow; import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge; import org.apache.flink.table.planner.plan.nodes.exec.ExecNode; -import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase; import org.apache.flink.table.planner.plan.nodes.exec.InputProperty; -import org.apache.flink.table.planner.plan.nodes.exec.SingleTransformationTranslator; +import org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalWindowJsonDeserializer; +import org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalWindowJsonSerializer; import org.apache.flink.table.planner.plan.nodes.exec.utils.CommonPythonUtil; import org.apache.flink.table.planner.plan.utils.AggregateInfoList; import org.apache.flink.table.planner.plan.utils.KeySelectorUtil; @@ -66,6 +66,12 @@ import org.apache.flink.table.runtime.util.TimeWindowUtil; import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize; + import org.apache.calcite.rel.core.AggregateCall; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -75,6 +81,7 @@ import java.time.ZoneId; import java.util.Arrays; import java.util.Collections; +import java.util.List; import static org.apache.flink.table.planner.plan.utils.AggregateUtil.hasRowIntervalType; import static org.apache.flink.table.planner.plan.utils.AggregateUtil.hasTimeIntervalType; @@ -84,10 +91,12 @@ import static org.apache.flink.table.planner.plan.utils.AggregateUtil.toDuration; import static org.apache.flink.table.planner.plan.utils.AggregateUtil.toLong; import static org.apache.flink.table.planner.plan.utils.AggregateUtil.transformToStreamAggregateInfoList; +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; /** Stream {@link ExecNode} for group widow aggregate (Python user defined aggregate function). */ -public class StreamExecPythonGroupWindowAggregate extends ExecNodeBase - implements StreamExecNode, SingleTransformationTranslator { +@JsonIgnoreProperties(ignoreUnknown = true) +public class StreamExecPythonGroupWindowAggregate extends StreamExecAggregateBase { private static final Logger LOGGER = LoggerFactory.getLogger(StreamExecPythonGroupWindowAggregate.class); @@ -100,12 +109,27 @@ public class StreamExecPythonGroupWindowAggregate extends ExecNodeBase "org.apache.flink.table.runtime.operators.python.aggregate." + "PythonStreamGroupWindowAggregateOperator"; + public static final String FIELD_NAME_WINDOW = "window"; + public static final String FIELD_NAME_NAMED_WINDOW_PROPERTIES = "namedWindowProperties"; + + @JsonProperty(FIELD_NAME_GROUPING) private final int[] grouping; + + @JsonProperty(FIELD_NAME_AGG_CALLS) private final AggregateCall[] aggCalls; + + @JsonProperty(FIELD_NAME_WINDOW) + @JsonSerialize(using = LogicalWindowJsonSerializer.class) + @JsonDeserialize(using = LogicalWindowJsonDeserializer.class) private final LogicalWindow window; + + @JsonProperty(FIELD_NAME_NAMED_WINDOW_PROPERTIES) private final PlannerNamedWindowProperty[] namedWindowProperties; - private final WindowEmitStrategy emitStrategy; + + @JsonProperty(FIELD_NAME_NEED_RETRACTION) private final boolean needRetraction; + + @JsonProperty(FIELD_NAME_GENERATE_UPDATE_BEFORE) private final boolean generateUpdateBefore; public StreamExecPythonGroupWindowAggregate( @@ -113,18 +137,43 @@ public StreamExecPythonGroupWindowAggregate( AggregateCall[] aggCalls, LogicalWindow window, PlannerNamedWindowProperty[] namedWindowProperties, - WindowEmitStrategy emitStrategy, boolean generateUpdateBefore, boolean needRetraction, InputProperty inputProperty, RowType outputType, String description) { - super(Collections.singletonList(inputProperty), outputType, description); - this.grouping = grouping; - this.aggCalls = aggCalls; - this.window = window; - this.namedWindowProperties = namedWindowProperties; - this.emitStrategy = emitStrategy; + this( + grouping, + aggCalls, + window, + namedWindowProperties, + generateUpdateBefore, + needRetraction, + getNewNodeId(), + Collections.singletonList(inputProperty), + outputType, + description); + } + + @JsonCreator + public StreamExecPythonGroupWindowAggregate( + @JsonProperty(FIELD_NAME_GROUPING) int[] grouping, + @JsonProperty(FIELD_NAME_AGG_CALLS) AggregateCall[] aggCalls, + @JsonProperty(FIELD_NAME_WINDOW) LogicalWindow window, + @JsonProperty(FIELD_NAME_NAMED_WINDOW_PROPERTIES) + PlannerNamedWindowProperty[] namedWindowProperties, + @JsonProperty(FIELD_NAME_GENERATE_UPDATE_BEFORE) boolean generateUpdateBefore, + @JsonProperty(FIELD_NAME_NEED_RETRACTION) boolean needRetraction, + @JsonProperty(FIELD_NAME_ID) int id, + @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List inputProperties, + @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType, + @JsonProperty(FIELD_NAME_DESCRIPTION) String description) { + super(id, inputProperties, outputType, description); + checkArgument(inputProperties.size() == 1); + this.grouping = checkNotNull(grouping); + this.aggCalls = checkNotNull(aggCalls); + this.window = checkNotNull(window); + this.namedWindowProperties = checkNotNull(namedWindowProperties); this.generateUpdateBefore = generateUpdateBefore; this.needRetraction = needRetraction; } @@ -187,6 +236,7 @@ protected Transformation translateToPlanInternal(PlannerBase planner) { Arrays.stream(aggCalls) .anyMatch(x -> PythonUtil.isPythonAggregate(x, PythonFunctionKind.GENERAL)); OneInputTransformation transform; + WindowEmitStrategy emitStrategy = WindowEmitStrategy.apply(tableConfig, window); if (isGeneralPythonUDAF) { final boolean[] aggCallNeedRetractions = new boolean[aggCalls.length]; Arrays.fill(aggCallNeedRetractions, needRetraction); diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalPythonGroupWindowAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalPythonGroupWindowAggregate.scala index 6198e67611b59..a9679a2995627 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalPythonGroupWindowAggregate.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalPythonGroupWindowAggregate.scala @@ -75,7 +75,6 @@ class StreamPhysicalPythonGroupWindowAggregate( aggCalls.toArray, window, namedWindowProperties.toArray, - emitStrategy, generateUpdateBefore, needRetraction, InputProperty.DEFAULT, diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/JsonSerdeCoverageTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/JsonSerdeCoverageTest.java index e3539a3282698..3a68e3f4792e4 100644 --- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/JsonSerdeCoverageTest.java +++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/JsonSerdeCoverageTest.java @@ -44,7 +44,6 @@ public class JsonSerdeCoverageTest { "StreamExecLegacySink", "StreamExecPythonGroupAggregate", "StreamExecWindowTableFunction", - "StreamExecPythonGroupWindowAggregate", "StreamExecGroupTableAggregate", "StreamExecPythonGroupTableAggregate", "StreamExecPythonOverAggregate", diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest.java new file mode 100644 index 0000000000000..afcd4a86fe1a6 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.stream; + +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedAggFunctions.TestPythonAggregateFunction; +import org.apache.flink.table.planner.utils.StreamTableTestUtil; +import org.apache.flink.table.planner.utils.TableTestBase; + +import org.junit.Before; +import org.junit.Test; + +/** Test json serialization/deserialization for group window aggregate. */ +public class PythonGroupWindowAggregateJsonPlanTest extends TableTestBase { + private StreamTableTestUtil util; + private TableEnvironment tEnv; + + @Before + public void setup() { + util = streamTestUtil(TableConfig.getDefault()); + tEnv = util.getTableEnv(); + + String srcTableDdl = + "CREATE TABLE MyTable (\n" + + " a INT NOT NULL,\n" + + " b BIGINT,\n" + + " c VARCHAR,\n" + + " `rowtime` AS TO_TIMESTAMP(c),\n" + + " proctime as PROCTIME(),\n" + + " WATERMARK for `rowtime` AS `rowtime` - INTERVAL '1' SECOND\n" + + ") WITH (\n" + + " 'connector' = 'values')\n"; + tEnv.executeSql(srcTableDdl); + tEnv.createTemporarySystemFunction("pyFunc", new TestPythonAggregateFunction()); + } + + @Test + public void testEventTimeTumbleWindow() { + String sinkTableDdl = + "CREATE TABLE MySink (\n" + + " b BIGINT,\n" + + " window_start TIMESTAMP(3),\n" + + " window_end TIMESTAMP(3),\n" + + " c BIGINT\n" + + ") WITH (\n" + + " 'connector' = 'values')\n"; + tEnv.executeSql(sinkTableDdl); + util.verifyJsonPlan( + "insert into MySink select\n" + + " b,\n" + + " TUMBLE_START(rowtime, INTERVAL '5' SECOND) as window_start,\n" + + " TUMBLE_END(rowtime, INTERVAL '5' SECOND) as window_end,\n" + + " pyFunc(a, a + 1)\n" + + "FROM MyTable\n" + + "GROUP BY b, TUMBLE(rowtime, INTERVAL '5' SECOND)"); + } + + @Test + public void testProcTimeTumbleWindow() { + String sinkTableDdl = + "CREATE TABLE MySink (\n" + + " b BIGINT,\n" + + " window_end TIMESTAMP(3),\n" + + " c BIGINT\n" + + ") WITH (\n" + + " 'connector' = 'values')\n"; + tEnv.executeSql(sinkTableDdl); + util.verifyJsonPlan( + "insert into MySink select\n" + + " b,\n" + + " TUMBLE_END(proctime, INTERVAL '15' MINUTE) as window_end,\n" + + " pyFunc(a, a + 1)\n" + + "FROM MyTable\n" + + "GROUP BY b, TUMBLE(proctime, INTERVAL '15' MINUTE)"); + } + + @Test + public void testEventTimeHopWindow() { + String sinkTableDdl = + "CREATE TABLE MySink (\n" + + " b BIGINT,\n" + + " c BIGINT\n" + + ") WITH (\n" + + " 'connector' = 'values')\n"; + tEnv.executeSql(sinkTableDdl); + util.verifyJsonPlan( + "insert into MySink select\n" + + " b,\n" + + " pyFunc(a, a + 1)\n" + + "FROM MyTable\n" + + "GROUP BY b, HOP(rowtime, INTERVAL '5' SECOND, INTERVAL '10' SECOND)"); + } + + @Test + public void testProcTimeHopWindow() { + String sinkTableDdl = + "CREATE TABLE MySink (\n" + + " b BIGINT,\n" + + " c BIGINT\n" + + ") WITH (\n" + + " 'connector' = 'values')\n"; + tEnv.executeSql(sinkTableDdl); + util.verifyJsonPlan( + "insert into MySink select\n" + + " b,\n" + + " pyFunc(a, a + 1)\n" + + "FROM MyTable\n" + + "GROUP BY b, HOP(proctime, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE)"); + } + + @Test + public void testEventTimeSessionWindow() { + String sinkTableDdl = + "CREATE TABLE MySink (\n" + + " b BIGINT,\n" + + " c BIGINT\n" + + ") WITH (\n" + + " 'connector' = 'values')\n"; + tEnv.executeSql(sinkTableDdl); + util.verifyJsonPlan( + "insert into MySink select\n" + + " b,\n" + + " pyFunc(a, a + 1)\n" + + "FROM MyTable\n" + + "GROUP BY b, Session(rowtime, INTERVAL '10' SECOND)"); + } + + @Test + public void testProcTimeSessionWindow() { + String sinkTableDdl = + "CREATE TABLE MySink (\n" + + " b BIGINT,\n" + + " c BIGINT\n" + + ") WITH (\n" + + " 'connector' = 'values')\n"; + tEnv.executeSql(sinkTableDdl); + util.verifyJsonPlan( + "insert into MySink select\n" + + " b,\n" + + " pyFunc(a, a + 1)\n" + + "FROM MyTable\n" + + "GROUP BY b, Session(proctime, INTERVAL '10' MINUTE)"); + } +} diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeHopWindow.out b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeHopWindow.out new file mode 100644 index 0000000000000..7d577f4c33332 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeHopWindow.out @@ -0,0 +1,425 @@ +{ + "flinkVersion" : "", + "nodes" : [ { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTableSourceScan", + "scanTableSource" : { + "identifier" : { + "catalogName" : "default_catalog", + "databaseName" : "default_database", + "tableName" : "MyTable" + }, + "catalogTable" : { + "schema.watermark.0.strategy.expr" : "`rowtime` - INTERVAL '1' SECOND", + "schema.4.expr" : "PROCTIME()", + "schema.0.data-type" : "INT NOT NULL", + "schema.2.name" : "c", + "schema.1.name" : "b", + "schema.4.name" : "proctime", + "schema.1.data-type" : "BIGINT", + "schema.3.data-type" : "TIMESTAMP(3)", + "schema.2.data-type" : "VARCHAR(2147483647)", + "schema.3.name" : "rowtime", + "connector" : "values", + "schema.watermark.0.rowtime" : "rowtime", + "schema.watermark.0.strategy.data-type" : "TIMESTAMP(3)", + "schema.3.expr" : "TO_TIMESTAMP(`c`)", + "schema.4.data-type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL", + "schema.0.name" : "a" + } + }, + "id" : 1, + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT NOT NULL" + }, { + "b" : "BIGINT" + }, { + "c" : "VARCHAR(2147483647)" + } ] + }, + "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c])", + "inputProperties" : [ ] + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : { + "typeName" : "BIGINT", + "nullable" : true + } + }, { + "kind" : "REX_CALL", + "operator" : { + "name" : "TO_TIMESTAMP", + "kind" : "OTHER_FUNCTION", + "syntax" : "FUNCTION" + }, + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : { + "typeName" : "VARCHAR", + "nullable" : true, + "precision" : 2147483647 + } + } ], + "type" : { + "typeName" : "TIMESTAMP", + "nullable" : true, + "precision" : 3 + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : { + "typeName" : "INTEGER", + "nullable" : false + } + } ], + "condition" : null, + "id" : 2, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "b" : "BIGINT" + }, { + "rowtime" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "REGULAR" + } + }, { + "a" : "INT NOT NULL" + } ] + }, + "description" : "Calc(select=[b, TO_TIMESTAMP(c) AS rowtime, a])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWatermarkAssigner", + "watermarkExpr" : { + "kind" : "REX_CALL", + "operator" : { + "name" : "-", + "kind" : "MINUS", + "syntax" : "SPECIAL" + }, + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : { + "typeName" : "TIMESTAMP", + "nullable" : true, + "precision" : 3 + } + }, { + "kind" : "LITERAL", + "value" : 1000, + "type" : { + "typeName" : "INTERVAL_SECOND", + "nullable" : false, + "precision" : 2, + "scale" : 6 + } + } ], + "type" : { + "typeName" : "TIMESTAMP", + "nullable" : true, + "precision" : 3 + } + }, + "rowtimeFieldIndex" : 1, + "id" : 3, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "b" : "BIGINT" + }, { + "rowtime" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "ROWTIME" + } + }, { + "a" : "INT NOT NULL" + } ] + }, + "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 1000:INTERVAL SECOND)])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : { + "typeName" : "BIGINT", + "nullable" : true + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : { + "timestampKind" : "ROWTIME", + "typeName" : "TIMESTAMP", + "nullable" : true + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : { + "typeName" : "INTEGER", + "nullable" : false + } + }, { + "kind" : "REX_CALL", + "operator" : { + "name" : "+", + "kind" : "PLUS", + "syntax" : "BINARY" + }, + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : { + "typeName" : "INTEGER", + "nullable" : false + } + }, { + "kind" : "LITERAL", + "value" : "1", + "type" : { + "typeName" : "INTEGER", + "nullable" : false + } + } ], + "type" : { + "typeName" : "INTEGER", + "nullable" : false + } + } ], + "condition" : null, + "id" : 4, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "b" : "BIGINT" + }, { + "rowtime" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "ROWTIME" + } + }, { + "a" : "INT NOT NULL" + }, { + "$f3" : "INT NOT NULL" + } ] + }, + "description" : "Calc(select=[b, rowtime, a, (a + 1) AS $f3])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecExchange", + "id" : 5, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "HASH", + "keys" : [ 0 ] + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "b" : "BIGINT" + }, { + "rowtime" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "ROWTIME" + } + }, { + "a" : "INT NOT NULL" + }, { + "$f3" : "INT NOT NULL" + } ] + }, + "description" : "Exchange(distribution=[hash[b]])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonGroupWindowAggregate", + "grouping" : [ 0 ], + "aggCalls" : [ { + "name" : "EXPR$1", + "aggFunction" : { + "name" : "pyFunc", + "kind" : "OTHER_FUNCTION", + "syntax" : "FUNCTION", + "functionKind" : "AGGREGATE", + "instance" : "rO0ABXNyAGRvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnBsYW5uZXIucnVudGltZS51dGlscy5KYXZhVXNlckRlZmluZWRBZ2dGdW5jdGlvbnMkVGVzdFB5dGhvbkFnZ3JlZ2F0ZUZ1bmN0aW9uera6fuZvJ50CAAB4cgBUb3JnLmFwYWNoZS5mbGluay50YWJsZS5wbGFubmVyLnJ1bnRpbWUudXRpbHMuSmF2YVVzZXJEZWZpbmVkQWdnRnVuY3Rpb25zJFdlaWdodGVkQXZnZ1Nq7DPVPgQCAAB4cgAyb3JnLmFwYWNoZS5mbGluay50YWJsZS5mdW5jdGlvbnMuQWdncmVnYXRlRnVuY3Rpb24g1IzcoWgbiQIAAHhyADxvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5JbXBlcmF0aXZlQWdncmVnYXRlRnVuY3Rpb27yV4D2r81spwIAAHhyADRvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5Vc2VyRGVmaW5lZEZ1bmN0aW9uWWgLCLtDDxYCAAB4cA", + "bridging" : true + }, + "argList" : [ 2, 3 ], + "filterArg" : -1, + "distinct" : false, + "approximate" : false, + "ignoreNulls" : false, + "type" : { + "typeName" : "BIGINT", + "nullable" : true + } + } ], + "window" : { + "kind" : "SLIDING", + "alias" : { + "name" : "w$", + "type" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "ROWTIME" + } + }, + "timeField" : { + "fieldName" : "rowtime", + "fieldIndex" : 1, + "inputIndex" : 0, + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "ROWTIME" + } + }, + "isTimeWindow" : true, + "size" : "PT10S", + "slide" : "PT5S" + }, + "namedWindowProperties" : [ ], + "generateUpdateBefore" : false, + "needRetraction" : false, + "id" : 6, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "b" : "BIGINT" + }, { + "EXPR$1" : "BIGINT" + } ] + }, + "description" : "PythonGroupWindowAggregate(groupBy=[b], window=[SlidingGroupWindow('w$, rowtime, 10000, 5000)], select=[b, pyFunc(a, $f3) AS EXPR$1])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink", + "dynamicTableSink" : { + "identifier" : { + "catalogName" : "default_catalog", + "databaseName" : "default_database", + "tableName" : "MySink" + }, + "catalogTable" : { + "connector" : "values", + "schema.0.data-type" : "BIGINT", + "schema.1.name" : "c", + "schema.0.name" : "b", + "schema.1.data-type" : "BIGINT" + } + }, + "inputChangelogMode" : [ "INSERT" ], + "id" : 7, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "b" : "BIGINT" + }, { + "EXPR$1" : "BIGINT" + } ] + }, + "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[b, EXPR$1])" + } ], + "edges" : [ { + "source" : 1, + "target" : 2, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 2, + "target" : 3, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 3, + "target" : 4, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 4, + "target" : 5, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 5, + "target" : 6, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 6, + "target" : 7, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeSessionWindow.out b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeSessionWindow.out new file mode 100644 index 0000000000000..0cd0df2564db8 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeSessionWindow.out @@ -0,0 +1,423 @@ +{ + "flinkVersion" : "", + "nodes" : [ { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTableSourceScan", + "scanTableSource" : { + "identifier" : { + "catalogName" : "default_catalog", + "databaseName" : "default_database", + "tableName" : "MyTable" + }, + "catalogTable" : { + "schema.watermark.0.strategy.expr" : "`rowtime` - INTERVAL '1' SECOND", + "schema.4.expr" : "PROCTIME()", + "schema.0.data-type" : "INT NOT NULL", + "schema.2.name" : "c", + "schema.1.name" : "b", + "schema.4.name" : "proctime", + "schema.1.data-type" : "BIGINT", + "schema.3.data-type" : "TIMESTAMP(3)", + "schema.2.data-type" : "VARCHAR(2147483647)", + "schema.3.name" : "rowtime", + "connector" : "values", + "schema.watermark.0.rowtime" : "rowtime", + "schema.watermark.0.strategy.data-type" : "TIMESTAMP(3)", + "schema.3.expr" : "TO_TIMESTAMP(`c`)", + "schema.4.data-type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL", + "schema.0.name" : "a" + } + }, + "id" : 1, + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT NOT NULL" + }, { + "b" : "BIGINT" + }, { + "c" : "VARCHAR(2147483647)" + } ] + }, + "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c])", + "inputProperties" : [ ] + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : { + "typeName" : "BIGINT", + "nullable" : true + } + }, { + "kind" : "REX_CALL", + "operator" : { + "name" : "TO_TIMESTAMP", + "kind" : "OTHER_FUNCTION", + "syntax" : "FUNCTION" + }, + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : { + "typeName" : "VARCHAR", + "nullable" : true, + "precision" : 2147483647 + } + } ], + "type" : { + "typeName" : "TIMESTAMP", + "nullable" : true, + "precision" : 3 + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : { + "typeName" : "INTEGER", + "nullable" : false + } + } ], + "condition" : null, + "id" : 2, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "b" : "BIGINT" + }, { + "rowtime" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "REGULAR" + } + }, { + "a" : "INT NOT NULL" + } ] + }, + "description" : "Calc(select=[b, TO_TIMESTAMP(c) AS rowtime, a])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWatermarkAssigner", + "watermarkExpr" : { + "kind" : "REX_CALL", + "operator" : { + "name" : "-", + "kind" : "MINUS", + "syntax" : "SPECIAL" + }, + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : { + "typeName" : "TIMESTAMP", + "nullable" : true, + "precision" : 3 + } + }, { + "kind" : "LITERAL", + "value" : 1000, + "type" : { + "typeName" : "INTERVAL_SECOND", + "nullable" : false, + "precision" : 2, + "scale" : 6 + } + } ], + "type" : { + "typeName" : "TIMESTAMP", + "nullable" : true, + "precision" : 3 + } + }, + "rowtimeFieldIndex" : 1, + "id" : 3, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "b" : "BIGINT" + }, { + "rowtime" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "ROWTIME" + } + }, { + "a" : "INT NOT NULL" + } ] + }, + "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 1000:INTERVAL SECOND)])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : { + "typeName" : "BIGINT", + "nullable" : true + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : { + "timestampKind" : "ROWTIME", + "typeName" : "TIMESTAMP", + "nullable" : true + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : { + "typeName" : "INTEGER", + "nullable" : false + } + }, { + "kind" : "REX_CALL", + "operator" : { + "name" : "+", + "kind" : "PLUS", + "syntax" : "BINARY" + }, + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : { + "typeName" : "INTEGER", + "nullable" : false + } + }, { + "kind" : "LITERAL", + "value" : "1", + "type" : { + "typeName" : "INTEGER", + "nullable" : false + } + } ], + "type" : { + "typeName" : "INTEGER", + "nullable" : false + } + } ], + "condition" : null, + "id" : 4, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "b" : "BIGINT" + }, { + "rowtime" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "ROWTIME" + } + }, { + "a" : "INT NOT NULL" + }, { + "$f3" : "INT NOT NULL" + } ] + }, + "description" : "Calc(select=[b, rowtime, a, (a + 1) AS $f3])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecExchange", + "id" : 5, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "HASH", + "keys" : [ 0 ] + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "b" : "BIGINT" + }, { + "rowtime" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "ROWTIME" + } + }, { + "a" : "INT NOT NULL" + }, { + "$f3" : "INT NOT NULL" + } ] + }, + "description" : "Exchange(distribution=[hash[b]])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonGroupWindowAggregate", + "grouping" : [ 0 ], + "aggCalls" : [ { + "name" : "EXPR$1", + "aggFunction" : { + "name" : "pyFunc", + "kind" : "OTHER_FUNCTION", + "syntax" : "FUNCTION", + "functionKind" : "AGGREGATE", + "instance" : "rO0ABXNyAGRvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnBsYW5uZXIucnVudGltZS51dGlscy5KYXZhVXNlckRlZmluZWRBZ2dGdW5jdGlvbnMkVGVzdFB5dGhvbkFnZ3JlZ2F0ZUZ1bmN0aW9uera6fuZvJ50CAAB4cgBUb3JnLmFwYWNoZS5mbGluay50YWJsZS5wbGFubmVyLnJ1bnRpbWUudXRpbHMuSmF2YVVzZXJEZWZpbmVkQWdnRnVuY3Rpb25zJFdlaWdodGVkQXZnZ1Nq7DPVPgQCAAB4cgAyb3JnLmFwYWNoZS5mbGluay50YWJsZS5mdW5jdGlvbnMuQWdncmVnYXRlRnVuY3Rpb24g1IzcoWgbiQIAAHhyADxvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5JbXBlcmF0aXZlQWdncmVnYXRlRnVuY3Rpb27yV4D2r81spwIAAHhyADRvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5Vc2VyRGVmaW5lZEZ1bmN0aW9uWWgLCLtDDxYCAAB4cA", + "bridging" : true + }, + "argList" : [ 2, 3 ], + "filterArg" : -1, + "distinct" : false, + "approximate" : false, + "ignoreNulls" : false, + "type" : { + "typeName" : "BIGINT", + "nullable" : true + } + } ], + "window" : { + "kind" : "SESSION", + "alias" : { + "name" : "w$", + "type" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "ROWTIME" + } + }, + "timeField" : { + "fieldName" : "rowtime", + "fieldIndex" : 1, + "inputIndex" : 0, + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "ROWTIME" + } + }, + "gap" : "PT10S" + }, + "namedWindowProperties" : [ ], + "generateUpdateBefore" : false, + "needRetraction" : false, + "id" : 6, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "b" : "BIGINT" + }, { + "EXPR$1" : "BIGINT" + } ] + }, + "description" : "PythonGroupWindowAggregate(groupBy=[b], window=[SessionGroupWindow('w$, rowtime, 10000)], select=[b, pyFunc(a, $f3) AS EXPR$1])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink", + "dynamicTableSink" : { + "identifier" : { + "catalogName" : "default_catalog", + "databaseName" : "default_database", + "tableName" : "MySink" + }, + "catalogTable" : { + "connector" : "values", + "schema.0.data-type" : "BIGINT", + "schema.1.name" : "c", + "schema.0.name" : "b", + "schema.1.data-type" : "BIGINT" + } + }, + "inputChangelogMode" : [ "INSERT" ], + "id" : 7, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "b" : "BIGINT" + }, { + "EXPR$1" : "BIGINT" + } ] + }, + "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[b, EXPR$1])" + } ], + "edges" : [ { + "source" : 1, + "target" : 2, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 2, + "target" : 3, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 3, + "target" : 4, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 4, + "target" : 5, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 5, + "target" : 6, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 6, + "target" : 7, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeTumbleWindow.out b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeTumbleWindow.out new file mode 100644 index 0000000000000..d8f94604e35ca --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeTumbleWindow.out @@ -0,0 +1,599 @@ +{ + "flinkVersion" : "", + "nodes" : [ { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTableSourceScan", + "scanTableSource" : { + "identifier" : { + "catalogName" : "default_catalog", + "databaseName" : "default_database", + "tableName" : "MyTable" + }, + "catalogTable" : { + "schema.watermark.0.strategy.expr" : "`rowtime` - INTERVAL '1' SECOND", + "schema.4.expr" : "PROCTIME()", + "schema.0.data-type" : "INT NOT NULL", + "schema.2.name" : "c", + "schema.1.name" : "b", + "schema.4.name" : "proctime", + "schema.1.data-type" : "BIGINT", + "schema.3.data-type" : "TIMESTAMP(3)", + "schema.2.data-type" : "VARCHAR(2147483647)", + "schema.3.name" : "rowtime", + "connector" : "values", + "schema.watermark.0.rowtime" : "rowtime", + "schema.watermark.0.strategy.data-type" : "TIMESTAMP(3)", + "schema.3.expr" : "TO_TIMESTAMP(`c`)", + "schema.4.data-type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL", + "schema.0.name" : "a" + } + }, + "id" : 1, + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT NOT NULL" + }, { + "b" : "BIGINT" + }, { + "c" : "VARCHAR(2147483647)" + } ] + }, + "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c])", + "inputProperties" : [ ] + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : { + "typeName" : "BIGINT", + "nullable" : true + } + }, { + "kind" : "REX_CALL", + "operator" : { + "name" : "TO_TIMESTAMP", + "kind" : "OTHER_FUNCTION", + "syntax" : "FUNCTION" + }, + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : { + "typeName" : "VARCHAR", + "nullable" : true, + "precision" : 2147483647 + } + } ], + "type" : { + "typeName" : "TIMESTAMP", + "nullable" : true, + "precision" : 3 + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : { + "typeName" : "INTEGER", + "nullable" : false + } + } ], + "condition" : null, + "id" : 2, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "b" : "BIGINT" + }, { + "rowtime" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "REGULAR" + } + }, { + "a" : "INT NOT NULL" + } ] + }, + "description" : "Calc(select=[b, TO_TIMESTAMP(c) AS rowtime, a])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWatermarkAssigner", + "watermarkExpr" : { + "kind" : "REX_CALL", + "operator" : { + "name" : "-", + "kind" : "MINUS", + "syntax" : "SPECIAL" + }, + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : { + "typeName" : "TIMESTAMP", + "nullable" : true, + "precision" : 3 + } + }, { + "kind" : "LITERAL", + "value" : 1000, + "type" : { + "typeName" : "INTERVAL_SECOND", + "nullable" : false, + "precision" : 2, + "scale" : 6 + } + } ], + "type" : { + "typeName" : "TIMESTAMP", + "nullable" : true, + "precision" : 3 + } + }, + "rowtimeFieldIndex" : 1, + "id" : 3, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "b" : "BIGINT" + }, { + "rowtime" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "ROWTIME" + } + }, { + "a" : "INT NOT NULL" + } ] + }, + "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 1000:INTERVAL SECOND)])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : { + "typeName" : "BIGINT", + "nullable" : true + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : { + "timestampKind" : "ROWTIME", + "typeName" : "TIMESTAMP", + "nullable" : true + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : { + "typeName" : "INTEGER", + "nullable" : false + } + }, { + "kind" : "REX_CALL", + "operator" : { + "name" : "+", + "kind" : "PLUS", + "syntax" : "BINARY" + }, + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : { + "typeName" : "INTEGER", + "nullable" : false + } + }, { + "kind" : "LITERAL", + "value" : "1", + "type" : { + "typeName" : "INTEGER", + "nullable" : false + } + } ], + "type" : { + "typeName" : "INTEGER", + "nullable" : false + } + } ], + "condition" : null, + "id" : 4, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "b" : "BIGINT" + }, { + "rowtime" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "ROWTIME" + } + }, { + "a" : "INT NOT NULL" + }, { + "$f3" : "INT NOT NULL" + } ] + }, + "description" : "Calc(select=[b, rowtime, a, (a + 1) AS $f3])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecExchange", + "id" : 5, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "HASH", + "keys" : [ 0 ] + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "b" : "BIGINT" + }, { + "rowtime" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "ROWTIME" + } + }, { + "a" : "INT NOT NULL" + }, { + "$f3" : "INT NOT NULL" + } ] + }, + "description" : "Exchange(distribution=[hash[b]])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonGroupWindowAggregate", + "grouping" : [ 0 ], + "aggCalls" : [ { + "name" : "EXPR$3", + "aggFunction" : { + "name" : "pyFunc", + "kind" : "OTHER_FUNCTION", + "syntax" : "FUNCTION", + "functionKind" : "AGGREGATE", + "instance" : "rO0ABXNyAGRvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnBsYW5uZXIucnVudGltZS51dGlscy5KYXZhVXNlckRlZmluZWRBZ2dGdW5jdGlvbnMkVGVzdFB5dGhvbkFnZ3JlZ2F0ZUZ1bmN0aW9uera6fuZvJ50CAAB4cgBUb3JnLmFwYWNoZS5mbGluay50YWJsZS5wbGFubmVyLnJ1bnRpbWUudXRpbHMuSmF2YVVzZXJEZWZpbmVkQWdnRnVuY3Rpb25zJFdlaWdodGVkQXZnZ1Nq7DPVPgQCAAB4cgAyb3JnLmFwYWNoZS5mbGluay50YWJsZS5mdW5jdGlvbnMuQWdncmVnYXRlRnVuY3Rpb24g1IzcoWgbiQIAAHhyADxvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5JbXBlcmF0aXZlQWdncmVnYXRlRnVuY3Rpb27yV4D2r81spwIAAHhyADRvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5Vc2VyRGVmaW5lZEZ1bmN0aW9uWWgLCLtDDxYCAAB4cA", + "bridging" : true + }, + "argList" : [ 2, 3 ], + "filterArg" : -1, + "distinct" : false, + "approximate" : false, + "ignoreNulls" : false, + "type" : { + "typeName" : "BIGINT", + "nullable" : true + } + } ], + "window" : { + "kind" : "TUMBLING", + "alias" : { + "name" : "w$", + "type" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "ROWTIME" + } + }, + "timeField" : { + "fieldName" : "rowtime", + "fieldIndex" : 1, + "inputIndex" : 0, + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "ROWTIME" + } + }, + "isTimeWindow" : true, + "size" : "PT5S" + }, + "namedWindowProperties" : [ { + "name" : "w$start", + "property" : { + "kind" : "WindowStart", + "reference" : { + "name" : "w$", + "type" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "ROWTIME" + } + } + } + }, { + "name" : "w$end", + "property" : { + "kind" : "WindowEnd", + "reference" : { + "name" : "w$", + "type" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "ROWTIME" + } + } + } + }, { + "name" : "w$rowtime", + "property" : { + "kind" : "Rowtime", + "reference" : { + "name" : "w$", + "type" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "ROWTIME" + } + } + } + }, { + "name" : "w$proctime", + "property" : { + "kind" : "Proctime", + "reference" : { + "name" : "w$", + "type" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "ROWTIME" + } + } + } + } ], + "generateUpdateBefore" : false, + "needRetraction" : false, + "id" : 6, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "b" : "BIGINT" + }, { + "EXPR$3" : "BIGINT" + }, { + "w$start" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "REGULAR" + } + }, { + "w$end" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "REGULAR" + } + }, { + "w$rowtime" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "ROWTIME" + } + }, { + "w$proctime" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "PROCTIME" + } + } ] + }, + "description" : "PythonGroupWindowAggregate(groupBy=[b], window=[TumblingGroupWindow('w$, rowtime, 5000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[b, pyFunc(a, $f3) AS EXPR$3, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : { + "typeName" : "BIGINT", + "nullable" : true + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : { + "typeName" : "TIMESTAMP", + "nullable" : false, + "precision" : 3 + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : { + "typeName" : "TIMESTAMP", + "nullable" : false, + "precision" : 3 + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : { + "typeName" : "BIGINT", + "nullable" : true + } + } ], + "condition" : null, + "id" : 7, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "b" : "BIGINT" + }, { + "window_start" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "REGULAR" + } + }, { + "window_end" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "REGULAR" + } + }, { + "EXPR$3" : "BIGINT" + } ] + }, + "description" : "Calc(select=[b, w$start AS window_start, w$end AS window_end, EXPR$3])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink", + "dynamicTableSink" : { + "identifier" : { + "catalogName" : "default_catalog", + "databaseName" : "default_database", + "tableName" : "MySink" + }, + "catalogTable" : { + "schema.3.data-type" : "BIGINT", + "schema.2.data-type" : "TIMESTAMP(3)", + "schema.3.name" : "c", + "connector" : "values", + "schema.0.data-type" : "BIGINT", + "schema.2.name" : "window_end", + "schema.1.name" : "window_start", + "schema.0.name" : "b", + "schema.1.data-type" : "TIMESTAMP(3)" + } + }, + "inputChangelogMode" : [ "INSERT" ], + "id" : 8, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "b" : "BIGINT" + }, { + "window_start" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "REGULAR" + } + }, { + "window_end" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "REGULAR" + } + }, { + "EXPR$3" : "BIGINT" + } ] + }, + "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[b, window_start, window_end, EXPR$3])" + } ], + "edges" : [ { + "source" : 1, + "target" : 2, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 2, + "target" : 3, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 3, + "target" : 4, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 4, + "target" : 5, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 5, + "target" : 6, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 6, + "target" : 7, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 7, + "target" : 8, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeHopWindow.out b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeHopWindow.out new file mode 100644 index 0000000000000..ee10f78bad592 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeHopWindow.out @@ -0,0 +1,464 @@ +{ + "flinkVersion" : "", + "nodes" : [ { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTableSourceScan", + "scanTableSource" : { + "identifier" : { + "catalogName" : "default_catalog", + "databaseName" : "default_database", + "tableName" : "MyTable" + }, + "catalogTable" : { + "schema.watermark.0.strategy.expr" : "`rowtime` - INTERVAL '1' SECOND", + "schema.4.expr" : "PROCTIME()", + "schema.0.data-type" : "INT NOT NULL", + "schema.2.name" : "c", + "schema.1.name" : "b", + "schema.4.name" : "proctime", + "schema.1.data-type" : "BIGINT", + "schema.3.data-type" : "TIMESTAMP(3)", + "schema.2.data-type" : "VARCHAR(2147483647)", + "schema.3.name" : "rowtime", + "connector" : "values", + "schema.watermark.0.rowtime" : "rowtime", + "schema.watermark.0.strategy.data-type" : "TIMESTAMP(3)", + "schema.3.expr" : "TO_TIMESTAMP(`c`)", + "schema.4.data-type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL", + "schema.0.name" : "a" + } + }, + "id" : 1, + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT NOT NULL" + }, { + "b" : "BIGINT" + }, { + "c" : "VARCHAR(2147483647)" + } ] + }, + "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c])", + "inputProperties" : [ ] + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : { + "typeName" : "INTEGER", + "nullable" : false + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : { + "typeName" : "BIGINT", + "nullable" : true + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : { + "typeName" : "VARCHAR", + "nullable" : true, + "precision" : 2147483647 + } + }, { + "kind" : "REX_CALL", + "operator" : { + "name" : "TO_TIMESTAMP", + "kind" : "OTHER_FUNCTION", + "syntax" : "FUNCTION" + }, + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : { + "typeName" : "VARCHAR", + "nullable" : true, + "precision" : 2147483647 + } + } ], + "type" : { + "typeName" : "TIMESTAMP", + "nullable" : true, + "precision" : 3 + } + }, { + "kind" : "REX_CALL", + "operator" : { + "name" : "PROCTIME", + "kind" : "OTHER_FUNCTION", + "syntax" : "FUNCTION" + }, + "operands" : [ ], + "type" : { + "timestampKind" : "PROCTIME", + "typeName" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false + } + } ], + "condition" : null, + "id" : 2, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT NOT NULL" + }, { + "b" : "BIGINT" + }, { + "c" : "VARCHAR(2147483647)" + }, { + "rowtime" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "REGULAR" + } + }, { + "proctime" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + } ] + }, + "description" : "Calc(select=[a, b, c, TO_TIMESTAMP(c) AS rowtime, PROCTIME() AS proctime])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWatermarkAssigner", + "watermarkExpr" : { + "kind" : "REX_CALL", + "operator" : { + "name" : "-", + "kind" : "MINUS", + "syntax" : "SPECIAL" + }, + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : { + "typeName" : "TIMESTAMP", + "nullable" : true, + "precision" : 3 + } + }, { + "kind" : "LITERAL", + "value" : 1000, + "type" : { + "typeName" : "INTERVAL_SECOND", + "nullable" : false, + "precision" : 2, + "scale" : 6 + } + } ], + "type" : { + "typeName" : "TIMESTAMP", + "nullable" : true, + "precision" : 3 + } + }, + "rowtimeFieldIndex" : 3, + "id" : 3, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT NOT NULL" + }, { + "b" : "BIGINT" + }, { + "c" : "VARCHAR(2147483647)" + }, { + "rowtime" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "ROWTIME" + } + }, { + "proctime" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + } ] + }, + "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 1000:INTERVAL SECOND)])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : { + "typeName" : "BIGINT", + "nullable" : true + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 4, + "type" : { + "timestampKind" : "PROCTIME", + "typeName" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : { + "typeName" : "INTEGER", + "nullable" : false + } + }, { + "kind" : "REX_CALL", + "operator" : { + "name" : "+", + "kind" : "PLUS", + "syntax" : "BINARY" + }, + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : { + "typeName" : "INTEGER", + "nullable" : false + } + }, { + "kind" : "LITERAL", + "value" : "1", + "type" : { + "typeName" : "INTEGER", + "nullable" : false + } + } ], + "type" : { + "typeName" : "INTEGER", + "nullable" : false + } + } ], + "condition" : null, + "id" : 4, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "b" : "BIGINT" + }, { + "proctime" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + }, { + "a" : "INT NOT NULL" + }, { + "$f3" : "INT NOT NULL" + } ] + }, + "description" : "Calc(select=[b, proctime, a, (a + 1) AS $f3])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecExchange", + "id" : 5, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "HASH", + "keys" : [ 0 ] + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "b" : "BIGINT" + }, { + "proctime" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + }, { + "a" : "INT NOT NULL" + }, { + "$f3" : "INT NOT NULL" + } ] + }, + "description" : "Exchange(distribution=[hash[b]])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonGroupWindowAggregate", + "grouping" : [ 0 ], + "aggCalls" : [ { + "name" : "EXPR$1", + "aggFunction" : { + "name" : "pyFunc", + "kind" : "OTHER_FUNCTION", + "syntax" : "FUNCTION", + "functionKind" : "AGGREGATE", + "instance" : "rO0ABXNyAGRvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnBsYW5uZXIucnVudGltZS51dGlscy5KYXZhVXNlckRlZmluZWRBZ2dGdW5jdGlvbnMkVGVzdFB5dGhvbkFnZ3JlZ2F0ZUZ1bmN0aW9uera6fuZvJ50CAAB4cgBUb3JnLmFwYWNoZS5mbGluay50YWJsZS5wbGFubmVyLnJ1bnRpbWUudXRpbHMuSmF2YVVzZXJEZWZpbmVkQWdnRnVuY3Rpb25zJFdlaWdodGVkQXZnZ1Nq7DPVPgQCAAB4cgAyb3JnLmFwYWNoZS5mbGluay50YWJsZS5mdW5jdGlvbnMuQWdncmVnYXRlRnVuY3Rpb24g1IzcoWgbiQIAAHhyADxvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5JbXBlcmF0aXZlQWdncmVnYXRlRnVuY3Rpb27yV4D2r81spwIAAHhyADRvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5Vc2VyRGVmaW5lZEZ1bmN0aW9uWWgLCLtDDxYCAAB4cA", + "bridging" : true + }, + "argList" : [ 2, 3 ], + "filterArg" : -1, + "distinct" : false, + "approximate" : false, + "ignoreNulls" : false, + "type" : { + "typeName" : "BIGINT", + "nullable" : true + } + } ], + "window" : { + "kind" : "SLIDING", + "alias" : { + "name" : "w$", + "type" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + }, + "timeField" : { + "fieldName" : "proctime", + "fieldIndex" : 1, + "inputIndex" : 0, + "fieldType" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + }, + "isTimeWindow" : true, + "size" : "PT10M", + "slide" : "PT5M" + }, + "namedWindowProperties" : [ ], + "generateUpdateBefore" : false, + "needRetraction" : false, + "id" : 6, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "b" : "BIGINT" + }, { + "EXPR$1" : "BIGINT" + } ] + }, + "description" : "PythonGroupWindowAggregate(groupBy=[b], window=[SlidingGroupWindow('w$, proctime, 600000, 300000)], select=[b, pyFunc(a, $f3) AS EXPR$1])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink", + "dynamicTableSink" : { + "identifier" : { + "catalogName" : "default_catalog", + "databaseName" : "default_database", + "tableName" : "MySink" + }, + "catalogTable" : { + "connector" : "values", + "schema.0.data-type" : "BIGINT", + "schema.1.name" : "c", + "schema.0.name" : "b", + "schema.1.data-type" : "BIGINT" + } + }, + "inputChangelogMode" : [ "INSERT" ], + "id" : 7, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "b" : "BIGINT" + }, { + "EXPR$1" : "BIGINT" + } ] + }, + "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[b, EXPR$1])" + } ], + "edges" : [ { + "source" : 1, + "target" : 2, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 2, + "target" : 3, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 3, + "target" : 4, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 4, + "target" : 5, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 5, + "target" : 6, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 6, + "target" : 7, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeSessionWindow.out b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeSessionWindow.out new file mode 100644 index 0000000000000..21ddac2f953ad --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeSessionWindow.out @@ -0,0 +1,462 @@ +{ + "flinkVersion" : "", + "nodes" : [ { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTableSourceScan", + "scanTableSource" : { + "identifier" : { + "catalogName" : "default_catalog", + "databaseName" : "default_database", + "tableName" : "MyTable" + }, + "catalogTable" : { + "schema.watermark.0.strategy.expr" : "`rowtime` - INTERVAL '1' SECOND", + "schema.4.expr" : "PROCTIME()", + "schema.0.data-type" : "INT NOT NULL", + "schema.2.name" : "c", + "schema.1.name" : "b", + "schema.4.name" : "proctime", + "schema.1.data-type" : "BIGINT", + "schema.3.data-type" : "TIMESTAMP(3)", + "schema.2.data-type" : "VARCHAR(2147483647)", + "schema.3.name" : "rowtime", + "connector" : "values", + "schema.watermark.0.rowtime" : "rowtime", + "schema.watermark.0.strategy.data-type" : "TIMESTAMP(3)", + "schema.3.expr" : "TO_TIMESTAMP(`c`)", + "schema.4.data-type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL", + "schema.0.name" : "a" + } + }, + "id" : 1, + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT NOT NULL" + }, { + "b" : "BIGINT" + }, { + "c" : "VARCHAR(2147483647)" + } ] + }, + "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c])", + "inputProperties" : [ ] + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : { + "typeName" : "INTEGER", + "nullable" : false + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : { + "typeName" : "BIGINT", + "nullable" : true + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : { + "typeName" : "VARCHAR", + "nullable" : true, + "precision" : 2147483647 + } + }, { + "kind" : "REX_CALL", + "operator" : { + "name" : "TO_TIMESTAMP", + "kind" : "OTHER_FUNCTION", + "syntax" : "FUNCTION" + }, + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : { + "typeName" : "VARCHAR", + "nullable" : true, + "precision" : 2147483647 + } + } ], + "type" : { + "typeName" : "TIMESTAMP", + "nullable" : true, + "precision" : 3 + } + }, { + "kind" : "REX_CALL", + "operator" : { + "name" : "PROCTIME", + "kind" : "OTHER_FUNCTION", + "syntax" : "FUNCTION" + }, + "operands" : [ ], + "type" : { + "timestampKind" : "PROCTIME", + "typeName" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false + } + } ], + "condition" : null, + "id" : 2, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT NOT NULL" + }, { + "b" : "BIGINT" + }, { + "c" : "VARCHAR(2147483647)" + }, { + "rowtime" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "REGULAR" + } + }, { + "proctime" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + } ] + }, + "description" : "Calc(select=[a, b, c, TO_TIMESTAMP(c) AS rowtime, PROCTIME() AS proctime])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWatermarkAssigner", + "watermarkExpr" : { + "kind" : "REX_CALL", + "operator" : { + "name" : "-", + "kind" : "MINUS", + "syntax" : "SPECIAL" + }, + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : { + "typeName" : "TIMESTAMP", + "nullable" : true, + "precision" : 3 + } + }, { + "kind" : "LITERAL", + "value" : 1000, + "type" : { + "typeName" : "INTERVAL_SECOND", + "nullable" : false, + "precision" : 2, + "scale" : 6 + } + } ], + "type" : { + "typeName" : "TIMESTAMP", + "nullable" : true, + "precision" : 3 + } + }, + "rowtimeFieldIndex" : 3, + "id" : 3, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT NOT NULL" + }, { + "b" : "BIGINT" + }, { + "c" : "VARCHAR(2147483647)" + }, { + "rowtime" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "ROWTIME" + } + }, { + "proctime" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + } ] + }, + "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 1000:INTERVAL SECOND)])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : { + "typeName" : "BIGINT", + "nullable" : true + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 4, + "type" : { + "timestampKind" : "PROCTIME", + "typeName" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : { + "typeName" : "INTEGER", + "nullable" : false + } + }, { + "kind" : "REX_CALL", + "operator" : { + "name" : "+", + "kind" : "PLUS", + "syntax" : "BINARY" + }, + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : { + "typeName" : "INTEGER", + "nullable" : false + } + }, { + "kind" : "LITERAL", + "value" : "1", + "type" : { + "typeName" : "INTEGER", + "nullable" : false + } + } ], + "type" : { + "typeName" : "INTEGER", + "nullable" : false + } + } ], + "condition" : null, + "id" : 4, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "b" : "BIGINT" + }, { + "proctime" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + }, { + "a" : "INT NOT NULL" + }, { + "$f3" : "INT NOT NULL" + } ] + }, + "description" : "Calc(select=[b, proctime, a, (a + 1) AS $f3])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecExchange", + "id" : 5, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "HASH", + "keys" : [ 0 ] + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "b" : "BIGINT" + }, { + "proctime" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + }, { + "a" : "INT NOT NULL" + }, { + "$f3" : "INT NOT NULL" + } ] + }, + "description" : "Exchange(distribution=[hash[b]])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonGroupWindowAggregate", + "grouping" : [ 0 ], + "aggCalls" : [ { + "name" : "EXPR$1", + "aggFunction" : { + "name" : "pyFunc", + "kind" : "OTHER_FUNCTION", + "syntax" : "FUNCTION", + "functionKind" : "AGGREGATE", + "instance" : "rO0ABXNyAGRvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnBsYW5uZXIucnVudGltZS51dGlscy5KYXZhVXNlckRlZmluZWRBZ2dGdW5jdGlvbnMkVGVzdFB5dGhvbkFnZ3JlZ2F0ZUZ1bmN0aW9uera6fuZvJ50CAAB4cgBUb3JnLmFwYWNoZS5mbGluay50YWJsZS5wbGFubmVyLnJ1bnRpbWUudXRpbHMuSmF2YVVzZXJEZWZpbmVkQWdnRnVuY3Rpb25zJFdlaWdodGVkQXZnZ1Nq7DPVPgQCAAB4cgAyb3JnLmFwYWNoZS5mbGluay50YWJsZS5mdW5jdGlvbnMuQWdncmVnYXRlRnVuY3Rpb24g1IzcoWgbiQIAAHhyADxvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5JbXBlcmF0aXZlQWdncmVnYXRlRnVuY3Rpb27yV4D2r81spwIAAHhyADRvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5Vc2VyRGVmaW5lZEZ1bmN0aW9uWWgLCLtDDxYCAAB4cA", + "bridging" : true + }, + "argList" : [ 2, 3 ], + "filterArg" : -1, + "distinct" : false, + "approximate" : false, + "ignoreNulls" : false, + "type" : { + "typeName" : "BIGINT", + "nullable" : true + } + } ], + "window" : { + "kind" : "SESSION", + "alias" : { + "name" : "w$", + "type" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + }, + "timeField" : { + "fieldName" : "proctime", + "fieldIndex" : 1, + "inputIndex" : 0, + "fieldType" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + }, + "gap" : "PT10M" + }, + "namedWindowProperties" : [ ], + "generateUpdateBefore" : false, + "needRetraction" : false, + "id" : 6, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "b" : "BIGINT" + }, { + "EXPR$1" : "BIGINT" + } ] + }, + "description" : "PythonGroupWindowAggregate(groupBy=[b], window=[SessionGroupWindow('w$, proctime, 600000)], select=[b, pyFunc(a, $f3) AS EXPR$1])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink", + "dynamicTableSink" : { + "identifier" : { + "catalogName" : "default_catalog", + "databaseName" : "default_database", + "tableName" : "MySink" + }, + "catalogTable" : { + "connector" : "values", + "schema.0.data-type" : "BIGINT", + "schema.1.name" : "c", + "schema.0.name" : "b", + "schema.1.data-type" : "BIGINT" + } + }, + "inputChangelogMode" : [ "INSERT" ], + "id" : 7, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "b" : "BIGINT" + }, { + "EXPR$1" : "BIGINT" + } ] + }, + "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[b, EXPR$1])" + } ], + "edges" : [ { + "source" : 1, + "target" : 2, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 2, + "target" : 3, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 3, + "target" : 4, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 4, + "target" : 5, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 5, + "target" : 6, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 6, + "target" : 7, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeTumbleWindow.out b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeTumbleWindow.out new file mode 100644 index 0000000000000..0308259074a3b --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeTumbleWindow.out @@ -0,0 +1,593 @@ +{ + "flinkVersion" : "", + "nodes" : [ { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTableSourceScan", + "scanTableSource" : { + "identifier" : { + "catalogName" : "default_catalog", + "databaseName" : "default_database", + "tableName" : "MyTable" + }, + "catalogTable" : { + "schema.watermark.0.strategy.expr" : "`rowtime` - INTERVAL '1' SECOND", + "schema.4.expr" : "PROCTIME()", + "schema.0.data-type" : "INT NOT NULL", + "schema.2.name" : "c", + "schema.1.name" : "b", + "schema.4.name" : "proctime", + "schema.1.data-type" : "BIGINT", + "schema.3.data-type" : "TIMESTAMP(3)", + "schema.2.data-type" : "VARCHAR(2147483647)", + "schema.3.name" : "rowtime", + "connector" : "values", + "schema.watermark.0.rowtime" : "rowtime", + "schema.watermark.0.strategy.data-type" : "TIMESTAMP(3)", + "schema.3.expr" : "TO_TIMESTAMP(`c`)", + "schema.4.data-type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL", + "schema.0.name" : "a" + } + }, + "id" : 1, + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT NOT NULL" + }, { + "b" : "BIGINT" + }, { + "c" : "VARCHAR(2147483647)" + } ] + }, + "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c])", + "inputProperties" : [ ] + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : { + "typeName" : "INTEGER", + "nullable" : false + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : { + "typeName" : "BIGINT", + "nullable" : true + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : { + "typeName" : "VARCHAR", + "nullable" : true, + "precision" : 2147483647 + } + }, { + "kind" : "REX_CALL", + "operator" : { + "name" : "TO_TIMESTAMP", + "kind" : "OTHER_FUNCTION", + "syntax" : "FUNCTION" + }, + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : { + "typeName" : "VARCHAR", + "nullable" : true, + "precision" : 2147483647 + } + } ], + "type" : { + "typeName" : "TIMESTAMP", + "nullable" : true, + "precision" : 3 + } + }, { + "kind" : "REX_CALL", + "operator" : { + "name" : "PROCTIME", + "kind" : "OTHER_FUNCTION", + "syntax" : "FUNCTION" + }, + "operands" : [ ], + "type" : { + "timestampKind" : "PROCTIME", + "typeName" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false + } + } ], + "condition" : null, + "id" : 2, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT NOT NULL" + }, { + "b" : "BIGINT" + }, { + "c" : "VARCHAR(2147483647)" + }, { + "rowtime" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "REGULAR" + } + }, { + "proctime" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + } ] + }, + "description" : "Calc(select=[a, b, c, TO_TIMESTAMP(c) AS rowtime, PROCTIME() AS proctime])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWatermarkAssigner", + "watermarkExpr" : { + "kind" : "REX_CALL", + "operator" : { + "name" : "-", + "kind" : "MINUS", + "syntax" : "SPECIAL" + }, + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : { + "typeName" : "TIMESTAMP", + "nullable" : true, + "precision" : 3 + } + }, { + "kind" : "LITERAL", + "value" : 1000, + "type" : { + "typeName" : "INTERVAL_SECOND", + "nullable" : false, + "precision" : 2, + "scale" : 6 + } + } ], + "type" : { + "typeName" : "TIMESTAMP", + "nullable" : true, + "precision" : 3 + } + }, + "rowtimeFieldIndex" : 3, + "id" : 3, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT NOT NULL" + }, { + "b" : "BIGINT" + }, { + "c" : "VARCHAR(2147483647)" + }, { + "rowtime" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "ROWTIME" + } + }, { + "proctime" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + } ] + }, + "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 1000:INTERVAL SECOND)])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : { + "typeName" : "BIGINT", + "nullable" : true + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 4, + "type" : { + "timestampKind" : "PROCTIME", + "typeName" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : { + "typeName" : "INTEGER", + "nullable" : false + } + }, { + "kind" : "REX_CALL", + "operator" : { + "name" : "+", + "kind" : "PLUS", + "syntax" : "BINARY" + }, + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : { + "typeName" : "INTEGER", + "nullable" : false + } + }, { + "kind" : "LITERAL", + "value" : "1", + "type" : { + "typeName" : "INTEGER", + "nullable" : false + } + } ], + "type" : { + "typeName" : "INTEGER", + "nullable" : false + } + } ], + "condition" : null, + "id" : 4, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "b" : "BIGINT" + }, { + "proctime" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + }, { + "a" : "INT NOT NULL" + }, { + "$f3" : "INT NOT NULL" + } ] + }, + "description" : "Calc(select=[b, proctime, a, (a + 1) AS $f3])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecExchange", + "id" : 5, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "HASH", + "keys" : [ 0 ] + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "b" : "BIGINT" + }, { + "proctime" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + }, { + "a" : "INT NOT NULL" + }, { + "$f3" : "INT NOT NULL" + } ] + }, + "description" : "Exchange(distribution=[hash[b]])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonGroupWindowAggregate", + "grouping" : [ 0 ], + "aggCalls" : [ { + "name" : "EXPR$2", + "aggFunction" : { + "name" : "pyFunc", + "kind" : "OTHER_FUNCTION", + "syntax" : "FUNCTION", + "functionKind" : "AGGREGATE", + "instance" : "rO0ABXNyAGRvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnBsYW5uZXIucnVudGltZS51dGlscy5KYXZhVXNlckRlZmluZWRBZ2dGdW5jdGlvbnMkVGVzdFB5dGhvbkFnZ3JlZ2F0ZUZ1bmN0aW9uera6fuZvJ50CAAB4cgBUb3JnLmFwYWNoZS5mbGluay50YWJsZS5wbGFubmVyLnJ1bnRpbWUudXRpbHMuSmF2YVVzZXJEZWZpbmVkQWdnRnVuY3Rpb25zJFdlaWdodGVkQXZnZ1Nq7DPVPgQCAAB4cgAyb3JnLmFwYWNoZS5mbGluay50YWJsZS5mdW5jdGlvbnMuQWdncmVnYXRlRnVuY3Rpb24g1IzcoWgbiQIAAHhyADxvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5JbXBlcmF0aXZlQWdncmVnYXRlRnVuY3Rpb27yV4D2r81spwIAAHhyADRvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5Vc2VyRGVmaW5lZEZ1bmN0aW9uWWgLCLtDDxYCAAB4cA", + "bridging" : true + }, + "argList" : [ 2, 3 ], + "filterArg" : -1, + "distinct" : false, + "approximate" : false, + "ignoreNulls" : false, + "type" : { + "typeName" : "BIGINT", + "nullable" : true + } + } ], + "window" : { + "kind" : "TUMBLING", + "alias" : { + "name" : "w$", + "type" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + }, + "timeField" : { + "fieldName" : "proctime", + "fieldIndex" : 1, + "inputIndex" : 0, + "fieldType" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + }, + "isTimeWindow" : true, + "size" : "PT15M" + }, + "namedWindowProperties" : [ { + "name" : "w$start", + "property" : { + "kind" : "WindowStart", + "reference" : { + "name" : "w$", + "type" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + } + } + }, { + "name" : "w$end", + "property" : { + "kind" : "WindowEnd", + "reference" : { + "name" : "w$", + "type" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + } + } + }, { + "name" : "w$proctime", + "property" : { + "kind" : "Proctime", + "reference" : { + "name" : "w$", + "type" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + } + } + } ], + "generateUpdateBefore" : false, + "needRetraction" : false, + "id" : 6, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "b" : "BIGINT" + }, { + "EXPR$2" : "BIGINT" + }, { + "w$start" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "REGULAR" + } + }, { + "w$end" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "REGULAR" + } + }, { + "w$proctime" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "PROCTIME" + } + } ] + }, + "description" : "PythonGroupWindowAggregate(groupBy=[b], window=[TumblingGroupWindow('w$, proctime, 900000)], properties=[w$start, w$end, w$proctime], select=[b, pyFunc(a, $f3) AS EXPR$2, start('w$) AS w$start, end('w$) AS w$end, proctime('w$) AS w$proctime])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : { + "typeName" : "BIGINT", + "nullable" : true + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : { + "typeName" : "TIMESTAMP", + "nullable" : false, + "precision" : 3 + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : { + "typeName" : "BIGINT", + "nullable" : true + } + } ], + "condition" : null, + "id" : 7, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "b" : "BIGINT" + }, { + "window_end" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "REGULAR" + } + }, { + "EXPR$2" : "BIGINT" + } ] + }, + "description" : "Calc(select=[b, w$end AS window_end, EXPR$2])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink", + "dynamicTableSink" : { + "identifier" : { + "catalogName" : "default_catalog", + "databaseName" : "default_database", + "tableName" : "MySink" + }, + "catalogTable" : { + "schema.2.data-type" : "BIGINT", + "connector" : "values", + "schema.0.data-type" : "BIGINT", + "schema.2.name" : "c", + "schema.1.name" : "window_end", + "schema.0.name" : "b", + "schema.1.data-type" : "TIMESTAMP(3)" + } + }, + "inputChangelogMode" : [ "INSERT" ], + "id" : 8, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "b" : "BIGINT" + }, { + "window_end" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "REGULAR" + } + }, { + "EXPR$2" : "BIGINT" + } ] + }, + "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[b, window_end, EXPR$2])" + } ], + "edges" : [ { + "source" : 1, + "target" : 2, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 2, + "target" : 3, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 3, + "target" : 4, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 4, + "target" : 5, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 5, + "target" : 6, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 6, + "target" : 7, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 7, + "target" : 8, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file