Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,9 @@ public interface DataSource {

/** Get the {@link MetadataAccessor} for accessing metadata from external systems. */
MetadataAccessor getMetadataAccessor();

/** Get the {@link SupportedMetadataColumn}s of the source. */
default SupportedMetadataColumn[] supportedMetadataColumns() {
return new SupportedMetadataColumn[0];
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.common.source;

import org.apache.flink.cdc.common.annotation.Experimental;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.types.DataType;

import java.io.Serializable;
import java.util.Map;

/** A metadata column that the source supports to read from the meta field. */
@Experimental
public interface SupportedMetadataColumn extends Serializable {
/** Column name. */
String getName();

/** The data type of this column in Flink CDC. */
DataType getType();

/** The returned java class of the reader. */
Class<?> getJavaClass();

/**
* Read the metadata from the {@link DataChangeEvent#meta()}.
*
* @param metadata the metadata returned from {@link DataChangeEvent#meta()}
* @return the value of this metadata found by metadata name
*/
Object read(Map<String, String> metadata);
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.sink.DataSink;
import org.apache.flink.cdc.common.source.DataSource;
import org.apache.flink.cdc.composer.PipelineComposer;
import org.apache.flink.cdc.composer.PipelineExecution;
import org.apache.flink.cdc.composer.definition.PipelineDef;
Expand Down Expand Up @@ -99,9 +100,10 @@ public PipelineExecution compose(PipelineDef pipelineDef) {

// Build Source Operator
DataSourceTranslator sourceTranslator = new DataSourceTranslator();
DataSource dataSource =
sourceTranslator.createDataSource(pipelineDef.getSource(), env, pipelineDefConfig);
DataStream<Event> stream =
sourceTranslator.translate(
pipelineDef.getSource(), env, pipelineDefConfig, parallelism);
sourceTranslator.translate(pipelineDef.getSource(), env, parallelism, dataSource);

// Build PreTransformOperator for processing Schema Event
TransformTranslator transformTranslator = new TransformTranslator();
Expand All @@ -110,7 +112,8 @@ public PipelineExecution compose(PipelineDef pipelineDef) {
stream,
pipelineDef.getTransforms(),
pipelineDef.getUdfs(),
pipelineDef.getModels());
pipelineDef.getModels(),
dataSource.supportedMetadataColumns());

// Schema operator
SchemaOperatorTranslator schemaOperatorTranslator =
Expand All @@ -129,7 +132,8 @@ public PipelineExecution compose(PipelineDef pipelineDef) {
pipelineDef.getTransforms(),
pipelineDef.getConfig().get(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE),
pipelineDef.getUdfs(),
pipelineDef.getModels());
pipelineDef.getModels(),
dataSource.supportedMetadataColumns());

// Build DataSink in advance as schema operator requires MetadataApplier
DataSinkTranslator sinkTranslator = new DataSinkTranslator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,29 @@
@Internal
public class DataSourceTranslator {

public DataSource createDataSource(
SourceDef sourceDef, StreamExecutionEnvironment env, Configuration pipelineConfig) {
// Search the data source factory
DataSourceFactory sourceFactory =
FactoryDiscoveryUtils.getFactoryByIdentifier(
sourceDef.getType(), DataSourceFactory.class);
// Add source JAR to environment
FactoryDiscoveryUtils.getJarPathByIdentifier(sourceFactory)
.ifPresent(jar -> FlinkEnvironmentUtils.addJar(env, jar));
DataSource dataSource =
sourceFactory.createDataSource(
new FactoryHelper.DefaultContext(
sourceDef.getConfig(),
pipelineConfig,
Thread.currentThread().getContextClassLoader()));
return dataSource;
}

public DataStreamSource<Event> translate(
SourceDef sourceDef,
StreamExecutionEnvironment env,
Configuration pipelineConfig,
int sourceParallelism) {
// Create data source
DataSource dataSource = createDataSource(sourceDef, env, pipelineConfig);

int sourceParallelism,
DataSource dataSource) {
// Get source provider
EventSourceProvider eventSourceProvider = dataSource.getEventSourceProvider();
if (eventSourceProvider instanceof FlinkSourceProvider) {
Expand Down Expand Up @@ -78,24 +93,6 @@ public DataStreamSource<Event> translate(
}
}

private DataSource createDataSource(
SourceDef sourceDef, StreamExecutionEnvironment env, Configuration pipelineConfig) {
// Search the data source factory
DataSourceFactory sourceFactory =
FactoryDiscoveryUtils.getFactoryByIdentifier(
sourceDef.getType(), DataSourceFactory.class);
// Add source JAR to environment
FactoryDiscoveryUtils.getJarPathByIdentifier(sourceFactory)
.ifPresent(jar -> FlinkEnvironmentUtils.addJar(env, jar));
DataSource dataSource =
sourceFactory.createDataSource(
new FactoryHelper.DefaultContext(
sourceDef.getConfig(),
pipelineConfig,
Thread.currentThread().getContextClassLoader()));
return dataSource;
}

private String generateDefaultSourceName(SourceDef sourceDef) {
return String.format("Flink CDC Event Source: %s", sourceDef.getType());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.source.SupportedMetadataColumn;
import org.apache.flink.cdc.composer.definition.ModelDef;
import org.apache.flink.cdc.composer.definition.TransformDef;
import org.apache.flink.cdc.composer.definition.UdfDef;
Expand Down Expand Up @@ -46,7 +47,8 @@ public DataStream<Event> translatePreTransform(
DataStream<Event> input,
List<TransformDef> transforms,
List<UdfDef> udfFunctions,
List<ModelDef> models) {
List<ModelDef> models,
SupportedMetadataColumn[] supportedMetadataColumns) {
if (transforms.isEmpty()) {
return input;
}
Expand All @@ -61,7 +63,8 @@ public DataStream<Event> translatePreTransform(
transform.getPrimaryKeys(),
transform.getPartitionKeys(),
transform.getTableOptions(),
transform.getPostTransformConverter());
transform.getPostTransformConverter(),
supportedMetadataColumns);
}

preTransformFunctionBuilder.addUdfFunctions(
Expand All @@ -77,7 +80,8 @@ public DataStream<Event> translatePostTransform(
List<TransformDef> transforms,
String timezone,
List<UdfDef> udfFunctions,
List<ModelDef> models) {
List<ModelDef> models,
SupportedMetadataColumn[] supportedMetadataColumns) {
if (transforms.isEmpty()) {
return input;
}
Expand All @@ -93,7 +97,8 @@ public DataStream<Event> translatePostTransform(
transform.getPrimaryKeys(),
transform.getPartitionKeys(),
transform.getTableOptions(),
transform.getPostTransformConverter());
transform.getPostTransformConverter(),
supportedMetadataColumns);
}
}
postTransformFunctionBuilder.addTimezone(timezone);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,13 +351,13 @@ void testTransform(ValuesDataSink.SinkApi sinkApi) throws Exception {
assertThat(outputEvents)
.containsExactly(
"CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING,`col12` STRING}, primaryKeys=col1, partitionKeys=col12, options=({key1=value1})}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, 10], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, 20], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, 10], op=INSERT, meta=({op_ts=1})}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, 20], op=INSERT, meta=({op_ts=2})}",
"AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=AFTER, existedColumnName=col2}]}",
"RenameColumnEvent{tableId=default_namespace.default_schema.table1, nameMapping={col2=newCol2, col3=newCol3}}",
"DropColumnEvent{tableId=default_namespace.default_schema.table1, droppedColumnNames=[newCol2]}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1, 10], after=[], op=DELETE, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, , 20], after=[2, x, 20], op=UPDATE, meta=()}");
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1, 10], after=[], op=DELETE, meta=({op_ts=4})}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, , 20], after=[2, x, 20], op=UPDATE, meta=({op_ts=5})}");
}

