Skip to content

Commit b08780d

Browse files
committed
Use AbstractCreateMaterializedTableConverter
1 parent d651f0b commit b08780d

File tree

7 files changed

+400
-256
lines changed

7 files changed

+400
-256
lines changed

flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1864,7 +1864,7 @@ SqlCreate SqlCreateOrAlterMaterializedTable(Span s, boolean replace, boolean isT
18641864
SqlNodeList partitionColumns = SqlNodeList.EMPTY;
18651865
SqlNodeList propertyList = SqlNodeList.EMPTY;
18661866
SqlNode freshness = null;
1867-
SqlLiteral refreshMode = null;
1867+
SqlRefreshMode refreshMode = null;
18681868
SqlNode asQuery = null;
18691869
boolean isOrAlter = false;
18701870
}
@@ -1930,12 +1930,12 @@ SqlCreate SqlCreateOrAlterMaterializedTable(Span s, boolean replace, boolean isT
19301930
(
19311931
<FULL>
19321932
{
1933-
refreshMode = SqlRefreshMode.FULL.symbol(getPos());
1933+
refreshMode = SqlRefreshMode.FULL;
19341934
}
19351935
|
19361936
<CONTINUOUS>
19371937
{
1938-
refreshMode = SqlRefreshMode.CONTINUOUS.symbol(getPos());
1938+
refreshMode = SqlRefreshMode.CONTINUOUS;
19391939
}
19401940
)
19411941
]

flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateMaterializedTable.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import org.apache.calcite.sql.SqlIdentifier;
2626
import org.apache.calcite.sql.SqlIntervalLiteral;
2727
import org.apache.calcite.sql.SqlKind;
28-
import org.apache.calcite.sql.SqlLiteral;
2928
import org.apache.calcite.sql.SqlNode;
3029
import org.apache.calcite.sql.SqlNodeList;
3130
import org.apache.calcite.sql.SqlOperator;
@@ -60,7 +59,7 @@ public class SqlCreateMaterializedTable extends SqlCreate {
6059

6160
private final @Nullable SqlIntervalLiteral freshness;
6261

63-
private final @Nullable SqlLiteral refreshMode;
62+
private final @Nullable SqlRefreshMode refreshMode;
6463

6564
private final SqlNode asQuery;
6665

@@ -74,7 +73,7 @@ public SqlCreateMaterializedTable(
7473
SqlNodeList partitionKeyList,
7574
SqlNodeList propertyList,
7675
@Nullable SqlIntervalLiteral freshness,
77-
@Nullable SqlLiteral refreshMode,
76+
@Nullable SqlRefreshMode refreshMode,
7877
SqlNode asQuery) {
7978
super(operator, pos, false, false);
8079
this.tableName = requireNonNull(tableName, "tableName should not be null");
@@ -140,7 +139,7 @@ public SqlIntervalLiteral getFreshness() {
140139
}
141140

142141
@Nullable
143-
public SqlLiteral getRefreshMode() {
142+
public SqlRefreshMode getRefreshMode() {
144143
return refreshMode;
145144
}
146145

flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateOrAlterMaterializedTable.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import org.apache.calcite.sql.SqlIdentifier;
2626
import org.apache.calcite.sql.SqlIntervalLiteral;
2727
import org.apache.calcite.sql.SqlKind;
28-
import org.apache.calcite.sql.SqlLiteral;
2928
import org.apache.calcite.sql.SqlNode;
3029
import org.apache.calcite.sql.SqlNodeList;
3130
import org.apache.calcite.sql.SqlOperator;
@@ -54,7 +53,7 @@ public SqlCreateOrAlterMaterializedTable(
5453
SqlNodeList partitionKeyList,
5554
SqlNodeList propertyList,
5655
@Nullable SqlIntervalLiteral freshness,
57-
@Nullable SqlLiteral refreshMode,
56+
@Nullable SqlRefreshMode refreshMode,
5857
SqlNode asQuery,
5958
boolean isOrAlter) {
6059
super(
@@ -147,7 +146,7 @@ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
147146
writer.newlineAndIndent();
148147
writer.keyword("REFRESH_MODE");
149148
writer.keyword("=");
150-
getRefreshMode().unparse(writer, leftPrec, rightPrec);
149+
writer.keyword(getRefreshMode().name());
151150
}
152151

153152
writer.newlineAndIndent();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,231 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.table.planner.operations.converters;
20+
21+
import org.apache.flink.sql.parser.ddl.SqlCreateMaterializedTable;
22+
import org.apache.flink.table.api.Schema;
23+
import org.apache.flink.table.api.ValidationException;
24+
import org.apache.flink.table.catalog.CatalogMaterializedTable;
25+
import org.apache.flink.table.catalog.CatalogMaterializedTable.LogicalRefreshMode;
26+
import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshMode;
27+
import org.apache.flink.table.catalog.IntervalFreshness;
28+
import org.apache.flink.table.catalog.ObjectIdentifier;
29+
import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
30+
import org.apache.flink.table.catalog.ResolvedSchema;
31+
import org.apache.flink.table.catalog.TableDistribution;
32+
import org.apache.flink.table.catalog.UnresolvedIdentifier;
33+
import org.apache.flink.table.planner.operations.PlannerQueryOperation;
34+
import org.apache.flink.table.planner.utils.MaterializedTableUtils;
35+
import org.apache.flink.table.planner.utils.OperationConverterUtils;
36+
import org.apache.flink.table.types.logical.LogicalType;
37+
import org.apache.flink.table.types.logical.LogicalTypeFamily;
38+
39+
import org.apache.calcite.sql.SqlNode;
40+
41+
import java.util.List;
42+
import java.util.Map;
43+
import java.util.Optional;
44+
import java.util.Set;
45+
import java.util.stream.Collectors;
46+
47+
import static org.apache.flink.table.api.config.MaterializedTableConfigOptions.DATE_FORMATTER;
48+
import static org.apache.flink.table.api.config.MaterializedTableConfigOptions.PARTITION_FIELDS;
49+
50+
/**
51+
* Abstract class for converting {@link SqlCreateMaterializedTable} and it's children to create
52+
* materialized table operations.
53+
*/
54+
public abstract class AbstractCreateMaterializedTableConverter<T extends SqlCreateMaterializedTable>
55+
implements SqlNodeConverter<T> {
56+
/** Context of create table converters while merging source and derived items. */
57+
protected interface MergeContext {
58+
Schema getMergedSchema();
59+
60+
Map<String, String> getMergedTableOptions();
61+
62+
List<String> getMergedPartitionKeys();
63+
64+
Optional<TableDistribution> getMergedTableDistribution();
65+
66+
String getMergedOriginalQuery();
67+
68+
String getMergedExpandedQuery();
69+
70+
ResolvedSchema getMergedQuerySchema();
71+
}
72+
73+
protected abstract MergeContext getMergeContext(
74+
T sqlCreateMaterializedTable, ConvertContext context);
75+
76+
protected final Optional<TableDistribution> getDerivedTableDistribution(
77+
T sqlCreateMaterializedTable) {
78+
return Optional.ofNullable(sqlCreateMaterializedTable.getDistribution())
79+
.map(OperationConverterUtils::getDistributionFromSqlDistribution);
80+
}
81+
82+
protected final List<String> getDerivedPartitionKeys(T sqlCreateMaterializedTable) {
83+
return OperationConverterUtils.getColumnNames(
84+
sqlCreateMaterializedTable.getPartitionKeyList());
85+
}
86+
87+
protected final Map<String, String> getDerivedTableOptions(T sqlCreateMaterializedTable) {
88+
return OperationConverterUtils.getProperties(sqlCreateMaterializedTable.getPropertyList());
89+
}
90+
91+
protected final IntervalFreshness getDerivedFreshness(T sqlCreateMaterializedTable) {
92+
return Optional.ofNullable(sqlCreateMaterializedTable.getFreshness())
93+
.map(MaterializedTableUtils::getMaterializedTableFreshness)
94+
.orElse(null);
95+
}
96+
97+
protected final ResolvedSchema getQueryResolvedSchema(
98+
T sqlCreateMaterializedTable, ConvertContext context) {
99+
SqlNode selectQuery = sqlCreateMaterializedTable.getAsQuery();
100+
SqlNode validateQuery = context.getSqlValidator().validate(selectQuery);
101+
102+
PlannerQueryOperation queryOperation =
103+
new PlannerQueryOperation(
104+
context.toRelRoot(validateQuery).project(),
105+
() -> context.toQuotedSqlString(validateQuery));
106+
return queryOperation.getResolvedSchema();
107+
}
108+
109+
protected final LogicalRefreshMode getDerivedLogicalRefreshMode(T sqlCreateMaterializedTable) {
110+
return MaterializedTableUtils.deriveLogicalRefreshMode(
111+
sqlCreateMaterializedTable.getRefreshMode());
112+
}
113+
114+
protected final RefreshMode getDerivedRefreshMode(LogicalRefreshMode logicalRefreshMode) {
115+
return MaterializedTableUtils.fromLogicalRefreshModeToRefreshMode(logicalRefreshMode);
116+
}
117+
118+
protected final String getDerivedOriginalQuery(
119+
T sqlCreateMaterializedTable, ConvertContext context) {
120+
SqlNode selectQuery = sqlCreateMaterializedTable.getAsQuery();
121+
return context.toQuotedSqlString(selectQuery);
122+
}
123+
124+
protected final String getDerivedExpandedQuery(
125+
T sqlCreateMaterializedTable, ConvertContext context) {
126+
SqlNode selectQuery = sqlCreateMaterializedTable.getAsQuery();
127+
SqlNode validatedQuery = context.getSqlValidator().validate(selectQuery);
128+
return context.expandSqlIdentifiers(context.toQuotedSqlString(validatedQuery));
129+
}
130+
131+
protected final String getComment(T sqlCreateMaterializedTable) {
132+
return OperationConverterUtils.getComment(sqlCreateMaterializedTable.getComment());
133+
}
134+
135+
protected final ResolvedCatalogMaterializedTable getResolvedCatalogMaterializedTable(
136+
T sqlCreateMaterializedTable, ConvertContext context) {
137+
final MergeContext mergeContext = getMergeContext(sqlCreateMaterializedTable, context);
138+
final List<String> partitionKeys = mergeContext.getMergedPartitionKeys();
139+
final Schema schema = mergeContext.getMergedSchema();
140+
final ResolvedSchema querySchema = mergeContext.getMergedQuerySchema();
141+
final Map<String, String> tableOptions = mergeContext.getMergedTableOptions();
142+
verifyPartitioningColumnsExist(querySchema, partitionKeys, tableOptions);
143+
144+
final TableDistribution distribution =
145+
mergeContext.getMergedTableDistribution().orElse(null);
146+
final String comment = getComment(sqlCreateMaterializedTable);
147+
148+
final String originalQuery = mergeContext.getMergedOriginalQuery();
149+
final String expandedQuery = mergeContext.getMergedExpandedQuery();
150+
151+
final IntervalFreshness intervalFreshness = getDerivedFreshness(sqlCreateMaterializedTable);
152+
153+
final LogicalRefreshMode logicalRefreshMode =
154+
getDerivedLogicalRefreshMode(sqlCreateMaterializedTable);
155+
156+
final RefreshMode refreshMode = getDerivedRefreshMode(logicalRefreshMode);
157+
158+
return context.getCatalogManager()
159+
.resolveCatalogMaterializedTable(
160+
CatalogMaterializedTable.newBuilder()
161+
.schema(schema)
162+
.comment(comment)
163+
.distribution(distribution)
164+
.partitionKeys(partitionKeys)
165+
.options(tableOptions)
166+
.originalQuery(originalQuery)
167+
.expandedQuery(expandedQuery)
168+
.freshness(intervalFreshness)
169+
.logicalRefreshMode(logicalRefreshMode)
170+
.refreshMode(refreshMode)
171+
.refreshStatus(CatalogMaterializedTable.RefreshStatus.INITIALIZING)
172+
.build());
173+
}
174+
175+
protected final ObjectIdentifier getIdentifier(
176+
SqlCreateMaterializedTable node, ConvertContext context) {
177+
UnresolvedIdentifier unresolvedIdentifier = UnresolvedIdentifier.of(node.fullTableName());
178+
return context.getCatalogManager().qualifyIdentifier(unresolvedIdentifier);
179+
}
180+
181+
private void verifyPartitioningColumnsExist(
182+
ResolvedSchema schema, List<String> partitionKeys, Map<String, String> tableOptions) {
183+
final Set<String> partitionFieldOptions =
184+
tableOptions.keySet().stream()
185+
.filter(k -> k.startsWith(PARTITION_FIELDS))
186+
.collect(Collectors.toSet());
187+
188+
for (String partitionKey : partitionKeys) {
189+
if (schema.getColumn(partitionKey).isEmpty()) {
190+
throw new ValidationException(
191+
String.format(
192+
"Partition column '%s' not defined in the query schema. Available columns: [%s].",
193+
partitionKey,
194+
schema.getColumnNames().stream()
195+
.collect(Collectors.joining("', '", "'", "'"))));
196+
}
197+
}
198+
199+
// verify partition key used by materialized table partition option
200+
// partition.fields.#.date-formatter whether exist
201+
for (String partitionOption : partitionFieldOptions) {
202+
String partitionKey =
203+
partitionOption.substring(
204+
PARTITION_FIELDS.length() + 1,
205+
partitionOption.length() - (DATE_FORMATTER.length() + 1));
206+
// partition key used in option partition.fields.#.date-formatter must be existed
207+
if (!partitionKeys.contains(partitionKey)) {
208+
throw new ValidationException(
209+
String.format(
210+
"Column '%s' referenced by materialized table option '%s' isn't a partition column. Available partition columns: [%s].",
211+
partitionKey,
212+
partitionOption,
213+
partitionKeys.stream()
214+
.collect(Collectors.joining("', '", "'", "'"))));
215+
}
216+
217+
// partition key used in option partition.fields.#.date-formatter must be string type
218+
LogicalType partitionKeyType =
219+
schema.getColumn(partitionKey).get().getDataType().getLogicalType();
220+
if (!partitionKeyType
221+
.getTypeRoot()
222+
.getFamilies()
223+
.contains(LogicalTypeFamily.CHARACTER_STRING)) {
224+
throw new ValidationException(
225+
String.format(
226+
"Materialized table option '%s' only supports referring to char, varchar and string type partition column. Column %s type is %s.",
227+
partitionOption, partitionKey, partitionKeyType.asSummaryString()));
228+
}
229+
}
230+
}
231+
}

0 commit comments

Comments
 (0)