Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@
"org.apache.flink.sql.parser.ddl.SqlAlterDatabase"
"org.apache.flink.sql.parser.ddl.SqlAlterFunction"
"org.apache.flink.sql.parser.ddl.materializedtable.SqlAlterMaterializedTable"
"org.apache.flink.sql.parser.ddl.materializedtable.SqlAlterMaterializedTableAsQuery"
"org.apache.flink.sql.parser.ddl.materializedtable.SqlAlterMaterializedTableDistribution.SqlAlterMaterializedTableAddDistribution"
"org.apache.flink.sql.parser.ddl.materializedtable.SqlAlterMaterializedTableDistribution.SqlAlterMaterializedTableModifyDistribution"
"org.apache.flink.sql.parser.ddl.materializedtable.SqlAlterMaterializedTableDropDistribution"
"org.apache.flink.sql.parser.ddl.materializedtable.SqlAlterMaterializedTableAsQuery"
"org.apache.flink.sql.parser.ddl.materializedtable.SqlAlterMaterializedTableFreshness"
"org.apache.flink.sql.parser.ddl.materializedtable.SqlAlterMaterializedTableOptions"
"org.apache.flink.sql.parser.ddl.materializedtable.SqlAlterMaterializedTableRefreshMode"
Expand All @@ -62,6 +62,10 @@
"org.apache.flink.sql.parser.ddl.materializedtable.SqlAlterMaterializedTableResume"
"org.apache.flink.sql.parser.ddl.materializedtable.SqlAlterMaterializedTableSuspend"
"org.apache.flink.sql.parser.ddl.materializedtable.SqlAlterMaterializedTableSchema.SqlAlterMaterializedTableAddSchema"
"org.apache.flink.sql.parser.ddl.materializedtable.SqlAlterMaterializedTableSchema.SqlAlterMaterializedTableDropColumn"
"org.apache.flink.sql.parser.ddl.materializedtable.SqlAlterMaterializedTableSchema.SqlAlterMaterializedTableDropConstraint"
"org.apache.flink.sql.parser.ddl.materializedtable.SqlAlterMaterializedTableSchema.SqlAlterMaterializedTableDropPrimaryKey"
"org.apache.flink.sql.parser.ddl.materializedtable.SqlAlterMaterializedTableSchema.SqlAlterMaterializedTableDropWatermark"
"org.apache.flink.sql.parser.ddl.materializedtable.SqlAlterMaterializedTableSchema.SqlAlterMaterializedTableModifySchema"
"org.apache.flink.sql.parser.ddl.materializedtable.SqlCreateOrAlterMaterializedTable"
"org.apache.flink.sql.parser.ddl.materializedtable.SqlDropMaterializedTable"
Expand All @@ -75,11 +79,11 @@
"org.apache.flink.sql.parser.ddl.table.SqlAlterTableAdd"
"org.apache.flink.sql.parser.ddl.table.SqlAlterTableDistribution.SqlAlterTableAddDistribution"
"org.apache.flink.sql.parser.ddl.table.SqlAlterTableDistribution.SqlAlterTableModifyDistribution"
"org.apache.flink.sql.parser.ddl.table.SqlAlterTableDropColumn"
"org.apache.flink.sql.parser.ddl.table.SqlAlterTableDropConstraint"
"org.apache.flink.sql.parser.ddl.table.SqlAlterTableDropDistribution"
"org.apache.flink.sql.parser.ddl.table.SqlAlterTableDropPrimaryKey"
"org.apache.flink.sql.parser.ddl.table.SqlAlterTableDropWatermark"
"org.apache.flink.sql.parser.ddl.table.SqlAlterTableDrop.SqlAlterTableDropConstraint"
"org.apache.flink.sql.parser.ddl.table.SqlAlterTableDrop.SqlAlterTableDropColumn"
"org.apache.flink.sql.parser.ddl.table.SqlAlterTableDrop.SqlAlterTableDropPrimaryKey"
"org.apache.flink.sql.parser.ddl.table.SqlAlterTableDrop.SqlAlterTableDropWatermark"
"org.apache.flink.sql.parser.ddl.table.SqlAlterTableModify"
"org.apache.flink.sql.parser.ddl.table.SqlAlterTableOptions"
"org.apache.flink.sql.parser.ddl.table.SqlAlterTableRename"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1018,7 +1018,7 @@ SqlAlterTable SqlAlterTable() :
startPos.plus(getPos()),
tableIdentifier,
new SqlNodeList(
Collections.singletonList(columnName),
List.of(columnName),
getPos()),
ifExists);
}
Expand Down Expand Up @@ -2041,6 +2041,7 @@ SqlAlterMaterializedTable SqlAlterMaterializedTable() :
SqlNodeList partSpec = SqlNodeList.EMPTY;
SqlNode freshness = null;
SqlNode asQuery = null;
SqlIdentifier constraintName;
AlterTableSchemaContext ctx = new AlterTableSchemaContext();
}
{
Expand Down Expand Up @@ -2184,11 +2185,51 @@ SqlAlterMaterializedTable SqlAlterMaterializedTable() :
ctx.watermark);
}
|
<DROP> <DISTRIBUTION> {
<DROP>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this rationalisation of the grammar. Can we update the docs to match it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to submit a separate PR for docs which should cover FLIP-550 (CREATE, ALTER, DROP)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(
{ SqlIdentifier columnName = null; }
columnName = CompoundIdentifier() {
return new SqlAlterMaterializedTableDropColumn(
startPos.plus(getPos()),
tableIdentifier,
new SqlNodeList(
List.of(columnName),
getPos()));
}
|
{ Pair<SqlNodeList, SqlNodeList> columnWithTypePair = null; }
columnWithTypePair = ParenthesizedCompoundIdentifierList() {
return new SqlAlterMaterializedTableDropColumn(
startPos.plus(getPos()),
tableIdentifier,
columnWithTypePair.getKey());
}
|
<DISTRIBUTION> {
return new SqlAlterMaterializedTableDropDistribution(
startPos.plus(getPos()),
tableIdentifier);
startPos.plus(getPos()),
tableIdentifier);
}
|
<PRIMARY> <KEY> {
return new SqlAlterMaterializedTableDropPrimaryKey(
startPos.plus(getPos()),
tableIdentifier);
}
|
<CONSTRAINT> constraintName = SimpleIdentifier() {
return new SqlAlterMaterializedTableDropConstraint(
startPos.plus(getPos()),
tableIdentifier,
constraintName);
}
|
<WATERMARK> {
return new SqlAlterMaterializedTableDropWatermark(
startPos.plus(getPos()),
tableIdentifier);
}
)
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.calcite.sql.SqlWriter;
import org.apache.calcite.sql.parser.SqlParserPos;