@ParameterizedTest
Expand All @@ -383,7 +383,7 @@ void testOpTypeMetadataColumn(ValuesDataSink.SinkApi sinkApi) throws Exception {
TransformDef transformDef =
new TransformDef(
"default_namespace.default_schema.table1",
"*,concat(col1,'0') as col12,__data_event_type__ as rk",
"*,concat(col1,'0') as col12,__data_event_type__ as rk,op_ts as opts",
"col1 <> '3'",
"col1",
"col12",
Expand Down Expand Up @@ -413,14 +413,14 @@ void testOpTypeMetadataColumn(ValuesDataSink.SinkApi sinkApi) throws Exception {
String[] outputEvents = outCaptor.toString().trim().split("\n");
assertThat(outputEvents)
.containsExactly(
"CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING,`col12` STRING,`rk` STRING NOT NULL}, primaryKeys=col1, partitionKeys=col12, options=({key1=value1})}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, 10, +I], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, 20, +I], op=INSERT, meta=()}",
"CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING,`col12` STRING,`rk` STRING NOT NULL,`opts` BIGINT NOT NULL}, primaryKeys=col1, partitionKeys=col12, options=({key1=value1})}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, 10, +I, 1], op=INSERT, meta=({op_ts=1})}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, 20, +I, 2], op=INSERT, meta=({op_ts=2})}",
"AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=AFTER, existedColumnName=col2}]}",
"RenameColumnEvent{tableId=default_namespace.default_schema.table1, nameMapping={col2=newCol2, col3=newCol3}}",
"DropColumnEvent{tableId=default_namespace.default_schema.table1, droppedColumnNames=[newCol2]}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1, 10, -D], after=[], op=DELETE, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, , 20, -U], after=[2, x, 20, +U], op=UPDATE, meta=()}");
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1, 10, -D, 4], after=[], op=DELETE, meta=({op_ts=4})}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, , 20, -U, 5], after=[2, x, 20, +U, 5], op=UPDATE, meta=({op_ts=5})}");
}

@ParameterizedTest
Expand Down Expand Up @@ -486,13 +486,13 @@ void testTransformTwice(ValuesDataSink.SinkApi sinkApi) throws Exception {
assertThat(outputEvents)
.containsExactly(
"CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING,`col12` STRING}, primaryKeys=col1, partitionKeys=col12, options=({key1=value1})}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, 11], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, 22], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, 11], op=INSERT, meta=({op_ts=1})}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, 22], op=INSERT, meta=({op_ts=2})}",
"AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=AFTER, existedColumnName=col2}]}",
"RenameColumnEvent{tableId=default_namespace.default_schema.table1, nameMapping={col2=newCol2, col3=newCol3}}",
"DropColumnEvent{tableId=default_namespace.default_schema.table1, droppedColumnNames=[newCol2]}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1, 11], after=[], op=DELETE, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, , 22], after=[2, x, 22], op=UPDATE, meta=()}");
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1, 11], after=[], op=DELETE, meta=({op_ts=4})}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, , 22], after=[2, x, 22], op=UPDATE, meta=({op_ts=5})}");
}

@Test
Expand Down
Loading
Loading