diff --git a/flink-cdc-cli/src/main/java/com/ververica/cdc/cli/parser/YamlPipelineDefinitionParser.java b/flink-cdc-cli/src/main/java/com/ververica/cdc/cli/parser/YamlPipelineDefinitionParser.java index ecf60f9cbe7..9120cd2d48d 100644 --- a/flink-cdc-cli/src/main/java/com/ververica/cdc/cli/parser/YamlPipelineDefinitionParser.java +++ b/flink-cdc-cli/src/main/java/com/ververica/cdc/cli/parser/YamlPipelineDefinitionParser.java @@ -24,6 +24,7 @@ import com.ververica.cdc.common.configuration.Configuration; import com.ververica.cdc.composer.definition.PipelineDef; import com.ververica.cdc.composer.definition.RouteDef; +import com.ververica.cdc.composer.definition.SchemaRouteDef; import com.ververica.cdc.composer.definition.SinkDef; import com.ververica.cdc.composer.definition.SourceDef; @@ -52,6 +53,10 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser { private static final String ROUTE_SOURCE_TABLE_KEY = "source-table"; private static final String ROUTE_SINK_TABLE_KEY = "sink-table"; private static final String ROUTE_DESCRIPTION_KEY = "description"; + private static final String ROUTE_SOURCE_DATABASE = "source-database"; + private static final String ROUTE_SINK_DATABASE = "sink-database"; + private static final String ROUTE_TABLE_PREFIX = "table-prefix"; + private static final String ROUTE_TABLE_SUFFIX = "table-suffix"; private final ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); @@ -79,8 +84,18 @@ public PipelineDef parse(Path pipelineDefPath, Configuration globalPipelineConfi // Routes are optional List routeDefs = new ArrayList<>(); + List schemaRouteDefs = new ArrayList<>(); Optional.ofNullable(root.get(ROUTE_KEY)) - .ifPresent(node -> node.forEach(route -> routeDefs.add(toRouteDef(route)))); + .ifPresent( + node -> + node.forEach( + route -> { + if (route.hasNonNull(ROUTE_SOURCE_TABLE_KEY)) { + routeDefs.add(toRouteDef(route)); + } else if (route.hasNonNull(ROUTE_SOURCE_DATABASE)) { + schemaRouteDefs.add(toschemaRouteDef(route)); + } + })); // Pipeline configs are optional Configuration userPipelineConfig = toPipelineConfig(root.get(PIPELINE_KEY)); @@ -90,7 +105,8 @@ public PipelineDef parse(Path pipelineDefPath, Configuration globalPipelineConfi pipelineConfig.addAll(globalPipelineConfig); pipelineConfig.addAll(userPipelineConfig); - return new PipelineDef(sourceDef, sinkDef, routeDefs, null, pipelineConfig); + return new PipelineDef( + sourceDef, sinkDef, routeDefs, schemaRouteDefs, null, pipelineConfig); } private SourceDef toSourceDef(JsonNode sourceNode) { @@ -147,6 +163,31 @@ private RouteDef toRouteDef(JsonNode routeNode) { return new RouteDef(sourceTable, sinkTable, description); } + private SchemaRouteDef toschemaRouteDef(JsonNode route) { + String sourceDatabase = + checkNotNull( + route.get(ROUTE_SOURCE_DATABASE), + "Missing required field \"%s\" in mapper configuration", + ROUTE_SOURCE_DATABASE) + .asText(); + String sinkDatabase = + checkNotNull( + route.get(ROUTE_SINK_DATABASE), + "Missing required field \"%s\" in mapper configuration", + ROUTE_SINK_DATABASE) + .asText(); + String tablePrefix = + Optional.ofNullable(route.get(ROUTE_TABLE_PREFIX)) + .map(JsonNode::asText) + .orElse(null); + String tableSuffix = + Optional.ofNullable(route.get(ROUTE_TABLE_SUFFIX)) + .map(JsonNode::asText) + .orElse(null); + + return new SchemaRouteDef(sourceDatabase, sinkDatabase, tablePrefix, tableSuffix); + } + private Configuration toPipelineConfig(JsonNode pipelineConfigNode) { if (pipelineConfigNode == null || pipelineConfigNode.isNull()) { return new Configuration(); diff --git a/flink-cdc-cli/src/test/java/com/ververica/cdc/cli/parser/YamlPipelineDefinitionParserTest.java b/flink-cdc-cli/src/test/java/com/ververica/cdc/cli/parser/YamlPipelineDefinitionParserTest.java index 045b1fe7a7e..12058cf3b30 100644 --- a/flink-cdc-cli/src/test/java/com/ververica/cdc/cli/parser/YamlPipelineDefinitionParserTest.java +++ b/flink-cdc-cli/src/test/java/com/ververica/cdc/cli/parser/YamlPipelineDefinitionParserTest.java @@ -22,6 +22,7 @@ import com.ververica.cdc.common.configuration.Configuration; import com.ververica.cdc.composer.definition.PipelineDef; import com.ververica.cdc.composer.definition.RouteDef; +import com.ververica.cdc.composer.definition.SchemaRouteDef; import com.ververica.cdc.composer.definition.SinkDef; import com.ververica.cdc.composer.definition.SourceDef; import org.junit.jupiter.api.Test; @@ -62,6 +63,15 @@ void testMinimizedDefinition() throws Exception { assertThat(pipelineDef).isEqualTo(minimizedDef); } + @Test + void testSchemaRoutesDefinition() throws Exception { + URL resource = + Resources.getResource("definitions/pipeline-definition-with-schemaRoute.yaml"); + YamlPipelineDefinitionParser parser = new YamlPipelineDefinitionParser(); + PipelineDef pipelineDef = parser.parse(Paths.get(resource.toURI()), new Configuration()); + assertThat(pipelineDef).isEqualTo(defWithSchemaRoute); + } + @Test void testOverridingGlobalConfig() throws Exception { URL resource = Resources.getResource("definitions/pipeline-definition-full.yaml"); @@ -268,4 +278,37 @@ void testInvalidTimeZone() throws Exception { Collections.emptyList(), null, new Configuration()); + + private final PipelineDef defWithSchemaRoute = + new PipelineDef( + new SourceDef( + "mysql", + "MySQL Source", + Configuration.fromMap( + ImmutableMap.builder() + .put("hostname", "localhost") + .put("port", "3306") + .put("username", "admin") + .put("password", "password") + .put("tables", "adb.*, app_db.\\.*") + .build())), + new SinkDef( + "doris", + "Doris Sink", + Configuration.fromMap( + ImmutableMap.builder() + .put("fenodes", "localhost:8030") + .put("username", "admin") + .put("password", "password") + .build())), + Collections.singletonList( + new RouteDef("app_db.order.*", "ods_db.ods_orders", null)), + Collections.singletonList(new SchemaRouteDef("adb", "ods_db", "adb_", null)), + null, + Configuration.fromMap( + ImmutableMap.builder() + .put("name", "source-database-sync-pipe") + .put("parallelism", "1") + .put("enable-schema-evolution", "true") + .build())); } diff --git a/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-with-schemaRoute.yaml b/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-with-schemaRoute.yaml new file mode 100644 index 00000000000..215cc5e67b4 --- /dev/null +++ b/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-with-schemaRoute.yaml @@ -0,0 +1,41 @@ +################################################################################ +# Copyright 2023 Ververica Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +source: + type: mysql + name: MySQL Source + hostname: localhost + port: 3306 + username: admin + password: password + tables: adb.*, app_db.\.* + +sink: + type: doris + name: Doris Sink + fenodes: localhost:8030 + username: admin + password: password + +route: + - source-table: app_db.order.* + sink-table: ods_db.ods_orders + - source-database: adb + sink-database: ods_db + table-prefix: adb_ + +pipeline: + name: source-database-sync-pipe + parallelism: 1 + enable-schema-evolution: true \ No newline at end of file diff --git a/flink-cdc-composer/src/main/java/com/ververica/cdc/composer/definition/PipelineDef.java b/flink-cdc-composer/src/main/java/com/ververica/cdc/composer/definition/PipelineDef.java index 6c65ae05d1e..29b2e254702 100644 --- a/flink-cdc-composer/src/main/java/com/ververica/cdc/composer/definition/PipelineDef.java +++ b/flink-cdc-composer/src/main/java/com/ververica/cdc/composer/definition/PipelineDef.java @@ -21,6 +21,7 @@ import com.ververica.cdc.common.types.LocalZonedTimestampType; import java.time.ZoneId; +import java.util.ArrayList; import java.util.List; import java.util.Objects; import java.util.TimeZone; @@ -51,6 +52,7 @@ public class PipelineDef { private final SourceDef source; private final SinkDef sink; private final List routes; + private final List schemeRoutes; private final List transforms; private final Configuration config; @@ -63,6 +65,22 @@ public PipelineDef( this.source = source; this.sink = sink; this.routes = routes; + this.schemeRoutes = new ArrayList<>(); + this.transforms = transforms; + this.config = evaluatePipelineTimeZone(config); + } + + public PipelineDef( + SourceDef source, + SinkDef sink, + List routes, + List schemeRoutes, + List transforms, + Configuration config) { + this.source = source; + this.sink = sink; + this.routes = routes; + this.schemeRoutes = schemeRoutes; this.transforms = transforms; this.config = evaluatePipelineTimeZone(config); } @@ -79,6 +97,10 @@ public List getRoute() { return routes; } + public List getSchemeRoute() { + return schemeRoutes; + } + public List getTransforms() { return transforms; } diff --git a/flink-cdc-composer/src/main/java/com/ververica/cdc/composer/definition/SchemaRouteDef.java b/flink-cdc-composer/src/main/java/com/ververica/cdc/composer/definition/SchemaRouteDef.java new file mode 100644 index 00000000000..8dd0c5a8366 --- /dev/null +++ b/flink-cdc-composer/src/main/java/com/ververica/cdc/composer/definition/SchemaRouteDef.java @@ -0,0 +1,91 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.composer.definition; + +import javax.annotation.Nullable; + +import java.util.Objects; +import java.util.Optional; + +/** + * Definition of a router. + * + *

A mapper definition contains: + * + *

    + *
  • sourceDataBase: sourceDataBase. + *
  • sinkDataBase: sinkDataBase. + *
  • tablePrefix: tablePrefix. + *
  • tableSuffix: tableSuffix. + *
+ */ +public class SchemaRouteDef { + private final String sourceDatabase; + private final String sinkDatabase; + @Nullable private final String tablePrefix; + @Nullable private final String tableSuffix; + + public SchemaRouteDef( + String sourceDatabase, + String sinkDatabase, + @Nullable String tablePrefix, + @Nullable String tableSuffix) { + this.sourceDatabase = sourceDatabase; + this.sinkDatabase = sinkDatabase; + this.tablePrefix = tablePrefix; + this.tableSuffix = tableSuffix; + } + + public String getSourceDatabase() { + return sourceDatabase; + } + + public String getSinkDatabase() { + return sinkDatabase; + } + + public Optional getTablePrefix() { + return Optional.ofNullable(tablePrefix); + } + + public Optional getTableSuffix() { + return Optional.ofNullable(tableSuffix); + } + + @Override + public String toString() { + return "MapperDef{" + + "sourceDatabase='" + + sourceDatabase + + '\'' + + ", sinkDatabase='" + + sinkDatabase + + '\'' + + ", tablePrefix='" + + tablePrefix + + '\'' + + ", tableSuffix='" + + tableSuffix + + '\'' + + '}'; + } + + @Override + public int hashCode() { + return Objects.hash(sourceDatabase, sinkDatabase, tablePrefix, tableSuffix); + } +} diff --git a/flink-cdc-composer/src/main/java/com/ververica/cdc/composer/flink/FlinkPipelineComposer.java b/flink-cdc-composer/src/main/java/com/ververica/cdc/composer/flink/FlinkPipelineComposer.java index 0e27055513c..54789177263 100644 --- a/flink-cdc-composer/src/main/java/com/ververica/cdc/composer/flink/FlinkPipelineComposer.java +++ b/flink-cdc-composer/src/main/java/com/ververica/cdc/composer/flink/FlinkPipelineComposer.java @@ -102,7 +102,9 @@ public PipelineExecution compose(PipelineDef pipelineDef) { // Route RouteTranslator routeTranslator = new RouteTranslator(); - stream = routeTranslator.translate(stream, pipelineDef.getRoute()); + stream = + routeTranslator.translate( + stream, pipelineDef.getRoute(), pipelineDef.getSchemeRoute()); // Create sink in advance as schema operator requires MetadataApplier DataSink dataSink = createDataSink(pipelineDef.getSink(), pipelineDef.getConfig()); diff --git a/flink-cdc-composer/src/main/java/com/ververica/cdc/composer/flink/translator/RouteTranslator.java b/flink-cdc-composer/src/main/java/com/ververica/cdc/composer/flink/translator/RouteTranslator.java index 35d5797f272..57a95fb6869 100644 --- a/flink-cdc-composer/src/main/java/com/ververica/cdc/composer/flink/translator/RouteTranslator.java +++ b/flink-cdc-composer/src/main/java/com/ververica/cdc/composer/flink/translator/RouteTranslator.java @@ -21,6 +21,7 @@ import com.ververica.cdc.common.event.Event; import com.ververica.cdc.common.event.TableId; import com.ververica.cdc.composer.definition.RouteDef; +import com.ververica.cdc.composer.definition.SchemaRouteDef; import com.ververica.cdc.runtime.operators.route.RouteFunction; import com.ververica.cdc.runtime.typeutils.EventTypeInfo; @@ -28,8 +29,9 @@ /** Translator for router. */ public class RouteTranslator { - public DataStream translate(DataStream input, List routes) { - if (routes.isEmpty()) { + public DataStream translate( + DataStream input, List routes, List schemeRoutes) { + if (routes.isEmpty() && schemeRoutes.isEmpty()) { return input; } RouteFunction.Builder routeFunctionBuilder = RouteFunction.newBuilder(); @@ -37,6 +39,13 @@ public DataStream translate(DataStream input, List route routeFunctionBuilder.addRoute( route.getSourceTable(), TableId.parse(route.getSinkTable())); } + for (SchemaRouteDef schemaRoute : schemeRoutes) { + routeFunctionBuilder.addSchemaRoute( + schemaRoute.getSourceDatabase(), + schemaRoute.getSinkDatabase(), + schemaRoute.getTablePrefix().orElse(null), + schemaRoute.getTableSuffix().orElse(null)); + } return input.map(routeFunctionBuilder.build(), new EventTypeInfo()).name("Route"); } } diff --git a/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/route/RouteFunction.java b/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/route/RouteFunction.java index 3d8d1c5b320..a33b1fb303c 100644 --- a/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/route/RouteFunction.java +++ b/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/route/RouteFunction.java @@ -18,6 +18,7 @@ import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.configuration.Configuration; import com.ververica.cdc.common.event.AddColumnEvent; @@ -34,6 +35,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.stream.Collectors; import static com.ververica.cdc.common.utils.Preconditions.checkState; @@ -41,6 +43,7 @@ /** A map function that applies user-defined routing logics. */ public class RouteFunction extends RichMapFunction { private final List> routingRules; + private final List> schemaRoutes; private transient List> routes; public static Builder newBuilder() { @@ -50,19 +53,32 @@ public static Builder newBuilder() { /** Builder of {@link RouteFunction}. */ public static class Builder { private final List> routingRules = new ArrayList<>(); + private final List> schemaRoutes = new ArrayList<>(); public Builder addRoute(String tableInclusions, TableId replaceBy) { routingRules.add(Tuple2.of(tableInclusions, replaceBy)); return this; } + public Builder addSchemaRoute( + String sourceDatabase, + String sinkDatabase, + String tablePrefix, + String tableSuffix) { + schemaRoutes.add(Tuple4.of(sourceDatabase, sinkDatabase, tablePrefix, tableSuffix)); + return this; + } + public RouteFunction build() { - return new RouteFunction(routingRules); + return new RouteFunction(routingRules, schemaRoutes); } } - private RouteFunction(List> routingRules) { + private RouteFunction( + List> routingRules, + List> schemaRoutes) { this.routingRules = routingRules; + this.schemaRoutes = schemaRoutes; } @Override @@ -91,7 +107,7 @@ public Event map(Event event) throws Exception { event.getClass().getCanonicalName())); ChangeEvent changeEvent = (ChangeEvent) event; TableId tableId = changeEvent.tableId(); - + // priority table route for (Tuple2 route : routes) { Selectors selectors = route.f0; TableId replaceBy = route.f1; @@ -99,6 +115,28 @@ public Event map(Event event) throws Exception { return recreateChangeEvent(changeEvent, replaceBy); } } + + // then schema route + for (Tuple4 schemaRoute : schemaRoutes) { + if (schemaRoute.f0.equals(tableId.getSchemaName())) { + // database mapping, table add prefix and suffix mapping + String replaceIdentifier; + String replaceTableName = + Optional.ofNullable(schemaRoute.f2).orElse("") + + tableId.getTableName() + + Optional.ofNullable(schemaRoute.f3).orElse(""); + if (tableId.getNamespace() != null) { + replaceIdentifier = + String.join( + ".", tableId.getNamespace(), schemaRoute.f1, replaceTableName); + } else { + replaceIdentifier = String.join(".", schemaRoute.f1, replaceTableName); + } + + return recreateChangeEvent(changeEvent, TableId.parse(replaceIdentifier)); + } + } + return event; } diff --git a/flink-cdc-runtime/src/test/java/com/ververica/cdc/runtime/operators/route/RouteFunctionTest.java b/flink-cdc-runtime/src/test/java/com/ververica/cdc/runtime/operators/route/RouteFunctionTest.java index 78adc6f64d4..29b73bc8057 100644 --- a/flink-cdc-runtime/src/test/java/com/ververica/cdc/runtime/operators/route/RouteFunctionTest.java +++ b/flink-cdc-runtime/src/test/java/com/ververica/cdc/runtime/operators/route/RouteFunctionTest.java @@ -49,6 +49,12 @@ class RouteFunctionTest { TableId.tableId("my_company", "my_branch", "customers"); private static final TableId NEW_CUSTOMERS = TableId.tableId("my_new_company", "my_new_branch", "customers"); + + private static final TableId ORDERS = TableId.tableId("my_source", "my_app", "orders"); + + private static final TableId NEW_ORDERS = + TableId.tableId("my_source", "ods_db", "my_new_orders"); + private static final Schema CUSTOMERS_SCHEMA = Schema.newBuilder() .physicalColumn("id", DataTypes.INT()) @@ -62,6 +68,7 @@ void testDataChangeEventRouting() throws Exception { RouteFunction router = RouteFunction.newBuilder() .addRoute("my_company.\\.+.customers", NEW_CUSTOMERS) + .addSchemaRoute("my_app", "ods_db", "my_new_", null) .build(); router.open(new Configuration()); BinaryRecordDataGenerator recordDataGenerator = @@ -82,6 +89,21 @@ void testDataChangeEventRouting() throws Exception { .withSchema(CUSTOMERS_SCHEMA) .hasFields(1, new BinaryStringData("Alice"), 12345678L); + // Insert + DataChangeEvent insertEventWithSchemaRouting = + DataChangeEvent.insertEvent( + ORDERS, + recordDataGenerator.generate( + new Object[] {1, new BinaryStringData("Alice"), 12345678L})); + assertThat(router.map(insertEventWithSchemaRouting)) + .asDataChangeEvent() + .hasTableId(NEW_ORDERS) + .hasOperationType(OperationType.INSERT) + .withAfterRecordData() + .hasArity(3) + .withSchema(CUSTOMERS_SCHEMA) + .hasFields(1, new BinaryStringData("Alice"), 12345678L); + // Update DataChangeEvent updateEvent = DataChangeEvent.updateEvent(