Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 71 additions & 22 deletions docs/content/overview/cdc-pipeline.md
Original file line number Diff line number Diff line change
@@ -1,62 +1,83 @@
# CDC Streaming ELT Framework

## What is CDC Streaming ELT Framework
CDC Streaming ELT Framework is a stream data integration framework that aims to provide users with a more robust API. It allows users to configure their data synchronization logic through customized Flink operators and job submission tools. The framework prioritizes optimizing the task submission process and offers enhanced functionalities such as whole database synchronization, sharding, and schema change synchronization.

CDC Streaming ELT Framework is a stream data integration framework that aims to provide users with a more robust API. It
allows users to configure their data synchronization logic through customized Flink operators and job submission tools.
The framework prioritizes optimizing the task submission process and offers enhanced functionalities such as whole
database synchronization, sharding, and schema change synchronization.

## What can CDC Streaming ELT Framework do?

![CDC Architecture](/_static/fig/architecture.png "CDC Architecture")

* ✅ End-to-end data integration framework
* ✅ API for data integration users to build jobs easily
* ✅ Multi-table support in Source / Sink
* ✅ Synchronization of entire databases
* ✅ Synchronization of entire databases
* ✅ Schema evolution capability


## Supported Connectors

| Connector | Database |
|----------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| [doris-pipeline](../pipelines/doris-pipeline.md) | <li> [Doris](https://doris.apache.org/): 1.2.x, 2.x.x |
| [mysql-pipeline](../pipelines/mysql-pipeline.md) | <li> [MySQL](https://dev.mysql.com/doc): 5.6, 5.7, 8.0.x <li> [RDS MySQL](https://www.aliyun.com/product/rds/mysql): 5.6, 5.7, 8.0.x <li> [PolarDB MySQL](https://www.aliyun.com/product/polardb): 5.6, 5.7, 8.0.x <li> [Aurora MySQL](https://aws.amazon.com/cn/rds/aurora): 5.6, 5.7, 8.0.x <li> [MariaDB](https://mariadb.org): 10.x <li> [PolarDB X](https://github.com/ApsaraDB/galaxysql): 2.0.1 |
| [starrocks-pipeline](../pipelines/starrocks-pipeline.md) | <li> [StarRocks](https://www.starrocks.io/): 2.x, 3.x |
| [starrocks-pipeline](../pipelines/starrocks-pipeline.md) | <li> [StarRocks](https://www.starrocks.io/): 2.x, 3.x |

## Supported Flink Versions

The following table shows the version mapping between Flink<sup>®</sup> CDC Pipeline and Flink<sup>®</sup>:

| Flink<sup>®</sup> CDC Version | Flink<sup>®</sup> Version |
|:-----------------------------------:|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------:|
| Flink<sup>®</sup> CDC Version | Flink<sup>®</sup> Version |
|:-----------------------------------:|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------:|
| <font color="DarkCyan">3.0.*</font> | <font color="MediumVioletRed">1.14.\*</font>, <font color="MediumVioletRed">1.15.\*</font>, <font color="MediumVioletRed">1.16.\*</font>, <font color="MediumVioletRed">1.17.\*</font>, <font color="MediumVioletRed">1.18.\*</font> |

## Core Concepts

![CDC Design](/_static/fig/design.png "CDC Design")

The data types flowing in the Flink CDC 3.0 framework are referred to as **Event**, which represent the change events generated by external systems.
Each event is marked with a **Table ID** for which the change occurred. Events are categorized into `SchemaChangeEvent` and `DataChangeEvent`, representing changes in table structure and data respectively.
The data types flowing in the Flink CDC 3.0 framework are referred to as **Event**, which represent the change events
generated by external systems.
Each event is marked with a **Table ID** for which the change occurred. Events are categorized into `SchemaChangeEvent`
and `DataChangeEvent`, representing changes in table structure and data respectively.

**Data Source** Connector captures the changes in external systems and converts them into events as the output of the synchronization task. It also provides a `MetadataAccessor` for the framework to read the metadata of the external systems.
**Data Source** Connector captures the changes in external systems and converts them into events as the output of the
synchronization task. It also provides a `MetadataAccessor` for the framework to read the metadata of the external
systems.

**Data Sink** connector receives the change events from **Data Source** and applies them to the external systems. Additionally, `MetadataApplier` is used to apply metadata changes from the source system to the target system.
**Data Sink** connector receives the change events from **Data Source** and applies them to the external systems.
Additionally, `MetadataApplier` is used to apply metadata changes from the source system to the target system.

Since events flow from the upstream to the downstream in a pipeline manner, the data synchronization task is referred as a **Data Pipeline**. A **Data Pipeline** consists of a **Data Source**, **Route**, **Transform** and **Data Sink**. The transform can add extra content to events, and the router can remap the `Table ID`s corresponding to events.
Since events flow from the upstream to the downstream in a pipeline manner, the data synchronization task is referred as
a **Data Pipeline**. A **Data Pipeline** consists of a **Data Source**, **Route**, **Transform** and **Data Sink**. The
transform can add extra content to events, and the router can remap the `Table ID`s corresponding to events.

Now let's introduce more details about the concepts you need to know when using the CDC Streaming ELT Framework.

### Table ID
When connecting to external systems, it is necessary to establish a mapping relationship with the storage objects of the external system. This is what `Table Id` refers to.

To be compatible with most external systems, the `Table ID` is represented by a 3-tuple : (namespace, schemaName, table). Connectors need to establish the mapping between Table ID and storage objects in external systems.
For instance, a table in MySQL/Doris is mapped to (null, database, table) and a topic in a message queue system such as Kafka is mapped to (null, null, topic).
When connecting to external systems, it is necessary to establish a mapping relationship with the storage objects of the
external system. This is what `Table Id` refers to.

To be compatible with most external systems, the `Table ID` is represented by a 3-tuple : (namespace, schemaName,
table). Connectors need to establish the mapping between Table ID and storage objects in external systems.
For instance, a table in MySQL/Doris is mapped to (null, database, table) and a topic in a message queue system such as
Kafka is mapped to (null, null, topic).

### Data Source

Data Source is used to access metadata and read the changed data from external systems.
A Data Source can read data from multiple tables simultaneously.

To describe a data source, the follows are required:

* Type: The type of the source, such as MySQL, Postgres.
* Name: The name of the source, which is user-defined (optional, with a default value provided).
* Other custom configurations for the source.

For example, we could use `yaml` files to define a mysql source

```yaml
source:
type: mysql
Expand All @@ -69,47 +90,75 @@ source:
```

### Data Sink
The Data Sink is used to apply schema changes and write change data to external systems. A Data Sink can write to multiple tables simultaneously.

The Data Sink is used to apply schema changes and write change data to external systems. A Data Sink can write to
multiple tables simultaneously.

To describe a data sink, the follows are required:

* Type: The type of the sink, such as MySQL or PostgreSQL.
* Name: The name of the sink, which is user-defined (optional, with a default value provided).
* Other custom configurations for the sink.

For example, we can use this `yaml` file to define a kafka sink:

```yaml
sink:
type: kafka
name: mysink-queue # Optional parameter for description purpose
name: mysink-queue # Optional parameter for description purpose
bootstrap-servers: localhost:9092
auto-create-table: true # Optional parameter for advanced functionalities
auto-create-table: true # Optional parameter for advanced functionalities
```

### Route
Route specifies the target table ID of each event.
The most typical scenario is the merge of sub-databases and sub-tables, routing multiple upstream source tables to the same sink table

Route specifies the target table ID of each event.
The most typical scenario is the merge of sub-databases and sub-tables, routing multiple upstream source tables to the
same sink table

To describe a route, the follows are required:

* source-table: Source table id, supports regular expressions
* sink-table: Sink table id, supports regular expressions
* escription: Routing rule description(optional, default value provided)
* replace-symbol: The placeholder used to replace the source table name in the sink table name, default value is '<>'
* description: Routing rule description(optional, default value provided)

For example, if synchronize the table 'web_order' in the database 'mydb' to a Kafka topic 'ods_web_order', we can use
this yaml file to define this route:

For example, if synchronize the table 'web_order' in the database 'mydb' to a Kafka topic 'ods_web_order', we can use this yaml file to define this route:
```yaml
route:
source-table: mydb.default.web_order
sink-table: ods_web_order
description: sync table to one destination table with given prefix ods_
```

And if we want to synchronize the table with prefix 'app_' in the database 'mydb' to a Kafka topic with prefix 'ods_',
we can use this yaml file to define this route:

```yaml
route:
source-table: mydb.default.app_\.*
sink-table: ods_<>
replace-symbol: <>
description: sync table to one destination table with given prefix ods_
```

Using this example, the two table names, app_table1 and app_table2 are replaced with two topic names, ods_table1 and
ods_table2.

### Data Pipeline
Since events flow from the upstream to the downstream in a pipeline manner, the data synchronization task is also referred as a Data Pipeline.

Since events flow from the upstream to the downstream in a pipeline manner, the data synchronization task is also
referred as a Data Pipeline.

To describe a Data Pipeline, the follows are required:

* Name: The name of the pipeline, which will be submitted to the Flink cluster as the job name.
* Other advanced capabilities such as automatic table creation, schema evolution, etc., will be implemented.

For example, we can use this yaml file to define a pipeline:

```yaml
pipeline:
name: mysql-to-kafka-pipeline
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser {
// Route keys
private static final String ROUTE_SOURCE_TABLE_KEY = "source-table";
private static final String ROUTE_SINK_TABLE_KEY = "sink-table";
private static final String ROUTE_REPLACE_SYMBOL = "replace-symbol";
private static final String ROUTE_DESCRIPTION_KEY = "description";

private final ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
Expand Down Expand Up @@ -140,11 +141,15 @@ private RouteDef toRouteDef(JsonNode routeNode) {
"Missing required field \"%s\" in route configuration",
ROUTE_SINK_TABLE_KEY)
.asText();
String replaceSymbol =
Optional.ofNullable(routeNode.get(ROUTE_REPLACE_SYMBOL))
.map(JsonNode::asText)
.orElse(null);
String description =
Optional.ofNullable(routeNode.get(ROUTE_DESCRIPTION_KEY))
.map(JsonNode::asText)
.orElse(null);
return new RouteDef(sourceTable, sinkTable, description);
return new RouteDef(sourceTable, sinkTable, replaceSymbol, description);
}

private Configuration toPipelineConfig(JsonNode pipelineConfigNode) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

import static com.ververica.cdc.common.pipeline.PipelineOptions.PIPELINE_LOCAL_TIME_ZONE;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -168,15 +169,7 @@ void testInvalidTimeZone() throws Exception {
.put("bootstrap-servers", "localhost:9092")
.put("auto-create-table", "true")
.build())),
Arrays.asList(
new RouteDef(
"mydb.default.app_order_.*",
"odsdb.default.app_order",
"sync all sharding tables to one"),
new RouteDef(
"mydb.default.web_order",
"odsdb.default.ods_web_order",
"sync table to with given prefix ods_")),
getRouteDefList(),
null,
Configuration.fromMap(
ImmutableMap.<String, String>builder()
Expand Down Expand Up @@ -212,15 +205,7 @@ void testInvalidTimeZone() throws Exception {
.put("bootstrap-servers", "localhost:9092")
.put("auto-create-table", "true")
.build())),
Arrays.asList(
new RouteDef(
"mydb.default.app_order_.*",
"odsdb.default.app_order",
"sync all sharding tables to one"),
new RouteDef(
"mydb.default.web_order",
"odsdb.default.ods_web_order",
"sync table to with given prefix ods_")),
getRouteDefList(),
null,
Configuration.fromMap(
ImmutableMap.<String, String>builder()
Expand All @@ -230,6 +215,30 @@ void testInvalidTimeZone() throws Exception {
.put("foo", "bar")
.build()));

private static List<RouteDef> getRouteDefList() {
return Arrays.asList(
new RouteDef(
"mydb.default.app_order_.*",
"odsdb.default.app_order",
null,
"sync all sharding tables to one"),
new RouteDef(
"mydb.default.web_order",
"odsdb.default.ods_web_order",
null,
"sync table to with given prefix ods_"),
new RouteDef(
"mydb.sharding.sharding_order",
"odsdb.default.<>",
null,
"sync table to different schema with original table name"),
new RouteDef(
"mydb.sharding.busi_cust_info",
"odsdb.default.dim_c<>",
"c<>",
"sync table to different schema with customized replace symbol"));
}

private final PipelineDef defWithOptional =
new PipelineDef(
new SourceDef(
Expand All @@ -254,7 +263,7 @@ void testInvalidTimeZone() throws Exception {
.build())),
Collections.singletonList(
new RouteDef(
"mydb.default.app_order_.*", "odsdb.default.app_order", null)),
"mydb.default.app_order_.*", "odsdb.default.app_order", null,null)),
null,
Configuration.fromMap(
ImmutableMap.<String, String>builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ route:
- source-table: mydb.default.web_order
sink-table: odsdb.default.ods_web_order
description: sync table to with given prefix ods_
- source-table: mydb.sharding.sharding_order
sink-table: odsdb.default.<>
description: sync table to different schema with original table name
- source-table: mydb.sharding.busi_cust_info
sink-table: odsdb.default.dim_c<>
replace-symbol: c<>
description: sync table to different schema with customized replace symbol

transform:
- source-table: mydb.app_order_.*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,17 @@ public static TableId parse(String tableId) {
throw new IllegalArgumentException("Invalid tableId: " + tableId);
}

public static TableId parse(String namespace, String schemaName, String tableName) {
if (namespace != null) {
return tableId(namespace, schemaName, tableName);
} else if (schemaName != null) {
return tableId(schemaName, tableName);
} else if (tableName != null) {
return tableId(tableName);
}
throw new IllegalArgumentException("Invalid tableName: " + tableName);
}

public String identifier() {
if (namespace == null || namespace.isEmpty()) {
if (schemaName == null || schemaName.isEmpty()) {
Expand Down
Loading