Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.ververica.cdc.common.configuration.Configuration;
import com.ververica.cdc.composer.definition.PipelineDef;
import com.ververica.cdc.composer.definition.RouteDef;
import com.ververica.cdc.composer.definition.SchemaRouteDef;
import com.ververica.cdc.composer.definition.SinkDef;
import com.ververica.cdc.composer.definition.SourceDef;

Expand Down Expand Up @@ -52,6 +53,10 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser {
private static final String ROUTE_SOURCE_TABLE_KEY = "source-table";
private static final String ROUTE_SINK_TABLE_KEY = "sink-table";
private static final String ROUTE_DESCRIPTION_KEY = "description";
private static final String ROUTE_SOURCE_DATABASE = "source-database";
private static final String ROUTE_SINK_DATABASE = "sink-database";
private static final String ROUTE_TABLE_PREFIX = "table-prefix";
private static final String ROUTE_TABLE_SUFFIX = "table-suffix";

private final ObjectMapper mapper = new ObjectMapper(new YAMLFactory());

Expand Down Expand Up @@ -79,8 +84,18 @@ public PipelineDef parse(Path pipelineDefPath, Configuration globalPipelineConfi

// Routes are optional
List<RouteDef> routeDefs = new ArrayList<>();
List<SchemaRouteDef> schemaRouteDefs = new ArrayList<>();
Optional.ofNullable(root.get(ROUTE_KEY))
.ifPresent(node -> node.forEach(route -> routeDefs.add(toRouteDef(route))));
.ifPresent(
node ->
node.forEach(
route -> {
if (route.hasNonNull(ROUTE_SOURCE_TABLE_KEY)) {
routeDefs.add(toRouteDef(route));
} else if (route.hasNonNull(ROUTE_SOURCE_DATABASE)) {
schemaRouteDefs.add(toschemaRouteDef(route));
}
}));

// Pipeline configs are optional
Configuration userPipelineConfig = toPipelineConfig(root.get(PIPELINE_KEY));
Expand All @@ -90,7 +105,8 @@ public PipelineDef parse(Path pipelineDefPath, Configuration globalPipelineConfi
pipelineConfig.addAll(globalPipelineConfig);
pipelineConfig.addAll(userPipelineConfig);

return new PipelineDef(sourceDef, sinkDef, routeDefs, null, pipelineConfig);
return new PipelineDef(
sourceDef, sinkDef, routeDefs, schemaRouteDefs, null, pipelineConfig);
}

private SourceDef toSourceDef(JsonNode sourceNode) {
Expand Down Expand Up @@ -147,6 +163,31 @@ private RouteDef toRouteDef(JsonNode routeNode) {
return new RouteDef(sourceTable, sinkTable, description);
}

private SchemaRouteDef toschemaRouteDef(JsonNode route) {
String sourceDatabase =
checkNotNull(
route.get(ROUTE_SOURCE_DATABASE),
"Missing required field \"%s\" in mapper configuration",
ROUTE_SOURCE_DATABASE)
.asText();
String sinkDatabase =
checkNotNull(
route.get(ROUTE_SINK_DATABASE),
"Missing required field \"%s\" in mapper configuration",
ROUTE_SINK_DATABASE)
.asText();
String tablePrefix =
Optional.ofNullable(route.get(ROUTE_TABLE_PREFIX))
.map(JsonNode::asText)
.orElse(null);
String tableSuffix =
Optional.ofNullable(route.get(ROUTE_TABLE_SUFFIX))
.map(JsonNode::asText)
.orElse(null);

return new SchemaRouteDef(sourceDatabase, sinkDatabase, tablePrefix, tableSuffix);
}

private Configuration toPipelineConfig(JsonNode pipelineConfigNode) {
if (pipelineConfigNode == null || pipelineConfigNode.isNull()) {
return new Configuration();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.ververica.cdc.common.configuration.Configuration;
import com.ververica.cdc.composer.definition.PipelineDef;
import com.ververica.cdc.composer.definition.RouteDef;
import com.ververica.cdc.composer.definition.SchemaRouteDef;
import com.ververica.cdc.composer.definition.SinkDef;
import com.ververica.cdc.composer.definition.SourceDef;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -62,6 +63,15 @@ void testMinimizedDefinition() throws Exception {
assertThat(pipelineDef).isEqualTo(minimizedDef);
}

@Test
void testSchemaRoutesDefinition() throws Exception {
URL resource =
Resources.getResource("definitions/pipeline-definition-with-schemaRoute.yaml");
YamlPipelineDefinitionParser parser = new YamlPipelineDefinitionParser();
PipelineDef pipelineDef = parser.parse(Paths.get(resource.toURI()), new Configuration());
assertThat(pipelineDef).isEqualTo(defWithSchemaRoute);
}

@Test
void testOverridingGlobalConfig() throws Exception {
URL resource = Resources.getResource("definitions/pipeline-definition-full.yaml");
Expand Down Expand Up @@ -268,4 +278,37 @@ void testInvalidTimeZone() throws Exception {
Collections.emptyList(),
null,
new Configuration());

private final PipelineDef defWithSchemaRoute =
new PipelineDef(
new SourceDef(
"mysql",
"MySQL Source",
Configuration.fromMap(
ImmutableMap.<String, String>builder()
.put("hostname", "localhost")
.put("port", "3306")
.put("username", "admin")
.put("password", "password")
.put("tables", "adb.*, app_db.\\.*")
.build())),
new SinkDef(
"doris",
"Doris Sink",
Configuration.fromMap(
ImmutableMap.<String, String>builder()
.put("fenodes", "localhost:8030")
.put("username", "admin")
.put("password", "password")
.build())),
Collections.singletonList(
new RouteDef("app_db.order.*", "ods_db.ods_orders", null)),
Collections.singletonList(new SchemaRouteDef("adb", "ods_db", "adb_", null)),
null,
Configuration.fromMap(
ImmutableMap.<String, String>builder()
.put("name", "source-database-sync-pipe")
.put("parallelism", "1")
.put("enable-schema-evolution", "true")
.build()));
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
################################################################################
# Copyright 2023 Ververica Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: admin
password: password
tables: adb.*, app_db.\.*

sink:
type: doris
name: Doris Sink
fenodes: localhost:8030
username: admin
password: password

route:
- source-table: app_db.order.*
sink-table: ods_db.ods_orders
- source-database: adb
sink-database: ods_db
table-prefix: adb_

pipeline:
name: source-database-sync-pipe
parallelism: 1
enable-schema-evolution: true
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.ververica.cdc.common.types.LocalZonedTimestampType;

import java.time.ZoneId;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.TimeZone;
Expand Down Expand Up @@ -51,6 +52,7 @@ public class PipelineDef {
private final SourceDef source;
private final SinkDef sink;
private final List<RouteDef> routes;
private final List<SchemaRouteDef> schemeRoutes;
private final List<TransformDef> transforms;
private final Configuration config;

Expand All @@ -63,6 +65,22 @@ public PipelineDef(
this.source = source;
this.sink = sink;
this.routes = routes;
this.schemeRoutes = new ArrayList<>();
this.transforms = transforms;
this.config = evaluatePipelineTimeZone(config);
}

public PipelineDef(
SourceDef source,
SinkDef sink,
List<RouteDef> routes,
List<SchemaRouteDef> schemeRoutes,
List<TransformDef> transforms,
Configuration config) {
this.source = source;
this.sink = sink;
this.routes = routes;
this.schemeRoutes = schemeRoutes;
this.transforms = transforms;
this.config = evaluatePipelineTimeZone(config);
}
Expand All @@ -79,6 +97,10 @@ public List<RouteDef> getRoute() {
return routes;
}

public List<SchemaRouteDef> getSchemeRoute() {
return schemeRoutes;
}

public List<TransformDef> getTransforms() {
return transforms;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.ververica.cdc.composer.definition;

import javax.annotation.Nullable;

import java.util.Objects;
import java.util.Optional;

/**
* Definition of a router.
*
* <p>A mapper definition contains:
*
* <ul>
* <li>sourceDataBase: sourceDataBase.
* <li>sinkDataBase: sinkDataBase.
* <li>tablePrefix: tablePrefix.
* <li>tableSuffix: tableSuffix.
* </ul>
*/
public class SchemaRouteDef {
private final String sourceDatabase;
private final String sinkDatabase;
@Nullable private final String tablePrefix;
@Nullable private final String tableSuffix;

public SchemaRouteDef(
String sourceDatabase,
String sinkDatabase,
@Nullable String tablePrefix,
@Nullable String tableSuffix) {
this.sourceDatabase = sourceDatabase;
this.sinkDatabase = sinkDatabase;
this.tablePrefix = tablePrefix;
this.tableSuffix = tableSuffix;
}

public String getSourceDatabase() {
return sourceDatabase;
}

public String getSinkDatabase() {
return sinkDatabase;
}

public Optional<String> getTablePrefix() {
return Optional.ofNullable(tablePrefix);
}

public Optional<String> getTableSuffix() {
return Optional.ofNullable(tableSuffix);
}

@Override
public String toString() {
return "MapperDef{"
+ "sourceDatabase='"
+ sourceDatabase
+ '\''
+ ", sinkDatabase='"
+ sinkDatabase
+ '\''
+ ", tablePrefix='"
+ tablePrefix
+ '\''
+ ", tableSuffix='"
+ tableSuffix
+ '\''
+ '}';
}

@Override
public int hashCode() {
return Objects.hash(sourceDatabase, sinkDatabase, tablePrefix, tableSuffix);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,9 @@ public PipelineExecution compose(PipelineDef pipelineDef) {

// Route
RouteTranslator routeTranslator = new RouteTranslator();
stream = routeTranslator.translate(stream, pipelineDef.getRoute());
stream =
routeTranslator.translate(
stream, pipelineDef.getRoute(), pipelineDef.getSchemeRoute());

// Create sink in advance as schema operator requires MetadataApplier
DataSink dataSink = createDataSink(pipelineDef.getSink(), pipelineDef.getConfig());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,31 @@
import com.ververica.cdc.common.event.Event;
import com.ververica.cdc.common.event.TableId;
import com.ververica.cdc.composer.definition.RouteDef;
import com.ververica.cdc.composer.definition.SchemaRouteDef;
import com.ververica.cdc.runtime.operators.route.RouteFunction;
import com.ververica.cdc.runtime.typeutils.EventTypeInfo;

import java.util.List;

/** Translator for router. */
public class RouteTranslator {
public DataStream<Event> translate(DataStream<Event> input, List<RouteDef> routes) {
if (routes.isEmpty()) {
public DataStream<Event> translate(
DataStream<Event> input, List<RouteDef> routes, List<SchemaRouteDef> schemeRoutes) {
if (routes.isEmpty() && schemeRoutes.isEmpty()) {
return input;
}
RouteFunction.Builder routeFunctionBuilder = RouteFunction.newBuilder();
for (RouteDef route : routes) {
routeFunctionBuilder.addRoute(
route.getSourceTable(), TableId.parse(route.getSinkTable()));
}
for (SchemaRouteDef schemaRoute : schemeRoutes) {
routeFunctionBuilder.addSchemaRoute(
schemaRoute.getSourceDatabase(),
schemaRoute.getSinkDatabase(),
schemaRoute.getTablePrefix().orElse(null),
schemaRoute.getTableSuffix().orElse(null));
}
return input.map(routeFunctionBuilder.build(), new EventTypeInfo()).name("Route");
}
}
Loading