diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/GroupAggregateHarnessTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/GroupAggregateHarnessTest.scala index 0b8ada6334b5f..969473a295b87 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/GroupAggregateHarnessTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/GroupAggregateHarnessTest.scala @@ -19,16 +19,20 @@ package org.apache.flink.table.planner.runtime.harness import org.apache.flink.api.scala._ -import org.apache.flink.table.api.{EnvironmentSettings, _} +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness import org.apache.flink.table.api.bridge.scala._ import org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl import org.apache.flink.table.api.config.ExecutionConfigOptions.{TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, TABLE_EXEC_MINIBATCH_ENABLED, TABLE_EXEC_MINIBATCH_SIZE} import org.apache.flink.table.api.config.OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY +import org.apache.flink.table.api.{EnvironmentSettings, _} +import org.apache.flink.table.data.RowData import org.apache.flink.table.planner.runtime.utils.StreamingWithMiniBatchTestBase.{MiniBatchMode, MiniBatchOff, MiniBatchOn} import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.{HEAP_BACKEND, ROCKSDB_BACKEND, StateBackendMode} import org.apache.flink.table.planner.runtime.utils.UserDefinedFunctionTestUtils.CountNullNonNull +import org.apache.flink.table.runtime.typeutils.RowDataSerializer import org.apache.flink.table.runtime.util.RowDataHarnessAssertor import org.apache.flink.table.runtime.util.StreamRecordUtils.binaryRecord +import org.apache.flink.table.types.logical.LogicalType import org.apache.flink.types.Row import org.apache.flink.types.RowKind._ @@ -46,7 +50,7 @@ import scala.collection.mutable @RunWith(classOf[Parameterized]) class GroupAggregateHarnessTest(mode: StateBackendMode, miniBatch: MiniBatchMode) - extends HarnessTestBase(mode) { + extends HarnessTestBase(mode) { @Before override def before(): Unit = { @@ -100,7 +104,7 @@ class GroupAggregateHarnessTest(mode: StateBackendMode, miniBatch: MiniBatchMode testHarness.setStateTtlProcessingTime(1) // insertion - testHarness.processElement(binaryRecord(INSERT,"aaa", 1L: JLong)) + testHarness.processElement(binaryRecord(INSERT, "aaa", 1L: JLong)) expectedOutput.add(binaryRecord(INSERT, "aaa", 1L: JLong)) // insertion @@ -144,7 +148,7 @@ class GroupAggregateHarnessTest(mode: StateBackendMode, miniBatch: MiniBatchMode expectedOutput.add(binaryRecord(INSERT, "eee", 6L: JLong)) // retract - testHarness.processElement(binaryRecord(INSERT,"aaa", 7L: JLong)) + testHarness.processElement(binaryRecord(INSERT, "aaa", 7L: JLong)) expectedOutput.add(binaryRecord(UPDATE_BEFORE, "aaa", 9L: JLong)) expectedOutput.add(binaryRecord(UPDATE_AFTER, "aaa", 16L: JLong)) testHarness.processElement(binaryRecord(INSERT, "bbb", 3L: JLong)) @@ -160,28 +164,8 @@ class GroupAggregateHarnessTest(mode: StateBackendMode, miniBatch: MiniBatchMode @Test def testAggregationWithDistinct(): Unit = { - val data = new mutable.MutableList[(String, String, Long)] - val t = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c) - tEnv.createTemporaryView("T", t) - tEnv.createTemporarySystemFunction("CntNullNonNull", new CountNullNonNull) - - val sql = - """ - |SELECT a, COUNT(DISTINCT b), CntNullNonNull(DISTINCT b), COUNT(*), SUM(c) - |FROM T - |GROUP BY a - """.stripMargin - val t1 = tEnv.sqlQuery(sql) - - tEnv.getConfig.setIdleStateRetention(Duration.ofSeconds(2)) - val testHarness = createHarnessTester(t1.toRetractStream[Row], "GroupAggregate") - val assertor = new RowDataHarnessAssertor( - Array( - DataTypes.STRING().getLogicalType, - DataTypes.BIGINT().getLogicalType, - DataTypes.STRING().getLogicalType, - DataTypes.BIGINT().getLogicalType, - DataTypes.BIGINT().getLogicalType)) + val (testHarness, outputTypes) = createAggregationWithDistinct + val assertor = new RowDataHarnessAssertor(outputTypes) testHarness.open() @@ -191,7 +175,7 @@ class GroupAggregateHarnessTest(mode: StateBackendMode, miniBatch: MiniBatchMode testHarness.setStateTtlProcessingTime(1) // insertion - testHarness.processElement(binaryRecord(INSERT,"aaa", "a1", 1L: JLong)) + testHarness.processElement(binaryRecord(INSERT, "aaa", "a1", 1L: JLong)) expectedOutput.add(binaryRecord(INSERT, "aaa", 1L: JLong, "1|0", 1L: JLong, 1L: JLong)) // insertion @@ -240,6 +224,41 @@ class GroupAggregateHarnessTest(mode: StateBackendMode, miniBatch: MiniBatchMode testHarness.close() } + private def createAggregationWithDistinct() + : (KeyedOneInputStreamOperatorTestHarness[RowData, RowData, RowData], Array[LogicalType]) = { + val data = new mutable.MutableList[(String, String, Long)] + val t = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c) + tEnv.createTemporaryView("T", t) + tEnv.createTemporarySystemFunction("CntNullNonNull", new CountNullNonNull) + + val sql = + """ + |SELECT a, COUNT(DISTINCT b), CntNullNonNull(DISTINCT b), COUNT(*), SUM(c) + |FROM T + |GROUP BY a + """.stripMargin + val t1 = tEnv.sqlQuery(sql) + + tEnv.getConfig.setIdleStateRetention(Duration.ofSeconds(2)) + val testHarness = createHarnessTester(t1.toRetractStream[Row], "GroupAggregate") + val outputTypes = Array( + DataTypes.STRING().getLogicalType, + DataTypes.BIGINT().getLogicalType, + DataTypes.STRING().getLogicalType, + DataTypes.BIGINT().getLogicalType, + DataTypes.BIGINT().getLogicalType) + + (testHarness, outputTypes) + } + + @Test + def testCloseWithoutOpen(): Unit = { + val (testHarness, outputType) = createAggregationWithDistinct + testHarness.setup(new RowDataSerializer(outputType: _*)) + // simulate a failover after a failed task open(e.g., stuck on initializing) + // expect no exception happens + testHarness.close() + } } object GroupAggregateHarnessTest { diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/HarnessTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/HarnessTestBase.scala index 191ba5f7f1a14..d7dc473963059 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/HarnessTestBase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/HarnessTestBase.scala @@ -28,9 +28,9 @@ import org.apache.flink.runtime.state.StateBackend import org.apache.flink.runtime.state.memory.MemoryStateBackend import org.apache.flink.streaming.api.operators.OneInputStreamOperator import org.apache.flink.streaming.api.scala.DataStream -import org.apache.flink.streaming.api.transformations.OneInputTransformation +import org.apache.flink.streaming.api.transformations.{OneInputTransformation, PartitionTransformation} import org.apache.flink.streaming.api.watermark.Watermark -import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness +import org.apache.flink.streaming.util.{KeyedOneInputStreamOperatorTestHarness, OneInputStreamOperatorTestHarness} import org.apache.flink.table.data.RowData import org.apache.flink.table.planner.JLong import org.apache.flink.table.planner.runtime.utils.StreamingTestBase @@ -86,6 +86,19 @@ class HarnessTestBase(mode: StateBackendMode) extends StreamingTestBase { .asInstanceOf[KeyedOneInputStreamOperatorTestHarness[RowData, RowData, RowData]] } + def createHarnessTesterForNoState( + ds: DataStream[_], + prefixOperatorName: String) + : OneInputStreamOperatorTestHarness[RowData, RowData] = { + val transformation = extractExpectedTransformation( + ds.javaStream.getTransformation, + prefixOperatorName) + val processOperator = transformation.getOperator + .asInstanceOf[OneInputStreamOperator[Any, Any]] + new OneInputStreamOperatorTestHarness(processOperator) + .asInstanceOf[OneInputStreamOperatorTestHarness[RowData, RowData]] + } + private def extractExpectedTransformation( t: Transformation[_], prefixOperatorName: String): OneInputTransformation[_, _] = { @@ -96,6 +109,8 @@ class HarnessTestBase(mode: StateBackendMode) extends StreamingTestBase { } else { extractExpectedTransformation(one.getInputs.get(0), prefixOperatorName) } + case p: PartitionTransformation[_] => + extractExpectedTransformation(p.getInputs.get(0), prefixOperatorName) case _ => throw new Exception( s"Can not find the expected $prefixOperatorName transformation") } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/OverAggregateHarnessTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/OverAggregateHarnessTest.scala index 191a89545ac8f..7c1f566f02a9f 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/OverAggregateHarnessTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/OverAggregateHarnessTest.scala @@ -20,14 +20,17 @@ package org.apache.flink.table.planner.runtime.harness import org.apache.flink.api.common.time.Time import org.apache.flink.api.scala._ -import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.runtime.streamrecord.StreamRecord +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness import org.apache.flink.table.api._ import org.apache.flink.table.api.bridge.scala._ import org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl +import org.apache.flink.table.data.RowData import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode +import org.apache.flink.table.runtime.typeutils.RowDataSerializer import org.apache.flink.table.runtime.util.RowDataHarnessAssertor import org.apache.flink.table.runtime.util.StreamRecordUtils.{binaryrow, row} +import org.apache.flink.table.types.logical.LogicalType import org.apache.flink.types.Row import org.junit.runner.RunWith @@ -52,34 +55,8 @@ class OverAggregateHarnessTest(mode: StateBackendMode) extends HarnessTestBase(m @Test def testProcTimeBoundedRowsOver(): Unit = { - - val data = new mutable.MutableList[(Long, String, Long)] - val t = env.fromCollection(data).toTable(tEnv, 'currtime, 'b, 'c, 'proctime.proctime) - tEnv.registerTable("T", t) - - val sql = - """ - |SELECT currtime, b, c, - | min(c) OVER - | (PARTITION BY b ORDER BY proctime ROWS BETWEEN 1 PRECEDING AND CURRENT ROW), - | max(c) OVER - | (PARTITION BY b ORDER BY proctime ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) - |FROM T - """.stripMargin - val t1 = tEnv.sqlQuery(sql) - - tEnv.getConfig.setIdleStateRetentionTime(Time.seconds(2), Time.seconds(4)) - val testHarness = createHarnessTester(t1.toAppendStream[Row], "OverAggregate") - val assertor = new RowDataHarnessAssertor( - Array( - DataTypes.BIGINT().getLogicalType, - DataTypes.STRING().getLogicalType, - DataTypes.BIGINT().getLogicalType, - DataTypes.BIGINT().getLogicalType, - DataTypes.BIGINT().getLogicalType, - DataTypes.BIGINT().getLogicalType, - DataTypes.BIGINT().getLogicalType)) - + val (testHarness, outputType) = createProcTimeBoundedRowsOver + val assertor = new RowDataHarnessAssertor(outputType) testHarness.open() // register cleanup timer with 3001 @@ -161,6 +138,36 @@ class OverAggregateHarnessTest(mode: StateBackendMode) extends HarnessTestBase(m testHarness.close() } + private def createProcTimeBoundedRowsOver() + : (KeyedOneInputStreamOperatorTestHarness[RowData, RowData, RowData], Array[LogicalType]) = { + val data = new mutable.MutableList[(Long, String, Long)] + val t = env.fromCollection(data).toTable(tEnv, 'currtime, 'b, 'c, 'proctime.proctime) + tEnv.registerTable("T", t) + + val sql = + """ + |SELECT currtime, b, c, + | min(c) OVER + | (PARTITION BY b ORDER BY proctime ROWS BETWEEN 1 PRECEDING AND CURRENT ROW), + | max(c) OVER + | (PARTITION BY b ORDER BY proctime ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) + |FROM T + """.stripMargin + val t1 = tEnv.sqlQuery(sql) + + tEnv.getConfig.setIdleStateRetentionTime(Time.seconds(2), Time.seconds(4)) + val testHarness = createHarnessTester(t1.toAppendStream[Row], "OverAggregate") + val outputType = Array( + DataTypes.BIGINT().getLogicalType, + DataTypes.STRING().getLogicalType, + DataTypes.BIGINT().getLogicalType, + DataTypes.BIGINT().getLogicalType, + DataTypes.BIGINT().getLogicalType, + DataTypes.BIGINT().getLogicalType, + DataTypes.BIGINT().getLogicalType) + (testHarness, outputType) + } + /** * NOTE: all elements at the same proc timestamp have the same value per key */ @@ -940,4 +947,12 @@ class OverAggregateHarnessTest(mode: StateBackendMode) extends HarnessTestBase(m assertor.assertOutputEqualsSorted("result mismatch", expectedOutput, result) testHarness.close() } + + @Test + def testCloseWithoutOpen(): Unit = { + val (testHarness, outputType) = createProcTimeBoundedRowsOver + testHarness.setup(new RowDataSerializer(outputType: _*)) + // simulate a failover after a failed task open, expect no exception happens + testHarness.open() + } } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/TableAggregateHarnessTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/TableAggregateHarnessTest.scala index 32b2688a466f9..d9dcf7fbc5745 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/TableAggregateHarnessTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/TableAggregateHarnessTest.scala @@ -18,23 +18,27 @@ package org.apache.flink.table.planner.runtime.harness -import java.lang.{Integer => JInt} -import java.util.concurrent.ConcurrentLinkedQueue import org.apache.flink.api.scala._ -import org.apache.flink.table.api._ +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness import org.apache.flink.table.api.bridge.scala._ import org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl -import org.apache.flink.table.api.EnvironmentSettings +import org.apache.flink.table.api.{EnvironmentSettings, _} +import org.apache.flink.table.data.RowData import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode import org.apache.flink.table.planner.utils.{Top3WithMapView, Top3WithRetractInput} +import org.apache.flink.table.runtime.typeutils.RowDataSerializer import org.apache.flink.table.runtime.util.RowDataHarnessAssertor import org.apache.flink.table.runtime.util.StreamRecordUtils.{deleteRecord, insertRecord} +import org.apache.flink.table.types.logical.LogicalType import org.apache.flink.types.Row + import org.junit.runner.RunWith import org.junit.runners.Parameterized import org.junit.{Before, Test} +import java.lang.{Integer => JInt} import java.time.Duration +import java.util.concurrent.ConcurrentLinkedQueue import scala.collection.mutable @@ -117,22 +121,8 @@ class TableAggregateHarnessTest(mode: StateBackendMode) extends HarnessTestBase( @Test def testTableAggregateWithRetractInput(): Unit = { - val top3 = new Top3WithRetractInput - tEnv.registerFunction("top3", top3) - val source = env.fromCollection(data).toTable(tEnv, 'a, 'b) - val resultTable = source - .groupBy('a) - .select('b.sum as 'b) - .flatAggregate(top3('b) as ('b1, 'b2)) - .select('b1, 'b2) - - tEnv.getConfig.setIdleStateRetention(Duration.ofSeconds(2)) - val testHarness = createHarnessTester( - resultTable.toRetractStream[Row], "GroupTableAggregate") - val assertor = new RowDataHarnessAssertor( - Array( - DataTypes.INT().getLogicalType, - DataTypes.INT().getLogicalType)) + val (testHarness, outputTypes) = createTableAggregateWithRetract + val assertor = new RowDataHarnessAssertor(outputTypes) testHarness.open() val expectedOutput = new ConcurrentLinkedQueue[Object]() @@ -170,4 +160,32 @@ class TableAggregateHarnessTest(mode: StateBackendMode) extends HarnessTestBase( assertor.assertOutputEqualsSorted("result mismatch", expectedOutput, result) testHarness.close() } + + private def createTableAggregateWithRetract() + : (KeyedOneInputStreamOperatorTestHarness[RowData, RowData, RowData], Array[LogicalType]) = { + val top3 = new Top3WithRetractInput + tEnv.registerFunction("top3", top3) + val source = env.fromCollection(data).toTable(tEnv, 'a, 'b) + val resultTable = source + .groupBy('a) + .select('b.sum as 'b) + .flatAggregate(top3('b) as('b1, 'b2)) + .select('b1, 'b2) + + tEnv.getConfig.setIdleStateRetention(Duration.ofSeconds(2)) + val testHarness = createHarnessTester( + resultTable.toRetractStream[Row], "GroupTableAggregate") + val outputTypes = Array( + DataTypes.INT().getLogicalType, + DataTypes.INT().getLogicalType) + (testHarness, outputTypes) + } + + @Test + def testCloseWithoutOpen(): Unit = { + val (testHarness, outputTypes) = createTableAggregateWithRetract + testHarness.setup(new RowDataSerializer(outputTypes: _*)) + // simulate a failover after a failed task open, expect no exception happens + testHarness.close() + } } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/WindowAggregateHarnessTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/WindowAggregateHarnessTest.scala index 0bb81a78e8790..6587af3cc5038 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/WindowAggregateHarnessTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/WindowAggregateHarnessTest.scala @@ -19,18 +19,21 @@ package org.apache.flink.table.planner.runtime.harness import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.scala.DataStream import org.apache.flink.streaming.runtime.streamrecord.StreamRecord import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness import org.apache.flink.table.api._ import org.apache.flink.table.api.bridge.scala._ +import org.apache.flink.table.api.config.OptimizerConfigOptions import org.apache.flink.table.data.{RowData, TimestampData} import org.apache.flink.table.planner.factories.TestValuesTableFactory import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.{HEAP_BACKEND, ROCKSDB_BACKEND, StateBackendMode} import org.apache.flink.table.planner.runtime.utils.TestData +import org.apache.flink.table.runtime.typeutils.RowDataSerializer import org.apache.flink.table.runtime.util.RowDataHarnessAssertor import org.apache.flink.table.runtime.util.StreamRecordUtils.binaryRecord import org.apache.flink.table.runtime.util.TimeWindowUtil.toUtcTimestampMills - +import org.apache.flink.table.types.logical.LogicalType import org.apache.flink.types.Row import org.apache.flink.types.RowKind.INSERT @@ -84,6 +87,32 @@ class WindowAggregateHarnessTest(backend: StateBackendMode, shiftTimeZone: ZoneI */ @Test def testProcessingTimeTumbleWindow(): Unit = { + val (testHarness, outputTypes) = createProcessingTimeTumbleWindowOperator + val assertor = new RowDataHarnessAssertor(outputTypes) + + testHarness.open() + ingestData(testHarness) + val expected = new ConcurrentLinkedQueue[Object]() + expected.add(record("a", 4L, 5.0D, 2L, + localMills("1970-01-01T00:00:00"), localMills("1970-01-01T00:00:05"))) + expected.add(record("a", 1L, null, 1L, + localMills("1970-01-01T00:00:05"), localMills("1970-01-01T00:00:10"))) + expected.add(record("b", 2L, 6.0D, 2L, + localMills("1970-01-01T00:00:05"), localMills("1970-01-01T00:00:10"))) + expected.add(record("b", 1L, 4.0D, 1L, + localMills("1970-01-01T00:00:15"), localMills("1970-01-01T00:00:20"))) + expected.add(record("b", 1L, 3.0D, 1L, + localMills("1970-01-01T00:00:30"), localMills("1970-01-01T00:00:35"))) + expected.add(record(null, 1L, 7.0D, 0L, + localMills("1970-01-01T00:00:30"), localMills("1970-01-01T00:00:35"))) + + assertor.assertOutputEqualsSorted("result mismatch", expected, testHarness.getOutput) + + testHarness.close() + } + + private def createProcessingTimeTumbleWindowOperator() + : (KeyedOneInputStreamOperatorTestHarness[RowData, RowData, RowData], Array[LogicalType]) = { val sql = """ |SELECT @@ -100,34 +129,15 @@ class WindowAggregateHarnessTest(backend: StateBackendMode, shiftTimeZone: ZoneI val t1 = tEnv.sqlQuery(sql) val testHarness = createHarnessTester(t1.toAppendStream[Row], "WindowAggregate") // window aggregate put window properties at the end of aggs - val assertor = new RowDataHarnessAssertor( + val outputTypes = Array( DataTypes.STRING().getLogicalType, DataTypes.BIGINT().getLogicalType, DataTypes.DOUBLE().getLogicalType, DataTypes.BIGINT().getLogicalType, DataTypes.TIMESTAMP_LTZ(3).getLogicalType, - DataTypes.TIMESTAMP_LTZ(3).getLogicalType)) - - testHarness.open() - ingestData(testHarness) - val expected = new ConcurrentLinkedQueue[Object]() - expected.add(record("a", 4L, 5.0D, 2L, - localMills("1970-01-01T00:00:00"), localMills("1970-01-01T00:00:05"))) - expected.add(record("a", 1L, null, 1L, - localMills("1970-01-01T00:00:05"), localMills("1970-01-01T00:00:10"))) - expected.add(record("b", 2L, 6.0D, 2L, - localMills("1970-01-01T00:00:05"), localMills("1970-01-01T00:00:10"))) - expected.add(record("b", 1L, 4.0D, 1L, - localMills("1970-01-01T00:00:15"), localMills("1970-01-01T00:00:20"))) - expected.add(record("b", 1L, 3.0D, 1L, - localMills("1970-01-01T00:00:30"), localMills("1970-01-01T00:00:35"))) - expected.add(record(null, 1L, 7.0D, 0L, - localMills("1970-01-01T00:00:30"), localMills("1970-01-01T00:00:35"))) - - assertor.assertOutputEqualsSorted("result mismatch", expected, testHarness.getOutput) - - testHarness.close() + DataTypes.TIMESTAMP_LTZ(3).getLogicalType) + (testHarness, outputTypes) } /** @@ -266,6 +276,80 @@ class WindowAggregateHarnessTest(backend: StateBackendMode, shiftTimeZone: ZoneI testHarness.close() } + @Test + def testCloseWithoutOpen(): Unit = { + val (testHarness, outputTypes) = createProcessingTimeTumbleWindowOperator + testHarness.setup(new RowDataSerializer(outputTypes: _*)) + // simulate a failover after a failed task open, expect no exception happens + testHarness.close() + } + + /** + * Processing time window doesn't support two-phase, so add a single two-phase test. + */ + @Test + def testTwoPhaseWindowAggregateCloseWithoutOpen(): Unit = { + val timestampDataId = TestValuesTableFactory.registerData(TestData.windowDataWithTimestamp) + tEnv.executeSql( + s""" + |CREATE TABLE T2 ( + | `ts` STRING, + | `int` INT, + | `double` DOUBLE, + | `float` FLOAT, + | `bigdec` DECIMAL(10, 2), + | `string` STRING, + | `name` STRING, + | `rowtime` AS + | TO_TIMESTAMP(`ts`), + | WATERMARK for `rowtime` AS `rowtime` - INTERVAL '1' SECOND + |) WITH ( + | 'connector' = 'values', + | 'data-id' = '${timestampDataId}', + | 'failing-source' = 'false' + |) + |""".stripMargin) + + tEnv.getConfig.getConfiguration.setString( + OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "TWO_PHASE") + + val sql = + """ + |SELECT + | `name`, + | window_start, + | window_end, + | COUNT(*), + | MAX(`double`), + | COUNT(DISTINCT `string`) + |FROM TABLE( + | TUMBLE(TABLE T2, DESCRIPTOR(rowtime), INTERVAL '5' SECOND)) + |GROUP BY `name`, window_start, window_end + """.stripMargin + val t1 = tEnv.sqlQuery(sql) + val stream: DataStream[Row] = t1.toAppendStream[Row] + + val testHarness = createHarnessTesterForNoState(stream, "LocalWindowAggregate") + // window aggregate put window properties at the end of aggs + val outputTypes = Array( + DataTypes.STRING().getLogicalType, + DataTypes.BIGINT().getLogicalType, + DataTypes.DOUBLE().getLogicalType, + DataTypes.BIGINT().getLogicalType, + DataTypes.TIMESTAMP_LTZ(3).getLogicalType, + DataTypes.TIMESTAMP_LTZ(3).getLogicalType) + testHarness.setup(new RowDataSerializer(outputTypes: _*)) + + // simulate a failover after a failed task open, expect no exception happens + testHarness.close() + + val testHarness1 = createHarnessTester(stream, "GlobalWindowAggregate") + testHarness1.setup(new RowDataSerializer(outputTypes: _*)) + + // simulate a failover after a failed task open, expect no exception happens + testHarness1.close() + } + /** * Ingests testing data, the input schema is [name, double, string, proctime]. * We follow the test data in [[TestData.windowDataWithTimestamp]] to have the same produced diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/LocalSlicingWindowAggOperator.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/LocalSlicingWindowAggOperator.java index b044db7b430d6..63f184a423053 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/LocalSlicingWindowAggOperator.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/LocalSlicingWindowAggOperator.java @@ -143,7 +143,9 @@ public void close() throws Exception { super.close(); collector = null; functionsClosed = true; - windowBuffer.close(); + if (windowBuffer != null) { + windowBuffer.close(); + } } @Override diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/bundle/AbstractMapBundleOperator.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/bundle/AbstractMapBundleOperator.java index 4d10646e5a28d..eba6e326d5931 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/bundle/AbstractMapBundleOperator.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/bundle/AbstractMapBundleOperator.java @@ -131,7 +131,7 @@ public void processElement(StreamRecord element) throws Exception { @Override public void finishBundle() throws Exception { - if (!bundle.isEmpty()) { + if (bundle != null && !bundle.isEmpty()) { numOfElements = 0; function.finishBundle(bundle, collector); bundle.clear();