Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions Dockerfile
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are Dockerfile & mysql-doris generic deployment configurations or just an example? Maybe somewhere like /example or /conf is better than putting them at root?

Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#/*
# * Licensed to the Apache Software Foundation (ASF) under one or more
# * contributor license agreements. See the NOTICE file distributed with
# * this work for additional information regarding copyright ownership.
# * The ASF licenses this file to You under the Apache License, Version 2.0
# * (the "License"); you may not use this file except in compliance with
# * the License. You may obtain a copy of the License at
# *
# * http://www.apache.org/licenses/LICENSE-2.0
# *
# * Unless required by applicable law or agreed to in writing, software
# * distributed under the License is distributed on an "AS IS" BASIS,
# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# * See the License for the specific language governing permissions and
# * limitations under the License.
# */

FROM flink

ARG FLINK_CDC_VERSION=3.2-SNAPSHOT
ARG PIPELINE_DEFINITION_FILE

RUN mkdir -p /opt/flink-cdc
RUN mkdir -p /opt/flink/usrlib
ENV FLINK_CDC_HOME /opt/flink-cdc
COPY flink-cdc-dist/target/flink-cdc-${FLINK_CDC_VERSION}-bin.tar.gz /tmp/
RUN tar -xzvf /tmp/flink-cdc-${FLINK_CDC_VERSION}-bin.tar.gz -C /tmp/ && \
mv /tmp/flink-cdc-${FLINK_CDC_VERSION}/* /opt/flink-cdc/ && \
mv /opt/flink-cdc/lib/flink-cdc-dist-${FLINK_CDC_VERSION}.jar /opt/flink-cdc/lib/flink-cdc-dist.jar && \
rm -rf /tmp/flink-cdc-${FLINK_CDC_VERSION} /tmp/flink-cdc-${FLINK_CDC_VERSION}-bin.tar.gz
# copy jars to cdc libs
COPY flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/target/flink-cdc-pipeline-connector-values-${FLINK_CDC_VERSION}.jar /opt/flink/usrlib/flink-cdc-pipeline-connector-values-${FLINK_CDC_VERSION}.jar
COPY flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/target/flink-cdc-pipeline-connector-mysql-${FLINK_CDC_VERSION}.jar /opt/flink/usrlib/flink-cdc-pipeline-connector-mysql-${FLINK_CDC_VERSION}.jar
# copy flink cdc pipeline conf file, Here is an example. Users can replace it according to their needs.
COPY $PIPELINE_DEFINITION_FILE $FLINK_CDC_HOME/conf
10 changes: 9 additions & 1 deletion flink-cdc-cli/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ limitations under the License.
<artifactId>flink-cdc-cli</artifactId>

<properties>
<commons-cli.version>1.6.0</commons-cli.version>
<commons-cli.version>1.7.0</commons-cli.version>
<snakeyaml.version>2.6</snakeyaml.version>
</properties>

Expand All @@ -55,6 +55,14 @@ limitations under the License.
<artifactId>commons-cli</artifactId>
<version>${commons-cli.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,19 @@

import org.apache.flink.cdc.cli.parser.PipelineDefinitionParser;
import org.apache.flink.cdc.cli.parser.YamlPipelineDefinitionParser;
import org.apache.flink.cdc.cli.utils.ConfigurationUtils;
import org.apache.flink.cdc.cli.utils.FlinkEnvironmentUtils;
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.composer.PipelineComposer;
import org.apache.flink.cdc.composer.PipelineDeploymentExecutor;
import org.apache.flink.cdc.composer.PipelineExecution;
import org.apache.flink.cdc.composer.definition.PipelineDef;
import org.apache.flink.cdc.composer.flink.deployment.ComposeDeploymentFactory;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;

import org.apache.commons.cli.CommandLine;

import java.nio.file.Path;
import java.util.List;

Expand All @@ -39,17 +44,21 @@ public class CliExecutor {
private final boolean useMiniCluster;
private final List<Path> additionalJars;

private final CommandLine commandLine;

private PipelineComposer composer = null;

private final SavepointRestoreSettings savepointSettings;

public CliExecutor(
CommandLine commandLine,
Path pipelineDefPath,
Configuration flinkConfig,
Configuration globalPipelineConfig,
boolean useMiniCluster,
List<Path> additionalJars,
SavepointRestoreSettings savepointSettings) {
this.commandLine = commandLine;
this.pipelineDefPath = pipelineDefPath;
this.flinkConfig = flinkConfig;
this.globalPipelineConfig = globalPipelineConfig;
Expand All @@ -59,22 +68,31 @@ public CliExecutor(
}

public PipelineExecution.ExecutionInfo run() throws Exception {
// Parse pipeline definition file
PipelineDefinitionParser pipelineDefinitionParser = new YamlPipelineDefinitionParser();
PipelineDef pipelineDef =
pipelineDefinitionParser.parse(pipelineDefPath, globalPipelineConfig);

// Create composer
PipelineComposer composer = getComposer();

// Compose pipeline
PipelineExecution execution = composer.compose(pipelineDef);

// Execute the pipeline
return execution.execute();
// Create Submit Executor to deployment flink cdc job Or Run Flink CDC Job
boolean isDeploymentMode = ConfigurationUtils.isDeploymentMode(commandLine);
if (isDeploymentMode) {
ComposeDeploymentFactory composeDeploymentFactory = new ComposeDeploymentFactory();
PipelineDeploymentExecutor composeExecutor =
composeDeploymentFactory.getFlinkComposeExecutor(commandLine);
return composeExecutor.deploy(
commandLine,
org.apache.flink.configuration.Configuration.fromMap(flinkConfig.toMap()),
additionalJars);
} else {
// Run CDC Job And Parse pipeline definition file
PipelineDefinitionParser pipelineDefinitionParser = new YamlPipelineDefinitionParser();
PipelineDef pipelineDef =
pipelineDefinitionParser.parse(pipelineDefPath, globalPipelineConfig);
// Create composer
PipelineComposer composer = getComposer();
// Compose pipeline
PipelineExecution execution = composer.compose(pipelineDef);
// Execute or submit the pipeline
return execution.execute();
}
}

private PipelineComposer getComposer() {
private PipelineComposer getComposer() throws Exception {
if (composer == null) {
return FlinkEnvironmentUtils.createComposer(
useMiniCluster, flinkConfig, additionalJars, savepointSettings);
Expand Down Expand Up @@ -102,6 +120,11 @@ public List<Path> getAdditionalJars() {
return additionalJars;
}

@VisibleForTesting
public String getDeploymentTarget() {
return commandLine.getOptionValue("target");
}

public SavepointRestoreSettings getSavepointSettings() {
return savepointSettings;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.FileNotFoundException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
Expand Down Expand Up @@ -83,13 +81,9 @@ static CliExecutor createExecutor(CommandLine commandLine) throws Exception {
"Missing pipeline definition file path in arguments. ");
}

// Take the first unparsed argument as the pipeline definition file
Path pipelineDefPath = Paths.get(unparsedArgs.get(0));
if (!Files.exists(pipelineDefPath)) {
throw new FileNotFoundException(
String.format("Cannot find pipeline definition file \"%s\"", pipelineDefPath));
}

// Take the first unparsed argument as the pipeline definition file
LOG.info("Real Path pipelineDefPath {}", pipelineDefPath);
// Global pipeline configuration
Configuration globalPipelineConfig = getGlobalConfig(commandLine);

Expand All @@ -111,6 +105,7 @@ static CliExecutor createExecutor(CommandLine commandLine) throws Exception {

// Build executor
return new CliExecutor(
commandLine,
pipelineDefPath,
flinkConfig,
globalPipelineConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,16 @@ public class CliFrontendOptions {
.desc("JARs to be submitted together with the pipeline")
.build();

public static final Option TARGET =
Option.builder("t")
.longOpt("target")
.hasArg()
.desc(
"The deployment target for the execution. This can take one of the following values "
+ "local/remote/yarn-session/yarn-application/kubernetes-session/kubernetes"
+ "-application")
.build();

public static final Option USE_MINI_CLUSTER =
Option.builder()
.longOpt("use-mini-cluster")
Expand Down Expand Up @@ -91,6 +101,8 @@ public static Options initializeOptions() {
.addOption(FLINK_HOME)
.addOption(GLOBAL_CONFIG)
.addOption(USE_MINI_CLUSTER)
.addOption(TARGET)
.addOption(USE_MINI_CLUSTER)
.addOption(SAVEPOINT_PATH_OPTION)
.addOption(SAVEPOINT_CLAIM_MODE)
.addOption(SAVEPOINT_ALLOW_NON_RESTORED_OPTION);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,18 @@
package org.apache.flink.cdc.cli.utils;

import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.client.deployment.executors.LocalExecutor;
import org.apache.flink.client.deployment.executors.RemoteExecutor;

import org.apache.commons.cli.CommandLine;

import java.nio.file.Path;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.apache.flink.cdc.cli.CliFrontendOptions.TARGET;

/** Utilities for handling {@link Configuration}. */
public class ConfigurationUtils {

Expand Down Expand Up @@ -62,4 +68,11 @@ private static Map<String, String> flattenConfigMap(

return flattenedMap;
}

public static boolean isDeploymentMode(CommandLine commandLine) {
String target = commandLine.getOptionValue(TARGET);
return target != null
&& !target.equalsIgnoreCase(LocalExecutor.NAME)
&& !target.equalsIgnoreCase(RemoteExecutor.NAME);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,19 @@ void testSavePointConfiguration() throws Exception {
assertThat(executor.getSavepointSettings().allowNonRestoredState()).isTrue();
}

@Test
void testDeploymentTargetConfiguration() throws Exception {
CliExecutor executor =
createExecutor(
pipelineDef(),
"--flink-home",
flinkHome(),
"-t",
"kubernetes-application",
"-n");
assertThat(executor.getDeploymentTarget()).isEqualTo("kubernetes-application");
}

@Test
void testAdditionalJar() throws Exception {
String aJar = "/foo/jar/a.jar";
Expand Down Expand Up @@ -177,6 +190,10 @@ private String globalPipelineConfig() throws Exception {
+ " was triggered.\n"
+ " -s,--from-savepoint <arg> Path to a savepoint to restore the job from\n"
+ " (for example hdfs:///flink/savepoint-1537\n"
+ " -t,--target <arg> The deployment target for the execution. This\n"
+ " can take one of the following values\n"
+ " local/remote/yarn-session/yarn-application/ku\n"
+ " bernetes-session/kubernetes-application\n"
+ " --use-mini-cluster Use Flink MiniCluster to run the pipeline\n";

private static class NoOpComposer implements PipelineComposer {
Expand Down
7 changes: 6 additions & 1 deletion flink-cdc-composer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,19 @@ limitations under the License.
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-kubernetes</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.composer;

import org.apache.flink.configuration.Configuration;

import org.apache.commons.cli.CommandLine;

import java.nio.file.Path;
import java.util.List;

/** PipelineDeploymentExecutor to execute flink cdc job from different target. */
public interface PipelineDeploymentExecutor {

PipelineExecution.ExecutionInfo deploy(
CommandLine commandLine, Configuration flinkConfig, List<Path> additionalJars)
throws Exception;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.composer.flink.deployment;

import org.apache.flink.cdc.composer.PipelineDeploymentExecutor;

import org.apache.commons.cli.CommandLine;

/** Create deployment methods corresponding to different goals. */
public class ComposeDeploymentFactory {

public PipelineDeploymentExecutor getFlinkComposeExecutor(CommandLine commandLine) {
String target = commandLine.getOptionValue("target");
if (target.equalsIgnoreCase("kubernetes-application")) {
return new K8SApplicationDeploymentExecutor();
}
throw new IllegalArgumentException(
String.format("Deployment target %s is not supported", target));
}
}
Loading