Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/themes/book
Submodule book updated 114 files
Original file line number Diff line number Diff line change
@@ -1,25 +1,9 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

License headers, JavaDocs and comments should be kept.

package org.apache.flink.cdc.cli.parser;

import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
import org.apache.flink.cdc.common.utils.StringUtils;
import org.apache.flink.cdc.composer.definition.ModelDef;
import org.apache.flink.cdc.composer.definition.PipelineDef;
import org.apache.flink.cdc.composer.definition.RouteDef;
import org.apache.flink.cdc.composer.definition.SinkDef;
Expand All @@ -35,6 +19,7 @@

import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -43,7 +28,6 @@
import static org.apache.flink.cdc.common.utils.ChangeEventUtils.resolveSchemaEvolutionOptions;
import static org.apache.flink.cdc.common.utils.Preconditions.checkNotNull;

/** Parser for converting YAML formatted pipeline definition to {@link PipelineDef}. */
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

public class YamlPipelineDefinitionParser implements PipelineDefinitionParser {

// Parent node keys
Expand All @@ -52,6 +36,7 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser {
private static final String ROUTE_KEY = "route";
private static final String TRANSFORM_KEY = "transform";
private static final String PIPELINE_KEY = "pipeline";
private static final String MODEL_KEY = "models";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

route, transform and user-defined-function are not in plural form. Maybe use model for consistency?


// Source / sink keys
private static final String TYPE_KEY = "type";
Expand All @@ -76,15 +61,17 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser {
private static final String UDF_FUNCTION_NAME_KEY = "name";
private static final String UDF_CLASSPATH_KEY = "classpath";

public static final String TRANSFORM_PRIMARY_KEY_KEY = "primary-keys";
// Model related keys
private static final String MODEL_NAME_KEY = "name";
private static final String MODEL_HOST_KEY = "host";
private static final String MODEL_API_KEY = "key";

public static final String TRANSFORM_PRIMARY_KEY_KEY = "primary-keys";
public static final String TRANSFORM_PARTITION_KEY_KEY = "partition-keys";

public static final String TRANSFORM_TABLE_OPTION_KEY = "table-options";

private final ObjectMapper mapper = new ObjectMapper(new YAMLFactory());

/** Parse the specified pipeline definition file. */
@Override
public PipelineDef parse(Path pipelineDefPath, Configuration globalPipelineConfig)
throws Exception {
Expand All @@ -99,64 +86,61 @@ public PipelineDef parse(String pipelineDefText, Configuration globalPipelineCon

private PipelineDef parse(JsonNode pipelineDefJsonNode, Configuration globalPipelineConfig)
throws Exception {
// Source is required
SourceDef sourceDef =
toSourceDef(
checkNotNull(
pipelineDefJsonNode.get(SOURCE_KEY),
"Missing required field \"%s\" in pipeline definition",
SOURCE_KEY));

// Sink is required
SinkDef sinkDef =
toSinkDef(
checkNotNull(
pipelineDefJsonNode.get(SINK_KEY),
"Missing required field \"%s\" in pipeline definition",
SINK_KEY));

// Transforms are optional
List<TransformDef> transformDefs = new ArrayList<>();
Optional.ofNullable(pipelineDefJsonNode.get(TRANSFORM_KEY))
.ifPresent(
node ->
node.forEach(
transform -> transformDefs.add(toTransformDef(transform))));

// Routes are optional
List<RouteDef> routeDefs = new ArrayList<>();
Optional.ofNullable(pipelineDefJsonNode.get(ROUTE_KEY))
.ifPresent(node -> node.forEach(route -> routeDefs.add(toRouteDef(route))));

// UDFs are optional
List<UdfDef> udfDefs = new ArrayList<>();
Optional.ofNullable(((ObjectNode) pipelineDefJsonNode.get(PIPELINE_KEY)).remove(UDF_KEY))
.ifPresent(node -> node.forEach(udf -> udfDefs.add(toUdfDef(udf))));

// Pipeline configs are optional
List<ModelDef> modelDefs = new ArrayList<>();
JsonNode modelsNode = pipelineDefJsonNode.get(PIPELINE_KEY).get(MODEL_KEY);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
JsonNode modelsNode = pipelineDefJsonNode.get(PIPELINE_KEY).get(MODEL_KEY);
JsonNode modelsNode = pipelineDefJsonNode.get(PIPELINE_KEY).remove(MODEL_KEY);

Accessing and removing MODEL_KEY could be done here at once.

if (modelsNode != null) {
modelDefs = parseModels(modelsNode);
}

Configuration userPipelineConfig = toPipelineConfig(pipelineDefJsonNode.get(PIPELINE_KEY));

// Merge user config into global config
Configuration pipelineConfig = new Configuration();
pipelineConfig.addAll(globalPipelineConfig);
pipelineConfig.addAll(userPipelineConfig);

return new PipelineDef(
sourceDef, sinkDef, routeDefs, transformDefs, udfDefs, pipelineConfig);
sourceDef, sinkDef, routeDefs, transformDefs, udfDefs, modelDefs, pipelineConfig);
}

private SourceDef toSourceDef(JsonNode sourceNode) {
Map<String, String> sourceMap =
mapper.convertValue(sourceNode, new TypeReference<Map<String, String>>() {});

// "type" field is required
String type =
checkNotNull(
sourceMap.remove(TYPE_KEY),
"Missing required field \"%s\" in source configuration",
TYPE_KEY);

// "name" field is optional
String name = sourceMap.remove(NAME_KEY);

return new SourceDef(type, name, Configuration.fromMap(sourceMap));
Expand All @@ -183,14 +167,12 @@ private SinkDef toSinkDef(JsonNode sinkNode) {
Map<String, String> sinkMap =
mapper.convertValue(sinkNode, new TypeReference<Map<String, String>>() {});

// "type" field is required
String type =
checkNotNull(
sinkMap.remove(TYPE_KEY),
"Missing required field \"%s\" in sink configuration",
TYPE_KEY);

// "name" field is optional
String name = sinkMap.remove(NAME_KEY);

return new SinkDef(type, name, Configuration.fromMap(sinkMap), declaredSETypes);
Expand Down Expand Up @@ -237,6 +219,29 @@ private UdfDef toUdfDef(JsonNode udfNode) {
return new UdfDef(functionName, classpath);
}

private ModelDef toModelDef(JsonNode modelNode) {
String name =
checkNotNull(
modelNode.get(MODEL_NAME_KEY),
"Missing required field \"%s\" in model configuration",
MODEL_NAME_KEY)
.asText();
String host =
checkNotNull(
modelNode.get(MODEL_HOST_KEY),
"Missing required field \"%s\" in model configuration",
MODEL_HOST_KEY)
.asText();
String apiKey =
checkNotNull(
modelNode.get(MODEL_API_KEY),
"Missing required field \"%s\" in model configuration",
MODEL_API_KEY)
.asText();

return new ModelDef(name, host, apiKey);
}

private TransformDef toTransformDef(JsonNode transformNode) {
String sourceTable =
checkNotNull(
Expand All @@ -248,7 +253,6 @@ private TransformDef toTransformDef(JsonNode transformNode) {
Optional.ofNullable(transformNode.get(TRANSFORM_PROJECTION_KEY))
.map(JsonNode::asText)
.orElse(null);
// When the star is in the first place, a backslash needs to be added for escape.
if (!StringUtils.isNullOrWhitespaceOnly(projection) && projection.contains("\\*")) {
projection = projection.replace("\\*", "*");
}
Expand Down Expand Up @@ -287,9 +291,27 @@ private Configuration toPipelineConfig(JsonNode pipelineConfigNode) {
if (pipelineConfigNode == null || pipelineConfigNode.isNull()) {
return new Configuration();
}
Map<String, String> pipelineConfigMap =
mapper.convertValue(
pipelineConfigNode, new TypeReference<Map<String, String>>() {});
Map<String, String> pipelineConfigMap = new HashMap<>();
pipelineConfigNode
.fields()
.forEachRemaining(
entry -> {
String key = entry.getKey();
JsonNode value = entry.getValue();
if (!key.equals(MODEL_KEY)) {
pipelineConfigMap.put(key, value.asText());
}
});
Comment on lines +295 to +304
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Models configuration could be parsed and removed in YamlPipelineDefinitionParser#parse method in advance, so there's no need to iterate configuration maps again here.

return Configuration.fromMap(pipelineConfigMap);
}

private List<ModelDef> parseModels(JsonNode modelsNode) {
List<ModelDef> modelDefs = new ArrayList<>();
if (modelsNode != null && modelsNode.isArray()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can throw an exception if modelsNode isn't a list. Or such code may silently fail:

pipeline:
  models:
    name: ...

While one may want to write this:

pipeline:
  models:
    - name: ...

for (JsonNode modelNode : modelsNode) {
modelDefs.add(toModelDef(modelNode));
}
}
return modelDefs;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ void testUdfDefinition() throws Exception {
null,
"add new uniq_id for each row")),
Collections.emptyList(),
Collections.emptyList(),
Configuration.fromMap(
ImmutableMap.<String, String>builder()
.put("name", "source-database-sync-pipe")
Expand Down Expand Up @@ -355,6 +356,7 @@ void testParsingFullDefinitionFromString() throws Exception {
null,
"add new uniq_id for each row")),
Collections.emptyList(),
Collections.emptyList(),
Configuration.fromMap(
ImmutableMap.<String, String>builder()
.put("name", "source-database-sync-pipe")
Expand Down Expand Up @@ -393,6 +395,7 @@ void testParsingFullDefinitionFromString() throws Exception {
null)),
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList(),
Configuration.fromMap(
ImmutableMap.<String, String>builder()
.put("parallelism", "4")
Expand All @@ -405,6 +408,7 @@ void testParsingFullDefinitionFromString() throws Exception {
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList(),
Configuration.fromMap(Collections.singletonMap("parallelism", "1")));

private final PipelineDef fullDefWithRouteRepSym =
Expand Down Expand Up @@ -463,6 +467,7 @@ void testParsingFullDefinitionFromString() throws Exception {
null,
"add new uniq_id for each row")),
Collections.emptyList(),
Collections.emptyList(),
Configuration.fromMap(
ImmutableMap.<String, String>builder()
.put("name", "source-database-sync-pipe")
Expand Down Expand Up @@ -492,6 +497,7 @@ void testParsingFullDefinitionFromString() throws Exception {
new UdfDef(
"format",
"org.apache.flink.cdc.udf.examples.java.FormatFunctionClass")),
Collections.emptyList(),
Configuration.fromMap(
ImmutableMap.<String, String>builder()
.put("parallelism", "1")
Expand Down
Loading