diff --git a/docs/content/overview/cdc-pipeline.md b/docs/content/overview/cdc-pipeline.md index 60b99d2e2e0..376fb269594 100644 --- a/docs/content/overview/cdc-pipeline.md +++ b/docs/content/overview/cdc-pipeline.md @@ -1,62 +1,83 @@ # CDC Streaming ELT Framework ## What is CDC Streaming ELT Framework -CDC Streaming ELT Framework is a stream data integration framework that aims to provide users with a more robust API. It allows users to configure their data synchronization logic through customized Flink operators and job submission tools. The framework prioritizes optimizing the task submission process and offers enhanced functionalities such as whole database synchronization, sharding, and schema change synchronization. + +CDC Streaming ELT Framework is a stream data integration framework that aims to provide users with a more robust API. It +allows users to configure their data synchronization logic through customized Flink operators and job submission tools. +The framework prioritizes optimizing the task submission process and offers enhanced functionalities such as whole +database synchronization, sharding, and schema change synchronization. ## What can CDC Streaming ELT Framework do? + ![CDC Architecture](/_static/fig/architecture.png "CDC Architecture") + * ✅ End-to-end data integration framework * ✅ API for data integration users to build jobs easily * ✅ Multi-table support in Source / Sink -* ✅ Synchronization of entire databases +* ✅ Synchronization of entire databases * ✅ Schema evolution capability - ## Supported Connectors | Connector | Database | |----------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | [doris-pipeline](../pipelines/doris-pipeline.md) |
  • [Doris](https://doris.apache.org/): 1.2.x, 2.x.x | | [mysql-pipeline](../pipelines/mysql-pipeline.md) |
  • [MySQL](https://dev.mysql.com/doc): 5.6, 5.7, 8.0.x
  • [RDS MySQL](https://www.aliyun.com/product/rds/mysql): 5.6, 5.7, 8.0.x
  • [PolarDB MySQL](https://www.aliyun.com/product/polardb): 5.6, 5.7, 8.0.x
  • [Aurora MySQL](https://aws.amazon.com/cn/rds/aurora): 5.6, 5.7, 8.0.x
  • [MariaDB](https://mariadb.org): 10.x
  • [PolarDB X](https://github.com/ApsaraDB/galaxysql): 2.0.1 | -| [starrocks-pipeline](../pipelines/starrocks-pipeline.md) |
  • [StarRocks](https://www.starrocks.io/): 2.x, 3.x | +| [starrocks-pipeline](../pipelines/starrocks-pipeline.md) |
  • [StarRocks](https://www.starrocks.io/): 2.x, 3.x | ## Supported Flink Versions + The following table shows the version mapping between Flink® CDC Pipeline and Flink®: -| Flink® CDC Version | Flink® Version | -|:-----------------------------------:|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------:| +| Flink® CDC Version | Flink® Version | +|:-----------------------------------:|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------:| | 3.0.* | 1.14.\*, 1.15.\*, 1.16.\*, 1.17.\*, 1.18.\* | ## Core Concepts + ![CDC Design](/_static/fig/design.png "CDC Design") -The data types flowing in the Flink CDC 3.0 framework are referred to as **Event**, which represent the change events generated by external systems. -Each event is marked with a **Table ID** for which the change occurred. Events are categorized into `SchemaChangeEvent` and `DataChangeEvent`, representing changes in table structure and data respectively. +The data types flowing in the Flink CDC 3.0 framework are referred to as **Event**, which represent the change events +generated by external systems. +Each event is marked with a **Table ID** for which the change occurred. Events are categorized into `SchemaChangeEvent` +and `DataChangeEvent`, representing changes in table structure and data respectively. -**Data Source** Connector captures the changes in external systems and converts them into events as the output of the synchronization task. It also provides a `MetadataAccessor` for the framework to read the metadata of the external systems. +**Data Source** Connector captures the changes in external systems and converts them into events as the output of the +synchronization task. It also provides a `MetadataAccessor` for the framework to read the metadata of the external +systems. -**Data Sink** connector receives the change events from **Data Source** and applies them to the external systems. Additionally, `MetadataApplier` is used to apply metadata changes from the source system to the target system. +**Data Sink** connector receives the change events from **Data Source** and applies them to the external systems. +Additionally, `MetadataApplier` is used to apply metadata changes from the source system to the target system. -Since events flow from the upstream to the downstream in a pipeline manner, the data synchronization task is referred as a **Data Pipeline**. A **Data Pipeline** consists of a **Data Source**, **Route**, **Transform** and **Data Sink**. The transform can add extra content to events, and the router can remap the `Table ID`s corresponding to events. +Since events flow from the upstream to the downstream in a pipeline manner, the data synchronization task is referred as +a **Data Pipeline**. A **Data Pipeline** consists of a **Data Source**, **Route**, **Transform** and **Data Sink**. The +transform can add extra content to events, and the router can remap the `Table ID`s corresponding to events. Now let's introduce more details about the concepts you need to know when using the CDC Streaming ELT Framework. ### Table ID -When connecting to external systems, it is necessary to establish a mapping relationship with the storage objects of the external system. This is what `Table Id` refers to. -To be compatible with most external systems, the `Table ID` is represented by a 3-tuple : (namespace, schemaName, table). Connectors need to establish the mapping between Table ID and storage objects in external systems. -For instance, a table in MySQL/Doris is mapped to (null, database, table) and a topic in a message queue system such as Kafka is mapped to (null, null, topic). +When connecting to external systems, it is necessary to establish a mapping relationship with the storage objects of the +external system. This is what `Table Id` refers to. + +To be compatible with most external systems, the `Table ID` is represented by a 3-tuple : (namespace, schemaName, +table). Connectors need to establish the mapping between Table ID and storage objects in external systems. +For instance, a table in MySQL/Doris is mapped to (null, database, table) and a topic in a message queue system such as +Kafka is mapped to (null, null, topic). ### Data Source + Data Source is used to access metadata and read the changed data from external systems. A Data Source can read data from multiple tables simultaneously. To describe a data source, the follows are required: + * Type: The type of the source, such as MySQL, Postgres. * Name: The name of the source, which is user-defined (optional, with a default value provided). * Other custom configurations for the source. For example, we could use `yaml` files to define a mysql source + ```yaml source: type: mysql @@ -69,32 +90,42 @@ source: ``` ### Data Sink -The Data Sink is used to apply schema changes and write change data to external systems. A Data Sink can write to multiple tables simultaneously. + +The Data Sink is used to apply schema changes and write change data to external systems. A Data Sink can write to +multiple tables simultaneously. To describe a data sink, the follows are required: + * Type: The type of the sink, such as MySQL or PostgreSQL. * Name: The name of the sink, which is user-defined (optional, with a default value provided). * Other custom configurations for the sink. For example, we can use this `yaml` file to define a kafka sink: + ```yaml sink: type: kafka - name: mysink-queue # Optional parameter for description purpose + name: mysink-queue # Optional parameter for description purpose bootstrap-servers: localhost:9092 - auto-create-table: true # Optional parameter for advanced functionalities + auto-create-table: true # Optional parameter for advanced functionalities ``` ### Route -Route specifies the target table ID of each event. -The most typical scenario is the merge of sub-databases and sub-tables, routing multiple upstream source tables to the same sink table + +Route specifies the target table ID of each event. +The most typical scenario is the merge of sub-databases and sub-tables, routing multiple upstream source tables to the +same sink table To describe a route, the follows are required: + * source-table: Source table id, supports regular expressions * sink-table: Sink table id, supports regular expressions -* escription: Routing rule description(optional, default value provided) +* replace-symbol: The placeholder used to replace the source table name in the sink table name, default value is '<>' +* description: Routing rule description(optional, default value provided) + +For example, if synchronize the table 'web_order' in the database 'mydb' to a Kafka topic 'ods_web_order', we can use +this yaml file to define this route: -For example, if synchronize the table 'web_order' in the database 'mydb' to a Kafka topic 'ods_web_order', we can use this yaml file to define this route: ```yaml route: source-table: mydb.default.web_order @@ -102,14 +133,32 @@ route: description: sync table to one destination table with given prefix ods_ ``` +And if we want to synchronize the table with prefix 'app_' in the database 'mydb' to a Kafka topic with prefix 'ods_', +we can use this yaml file to define this route: + +```yaml +route: + source-table: mydb.default.app_\.* + sink-table: ods_<> + replace-symbol: <> + description: sync table to one destination table with given prefix ods_ +``` + +Using this example, the two table names, app_table1 and app_table2 are replaced with two topic names, ods_table1 and +ods_table2. + ### Data Pipeline -Since events flow from the upstream to the downstream in a pipeline manner, the data synchronization task is also referred as a Data Pipeline. + +Since events flow from the upstream to the downstream in a pipeline manner, the data synchronization task is also +referred as a Data Pipeline. To describe a Data Pipeline, the follows are required: + * Name: The name of the pipeline, which will be submitted to the Flink cluster as the job name. * Other advanced capabilities such as automatic table creation, schema evolution, etc., will be implemented. For example, we can use this yaml file to define a pipeline: + ```yaml pipeline: name: mysql-to-kafka-pipeline diff --git a/flink-cdc-cli/src/main/java/com/ververica/cdc/cli/parser/YamlPipelineDefinitionParser.java b/flink-cdc-cli/src/main/java/com/ververica/cdc/cli/parser/YamlPipelineDefinitionParser.java index ecf60f9cbe7..1873c5c2717 100644 --- a/flink-cdc-cli/src/main/java/com/ververica/cdc/cli/parser/YamlPipelineDefinitionParser.java +++ b/flink-cdc-cli/src/main/java/com/ververica/cdc/cli/parser/YamlPipelineDefinitionParser.java @@ -51,6 +51,7 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser { // Route keys private static final String ROUTE_SOURCE_TABLE_KEY = "source-table"; private static final String ROUTE_SINK_TABLE_KEY = "sink-table"; + private static final String ROUTE_REPLACE_SYMBOL = "replace-symbol"; private static final String ROUTE_DESCRIPTION_KEY = "description"; private final ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); @@ -140,11 +141,15 @@ private RouteDef toRouteDef(JsonNode routeNode) { "Missing required field \"%s\" in route configuration", ROUTE_SINK_TABLE_KEY) .asText(); + String replaceSymbol = + Optional.ofNullable(routeNode.get(ROUTE_REPLACE_SYMBOL)) + .map(JsonNode::asText) + .orElse(null); String description = Optional.ofNullable(routeNode.get(ROUTE_DESCRIPTION_KEY)) .map(JsonNode::asText) .orElse(null); - return new RouteDef(sourceTable, sinkTable, description); + return new RouteDef(sourceTable, sinkTable, replaceSymbol, description); } private Configuration toPipelineConfig(JsonNode pipelineConfigNode) { diff --git a/flink-cdc-cli/src/test/java/com/ververica/cdc/cli/parser/YamlPipelineDefinitionParserTest.java b/flink-cdc-cli/src/test/java/com/ververica/cdc/cli/parser/YamlPipelineDefinitionParserTest.java index 045b1fe7a7e..ea54dd95222 100644 --- a/flink-cdc-cli/src/test/java/com/ververica/cdc/cli/parser/YamlPipelineDefinitionParserTest.java +++ b/flink-cdc-cli/src/test/java/com/ververica/cdc/cli/parser/YamlPipelineDefinitionParserTest.java @@ -30,6 +30,7 @@ import java.nio.file.Paths; import java.util.Arrays; import java.util.Collections; +import java.util.List; import static com.ververica.cdc.common.pipeline.PipelineOptions.PIPELINE_LOCAL_TIME_ZONE; import static org.assertj.core.api.Assertions.assertThat; @@ -168,15 +169,7 @@ void testInvalidTimeZone() throws Exception { .put("bootstrap-servers", "localhost:9092") .put("auto-create-table", "true") .build())), - Arrays.asList( - new RouteDef( - "mydb.default.app_order_.*", - "odsdb.default.app_order", - "sync all sharding tables to one"), - new RouteDef( - "mydb.default.web_order", - "odsdb.default.ods_web_order", - "sync table to with given prefix ods_")), + getRouteDefList(), null, Configuration.fromMap( ImmutableMap.builder() @@ -212,15 +205,7 @@ void testInvalidTimeZone() throws Exception { .put("bootstrap-servers", "localhost:9092") .put("auto-create-table", "true") .build())), - Arrays.asList( - new RouteDef( - "mydb.default.app_order_.*", - "odsdb.default.app_order", - "sync all sharding tables to one"), - new RouteDef( - "mydb.default.web_order", - "odsdb.default.ods_web_order", - "sync table to with given prefix ods_")), + getRouteDefList(), null, Configuration.fromMap( ImmutableMap.builder() @@ -230,6 +215,30 @@ void testInvalidTimeZone() throws Exception { .put("foo", "bar") .build())); + private static List getRouteDefList() { + return Arrays.asList( + new RouteDef( + "mydb.default.app_order_.*", + "odsdb.default.app_order", + null, + "sync all sharding tables to one"), + new RouteDef( + "mydb.default.web_order", + "odsdb.default.ods_web_order", + null, + "sync table to with given prefix ods_"), + new RouteDef( + "mydb.sharding.sharding_order", + "odsdb.default.<>", + null, + "sync table to different schema with original table name"), + new RouteDef( + "mydb.sharding.busi_cust_info", + "odsdb.default.dim_c<>", + "c<>", + "sync table to different schema with customized replace symbol")); + } + private final PipelineDef defWithOptional = new PipelineDef( new SourceDef( @@ -254,7 +263,7 @@ void testInvalidTimeZone() throws Exception { .build())), Collections.singletonList( new RouteDef( - "mydb.default.app_order_.*", "odsdb.default.app_order", null)), + "mydb.default.app_order_.*", "odsdb.default.app_order", null,null)), null, Configuration.fromMap( ImmutableMap.builder() diff --git a/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-full.yaml b/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-full.yaml index a719b45fff5..cf4e9ba9429 100644 --- a/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-full.yaml +++ b/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-full.yaml @@ -36,6 +36,13 @@ route: - source-table: mydb.default.web_order sink-table: odsdb.default.ods_web_order description: sync table to with given prefix ods_ + - source-table: mydb.sharding.sharding_order + sink-table: odsdb.default.<> + description: sync table to different schema with original table name + - source-table: mydb.sharding.busi_cust_info + sink-table: odsdb.default.dim_c<> + replace-symbol: c<> + description: sync table to different schema with customized replace symbol transform: - source-table: mydb.app_order_.* diff --git a/flink-cdc-common/src/main/java/com/ververica/cdc/common/event/TableId.java b/flink-cdc-common/src/main/java/com/ververica/cdc/common/event/TableId.java index 9d3b5d7ff85..3064e618afa 100644 --- a/flink-cdc-common/src/main/java/com/ververica/cdc/common/event/TableId.java +++ b/flink-cdc-common/src/main/java/com/ververica/cdc/common/event/TableId.java @@ -84,6 +84,17 @@ public static TableId parse(String tableId) { throw new IllegalArgumentException("Invalid tableId: " + tableId); } + public static TableId parse(String namespace, String schemaName, String tableName) { + if (namespace != null) { + return tableId(namespace, schemaName, tableName); + } else if (schemaName != null) { + return tableId(schemaName, tableName); + } else if (tableName != null) { + return tableId(tableName); + } + throw new IllegalArgumentException("Invalid tableName: " + tableName); + } + public String identifier() { if (namespace == null || namespace.isEmpty()) { if (schemaName == null || schemaName.isEmpty()) { diff --git a/flink-cdc-composer/src/main/java/com/ververica/cdc/composer/definition/RouteDef.java b/flink-cdc-composer/src/main/java/com/ververica/cdc/composer/definition/RouteDef.java index 32b705ab4e7..7e0e3cd6956 100644 --- a/flink-cdc-composer/src/main/java/com/ververica/cdc/composer/definition/RouteDef.java +++ b/flink-cdc-composer/src/main/java/com/ververica/cdc/composer/definition/RouteDef.java @@ -35,11 +35,24 @@ public class RouteDef { private final String sourceTable; private final String sinkTable; + @Nullable private final String replaceSymbol; @Nullable private final String description; + public RouteDef( + String sourceTable, + String sinkTable, + @Nullable String replaceSymbol, + @Nullable String description) { + this.sourceTable = sourceTable; + this.sinkTable = sinkTable; + this.replaceSymbol = replaceSymbol; + this.description = description; + } + public RouteDef(String sourceTable, String sinkTable, @Nullable String description) { this.sourceTable = sourceTable; this.sinkTable = sinkTable; + this.replaceSymbol = "<>"; this.description = description; } @@ -51,6 +64,11 @@ public String getSinkTable() { return sinkTable; } + @Nullable + public String getReplaceSymbol() { + return replaceSymbol; + } + public Optional getDescription() { return Optional.ofNullable(description); } @@ -62,6 +80,8 @@ public String toString() { + sourceTable + ", sinkTable=" + sinkTable + + ", replaceSymbol=" + + replaceSymbol + ", description='" + description + '\'' @@ -79,11 +99,12 @@ public boolean equals(Object o) { RouteDef routeDef = (RouteDef) o; return Objects.equals(sourceTable, routeDef.sourceTable) && Objects.equals(sinkTable, routeDef.sinkTable) + && Objects.equals(replaceSymbol, routeDef.replaceSymbol) && Objects.equals(description, routeDef.description); } @Override public int hashCode() { - return Objects.hash(sourceTable, sinkTable, description); + return Objects.hash(sourceTable, sinkTable, replaceSymbol, description); } } diff --git a/flink-cdc-composer/src/main/java/com/ververica/cdc/composer/flink/translator/RouteTranslator.java b/flink-cdc-composer/src/main/java/com/ververica/cdc/composer/flink/translator/RouteTranslator.java index 35d5797f272..dcc46407532 100644 --- a/flink-cdc-composer/src/main/java/com/ververica/cdc/composer/flink/translator/RouteTranslator.java +++ b/flink-cdc-composer/src/main/java/com/ververica/cdc/composer/flink/translator/RouteTranslator.java @@ -35,7 +35,9 @@ public DataStream translate(DataStream input, List route RouteFunction.Builder routeFunctionBuilder = RouteFunction.newBuilder(); for (RouteDef route : routes) { routeFunctionBuilder.addRoute( - route.getSourceTable(), TableId.parse(route.getSinkTable())); + route.getSourceTable(), + route.getReplaceSymbol(), + TableId.parse(route.getSinkTable())); } return input.map(routeFunctionBuilder.build(), new EventTypeInfo()).name("Route"); } diff --git a/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/route/RouteFunction.java b/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/route/RouteFunction.java index 3d8d1c5b320..1e88feaf647 100644 --- a/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/route/RouteFunction.java +++ b/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/route/RouteFunction.java @@ -17,7 +17,7 @@ package com.ververica.cdc.runtime.operators.route; import org.apache.flink.api.common.functions.RichMapFunction; -import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; import com.ververica.cdc.common.event.AddColumnEvent; @@ -40,8 +40,10 @@ /** A map function that applies user-defined routing logics. */ public class RouteFunction extends RichMapFunction { - private final List> routingRules; - private transient List> routes; + private final List> routingRules; + private transient List> routes; + + private static final String DEFAULT_TABLE_NAME_REPLACE_SYMBOL = "<>"; public static Builder newBuilder() { return new Builder(); @@ -49,11 +51,19 @@ public static Builder newBuilder() { /** Builder of {@link RouteFunction}. */ public static class Builder { - private final List> routingRules = new ArrayList<>(); + private final List> routingRules = new ArrayList<>(); + + public Builder addRoute(String tableInclusions, String replaceSymbol, TableId replaceBy) { + String tableNameReplaceSymbol = + replaceSymbol == null || replaceSymbol.isEmpty() + ? DEFAULT_TABLE_NAME_REPLACE_SYMBOL + : replaceSymbol; + routingRules.add(Tuple3.of(tableInclusions, tableNameReplaceSymbol, replaceBy)); + return this; + } public Builder addRoute(String tableInclusions, TableId replaceBy) { - routingRules.add(Tuple2.of(tableInclusions, replaceBy)); - return this; + return addRoute(tableInclusions, null, replaceBy); } public RouteFunction build() { @@ -61,7 +71,7 @@ public RouteFunction build() { } } - private RouteFunction(List> routingRules) { + private RouteFunction(List> routingRules) { this.routingRules = routingRules; } @@ -72,12 +82,14 @@ public void open(Configuration parameters) throws Exception { .map( tuple2 -> { String tableInclusions = tuple2.f0; - TableId replaceBy = tuple2.f1; + String tableNameReplaceSymbol = tuple2.f1; + TableId replaceBy = tuple2.f2; Selectors selectors = new Selectors.SelectorsBuilder() .includeTables(tableInclusions) .build(); - return new Tuple2<>(selectors, replaceBy); + return new Tuple3<>( + selectors, tableNameReplaceSymbol, replaceBy); }) .collect(Collectors.toList()); } @@ -92,10 +104,25 @@ public Event map(Event event) throws Exception { ChangeEvent changeEvent = (ChangeEvent) event; TableId tableId = changeEvent.tableId(); - for (Tuple2 route : routes) { + for (Tuple3 route : routes) { Selectors selectors = route.f0; - TableId replaceBy = route.f1; + String tableNameReplaceSymbol = route.f1; + TableId replaceBy = route.f2; if (selectors.isMatch(tableId)) { + // Add a rule that when configuring tableNameReplaceSymbol in tablename, + // the namespace name and schemaName name needs to be changed + // the tableNameReplaceSymbol needs to be replaced by the table name of the event + if (replaceBy.getTableName().contains(tableNameReplaceSymbol)) { + replaceBy = + TableId.parse( + replaceBy.getNamespace(), + replaceBy.getSchemaName(), + replaceBy + .getTableName() + .replace( + tableNameReplaceSymbol, + tableId.getTableName())); + } return recreateChangeEvent(changeEvent, replaceBy); } } diff --git a/flink-cdc-runtime/src/test/java/com/ververica/cdc/runtime/operators/route/RouteFunctionTest.java b/flink-cdc-runtime/src/test/java/com/ververica/cdc/runtime/operators/route/RouteFunctionTest.java index 78adc6f64d4..ba28d50eecb 100644 --- a/flink-cdc-runtime/src/test/java/com/ververica/cdc/runtime/operators/route/RouteFunctionTest.java +++ b/flink-cdc-runtime/src/test/java/com/ververica/cdc/runtime/operators/route/RouteFunctionTest.java @@ -191,4 +191,49 @@ void testSchemaChangeEventRouting() throws Exception { .containsTypeMapping(typeMapping) .hasTableId(NEW_CUSTOMERS); } + + @Test + void testSchemaChangeRouting() throws Exception { + String DEFAULT_TABLE_NAME_REPLACE_SYMBOL = "<>"; + // The test only modified schema and the table name remains unchanged + TableId route = + TableId.tableId( + "my_new_company", "my_new_branch", DEFAULT_TABLE_NAME_REPLACE_SYMBOL); + TableId target = TableId.tableId("my_new_company", "my_new_branch", "customers"); + RouteFunction router = + RouteFunction.newBuilder() + .addRoute( + "my_company.my_branch.\\.*", + DEFAULT_TABLE_NAME_REPLACE_SYMBOL, + route) + .build(); + router.open(new Configuration()); + // CreateTableEvent + CreateTableEvent createTableEvent = new CreateTableEvent(CUSTOMERS, CUSTOMERS_SCHEMA); + assertThat(router.map(createTableEvent)).asSchemaChangeEvent().hasTableId(target); + } + + @Test + void testTableNameChangeRouting() throws Exception { + String DEFAULT_TABLE_NAME_REPLACE_SYMBOL = "<>"; + // The test only modified schema and the table name remains unchanged + TableId route = + TableId.tableId( + "my_new_company", + "my_new_branch", + "db_target_" + DEFAULT_TABLE_NAME_REPLACE_SYMBOL + "_1"); + TableId target = + TableId.tableId("my_new_company", "my_new_branch", "db_target_customers_1"); + RouteFunction router = + RouteFunction.newBuilder() + .addRoute( + "my_company.my_branch.\\.*", + DEFAULT_TABLE_NAME_REPLACE_SYMBOL, + route) + .build(); + router.open(new Configuration()); + // CreateTableEvent + CreateTableEvent createTableEvent = new CreateTableEvent(CUSTOMERS, CUSTOMERS_SCHEMA); + assertThat(router.map(createTableEvent)).asSchemaChangeEvent().hasTableId(target); + } }