Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ package org.apache.flink.table.planner.calcite
import org.apache.flink.sql.parser.SqlProperty
import org.apache.flink.sql.parser.dml.RichSqlInsert
import org.apache.flink.table.api.ValidationException
import org.apache.flink.table.planner.calcite.PreValidateReWriter.appendPartitionAndNullsProjects
import org.apache.flink.table.planner.calcite.PreValidateReWriter.{appendPartitionAndNullsProjects, notSupported}
import org.apache.flink.table.planner.plan.schema.{CatalogSourceTable, FlinkPreparingTableBase, LegacyCatalogSourceTable}
import org.apache.flink.util.Preconditions.checkArgument

import org.apache.calcite.plan.RelOptTable
import org.apache.calcite.prepare.CalciteCatalogReader
Expand All @@ -33,7 +34,7 @@ import org.apache.calcite.sql.fun.SqlStdOperatorTable
import org.apache.calcite.sql.parser.SqlParserPos
import org.apache.calcite.sql.util.SqlBasicVisitor
import org.apache.calcite.sql.validate.{SqlValidatorException, SqlValidatorTable, SqlValidatorUtil}
import org.apache.calcite.sql.{SqlCall, SqlIdentifier, SqlKind, SqlLiteral, SqlNode, SqlNodeList, SqlSelect, SqlUtil}
import org.apache.calcite.sql.{SqlCall, SqlIdentifier, SqlKind, SqlLiteral, SqlNode, SqlNodeList, SqlOrderBy, SqlSelect, SqlUtil}
import org.apache.calcite.util.Static.RESOURCE

import java.util
Expand All @@ -50,24 +51,26 @@ class PreValidateReWriter(
call match {
case r: RichSqlInsert
if r.getStaticPartitions.nonEmpty || r.getTargetColumnList != null => r.getSource match {
case select: SqlSelect =>
appendPartitionAndNullsProjects(r, validator, typeFactory, select, r.getStaticPartitions)
case values: SqlCall if values.getKind == SqlKind.VALUES =>
val newSource = appendPartitionAndNullsProjects(r, validator, typeFactory, values,
r.getStaticPartitions)
case call: SqlCall =>
val newSource = appendPartitionAndNullsProjects(
r, validator, typeFactory, call, r.getStaticPartitions)
r.setOperand(2, newSource)
case source =>
throw new ValidationException(
s"INSERT INTO <table> PARTITION [(COLUMN LIST)] statement only support "
+ s"SELECT and VALUES clause for now, '$source' is not supported yet.")
case source => throw new ValidationException(notSupported(source))
}
case _ =>
}
}
}

object PreValidateReWriter {

//~ Tools ------------------------------------------------------------------

private def notSupported(source: SqlNode): String = {
s"INSERT INTO <table> PARTITION [(COLUMN LIST)] statement only support " +
s"SELECT, VALUES, SET_QUERY AND ORDER BY clause for now, '$source' is not supported yet."
}

/**
* Append the static partitions and unspecified columns to the data source projection list.
* The columns are appended to the corresponding positions.
Expand Down Expand Up @@ -108,7 +111,6 @@ object PreValidateReWriter {
typeFactory: RelDataTypeFactory,
source: SqlCall,
partitions: SqlNodeList): SqlCall = {
assert(source.getKind == SqlKind.SELECT || source.getKind == SqlKind.VALUES)
val calciteCatalogReader = validator.getCatalogReader.unwrap(classOf[CalciteCatalogReader])
val names = sqlInsert.getTargetTable.asInstanceOf[SqlIdentifier].names
val table = calciteCatalogReader.getTable(names)
Expand Down Expand Up @@ -185,11 +187,49 @@ object PreValidateReWriter {
}
}

source match {
case select: SqlSelect =>
rewriteSelect(validator, select, targetRowType, assignedFields, targetPosition)
case values: SqlCall if values.getKind == SqlKind.VALUES =>
rewriteValues(values, targetRowType, assignedFields, targetPosition)
rewriteSqlCall(validator, source, targetRowType, assignedFields, targetPosition)
}

private def rewriteSqlCall(
validator: FlinkCalciteSqlValidator,
call: SqlCall,
targetRowType: RelDataType,
assignedFields: util.LinkedHashMap[Integer, SqlNode],
targetPosition: util.List[Int]): SqlCall = {

def rewrite(node: SqlNode): SqlCall = {
checkArgument(node.isInstanceOf[SqlCall], node)
rewriteSqlCall(
validator,
node.asInstanceOf[SqlCall],
targetRowType,
assignedFields,
targetPosition)
}

call.getKind match {
case SqlKind.SELECT =>
rewriteSelect(
validator, call.asInstanceOf[SqlSelect], targetRowType, assignedFields, targetPosition)
case SqlKind.VALUES =>
rewriteValues(call, targetRowType, assignedFields, targetPosition)
case kind if SqlKind.SET_QUERY.contains(kind) =>
call.getOperandList.zipWithIndex.foreach {
case (operand, index) => call.setOperand(index, rewrite(operand))
}
call
case SqlKind.ORDER_BY =>
val operands = call.getOperandList
new SqlOrderBy(
call.getParserPosition,
rewrite(operands.get(0)),
operands.get(1).asInstanceOf[SqlNodeList],
operands.get(2),
operands.get(3))
// Not support:
// case SqlKind.WITH =>
// case SqlKind.EXPLICIT_TABLE =>
case _ => throw new ValidationException(notSupported(call))
}
}

Expand Down
Loading