diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlCumulateTableFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlCumulateTableFunction.java index f5e6c56b1d2bb..82eb710316079 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlCumulateTableFunction.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlCumulateTableFunction.java @@ -57,6 +57,9 @@ public boolean checkOperandTypes(SqlCallBinding callBinding, boolean throwOnFail if (!checkIntervalOperands(callBinding, 2)) { return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure); } + if (callBinding.getOperandCount() == 5) { + return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure); + } // check time attribute return throwExceptionOrReturnFalse( checkTimeColumnDescriptorOperand(callBinding, 1), throwOnFailure); @@ -66,7 +69,7 @@ public boolean checkOperandTypes(SqlCallBinding callBinding, boolean throwOnFail public String getAllowedSignatures(SqlOperator op, String opName) { return opName + "(TABLE table_name, DESCRIPTOR(timecol), " - + "datetime interval, datetime interval[, datetime interval])"; + + "datetime interval, datetime interval)"; } } } diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlHopTableFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlHopTableFunction.java index 048a96342df9a..1d2a34fc1b082 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlHopTableFunction.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlHopTableFunction.java @@ -57,6 +57,9 @@ public boolean checkOperandTypes(SqlCallBinding callBinding, boolean throwOnFail if (!checkIntervalOperands(callBinding, 2)) { return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure); } + if (callBinding.getOperandCount() == 5) { + return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure); + } // check time attribute return throwExceptionOrReturnFalse( checkTimeColumnDescriptorOperand(callBinding, 1), throwOnFailure); @@ -66,7 +69,7 @@ public boolean checkOperandTypes(SqlCallBinding callBinding, boolean throwOnFail public String getAllowedSignatures(SqlOperator op, String opName) { return opName + "(TABLE table_name, DESCRIPTOR(timecol), " - + "datetime interval, datetime interval[, datetime interval])"; + + "datetime interval, datetime interval)"; } } } diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlTumbleTableFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlTumbleTableFunction.java index b0f3cb4004ba3..c3149fd777ec9 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlTumbleTableFunction.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlTumbleTableFunction.java @@ -55,6 +55,9 @@ public boolean checkOperandTypes(SqlCallBinding callBinding, boolean throwOnFail if (!checkIntervalOperands(callBinding, 2)) { return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure); } + if (callBinding.getOperandCount() == 4) { + return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure); + } // check time attribute return throwExceptionOrReturnFalse( checkTimeColumnDescriptorOperand(callBinding, 1), throwOnFailure); @@ -62,9 +65,7 @@ public boolean checkOperandTypes(SqlCallBinding callBinding, boolean throwOnFail @Override public String getAllowedSignatures(SqlOperator op, String opName) { - return opName - + "(TABLE table_name, DESCRIPTOR(timecol), datetime interval" - + "[, datetime interval])"; + return opName + "(TABLE table_name, DESCRIPTOR(timecol), datetime interval)"; } } } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/WindowTableFunctionTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/WindowTableFunctionTest.scala index d3f93c8be8742..640fade4a4e9e 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/WindowTableFunctionTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/WindowTableFunctionTest.scala @@ -140,4 +140,59 @@ class WindowTableFunctionTest extends TableTestBase { util.verifyExplain(sql) } + @Test + def testInvalidTumbleParameters(): Unit = { + val sql = + """ + |SELECT * + |FROM TABLE(TUMBLE( + | TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE, INTERVAL '5' MINUTE)) + |""".stripMargin + + thrown.expectMessage("Supported form(s): " + + "TUMBLE(TABLE table_name, DESCRIPTOR(timecol), datetime interval)") + thrown.expect(classOf[ValidationException]) + util.verifyExplain(sql) + } + + @Test + def testInvalidHopParameters(): Unit = { + val sql = + """ + |SELECT * + |FROM TABLE( + | HOP( + | TABLE MyTable, + | DESCRIPTOR(rowtime), + | INTERVAL '1' MINUTE, + | INTERVAL '15' MINUTE, + | INTERVAL '5' MINUTE)) + |""".stripMargin + + thrown.expectMessage("Supported form(s): " + + "HOP(TABLE table_name, DESCRIPTOR(timecol), datetime interval, datetime interval)") + thrown.expect(classOf[ValidationException]) + util.verifyExplain(sql) + } + + @Test + def testInvalidCumulateParameters(): Unit = { + val sql = + """ + |SELECT * + |FROM TABLE( + | CUMULATE( + | TABLE MyTable, + | DESCRIPTOR(rowtime), + | INTERVAL '1' MINUTE, + | INTERVAL '15' MINUTE, + | INTERVAL '5' MINUTE)) + |""".stripMargin + + thrown.expectMessage("Supported form(s): " + + "CUMULATE(TABLE table_name, DESCRIPTOR(timecol), datetime interval, datetime interval)") + thrown.expect(classOf[ValidationException]) + util.verifyExplain(sql) + } + }