Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -232,28 +232,52 @@ public <T, ACC> void registerFunction(

@Override
public <T> Table fromDataStream(DataStream<T> dataStream) {
return fromDataStreamInternal(dataStream, null, null);
return fromStreamInternal(dataStream, null, null, ChangelogMode.insertOnly());
}

@Override
public <T> Table fromDataStream(DataStream<T> dataStream, Schema schema) {
Preconditions.checkNotNull(schema, "Schema must not be null.");
return fromDataStreamInternal(dataStream, schema, null);
return fromStreamInternal(dataStream, schema, null, ChangelogMode.insertOnly());
}

@Override
public Table fromChangelogStream(DataStream<Row> dataStream) {
return fromStreamInternal(dataStream, null, null, ChangelogMode.all());
}

@Override
public Table fromChangelogStream(DataStream<Row> dataStream, Schema schema) {
Preconditions.checkNotNull(schema, "Schema must not be null.");
return fromStreamInternal(dataStream, schema, null, ChangelogMode.all());
}

@Override
public Table fromChangelogStream(
DataStream<Row> dataStream, Schema schema, ChangelogMode changelogMode) {
Preconditions.checkNotNull(schema, "Schema must not be null.");
return fromStreamInternal(dataStream, schema, null, changelogMode);
}

@Override
public <T> void createTemporaryView(String path, DataStream<T> dataStream) {
createTemporaryView(path, fromDataStreamInternal(dataStream, null, path));
createTemporaryView(
path, fromStreamInternal(dataStream, null, path, ChangelogMode.insertOnly()));
}

@Override
public <T> void createTemporaryView(String path, DataStream<T> dataStream, Schema schema) {
createTemporaryView(path, fromDataStreamInternal(dataStream, schema, path));
createTemporaryView(
path, fromStreamInternal(dataStream, schema, path, ChangelogMode.insertOnly()));
}

private <T> Table fromDataStreamInternal(
DataStream<T> dataStream, @Nullable Schema schema, @Nullable String viewPath) {
private <T> Table fromStreamInternal(
DataStream<T> dataStream,
@Nullable Schema schema,
@Nullable String viewPath,
ChangelogMode changelogMode) {
Preconditions.checkNotNull(dataStream, "Data stream must not be null.");
Preconditions.checkNotNull(changelogMode, "Changelog mode must not be null.");
final CatalogManager catalogManager = getCatalogManager();
final SchemaResolver schemaResolver = catalogManager.getSchemaResolver();
final OperationTreeBuilder operationTreeBuilder = getOperationTreeBuilder();
Expand Down Expand Up @@ -281,7 +305,7 @@ private <T> Table fromDataStreamInternal(
dataStream,
schemaTranslationResult.getPhysicalDataType(),
schemaTranslationResult.isTopLevelRecord(),
ChangelogMode.insertOnly(),
changelogMode,
resolvedSchema);

final List<String> projections = schemaTranslationResult.getProjections();
Expand Down Expand Up @@ -324,28 +348,69 @@ public <T> DataStream<T> toDataStream(Table table, Class<T> targetClass) {
public <T> DataStream<T> toDataStream(Table table, AbstractDataType<?> targetDataType) {
Preconditions.checkNotNull(table, "Table must not be null.");
Preconditions.checkNotNull(targetDataType, "Target data type must not be null.");
final CatalogManager catalogManager = getCatalogManager();
final SchemaResolver schemaResolver = catalogManager.getSchemaResolver();
final OperationTreeBuilder operationTreeBuilder = getOperationTreeBuilder();

final ExternalSchemaTranslator.OutputResult schemaTranslationResult =
ExternalSchemaTranslator.fromInternal(
catalogManager.getDataTypeFactory(),
getCatalogManager().getDataTypeFactory(),
table.getResolvedSchema(),
targetDataType);

final List<String> projections = schemaTranslationResult.getProjections();
final QueryOperation projectOperation;
if (projections == null) {
projectOperation = table.getQueryOperation();
} else {
projectOperation =
operationTreeBuilder.project(
projections.stream()
.map(ApiExpressionUtils::unresolvedRef)
.collect(Collectors.toList()),
table.getQueryOperation());
}
return toStreamInternal(table, schemaTranslationResult, ChangelogMode.insertOnly());
}

@Override
public DataStream<Row> toChangelogStream(Table table) {
Preconditions.checkNotNull(table, "Table must not be null.");

final ExternalSchemaTranslator.OutputResult schemaTranslationResult =
ExternalSchemaTranslator.fromInternal(table.getResolvedSchema(), null);

return toStreamInternal(table, schemaTranslationResult, null);
}

@Override
public DataStream<Row> toChangelogStream(Table table, Schema targetSchema) {
Preconditions.checkNotNull(table, "Table must not be null.");
Preconditions.checkNotNull(targetSchema, "Target schema must not be null.");

final ExternalSchemaTranslator.OutputResult schemaTranslationResult =
ExternalSchemaTranslator.fromInternal(table.getResolvedSchema(), targetSchema);

return toStreamInternal(table, schemaTranslationResult, null);
}

@Override
public DataStream<Row> toChangelogStream(
Table table, Schema targetSchema, ChangelogMode changelogMode) {
Preconditions.checkNotNull(table, "Table must not be null.");
Preconditions.checkNotNull(targetSchema, "Target schema must not be null.");
Preconditions.checkNotNull(changelogMode, "Changelog mode must not be null.");

final ExternalSchemaTranslator.OutputResult schemaTranslationResult =
ExternalSchemaTranslator.fromInternal(table.getResolvedSchema(), targetSchema);

return toStreamInternal(table, schemaTranslationResult, changelogMode);
}

private <T> DataStream<T> toStreamInternal(
Table table,
ExternalSchemaTranslator.OutputResult schemaTranslationResult,
@Nullable ChangelogMode changelogMode) {
final CatalogManager catalogManager = getCatalogManager();
final SchemaResolver schemaResolver = catalogManager.getSchemaResolver();
final OperationTreeBuilder operationTreeBuilder = getOperationTreeBuilder();

final QueryOperation projectOperation =
schemaTranslationResult
.getProjections()
.map(
projections ->
operationTreeBuilder.project(
projections.stream()
.map(ApiExpressionUtils::unresolvedRef)
.collect(Collectors.toList()),
table.getQueryOperation()))
.orElseGet(table::getQueryOperation);

final ResolvedSchema resolvedSchema =
schemaResolver.resolve(schemaTranslationResult.getSchema());
Expand All @@ -361,13 +426,15 @@ public <T> DataStream<T> toDataStream(Table table, AbstractDataType<?> targetDat
objectIdentifier,
projectOperation,
resolvedSchema,
ChangelogMode.insertOnly(),
schemaTranslationResult.getPhysicalDataType());
changelogMode,
schemaTranslationResult
.getPhysicalDataType()
.orElseGet(resolvedSchema::toPhysicalRowDataType));

return toDataStreamInternal(table, modifyOperation);
return toStreamInternal(table, modifyOperation);
}

private <T> DataStream<T> toDataStreamInternal(Table table, ModifyOperation modifyOperation) {
private <T> DataStream<T> toStreamInternal(Table table, ModifyOperation modifyOperation) {
final List<Transformation<?>> transformations =
planner.translate(Collections.singletonList(modifyOperation));

Expand Down Expand Up @@ -441,7 +508,7 @@ public <T> DataStream<T> toAppendStream(Table table, TypeInformation<T> typeInfo
table.getQueryOperation(),
TypeConversions.fromLegacyInfoToDataType(typeInfo),
OutputConversionModifyOperation.UpdateMode.APPEND);
return toDataStreamInternal(table, modifyOperation);
return toStreamInternal(table, modifyOperation);
}

@Override
Expand All @@ -458,7 +525,7 @@ public <T> DataStream<Tuple2<Boolean, T>> toRetractStream(
table.getQueryOperation(),
wrapWithChangeFlag(typeInfo),
OutputConversionModifyOperation.UpdateMode.RETRACT);
return toDataStreamInternal(table, modifyOperation);
return toStreamInternal(table, modifyOperation);
}

@Override
Expand Down
Loading