Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-36805][cdc-common] Add ConfigShade interface to support encryption of sensitive configuration items and provide a base64 encoding implementation #3829

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
import org.apache.flink.cdc.common.event.SchemaChangeEventTypeFamily;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.shade.utils.ConfigShadeUtils;
import org.apache.flink.cdc.common.utils.Preconditions;
import org.apache.flink.cdc.common.utils.StringUtils;
import org.apache.flink.cdc.composer.definition.ModelDef;
Expand Down Expand Up @@ -114,7 +115,6 @@ public PipelineDef parse(String pipelineDefText, Configuration globalPipelineCon

private PipelineDef parse(JsonNode pipelineDefJsonNode, Configuration globalPipelineConfig)
throws Exception {

// UDFs are optional. We parse UDF first and remove it from the pipelineDefJsonNode since
// it's not of plain data types and must be removed before calling toPipelineConfig.
List<UdfDef> udfDefs = new ArrayList<>();
Expand All @@ -135,6 +135,14 @@ private PipelineDef parse(JsonNode pipelineDefJsonNode, Configuration globalPipe
SchemaChangeBehavior schemaChangeBehavior =
userPipelineConfig.get(PIPELINE_SCHEMA_CHANGE_BEHAVIOR);

// Merge user config into global config
Configuration pipelineConfig = new Configuration();
pipelineConfig.addAll(globalPipelineConfig);
pipelineConfig.addAll(userPipelineConfig);

// Decrypt configurations if specified
pipelineDefJsonNode = ConfigShadeUtils.decryptConfig(pipelineDefJsonNode, pipelineConfig);

// Source is required
SourceDef sourceDef =
toSourceDef(
Expand Down Expand Up @@ -165,11 +173,6 @@ private PipelineDef parse(JsonNode pipelineDefJsonNode, Configuration globalPipe
Optional.ofNullable(pipelineDefJsonNode.get(ROUTE_KEY))
.ifPresent(node -> node.forEach(route -> routeDefs.add(toRouteDef(route))));

// Merge user config into global config
Configuration pipelineConfig = new Configuration();
pipelineConfig.addAll(globalPipelineConfig);
pipelineConfig.addAll(userPipelineConfig);

return new PipelineDef(
sourceDef, sinkDef, routeDefs, transformDefs, udfDefs, modelDefs, pipelineConfig);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
import org.apache.flink.cdc.common.shade.utils.ConfigShadeUtils;
import org.apache.flink.cdc.composer.definition.ModelDef;
import org.apache.flink.cdc.composer.definition.PipelineDef;
import org.apache.flink.cdc.composer.definition.RouteDef;
Expand All @@ -32,6 +33,7 @@
import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableSet;
import org.apache.flink.shaded.guava31.com.google.common.io.Resources;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.net.URL;
Expand All @@ -57,6 +59,10 @@
/** Unit test for {@link org.apache.flink.cdc.cli.parser.YamlPipelineDefinitionParser}. */
class YamlPipelineDefinitionParserTest {

private static final String USERNAME = "flinkcdc";

private static final String PASSWORD = "flinkcdc_password";

@Test
void testParsingFullDefinition() throws Exception {
URL resource = Resources.getResource("definitions/pipeline-definition-full.yaml");
Expand All @@ -81,6 +87,25 @@ void testMinimizedDefinition() throws Exception {
assertThat(pipelineDef).isEqualTo(minimizedDef);
}

@Test
public void testDecryptOptions() {
String encryptUsername = "ZmxpbmtjZGM=";
String encryptPassword = "ZmxpbmtjZGNfcGFzc3dvcmQ=";
String decryptUsername = ConfigShadeUtils.decryptOption("base64", encryptUsername);
String decryptPassword = ConfigShadeUtils.decryptOption("base64", encryptPassword);
Assertions.assertEquals(decryptUsername, USERNAME);
Assertions.assertEquals(decryptPassword, PASSWORD);
}

@Test
void testParsingBase64EncodedDefinition() throws Exception {
URL resource =
Resources.getResource("definitions/pipeline-definition-with-base64-encoded.yaml");
YamlPipelineDefinitionParser parser = new YamlPipelineDefinitionParser();
PipelineDef pipelineDef = parser.parse(Paths.get(resource.toURI()), new Configuration());
assertThat(pipelineDef).isEqualTo(decryptedDefWithBase64Encode);
}

@Test
void testOverridingGlobalConfig() throws Exception {
URL resource = Resources.getResource("definitions/pipeline-definition-full.yaml");
Expand Down Expand Up @@ -368,6 +393,41 @@ private void testSchemaEvolutionTypesParsing(
.put("schema-operator.rpc-timeout", "1 h")
.build()));

private final PipelineDef decryptedDefWithBase64Encode =
new PipelineDef(
new SourceDef(
"mysql",
"source-database",
Configuration.fromMap(
ImmutableMap.<String, String>builder()
.put("host", "localhost")
.put("port", "3306")
.put("username", "admin")
.put("password", "password1")
.put("tables", "replication.cluster")
.build())),
new SinkDef(
"doris",
"sink-queue",
Configuration.fromMap(
ImmutableMap.<String, String>builder()
.put("fenodes", "localhost:8035")
.put("username", "root")
.put("password", "password2")
.build())),
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList(),
Configuration.fromMap(
ImmutableMap.<String, String>builder()
.put("name", "source-database-sync-pipe")
.put("parallelism", "4")
.put("schema.change.behavior", "evolve")
.put("schema-operator.rpc-timeout", "1 h")
.put("shade.identifier", "base64")
.put("shade.sensitive.keywords", "password;username")
.build()));

@Test
void testParsingFullDefinitionFromString() throws Exception {
String pipelineDefText =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
source:
type: mysql
name: source-database
host: localhost
port: 3306
username: YWRtaW4=
password: cGFzc3dvcmQx
tables: replication.cluster

sink:
type: doris
name: sink-queue
fenodes: localhost:8035
username: cm9vdA==
password: cGFzc3dvcmQy

pipeline:
name: source-database-sync-pipe
parallelism: 4
schema.change.behavior: evolve
schema-operator.rpc-timeout: 1 h
shade.identifier: base64
shade.sensitive.keywords: password;username
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.cdc.common.configuration.description.ListElement;

import java.time.Duration;
import java.util.List;

import static org.apache.flink.cdc.common.configuration.description.TextElement.text;

Expand Down Expand Up @@ -100,5 +101,20 @@ public class PipelineOptions {
.withDescription(
"The timeout time for SchemaOperator to wait downstream SchemaChangeEvent applying finished, the default value is 3 minutes.");

public static final ConfigOption<String> SHADE_IDENTIFIER_OPTION =
ConfigOptions.key("shade.identifier")
.stringType()
.defaultValue("default")
.withDescription(
"The identifier of the encryption method for decryption. Defaults to \"default\", indicating no encryption");

public static final ConfigOption<List<String>> SHADE_SENSITIVE_KEYWORDS =
ConfigOptions.key("shade.sensitive.keywords")
.stringType()
.asList()
.defaultValues("password")
.withDescription(
"A semicolon-separated list of keywords of the configuration items to be decrypted.");

private PipelineOptions() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.common.shade;

import org.apache.flink.cdc.common.annotation.PublicEvolving;
import org.apache.flink.cdc.common.configuration.Configuration;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* The interface that provides the ability to decrypt {@link
* org.apache.flink.cdc.composer.definition}.
*/
@PublicEvolving
public interface ConfigShade {
Logger LOG = LoggerFactory.getLogger(ConfigShade.class);

/**
* Initializes the custom instance using the pipeline configuration.
*
* <p>This method can be useful when decryption requires an external file (e.g. a key file)
* defined in the pipeline configs.
*/
default void initialize(Configuration pipelineConfig) throws Exception {}

/**
* The unique identifier of the current interface, used it to select the correct {@link
* ConfigShade}.
*/
String getIdentifier();

/**
* Decrypt the content.
*
* @param content The content to decrypt
*/
String decrypt(String content);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.common.shade.impl;

import org.apache.flink.cdc.common.shade.ConfigShade;

import java.util.Base64;

/** Base64 ConfigShade. */
public class Base64ConfigShade implements ConfigShade {

private static final Base64.Decoder DECODER = Base64.getDecoder();

private static final String IDENTIFIER = "base64";

@Override
public String getIdentifier() {
return IDENTIFIER;
}

@Override
public String decrypt(String content) {
return new String(DECODER.decode(content));
}
}
Loading
Loading