import java.util.Collections;
import java.util.List;

/**
Expand All @@ -38,7 +37,7 @@ public SqlAlterMaterializedTableDropDistribution(SqlParserPos pos, SqlIdentifier

@Override
public List<SqlNode> getOperandList() {
return Collections.emptyList();
return List.of(name);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@

/**
* Abstract class to describe statements which are used to alter schema for materialized tables. See
* examples in javadoc for {@link SqlAlterMaterializedTableAddSchema}.
* examples in javadoc for child classes.
*/
public abstract class SqlAlterMaterializedTableSchema extends SqlAlterMaterializedTable
implements ExtendedSqlNode {
Expand Down Expand Up @@ -181,4 +181,130 @@ protected String getAlterOperation() {
return "MODIFY";
}
}

/**
* Abstract class to describe statements which are used to drop schema components while altering
* schema of materialized tables. See examples in javadoc for child classes.
*/
public abstract static class SqlAlterMaterializedTableDropSchema
extends SqlAlterMaterializedTableSchema {
public SqlAlterMaterializedTableDropSchema(
SqlParserPos pos, SqlIdentifier materializedTableName) {
super(pos, materializedTableName, SqlNodeList.EMPTY, List.of(), null);
}

protected abstract void unparseDropOperation(SqlWriter writer, int leftPrec, int rightPrec);

@Override
protected String getAlterOperation() {
return "DROP";
}

@Override
public final void unparseAlterOperation(SqlWriter writer, int leftPrec, int rightPrec) {
super.unparseAlterOperation(writer, leftPrec, rightPrec);
unparseDropOperation(writer, leftPrec, rightPrec);
}
}

