Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat chunjun website final #1024

Merged
merged 7 commits into from
Jul 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,8 @@ nohup.out

# chunjun plugins
chunjun-dist/

*/.cache
*/node_modules
/website/public
/website/public/page-data/
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import java.util.Map;

/**
* @program: flinkx
* @program: chunjun
* @author: wuren
* @create: 2021/10/19
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package com.dtstack.chunjun.connector.hbase;

/**
* @program: flinkx
* @program: chunjun
* @author: wuren
* @create: 2021/10/15
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@

/**
* @author wuren
* @program flinkx
* @program chunjun
* @create 2021/04/30
*/

Expand Down
30 changes: 14 additions & 16 deletions chunjun-core/src/main/java/com/dtstack/chunjun/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@
import com.dtstack.chunjun.util.PropertiesUtil;
import com.dtstack.chunjun.util.TableUtil;

import com.google.common.base.Preconditions;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;

import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.core.execution.JobClient;
Expand All @@ -69,9 +73,6 @@
import org.apache.flink.table.factories.TableFactoryService;
import org.apache.flink.table.types.DataType;

import com.google.common.base.Preconditions;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -138,6 +139,7 @@ public static void main(String[] args) throws Exception {
* @param tableEnv
* @param job
* @param options
*
* @throws Exception
*/
private static void exeSqlJob(
Expand Down Expand Up @@ -171,6 +173,7 @@ private static void exeSqlJob(
* @param tableEnv
* @param job
* @param options
*
* @throws Exception
*/
private static void exeSyncJob(
Expand All @@ -179,7 +182,7 @@ private static void exeSyncJob(
String job,
Options options)
throws Exception {
SyncConf config = parseFlinkxConf(job, options);
SyncConf config = parseConf(job, options);
configStreamExecutionEnvironment(env, options, config);

SourceFactory sourceFactory = DataSyncFactoryUtil.discoverSource(config, env);
Expand Down Expand Up @@ -236,6 +239,7 @@ private static void exeSyncJob(
* @param tableEnv
* @param config
* @param sourceDataStream
*
* @return
*/
private static DataStream<RowData> syncStreamToTable(
Expand Down Expand Up @@ -273,17 +277,14 @@ private static DataStream<RowData> syncStreamToTable(
*
* @param job
* @param options
*
* @return
*/
public static SyncConf parseFlinkxConf(String job, Options options) {
public static SyncConf parseConf(String job, Options options) {
SyncConf config;
try {
config = SyncConf.parseJob(job);

if (StringUtils.isNotBlank(options.getFlinkxDistDir())) {
config.setPluginRoot(options.getFlinkxDistDir());
}

Properties confProperties = PropertiesUtil.parseConf(options.getConfProp());

String savePointPath =
Expand All @@ -292,9 +293,6 @@ public static SyncConf parseFlinkxConf(String job, Options options) {
config.setSavePointPath(savePointPath);
}

if (StringUtils.isNotBlank(options.getRemoteFlinkxDistDir())) {
config.setRemotePluginPath(options.getRemoteFlinkxDistDir());
}
} catch (Exception e) {
throw new ChunJunRuntimeException(e);
}
Expand All @@ -306,7 +304,7 @@ public static SyncConf parseFlinkxConf(String job, Options options) {
*
* @param env StreamExecutionEnvironment
* @param options options
* @param config FlinkxConf
* @param config ChunJunConf
*/
private static void configStreamExecutionEnvironment(
StreamExecutionEnvironment env, Options options, SyncConf config) {
Expand All @@ -317,13 +315,13 @@ private static void configStreamExecutionEnvironment(
} else {
Preconditions.checkArgument(
ExecuteProcessHelper.checkRemoteSqlPluginPath(
options.getRemoteFlinkxDistDir(),
options.getRemoteChunJunDistDir(),
options.getMode(),
options.getPluginLoadMode()),
"Non-local mode or shipfile deployment mode, remoteSqlPluginPath is required");
FactoryHelper factoryHelper = new FactoryHelper();
factoryHelper.setLocalPluginPath(options.getFlinkxDistDir());
factoryHelper.setRemotePluginPath(options.getRemoteFlinkxDistDir());
factoryHelper.setLocalPluginPath(options.getChunjunDistDir());
factoryHelper.setRemotePluginPath(options.getRemoteChunJunDistDir());
factoryHelper.setPluginLoadMode(options.getPluginLoadMode());
factoryHelper.setEnv(env);
factoryHelper.setExecutionMode(options.getMode());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ public void setExecuteDdlAble(boolean executeDdlAble) {

@Override
public String toString() {
return "FlinkxCommonConf{"
return "ChunJunCommonConf{"
+ "speedBytes="
+ speedBytes
+ ", errorRecord="
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public class Metrics {

public static final String MAX_VALUE = "maxValue";

public static final String METRIC_GROUP_KEY_FLINKX = "flinkx";
public static final String METRIC_GROUP_KEY_CHUNJUN = "chunjun";

public static final String METRIC_GROUP_VALUE_OUTPUT = "output";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public BaseMetric(RuntimeContext runtimeContext) {
runtimeContext
.getMetricGroup()
.addGroup(
Metrics.METRIC_GROUP_KEY_FLINKX, Metrics.METRIC_GROUP_VALUE_OUTPUT);
Metrics.METRIC_GROUP_KEY_CHUNJUN, Metrics.METRIC_GROUP_VALUE_OUTPUT);

chunjunDirtyMetricGroup =
chunjunMetricGroup.addGroup(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@
import com.dtstack.chunjun.enums.ClusterMode;
import com.dtstack.chunjun.util.PropertiesUtil;

import org.apache.commons.lang.StringUtils;

import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.yarn.configuration.YarnConfigOptions;

import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -58,10 +59,6 @@ public class Options {
@OptionRequired(description = "Flink configuration directory")
private String flinkConfDir;

@Deprecated
@OptionRequired(description = "ChunJun dist dir")
private String flinkxDistDir;

@OptionRequired(description = "ChunJun dist dir")
private String chunjunDistDir;

Expand All @@ -80,10 +77,6 @@ public class Options {
@OptionRequired(description = "plugin load mode, by classpath or shipfile")
private String pluginLoadMode = "shipfile";

@Deprecated
@OptionRequired(description = "remote ChunJun dist dir")
private String remoteFlinkxDistDir;

@OptionRequired(description = "remote ChunJun dist dir")
private String remoteChunJunDistDir;

Expand Down Expand Up @@ -145,17 +138,7 @@ public void setFlinkConfDir(String flinkConfDir) {
}

public String getChunjunDistDir() {
String flinkxDistDir = this.flinkxDistDir;
String chunjunDistDir = this.chunjunDistDir;
String distDir;

if (StringUtils.isNotBlank(flinkxDistDir)) {
LOG.warn("Option 'flinkxDistDir' is deprecated, please replace with 'chunjunDistDir'.");
distDir = flinkxDistDir;
} else {
distDir = chunjunDistDir;
}
return distDir;
return this.chunjunDistDir;
}

public void setChunjunDistDir(String chunjunDistDir) {
Expand Down Expand Up @@ -203,18 +186,7 @@ public void setPluginLoadMode(String pluginLoadMode) {
}

public String getRemoteChunJunDistDir() {
String remoteFlinkxDistDir = this.remoteFlinkxDistDir;
String remoteChunJunDistDir = this.remoteChunJunDistDir;
String remoteDir;

if (StringUtils.isNotBlank(remoteFlinkxDistDir)) {
LOG.warn(
"Option 'remoteFlinkxDistDir' is deprecated, please replace with 'remoteChunJunDistDir'.");
remoteDir = remoteFlinkxDistDir;
} else {
remoteDir = remoteChunJunDistDir;
}
return remoteDir;
return this.remoteChunJunDistDir;
}

public void setRemoteChunJunDistDir(String remoteChunJunDistDir) {
Expand Down Expand Up @@ -253,43 +225,6 @@ public void setJobType(String jobType) {
this.jobType = jobType;
}

public String getFlinkxDistDir() {
String flinkxDistDir = this.flinkxDistDir;
String chunjunDistDir = this.chunjunDistDir;
String distDir;

if (StringUtils.isNotBlank(flinkxDistDir)) {
LOG.warn("Option 'flinkxDistDir' is deprecated, please replace with 'chunjunDistDir'.");
distDir = flinkxDistDir;
} else {
distDir = chunjunDistDir;
}
return distDir;
}

public void setFlinkxDistDir(String flinkxDistDir) {
this.flinkxDistDir = flinkxDistDir;
}

public String getRemoteFlinkxDistDir() {
String remoteFlinkxDistDir = this.remoteFlinkxDistDir;
String remoteChunJunDistDir = this.remoteChunJunDistDir;
String remoteDir;

if (StringUtils.isNotBlank(remoteFlinkxDistDir)) {
LOG.warn(
"Option 'remoteFlinkxDistDir' is deprecated, please replace with 'remoteChunJunDistDir'.");
remoteDir = remoteFlinkxDistDir;
} else {
remoteDir = remoteChunJunDistDir;
}
return remoteDir;
}

public void setRemoteFlinkxDistDir(String remoteFlinkxDistDir) {
this.remoteFlinkxDistDir = remoteFlinkxDistDir;
}

@Override
public String toString() {
return new StringJoiner(", ", Options.class.getSimpleName() + "[", "]")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ protected DataStreamSink<RowData> createOutput(
return createOutput(dataSet, outputFormat, this.getClass().getSimpleName().toLowerCase());
}

/** 初始化FlinkxCommonConf */
/** 初始化ChunJunCommonConf */
public void initCommonConf(ChunJunCommonConf commonConf) {
PropertiesUtil.initCommonConf(commonConf, this.syncConf);
commonConf.setCheckFormat(this.syncConf.getWriter().getBooleanVal("check", true));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public static Set<URL> getDdlJarFileDirPath(
}

/**
* Obtain local and remote FlinkX plugin jar package path
* Obtain local and remote ChunJun plugin jar package path
*
* @param pluginPath
* @param suffix
Expand Down Expand Up @@ -414,7 +414,7 @@ public static List<String> setPipelineOptionsToEnvConfig(
jarList.addAll(urlList);

List<String> pipelineJars = new ArrayList();
LOG.info("Flinkx executionMode: " + executionMode);
LOG.info("ChunJun executionMode: " + executionMode);
if (ClusterMode.getByName(executionMode) == ClusterMode.kubernetesApplication) {
for (String jarUrl : jarList) {
String newJarUrl = jarUrl;
Expand All @@ -430,7 +430,7 @@ public static List<String> setPipelineOptionsToEnvConfig(
pipelineJars.addAll(jarList);
}

LOG.info("Flinkx reset pipeline.jars: " + pipelineJars);
LOG.info("ChunJun reset pipeline.jars: " + pipelineJars);
configuration.set(PipelineOptions.JARS, pipelineJars);

List<String> classpathList = configuration.get(PipelineOptions.CLASSPATHS);
Expand Down
2 changes: 1 addition & 1 deletion chunjun-formats/chunjun-protobuf/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>flinkx-protobuf</artifactId>
<artifactId>chunjun-protobuf</artifactId>

<properties>
<maven.compiler.source>8</maven.compiler.source>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public void open() {

@Override
public void registerMetric(Accumulator accumulator, String name) {
name = Metrics.METRIC_GROUP_KEY_FLINKX + "_" + name;
name = Metrics.METRIC_GROUP_KEY_CHUNJUN + "_" + name;
ReporterScopedSettings reporterScopedSettings =
new ReporterScopedSettings(0, ',', Collections.emptySet());
FrontMetricGroup front =
Expand Down
Loading