Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-37098] Fix selecting time attribute from a view #25952

Merged
merged 1 commit into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,9 @@ public static String buildShowCreateViewRow(
"SHOW CREATE VIEW is only supported for views, but %s is a table. Please use SHOW CREATE TABLE instead.",
viewIdentifier.asSerializableString()));
}
if (view.getOrigin() instanceof QueryOperationCatalogView) {
final CatalogBaseTable origin = view.getOrigin();
if (origin instanceof QueryOperationCatalogView
&& !((QueryOperationCatalogView) origin).supportsShowCreateView()) {
throw new TableException(
"SHOW CREATE VIEW is not supported for views registered by Table API.");
}
Expand All @@ -99,7 +101,7 @@ public static String buildShowCreateViewRow(
.append(buildCreateFormattedPrefix("VIEW", isTemporary, viewIdentifier));
sb.append(extractFormattedColumnNames(view, PRINT_INDENT)).append("\n)\n");
extractComment(view).ifPresent(c -> sb.append(formatComment(c)).append("\n"));
sb.append("AS ").append(((CatalogView) view.getOrigin()).getExpandedQuery()).append("\n");
sb.append("AS ").append(((CatalogView) origin).getExpandedQuery()).append("\n");

return sb.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ protected TableEnvironmentImpl(
getParser()::parseSqlExpression,
isStreamingMode);
catalogManager.initSchemaResolver(
isStreamingMode, operationTreeBuilder.getResolverBuilder());
isStreamingMode, operationTreeBuilder.getResolverBuilder(), getParser());
this.operationCtx =
new ExecutableOperationContextImpl(
catalogManager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,12 @@
import org.apache.flink.table.catalog.listener.DropDatabaseEvent;
import org.apache.flink.table.catalog.listener.DropModelEvent;
import org.apache.flink.table.catalog.listener.DropTableEvent;
import org.apache.flink.table.delegation.Parser;
import org.apache.flink.table.delegation.Planner;
import org.apache.flink.table.expressions.resolver.ExpressionResolver.ExpressionResolverBuilder;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.QueryOperation;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;

Expand All @@ -73,6 +76,7 @@
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import static java.lang.String.format;
Expand Down Expand Up @@ -104,6 +108,7 @@ public final class CatalogManager implements CatalogRegistry, AutoCloseable {
private @Nullable String currentDatabaseName;

private DefaultSchemaResolver schemaResolver;
private Parser parser;

// The name of the built-in catalog
private final String builtInCatalogName;
Expand Down Expand Up @@ -282,9 +287,12 @@ public void close() throws CatalogException {
* @see TableEnvironmentImpl#create(EnvironmentSettings)
*/
public void initSchemaResolver(
boolean isStreamingMode, ExpressionResolverBuilder expressionResolverBuilder) {
boolean isStreamingMode,
ExpressionResolverBuilder expressionResolverBuilder,
Parser parser) {
this.schemaResolver =
new DefaultSchemaResolver(isStreamingMode, typeFactory, expressionResolverBuilder);
this.parser = parser;
}

/** Returns a {@link SchemaResolver} for creating {@link ResolvedSchema} from {@link Schema}. */
Expand Down Expand Up @@ -1775,8 +1783,53 @@ public ResolvedCatalogView resolveCatalogView(CatalogView view) {
if (view instanceof ResolvedCatalogView) {
return (ResolvedCatalogView) view;
}

if (view instanceof QueryOperationCatalogView) {
final QueryOperation queryOperation =
((QueryOperationCatalogView) view).getQueryOperation();
return new ResolvedCatalogView(view, queryOperation.getResolvedSchema());
}

final ResolvedSchema resolvedSchema = view.getUnresolvedSchema().resolve(schemaResolver);
return new ResolvedCatalogView(view, resolvedSchema);
final List<Operation> parse;
try {
parse = parser.parse(view.getExpandedQuery());
} catch (Throwable e) {
// in case of a failure during parsing, let the lower layers fail
return new ResolvedCatalogView(view, resolvedSchema);
}
if (parse.size() != 1 || !(parse.get(0) instanceof QueryOperation)) {
// parsing a view should result in a single query operation
// if it is not what we expect, we let the lower layers fail
return new ResolvedCatalogView(view, resolvedSchema);
dawidwys marked this conversation as resolved.
Show resolved Hide resolved
} else {
final QueryOperation operation = (QueryOperation) parse.get(0);
final ResolvedSchema querySchema = operation.getResolvedSchema();
if (querySchema.getColumns().size() != resolvedSchema.getColumns().size()) {
// in case the query does not match the number of expected columns, let the lower
// layers fail
return new ResolvedCatalogView(view, resolvedSchema);
}
final ResolvedSchema renamedQuerySchema =
new ResolvedSchema(
IntStream.range(0, resolvedSchema.getColumnCount())
.mapToObj(
i ->
querySchema
.getColumn(i)
.get()
.rename(
resolvedSchema
.getColumnNames()
.get(i)))
.collect(Collectors.toList()),
resolvedSchema.getWatermarkSpecs(),
resolvedSchema.getPrimaryKey().orElse(null));
return new ResolvedCatalogView(
// pass a view that has the query parsed and
// validated already
new QueryOperationCatalogView(operation, view), renamedQuerySchema);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.operations.QueryOperation;

import javax.annotation.Nullable;

import java.util.Map;
import java.util.Optional;

Expand All @@ -32,9 +34,16 @@
public final class QueryOperationCatalogView implements CatalogView {

private final QueryOperation queryOperation;
private final @Nullable CatalogView originalView;

public QueryOperationCatalogView(QueryOperation queryOperation) {
this(queryOperation, null);
}

public QueryOperationCatalogView(
final QueryOperation queryOperation, final CatalogView originalView) {
this.queryOperation = queryOperation;
this.originalView = originalView;
}

public QueryOperation getQueryOperation() {
Expand All @@ -48,17 +57,23 @@ public Schema getUnresolvedSchema() {

@Override
public Map<String, String> getOptions() {
throw new TableException("A view backed by a query operation has no options.");
if (originalView == null) {
throw new TableException("A view backed by a query operation has no options.");
} else {
return originalView.getOptions();
}
}

@Override
public String getComment() {
return queryOperation.asSummaryString();
return Optional.ofNullable(originalView)
.map(CatalogView::getComment)
.orElseGet(queryOperation::asSummaryString);
}

@Override
public QueryOperationCatalogView copy() {
return new QueryOperationCatalogView(queryOperation);
return new QueryOperationCatalogView(queryOperation, originalView);
}

@Override
Expand All @@ -73,13 +88,26 @@ public Optional<String> getDetailedDescription() {

@Override
public String getOriginalQuery() {
throw new TableException(
"A view backed by a query operation has no serializable representation.");
if (originalView == null) {
throw new TableException(
"A view backed by a query operation has no serializable representation.");
} else {
return originalView.getOriginalQuery();
}
}

@Override
public String getExpandedQuery() {
throw new TableException(
"A view backed by a query operation has no serializable representation.");
if (originalView == null) {
throw new TableException(
"A view backed by a query operation has no serializable representation.");
} else {
return originalView.getExpandedQuery();
}
}

@Internal
public boolean supportsShowCreateView() {
return originalView != null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.utils.CatalogManagerMocks;
import org.apache.flink.table.utils.ExpressionResolverMocks;
import org.apache.flink.table.utils.ParserMock;

import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -460,7 +461,7 @@ private static CatalogManager catalogManager() {
ExpressionResolverMocks.forSqlExpression(
CatalogBaseTableResolutionTest::resolveSqlExpression);

catalogManager.initSchemaResolver(true, expressionResolverBuilder);
catalogManager.initSchemaResolver(true, expressionResolverBuilder, new ParserMock());

return catalogManager;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.flink.table.catalog.listener.DropTableEvent;
import org.apache.flink.table.utils.CatalogManagerMocks;
import org.apache.flink.table.utils.ExpressionResolverMocks;
import org.apache.flink.table.utils.ParserMock;

import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -133,7 +134,8 @@ void testTableModificationListener() throws Exception {
dropFuture,
dropTemporaryFuture));

catalogManager.initSchemaResolver(true, ExpressionResolverMocks.dummyResolver());
catalogManager.initSchemaResolver(
true, ExpressionResolverMocks.dummyResolver(), new ParserMock());
// Create a view
catalogManager.createTable(
CatalogView.of(Schema.newBuilder().build(), null, "", "", Collections.emptyMap()),
Expand Down Expand Up @@ -279,7 +281,8 @@ public void testModelModificationListener() throws Exception {
.build())
.build();

catalogManager.initSchemaResolver(true, ExpressionResolverMocks.dummyResolver());
catalogManager.initSchemaResolver(
true, ExpressionResolverMocks.dummyResolver(), new ParserMock());

HashMap<String, String> options =
new HashMap<String, String>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ public static CatalogManager createCatalogManager(
builder.catalogStoreHolder(catalogStoreHolder);
}
final CatalogManager catalogManager = builder.build();
catalogManager.initSchemaResolver(true, ExpressionResolverMocks.dummyResolver());
catalogManager.initSchemaResolver(
true, ExpressionResolverMocks.dummyResolver(), new ParserMock());
return catalogManager;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,14 @@

import javax.annotation.Nullable;

import java.util.Collections;
import java.util.List;

/** Mocks {@link Parser} for tests. */
public class ParserMock implements Parser {
@Override
public List<Operation> parse(String statement) {
return null;
return Collections.emptyList();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ public String asSummaryString() {
/** Returns a copy of the column with a replaced {@link DataType}. */
public abstract Column copy(DataType newType);

/** Returns a copy of the column with a replaced name. */
public abstract Column rename(String newName);

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down Expand Up @@ -202,6 +205,11 @@ public Optional<String> explainExtras() {
public Column copy(DataType newDataType) {
return new PhysicalColumn(name, newDataType, comment);
}

@Override
public Column rename(String newName) {
return new PhysicalColumn(newName, dataType, comment);
}
}

/** Representation of a computed column. */
Expand Down Expand Up @@ -252,6 +260,11 @@ public Column copy(DataType newDataType) {
return new ComputedColumn(name, newDataType, expression, comment);
}

@Override
public Column rename(String newName) {
return new ComputedColumn(newName, dataType, expression, comment);
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down Expand Up @@ -344,6 +357,11 @@ public Column copy(DataType newDataType) {
return new MetadataColumn(name, newDataType, metadataKey, isVirtual, comment);
}

@Override
public Column rename(String newName) {
return new MetadataColumn(newName, dataType, metadataKey, isVirtual, comment);
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptSchema;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.type.RelDataType;

Expand Down Expand Up @@ -77,6 +78,7 @@ public RelNode convertToRel(RelOptTable.ToRelContext context) {
final Context chain = Contexts.of(context, cluster.getPlanner().getContext());
final FlinkRelBuilder relBuilder = FlinkRelBuilder.of(chain, cluster, getRelOptSchema());

return relBuilder.queryOperation(catalogView.getQueryOperation()).build();
return RelOptUtil.createCastRel(
relBuilder.queryOperation(catalogView.getQueryOperation()).build(), rowType, true);
}
}
Loading