/** ALTER MATERIALIZED TABLE [catalog_name.][db_name.]materialized_table_name DROP WATERMARK. */
public static class SqlAlterMaterializedTableDropWatermark
extends SqlAlterMaterializedTableDropSchema {
public SqlAlterMaterializedTableDropWatermark(
SqlParserPos pos, SqlIdentifier materializedTableName) {
super(pos, materializedTableName);
}

@Override
public List<SqlNode> getOperandList() {
return List.of(name);
}

@Override
public void unparseDropOperation(SqlWriter writer, int leftPrec, int rightPrec) {
writer.keyword("WATERMARK");
}
}

/**
* ALTER MATERIALIZED TABLE [catalog_name.][db_name.]materialized_table_name DROP PRIMARY KEY.
*/
public static class SqlAlterMaterializedTableDropPrimaryKey
extends SqlAlterMaterializedTableDropSchema {
public SqlAlterMaterializedTableDropPrimaryKey(
SqlParserPos pos, SqlIdentifier materializedTableName) {
super(pos, materializedTableName);
}

@Override
public void unparseDropOperation(SqlWriter writer, int leftPrec, int rightPrec) {
writer.keyword("PRIMARY KEY");
}
}

/**
* ALTER MATERIALIZED TABLE [catalog_name.][db_name.]materialized_table_name DROP CONSTRAINT
* constraint_name.
*/
public static class SqlAlterMaterializedTableDropConstraint
extends SqlAlterMaterializedTableDropSchema {
private final SqlIdentifier constraintName;

public SqlAlterMaterializedTableDropConstraint(
SqlParserPos pos, SqlIdentifier tableName, SqlIdentifier constraintName) {
super(pos, tableName);
this.constraintName = constraintName;
}

public SqlIdentifier getConstraintName() {
return constraintName;
}

@Override
public void unparseDropOperation(SqlWriter writer, int leftPrec, int rightPrec) {
writer.keyword("CONSTRAINT");
constraintName.unparse(writer, leftPrec, rightPrec);
}
}

/**
* SqlNode to describe ALTER MATERIALIZED TABLE materialized_table_name DROP column clause.
*
* <p>Example: DDL like the below for drop column.
*
* <pre>{@code
* -- drop single column
* ALTER MATERIALIZED TABLE prod.db.sample DROP col1;
*
* -- drop multiple columns
* ALTER MATERIALIZED TABLE prod.db.sample DROP (col1, col2, col3);
* }</pre>
*/
public static class SqlAlterMaterializedTableDropColumn
extends SqlAlterMaterializedTableDropSchema {

private final SqlNodeList columnList;

public SqlAlterMaterializedTableDropColumn(
SqlParserPos pos, SqlIdentifier tableName, SqlNodeList columnList) {
super(pos, tableName);
this.columnList = columnList;
}

public SqlNodeList getColumnList() {
return columnList;
}

@Override
public List<SqlNode> getOperandList() {
return List.of(name, columnList);
}

@Override
public void unparseDropOperation(SqlWriter writer, int leftPrec, int rightPrec) {
// unparse materialized table column
SqlUnparseUtils.unparseTableSchema(
columnList, List.of(), null, writer, leftPrec, rightPrec);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,14 @@

import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.SqlSpecialOperator;
import org.apache.calcite.sql.SqlWriter;
import org.apache.calcite.sql.parser.SqlParserPos;

import java.util.List;

/** DROP MATERIALIZED TABLE DDL sql call. */
public class SqlDropMaterializedTable extends SqlDropObject {

Expand All @@ -38,6 +41,11 @@ public SqlDropMaterializedTable(
super(OPERATOR, pos, tableIdentifier, ifExists);
}

@Override
public List<SqlNode> getOperandList() {
return List.of(name);
}

@Override
public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
writer.keyword("DROP MATERIALIZED TABLE");
Expand Down
Loading