diff --git a/.gitignore b/.gitignore index 9d1d39654a7f5a..a53f275cb54962 100644 --- a/.gitignore +++ b/.gitignore @@ -27,6 +27,8 @@ log/ fe_plugins/*/target fe_plugins/output fe/mocked +fe/*/target +dependency-reduced-pom.xml #ignore eclipse project file & idea project file @@ -43,6 +45,8 @@ be/src/gen_cpp/*.cc be/src/gen_cpp/*.cpp be/src/gen_cpp/*.h be/src/gen_cpp/opcode +be/ut_build_ASAN/ +be/tags #ignore vscode project file .vscode diff --git a/build.sh b/build.sh index 89903813cf3146..fc287aa08086b9 100755 --- a/build.sh +++ b/build.sh @@ -49,7 +49,8 @@ usage() { Usage: $0 Optional options: --be build Backend - --fe build Frontend + --fe build Frontend and Spark Dpp application + --spark-dpp build Spark DPP application --clean clean and build target --with-mysql enable MySQL support(default) --without-mysql disable MySQL support @@ -57,12 +58,13 @@ Usage: $0 --without-lzo disable LZO compress support Eg. - $0 build Backend and Frontend without clean + $0 build all $0 --be build Backend without clean $0 --be --without-mysql build Backend with MySQL disable $0 --be --without-mysql --without-lzo build Backend with both MySQL and LZO disable - $0 --fe --clean clean and build Frontend - $0 --fe --be --clean clean and build both Frontend and Backend + $0 --fe --clean clean and build Frontend and Spark Dpp application + $0 --fe --be --clean clean and build Frontend, Spark Dpp application and Backend + $0 --spark-dpp build Spark DPP application alone " exit 1 } @@ -73,6 +75,7 @@ OPTS=$(getopt \ -o 'h' \ -l 'be' \ -l 'fe' \ + -l 'spark-dpp' \ -l 'clean' \ -l 'with-mysql' \ -l 'without-mysql' \ @@ -89,26 +92,30 @@ eval set -- "$OPTS" BUILD_BE= BUILD_FE= +BUILD_SPARK_DPP= CLEAN= RUN_UT= WITH_MYSQL=ON WITH_LZO=ON HELP=0 if [ $# == 1 ] ; then - # defuat + # default BUILD_BE=1 BUILD_FE=1 + BUILD_SPARK_DPP=1 CLEAN=0 RUN_UT=0 else BUILD_BE=0 BUILD_FE=0 + BUILD_SPARK_DPP=0 CLEAN=0 RUN_UT=0 while true; do case "$1" in --be) BUILD_BE=1 ; shift ;; --fe) BUILD_FE=1 ; shift ;; + --spark-dpp) BUILD_SPARK_DPP=1 ; shift ;; --clean) CLEAN=1 ; shift ;; --ut) RUN_UT=1 ; shift ;; --with-mysql) WITH_MYSQL=ON; shift ;; @@ -128,18 +135,19 @@ if [[ ${HELP} -eq 1 ]]; then exit fi -if [ ${CLEAN} -eq 1 -a ${BUILD_BE} -eq 0 -a ${BUILD_FE} -eq 0 ]; then - echo "--clean can not be specified without --fe or --be" +if [ ${CLEAN} -eq 1 -a ${BUILD_BE} -eq 0 -a ${BUILD_FE} -eq 0 -a ${BUILD_SPARK_DPP} -eq 0 ]; then + echo "--clean can not be specified without --fe or --be or --spark-dpp" exit 1 fi echo "Get params: - BUILD_BE -- $BUILD_BE - BUILD_FE -- $BUILD_FE - CLEAN -- $CLEAN - RUN_UT -- $RUN_UT - WITH_MYSQL -- $WITH_MYSQL - WITH_LZO -- $WITH_LZO + BUILD_BE -- $BUILD_BE + BUILD_FE -- $BUILD_FE + BUILD_SPARK_DPP -- $BUILD_SPARK_DPP + CLEAN -- $CLEAN + RUN_UT -- $RUN_UT + WITH_MYSQL -- $WITH_MYSQL + WITH_LZO -- $WITH_LZO " # Clean and build generated code @@ -175,14 +183,25 @@ cd ${DORIS_HOME}/docs ./build_help_zip.sh cd ${DORIS_HOME} +# Assesmble FE modules +FE_MODULES= +if [ ${BUILD_FE} -eq 1 -o ${BUILD_SPARK_DPP} -eq 1 ]; then + if [ ${BUILD_SPARK_DPP} -eq 1 ]; then + FE_MODULES="fe-common,spark-dpp" + fi + if [ ${BUILD_FE} -eq 1 ]; then + FE_MODULES="fe-common,spark-dpp,fe-core" + fi +fi + # Clean and build Frontend -if [ ${BUILD_FE} -eq 1 ] ; then - echo "Build Frontend" +if [ ${FE_MODULES}x != ""x ]; then + echo "Build Frontend Modules: $FE_MODULES" cd ${DORIS_HOME}/fe if [ ${CLEAN} -eq 1 ]; then ${MVN_CMD} clean fi - ${MVN_CMD} package -DskipTests + ${MVN_CMD} package -pl ${FE_MODULES} -DskipTests cd ${DORIS_HOME} fi @@ -190,19 +209,29 @@ fi DORIS_OUTPUT=${DORIS_HOME}/output/ mkdir -p ${DORIS_OUTPUT} -#Copy Frontend and Backend -if [ ${BUILD_FE} -eq 1 ]; then - install -d ${DORIS_OUTPUT}/fe/bin ${DORIS_OUTPUT}/fe/conf \ - ${DORIS_OUTPUT}/fe/webroot/ ${DORIS_OUTPUT}/fe/lib/ - - cp -r -p ${DORIS_HOME}/bin/*_fe.sh ${DORIS_OUTPUT}/fe/bin/ - cp -r -p ${DORIS_HOME}/conf/fe.conf ${DORIS_OUTPUT}/fe/conf/ - rm -rf ${DORIS_OUTPUT}/fe/lib/* - cp -r -p ${DORIS_HOME}/fe/fe-core/target/lib/* ${DORIS_OUTPUT}/fe/lib/ - cp -r -p ${DORIS_HOME}/fe/fe-core/target/palo-fe.jar ${DORIS_OUTPUT}/fe/lib/ - cp -r -p ${DORIS_HOME}/docs/build/help-resource.zip ${DORIS_OUTPUT}/fe/lib/ - cp -r -p ${DORIS_HOME}/webroot/* ${DORIS_OUTPUT}/fe/webroot/ +# Copy Frontend and Backend +if [ ${BUILD_FE} -eq 1 -o ${BUILD_SPARK_DPP} -eq 1 ]; then + if [ ${BUILD_FE} -eq 1 ]; then + install -d ${DORIS_OUTPUT}/fe/bin ${DORIS_OUTPUT}/fe/conf \ + ${DORIS_OUTPUT}/fe/webroot/ ${DORIS_OUTPUT}/fe/lib/ \ + ${DORIS_OUTPUT}/fe/spark-dpp/ + + cp -r -p ${DORIS_HOME}/bin/*_fe.sh ${DORIS_OUTPUT}/fe/bin/ + cp -r -p ${DORIS_HOME}/conf/fe.conf ${DORIS_OUTPUT}/fe/conf/ + rm -rf ${DORIS_OUTPUT}/fe/lib/* + cp -r -p ${DORIS_HOME}/fe/fe-core/target/lib/* ${DORIS_OUTPUT}/fe/lib/ + cp -r -p ${DORIS_HOME}/fe/fe-core/target/palo-fe.jar ${DORIS_OUTPUT}/fe/lib/ + cp -r -p ${DORIS_HOME}/docs/build/help-resource.zip ${DORIS_OUTPUT}/fe/lib/ + cp -r -p ${DORIS_HOME}/webroot/* ${DORIS_OUTPUT}/fe/webroot/ + cp -r -p ${DORIS_HOME}/fe/spark-dpp/target/spark-dpp-*-jar-with-dependencies.jar ${DORIS_OUTPUT}/fe/spark-dpp/ + + elif [ ${BUILD_SPARK_DPP} -eq 1 ]; then + install -d ${DORIS_OUTPUT}/fe/spark-dpp/ + rm -rf ${DORIS_OUTPUT}/fe/spark-dpp/* + cp -r -p ${DORIS_HOME}/fe/spark-dpp/target/spark-dpp-*-jar-with-dependencies.jar ${DORIS_OUTPUT}/fe/spark-dpp/ + fi fi + if [ ${BUILD_BE} -eq 1 ]; then install -d ${DORIS_OUTPUT}/be/bin \ ${DORIS_OUTPUT}/be/conf \ diff --git a/fe/README b/fe/README new file mode 100644 index 00000000000000..4b7e81ad1aa289 --- /dev/null +++ b/fe/README @@ -0,0 +1,14 @@ +# fe-common + +This module is used to store some common classes of other modules. + +# spark-dpp + +This module is Spark DPP program, used for Spark Load function. +Depends: fe-common + +# fe-core + +This module is the main process module of FE. +Depends: fe-common, spark-dpp + diff --git a/fe/fe-common/pom.xml b/fe/fe-common/pom.xml new file mode 100644 index 00000000000000..225182c8c62961 --- /dev/null +++ b/fe/fe-common/pom.xml @@ -0,0 +1,68 @@ + + + + + + 4.0.0 + + + org.apache + doris-fe + 3.4.0 + ../pom.xml + + + fe-common + 1.0.0 + jar + + + ${basedir}/../../ + + + + + + + org.apache.maven.plugins + maven-checkstyle-plugin + 3.1.0 + + checkstyle.xml + UTF-8 + true + true + false + **/jmockit/**/* + + + + validate + validate + + check + + + + + + + diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/jmockit/AutoType.java b/fe/fe-common/src/main/java/org/apache/doris/common/jmockit/AutoType.java similarity index 100% rename from fe/fe-core/src/test/java/org/apache/doris/common/jmockit/AutoType.java rename to fe/fe-common/src/main/java/org/apache/doris/common/jmockit/AutoType.java diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/jmockit/ConstructorReflection.java b/fe/fe-common/src/main/java/org/apache/doris/common/jmockit/ConstructorReflection.java similarity index 100% rename from fe/fe-core/src/test/java/org/apache/doris/common/jmockit/ConstructorReflection.java rename to fe/fe-common/src/main/java/org/apache/doris/common/jmockit/ConstructorReflection.java diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/jmockit/Deencapsulation.java b/fe/fe-common/src/main/java/org/apache/doris/common/jmockit/Deencapsulation.java similarity index 100% rename from fe/fe-core/src/test/java/org/apache/doris/common/jmockit/Deencapsulation.java rename to fe/fe-common/src/main/java/org/apache/doris/common/jmockit/Deencapsulation.java diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/jmockit/FieldReflection.java b/fe/fe-common/src/main/java/org/apache/doris/common/jmockit/FieldReflection.java similarity index 100% rename from fe/fe-core/src/test/java/org/apache/doris/common/jmockit/FieldReflection.java rename to fe/fe-common/src/main/java/org/apache/doris/common/jmockit/FieldReflection.java diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/jmockit/GeneratedClasses.java b/fe/fe-common/src/main/java/org/apache/doris/common/jmockit/GeneratedClasses.java similarity index 100% rename from fe/fe-core/src/test/java/org/apache/doris/common/jmockit/GeneratedClasses.java rename to fe/fe-common/src/main/java/org/apache/doris/common/jmockit/GeneratedClasses.java diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/jmockit/MethodReflection.java b/fe/fe-common/src/main/java/org/apache/doris/common/jmockit/MethodReflection.java similarity index 100% rename from fe/fe-core/src/test/java/org/apache/doris/common/jmockit/MethodReflection.java rename to fe/fe-common/src/main/java/org/apache/doris/common/jmockit/MethodReflection.java diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/jmockit/ParameterReflection.java b/fe/fe-common/src/main/java/org/apache/doris/common/jmockit/ParameterReflection.java similarity index 100% rename from fe/fe-core/src/test/java/org/apache/doris/common/jmockit/ParameterReflection.java rename to fe/fe-common/src/main/java/org/apache/doris/common/jmockit/ParameterReflection.java diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/jmockit/ThrowOfCheckedException.java b/fe/fe-common/src/main/java/org/apache/doris/common/jmockit/ThrowOfCheckedException.java similarity index 100% rename from fe/fe-core/src/test/java/org/apache/doris/common/jmockit/ThrowOfCheckedException.java rename to fe/fe-common/src/main/java/org/apache/doris/common/jmockit/ThrowOfCheckedException.java diff --git a/fe/fe-core/pom.xml b/fe/fe-core/pom.xml index a35d2b67136d72..d3af8612bba4d3 100644 --- a/fe/fe-core/pom.xml +++ b/fe/fe-core/pom.xml @@ -30,7 +30,7 @@ under the License. ../pom.xml - doris-fe-core + fe-core 3.4.0 jar @@ -40,6 +40,11 @@ under the License. + + org.apache + spark-dpp + + cglib diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java index cef44ea19608be..874d3d1945d3c7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java @@ -514,7 +514,7 @@ public class Config extends ConfigBase { * Default spark dpp version */ @ConfField - public static String spark_dpp_version = "1_0_0"; + public static String spark_dpp_version = "1.0.0"; /** * Default spark load timeout */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java index 5063e9b1050070..428e88c3cae693 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java @@ -53,6 +53,7 @@ import org.apache.spark.launcher.SparkAppHandle.State; import org.apache.spark.launcher.SparkLauncher; +import java.io.File; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.List; @@ -78,10 +79,14 @@ public class SparkEtlJobHandler { class SparkAppListener implements Listener { @Override - public void stateChanged(SparkAppHandle sparkAppHandle) {} + public void stateChanged(SparkAppHandle sparkAppHandle) { + LOG.info("get spark state changed: {}, app id: {}", sparkAppHandle.getState(), sparkAppHandle.getAppId()); + } @Override - public void infoChanged(SparkAppHandle sparkAppHandle) {} + public void infoChanged(SparkAppHandle sparkAppHandle) { + LOG.info("get spark info changed: {}, app id: {}", sparkAppHandle.getState(), sparkAppHandle.getAppId()); + } } public void submitEtlJob(long loadJobId, String loadLabel, EtlJobConfig etlJobConfig, SparkResource resource, @@ -134,7 +139,9 @@ public void submitEtlJob(long loadJobId, String loadLabel, EtlJobConfig etlJobCo .setMainClass(SparkEtlJob.class.getCanonicalName()) .setAppName(String.format(ETL_JOB_NAME, loadLabel)) .setSparkHome(sparkHome) - .addAppArgs(jobConfigHdfsPath); + .addAppArgs(jobConfigHdfsPath) + .redirectError() + .redirectOutput(new File(Config.sys_log_dir + "/spark-submitter.log")); // spark configs for (Map.Entry entry : resource.getSparkConfigs().entrySet()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkRepository.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkRepository.java index 9073c151b36105..18f8504886bbf8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkRepository.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkRepository.java @@ -60,14 +60,14 @@ public class SparkRepository { public static final String REPOSITORY_DIR = "__spark_repository__"; public static final String PREFIX_ARCHIVE = "__archive_"; public static final String PREFIX_LIB = "__lib_"; - public static final String SPARK_DPP = "spark-dpp"; + public static final String SPARK_DPP_JAR = "spark-dpp-" + Config.spark_dpp_version + "-jar-with-dependencies.jar"; + public static final String SPARK_DPP = "spark-dpp-" + Config.spark_dpp_version + "-jar-with-dependencies"; public static final String SPARK_2X = "spark-2x"; - public static final String SUFFIX = ".zip"; private static final String PATH_DELIMITER = "/"; private static final String FILE_NAME_SEPARATOR = "_"; - private static final String DPP_RESOURCE = "/spark-dpp/spark-dpp.jar"; + private static final String DPP_RESOURCE_DIR = "/spark-dpp/"; private static final String SPARK_RESOURCE = "/jars/spark-2x.zip"; private String remoteRepositoryPath; @@ -85,7 +85,7 @@ public SparkRepository(String remoteRepositoryPath, BrokerDesc brokerDesc) { this.brokerDesc = brokerDesc; this.currentDppVersion = Config.spark_dpp_version; this.currentArchive = new SparkArchive(getRemoteArchivePath(currentDppVersion), currentDppVersion); - this.localDppPath = PaloFe.DORIS_HOME_DIR + DPP_RESOURCE; + this.localDppPath = PaloFe.DORIS_HOME_DIR + DPP_RESOURCE_DIR + SPARK_DPP_JAR; if (!Strings.isNullOrEmpty(Config.spark_resource_path)) { this.localSpark2xPath = Config.spark_resource_path; } else { @@ -98,7 +98,7 @@ public void prepare() throws LoadException { } private void initRepository() throws LoadException { - LOG.info("start to init remote repository"); + LOG.info("start to init remote repositoryi. local dpp: {}", this.localDppPath); boolean needUpload = false; boolean needReplace = false; CHECK: { @@ -222,7 +222,11 @@ private void getLibraries(String remoteArchivePath, List libraries if (!fileName.startsWith(PREFIX_LIB)) { continue; } - String[] lib_arg = unWrap(PREFIX_LIB, SUFFIX, fileName).split(FILE_NAME_SEPARATOR); + + // fileName should like: + // __lib_md5sum_spark-dpp-1.0.0-jar-with-dependencies.jar + // __lib_md5sum_spark-2x.zip + String[] lib_arg = unwrap(PREFIX_LIB, fileName).split(FILE_NAME_SEPARATOR); if (lib_arg.length != 2) { continue; } @@ -232,16 +236,12 @@ private void getLibraries(String remoteArchivePath, List libraries } String type = lib_arg[1]; SparkLibrary.LibType libType = null; - switch (type) { - case SPARK_DPP: - libType = SparkLibrary.LibType.DPP; - break; - case SPARK_2X: - libType = SparkLibrary.LibType.SPARK2X; - break; - default: - Preconditions.checkState(false, "wrong library type: " + type); - break; + if (type.equals(SPARK_DPP)) { + libType = SparkLibrary.LibType.DPP; + } else if (type.equals(SPARK_2X)) { + libType = SparkLibrary.LibType.SPARK2X; + } else { + throw new LoadException("Invalid library type: " + type); } SparkLibrary remoteFile = new SparkLibrary(fileStatus.path, md5sum, libType, fileStatus.size); libraries.add(remoteFile); @@ -259,7 +259,7 @@ public String getMd5String(String filePath) throws LoadException { LOG.debug("get md5sum from file {}, md5sum={}", filePath, md5sum); return md5sum; } catch (FileNotFoundException e) { - throw new LoadException("file " + filePath + "dose not exist"); + throw new LoadException("file " + filePath + " does not exist"); } catch (IOException e) { throw new LoadException("failed to get md5sum from file " + filePath); } @@ -302,8 +302,11 @@ private static String getFileName(String delimiter, String path) { return path.substring(path.lastIndexOf(delimiter) + 1); } - private static String unWrap(String prefix, String suffix, String fileName) { - return fileName.substring(prefix.length(), fileName.length() - suffix.length()); + // input: __lib_md5sum_spark-dpp-1.0.0-jar-with-dependencies.jar + // output: md5sum_spark-dpp-1.0.0-jar-with-dependencies + private static String unwrap(String prefix, String fileName) { + int pos = fileName.lastIndexOf("."); + return fileName.substring(prefix.length(), pos); } private static String joinPrefix(String prefix, String fileName) { diff --git a/fe/pom.xml b/fe/pom.xml index 68e167c8b6a563..6ca26cba402280 100644 --- a/fe/pom.xml +++ b/fe/pom.xml @@ -29,6 +29,8 @@ under the License. pom + fe-common + spark-dpp fe-core @@ -108,6 +110,18 @@ under the License. + + org.apache + fe-common + 1.0.0 + + + + org.apache + spark-dpp + 1.0.0 + + cglib @@ -573,6 +587,62 @@ under the License. 2.4.5 provided + + + org.apache.hadoop + hadoop-common + 2.6.5 + provided + + + + org.apache.parquet + parquet-column + 1.10.1 + provided + + + + org.apache.parquet + parquet-hadoop + 1.10.1 + provided + + + + org.apache.parquet + parquet-common + 1.10.1 + provided + + + + commons-collections + commons-collections + 3.2.1 + compile + + + + org.scala-lang + scala-library + 2.12.10 + provided + + + + com.esotericsoftware + kryo-shaded + 4.0.2 + compile + + + + org.apache.spark + spark-catalyst_2.12 + 2.4.5 + provided + diff --git a/fe/spark-dpp/pom.xml b/fe/spark-dpp/pom.xml new file mode 100644 index 00000000000000..394534880056f8 --- /dev/null +++ b/fe/spark-dpp/pom.xml @@ -0,0 +1,430 @@ + + + + + + 4.0.0 + + + org.apache + doris-fe + 3.4.0 + ../pom.xml + + + spark-dpp + 1.0.0 + jar + + + ${basedir}/../../ + 1 + + + + + org.apache + fe-common + + + + + commons-codec + commons-codec + provided + + + + + org.apache.commons + commons-lang3 + provided + + + + + com.google.code.gson + gson + + + + + org.jmockit + jmockit + test + + + + + joda-time + joda-time + provided + + + + + junit + junit + test + + + + + log4j + log4j + provided + + + + org.roaringbitmap + RoaringBitmap + + + + + + org.apache.spark + spark-core_2.12 + provided + + + + + org.apache.spark + spark-sql_2.12 + provided + + + + org.apache.hadoop + hadoop-common + provided + + + + org.apache.parquet + parquet-column + provided + + + + org.apache.parquet + parquet-hadoop + provided + + + + org.apache.parquet + parquet-common + provided + + + + commons-collections + commons-collections + provided + + + + org.scala-lang + scala-library + provided + + + + com.esotericsoftware + kryo-shaded + provided + + + + org.apache.spark + spark-catalyst_2.12 + 2.4.5 + provided + + + + com.google.guava + guava + + + + + spark-dpp-${version} + + + + + maven-surefire-plugin + 2.22.2 + + set larger, eg, 3, to reduce the time or running FE unit tests<--> + ${fe_ut_parallel} + not reuse forked jvm, so that each unit test will run in separate jvm. to avoid singleton confict<--> + false + + -javaagent:${settings.localRepository}/org/jmockit/jmockit/1.48/jmockit-1.48.jar + + + + + + + org.apache.maven.plugins + maven-dependency-plugin + 3.1.1 + + + copy-dependencies + package + + copy-dependencies + + + ${project.build.directory}/lib + false + false + true + ${skip.plugin} + + + + + + + maven-assembly-plugin + + + + org.apache.doris.load.loadv2.etl.SparkEtlJob + + + + jar-with-dependencies + + + + + make-assembly + package + + single + + + + + + + org.codehaus.mojo + cobertura-maven-plugin + 2.7 + + + 1024m + + + + + + + org.apache.maven.plugins + maven-checkstyle-plugin + 3.1.0 + + checkstyle.xml + UTF-8 + true + true + false + + + + validate + validate + + check + + + + + + + + maven-clean-plugin + 3.1.0 + + + auto-clean + initialize + + clean + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.2.1 + + + + com.google.code.findbugs:* + org.slf4j:* + + + + + org.roaringbitmap + org.apache.doris.shaded.org.roaringbitmap + com.google.guava + org.apache.doris.shaded.com.google.guava + + + + + + package + + shade + + + + + + + + + + + + org.eclipse.m2e + lifecycle-mapping + 1.0.0 + + + + + + net.sourceforge.czt.dev + cup-maven-plugin + [1.6,) + + generate + + + + + + + + + de.jflex + maven-jflex-plugin + [1.4.3,) + + generate + + + + + + + + + org.codehaus.mojo + exec-maven-plugin + [1.6,) + + exec + + + + + + + + + org.codehaus.mojo + build-helper-maven-plugin + [1.7,) + + add-source + + + + + + + + + org.apache.maven.plugins + maven-dependency-plugin + [3.1.1,) + + copy-dependencies + + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + [3.1,) + + compile + testCompile + + + + + + + + + org.apache.maven.plugins + maven-resources-plugin + [2.6,) + + resources + testResources + + + + + + + + + + + + + + + + + diff --git a/fe/spark-dpp/src/main/java/org/apache/doris/common/Codec.java b/fe/spark-dpp/src/main/java/org/apache/doris/common/Codec.java new file mode 100644 index 00000000000000..abe369f8fcba61 --- /dev/null +++ b/fe/spark-dpp/src/main/java/org/apache/doris/common/Codec.java @@ -0,0 +1,58 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +public class Codec { + + // not support encode negative value now + public static void encodeVarint64(long source, DataOutput out) throws IOException { + assert source >= 0; + short B = 128; + + while (source > B) { + out.write((int) (source & (B - 1) | B)); + source = source >> 7; + } + out.write((int) (source & (B - 1))); + } + + // not support decode negative value now + public static long decodeVarint64(DataInput in) throws IOException { + long result = 0; + int shift = 0; + short B = 128; + + while (true) { + int oneByte = in.readUnsignedByte(); + boolean isEnd = (oneByte & B) == 0; + result = result | ((long) (oneByte & B - 1) << (shift * 7)); + if (isEnd) { + break; + } + shift++; + } + + return result; + } +} + + diff --git a/fe/spark-dpp/src/main/java/org/apache/doris/common/SparkDppException.java b/fe/spark-dpp/src/main/java/org/apache/doris/common/SparkDppException.java new file mode 100644 index 00000000000000..66547461deadad --- /dev/null +++ b/fe/spark-dpp/src/main/java/org/apache/doris/common/SparkDppException.java @@ -0,0 +1,39 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common; + +import com.google.common.base.Strings; + +// Exception for Spark DPP process +public class SparkDppException extends Exception { + public SparkDppException(String msg, Throwable cause) { + super(Strings.nullToEmpty(msg), cause); + } + + public SparkDppException(Throwable cause) { + super(cause); + } + + public SparkDppException(String msg, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { + super(Strings.nullToEmpty(msg), cause, enableSuppression, writableStackTrace); + } + + public SparkDppException(String msg) { + super(Strings.nullToEmpty(msg)); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BitmapValue.java b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/BitmapValue.java similarity index 99% rename from fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BitmapValue.java rename to fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/BitmapValue.java index 08da142bd3f395..d294f3120b1166 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BitmapValue.java +++ b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/BitmapValue.java @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.load.loadv2; +package org.apache.doris.load.loadv2.dpp; import org.roaringbitmap.Util; diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/dpp/ColumnParser.java b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/ColumnParser.java similarity index 97% rename from fe/fe-core/src/main/java/org/apache/doris/load/loadv2/dpp/ColumnParser.java rename to fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/ColumnParser.java index c31ed1b071f337..0b91cb003555f3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/dpp/ColumnParser.java +++ b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/ColumnParser.java @@ -17,7 +17,7 @@ package org.apache.doris.load.loadv2.dpp; -import org.apache.doris.common.UserException; +import org.apache.doris.common.SparkDppException; import org.apache.doris.load.loadv2.etl.EtlJobConfig; import org.joda.time.DateTime; @@ -26,7 +26,7 @@ // Parser to validate value for different type public abstract class ColumnParser implements Serializable { - public static ColumnParser create(EtlJobConfig.EtlColumn etlColumn) throws UserException { + public static ColumnParser create(EtlJobConfig.EtlColumn etlColumn) throws SparkDppException { String columnType = etlColumn.columnType; if (columnType.equalsIgnoreCase("TINYINT")) { return new TinyIntParser(); @@ -52,7 +52,7 @@ public static ColumnParser create(EtlJobConfig.EtlColumn etlColumn) throws UserE || columnType.equalsIgnoreCase("HLL")) { return new StringParser(etlColumn); } else { - throw new UserException("unsupported type:" + columnType); + throw new SparkDppException("unsupported type:" + columnType); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/dpp/DorisKryoRegistrator.java b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/DorisKryoRegistrator.java similarity index 88% rename from fe/fe-core/src/main/java/org/apache/doris/load/loadv2/dpp/DorisKryoRegistrator.java rename to fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/DorisKryoRegistrator.java index f0528ec40cddc1..d434c2f6133915 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/dpp/DorisKryoRegistrator.java +++ b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/DorisKryoRegistrator.java @@ -27,7 +27,7 @@ public class DorisKryoRegistrator implements KryoRegistrator { @Override public void registerClasses(Kryo kryo) { - kryo.register(org.apache.doris.load.loadv2.Roaring64Map.class); - kryo.register(org.apache.doris.load.loadv2.BitmapValue.class); + kryo.register(Roaring64Map.class); + kryo.register(BitmapValue.class); } } \ No newline at end of file diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/dpp/DorisRangePartitioner.java b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/DorisRangePartitioner.java similarity index 100% rename from fe/fe-core/src/main/java/org/apache/doris/load/loadv2/dpp/DorisRangePartitioner.java rename to fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/DorisRangePartitioner.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/dpp/DppColumns.java b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/DppColumns.java similarity index 100% rename from fe/fe-core/src/main/java/org/apache/doris/load/loadv2/dpp/DppColumns.java rename to fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/DppColumns.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/dpp/DppResult.java b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/DppResult.java similarity index 100% rename from fe/fe-core/src/main/java/org/apache/doris/load/loadv2/dpp/DppResult.java rename to fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/DppResult.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/dpp/DppUtils.java b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/DppUtils.java similarity index 93% rename from fe/fe-core/src/main/java/org/apache/doris/load/loadv2/dpp/DppUtils.java rename to fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/DppUtils.java index d0882ac9fd2f93..78ef43b52253a1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/dpp/DppUtils.java +++ b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/DppUtils.java @@ -17,7 +17,7 @@ package org.apache.doris.load.loadv2.dpp; -import org.apache.doris.common.UserException; +import org.apache.doris.common.SparkDppException; import org.apache.doris.load.loadv2.etl.EtlJobConfig; import org.apache.spark.sql.types.DataType; @@ -69,7 +69,7 @@ public static Class getClassFromDataType(DataType dataType) { return null; } - public static Class getClassFromColumn(EtlJobConfig.EtlColumn column) throws UserException { + public static Class getClassFromColumn(EtlJobConfig.EtlColumn column) throws SparkDppException { switch (column.columnType) { case "BOOLEAN": return Boolean.class; @@ -83,7 +83,7 @@ public static Class getClassFromColumn(EtlJobConfig.EtlColumn column) throws Use case "BIGINT": return Long.class; case "LARGEINT": - throw new UserException("LARGEINT is not supported now"); + throw new SparkDppException("LARGEINT is not supported now"); case "FLOAT": return Float.class; case "DOUBLE": @@ -213,14 +213,14 @@ public static StructType createDstTableSchema(List colum return dstSchema; } - public static List parseColumnsFromPath(String filePath, List columnsFromPath) throws UserException { + public static List parseColumnsFromPath(String filePath, List columnsFromPath) throws SparkDppException { if (columnsFromPath == null || columnsFromPath.isEmpty()) { return Collections.emptyList(); } String[] strings = filePath.split("/"); if (strings.length < 2) { System.err.println("Fail to parse columnsFromPath, expected: " + columnsFromPath + ", filePath: " + filePath); - throw new UserException("Reason: Fail to parse columnsFromPath, expected: " + columnsFromPath + ", filePath: " + filePath); + throw new SparkDppException("Reason: Fail to parse columnsFromPath, expected: " + columnsFromPath + ", filePath: " + filePath); } String[] columns = new String[columnsFromPath.size()]; int size = 0; @@ -231,12 +231,12 @@ public static List parseColumnsFromPath(String filePath, List co } if (str == null || !str.contains("=")) { System.err.println("Fail to parse columnsFromPath, expected: " + columnsFromPath + ", filePath: " + filePath); - throw new UserException("Reason: Fail to parse columnsFromPath, expected: " + columnsFromPath + ", filePath: " + filePath); + throw new SparkDppException("Reason: Fail to parse columnsFromPath, expected: " + columnsFromPath + ", filePath: " + filePath); } String[] pair = str.split("=", 2); if (pair.length != 2) { System.err.println("Fail to parse columnsFromPath, expected: " + columnsFromPath + ", filePath: " + filePath); - throw new UserException("Reason: Fail to parse columnsFromPath, expected: " + columnsFromPath + ", filePath: " + filePath); + throw new SparkDppException("Reason: Fail to parse columnsFromPath, expected: " + columnsFromPath + ", filePath: " + filePath); } int index = columnsFromPath.indexOf(pair[0]); if (index == -1) { @@ -250,7 +250,7 @@ public static List parseColumnsFromPath(String filePath, List co } if (size != columnsFromPath.size()) { System.err.println("Fail to parse columnsFromPath, expected: " + columnsFromPath + ", filePath: " + filePath); - throw new UserException("Reason: Fail to parse columnsFromPath, expected: " + columnsFromPath + ", filePath: " + filePath); + throw new SparkDppException("Reason: Fail to parse columnsFromPath, expected: " + columnsFromPath + ", filePath: " + filePath); } return Lists.newArrayList(columns); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/dpp/GlobalDictBuilder.java b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/GlobalDictBuilder.java similarity index 100% rename from fe/fe-core/src/main/java/org/apache/doris/load/loadv2/dpp/GlobalDictBuilder.java rename to fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/GlobalDictBuilder.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/Hll.java b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/Hll.java similarity index 99% rename from fe/fe-core/src/main/java/org/apache/doris/load/loadv2/Hll.java rename to fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/Hll.java index 89eafb134307af..c95a3a413551d1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/Hll.java +++ b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/Hll.java @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.load.loadv2; +package org.apache.doris.load.loadv2.dpp; import org.apache.commons.codec.binary.StringUtils; diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/dpp/MinimumCoverageRollupTreeBuilder.java b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/MinimumCoverageRollupTreeBuilder.java similarity index 100% rename from fe/fe-core/src/main/java/org/apache/doris/load/loadv2/dpp/MinimumCoverageRollupTreeBuilder.java rename to fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/MinimumCoverageRollupTreeBuilder.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/Roaring64Map.java b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/Roaring64Map.java similarity index 99% rename from fe/fe-core/src/main/java/org/apache/doris/load/loadv2/Roaring64Map.java rename to fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/Roaring64Map.java index 8398e959552343..56ad1a7a9c1cad 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/Roaring64Map.java +++ b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/Roaring64Map.java @@ -15,7 +15,10 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.load.loadv2; +package org.apache.doris.load.loadv2.dpp; + +import org.apache.doris.common.Codec; +import org.apache.doris.load.loadv2.dpp.BitmapValue; import org.roaringbitmap.BitmapDataProvider; import org.roaringbitmap.BitmapDataProviderSupplier; @@ -43,10 +46,6 @@ import java.util.SortedMap; import java.util.TreeMap; -import static org.apache.doris.common.util.Util.decodeVarint64; -import static org.apache.doris.common.util.Util.encodeVarint64; -import static org.apache.doris.load.loadv2.BitmapValue.BITMAP32; -import static org.apache.doris.load.loadv2.BitmapValue.BITMAP64; /** * @@ -1316,13 +1315,13 @@ public void serialize(DataOutput out) throws IOException { return; } if (is32BitsEnough()) { - out.write(BITMAP32); + out.write(BitmapValue.BITMAP32); highToBitmap.get(0).serialize(out); return; } - out.write(BITMAP64); - encodeVarint64(highToBitmap.size(), out); + out.write(BitmapValue.BITMAP64); + Codec.encodeVarint64(highToBitmap.size(), out); for (Map.Entry entry : highToBitmap.entrySet()) { out.writeInt(entry.getKey().intValue()); @@ -1347,8 +1346,8 @@ public void deserialize(DataInput in, int bitmapType) throws IOException { highToBitmap = new TreeMap<>(); long nbHighs = 1; - if (bitmapType == BITMAP64) { - nbHighs = decodeVarint64(in); + if (bitmapType == BitmapValue.BITMAP64) { + nbHighs = Codec.decodeVarint64(in); } for (int i = 0; i < nbHighs; i++) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/dpp/RollupTreeBuilder.java b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/RollupTreeBuilder.java similarity index 100% rename from fe/fe-core/src/main/java/org/apache/doris/load/loadv2/dpp/RollupTreeBuilder.java rename to fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/RollupTreeBuilder.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/dpp/RollupTreeNode.java b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/RollupTreeNode.java similarity index 100% rename from fe/fe-core/src/main/java/org/apache/doris/load/loadv2/dpp/RollupTreeNode.java rename to fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/RollupTreeNode.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java similarity index 96% rename from fe/fe-core/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java rename to fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java index 3a1002638b32af..898bd9fe6c748e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java +++ b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java @@ -17,8 +17,7 @@ package org.apache.doris.load.loadv2.dpp; -import org.apache.doris.catalog.FunctionSet; -import org.apache.doris.common.UserException; +import org.apache.doris.common.SparkDppException; import org.apache.doris.load.loadv2.etl.EtlJobConfig; import com.google.common.base.Strings; @@ -73,6 +72,7 @@ import java.util.Set; import java.util.stream.Collectors; +import org.apache.spark.util.SerializableConfiguration; import scala.Tuple2; import scala.collection.JavaConverters; import scala.collection.Seq; @@ -104,6 +104,11 @@ public final class SparkDpp implements java.io.Serializable { private Map bucketKeyMap = new HashMap<>(); // accumulator to collect invalid rows private StringAccumulator invalidRows = new StringAccumulator(); + // save the hadoop configuration from spark session. + // because hadoop configuration is not serializable, + // we need to wrap it so that we can use it in executor. + private SerializableConfiguration serializableHadoopConf; + public SparkDpp(SparkSession spark, EtlJobConfig etlJobConfig) { this.spark = spark; @@ -117,9 +122,10 @@ public void init() { fileNumberAcc = spark.sparkContext().longAccumulator("fileNumberAcc"); fileSizeAcc = spark.sparkContext().longAccumulator("fileSizeAcc"); spark.sparkContext().register(invalidRows, "InvalidRowsAccumulator"); + this.serializableHadoopConf = new SerializableConfiguration(spark.sparkContext().hadoopConfiguration()); } - private Dataset processRDDAggAndRepartition(Dataset dataframe, EtlJobConfig.EtlIndex currentIndexMeta) throws UserException { + private Dataset processRDDAggAndRepartition(Dataset dataframe, EtlJobConfig.EtlIndex currentIndexMeta) throws SparkDppException { final boolean isDuplicateTable = !StringUtils.equalsIgnoreCase(currentIndexMeta.indexType, "AGGREGATE") && !StringUtils.equalsIgnoreCase(currentIndexMeta.indexType, "UNIQUE"); @@ -184,7 +190,7 @@ private Dataset processRDDAggAndRepartition(Dataset dataframe, EtlJobC private void writePartitionedAndSortedDataframeToParquet(Dataset dataframe, String pathPattern, long tableId, - EtlJobConfig.EtlIndex indexMeta) throws UserException { + EtlJobConfig.EtlIndex indexMeta) throws SparkDppException { StructType outputSchema = dataframe.schema(); StructType dstSchema = DataTypes.createStructType( Arrays.asList(outputSchema.fields()).stream() @@ -195,7 +201,7 @@ private void writePartitionedAndSortedDataframeToParquet(Dataset dataframe, @Override public void call(Iterator t) throws Exception { // write the data to dst file - Configuration conf = new Configuration(); + Configuration conf = new Configuration(serializableHadoopConf.value()); FileSystem fs = FileSystem.get(URI.create(etlJobConfig.outputPath), conf); String lastBucketKey = null; ParquetWriter parquetWriter = null; @@ -278,7 +284,7 @@ public void call(Iterator t) throws Exception { private void processRollupTree(RollupTreeNode rootNode, Dataset rootDataframe, long tableId, EtlJobConfig.EtlTable tableMeta, - EtlJobConfig.EtlIndex baseIndex) throws UserException { + EtlJobConfig.EtlIndex baseIndex) throws SparkDppException { Queue nodeQueue = new LinkedList<>(); nodeQueue.offer(rootNode); int currentLevel = 0; @@ -351,7 +357,7 @@ private Dataset repartitionDataframeByBucketId(SparkSession spark, Dataset< List valueColumnNames, StructType dstTableSchema, EtlJobConfig.EtlIndex baseIndex, - List validPartitionIds) throws UserException { + List validPartitionIds) throws SparkDppException { List distributeColumns = partitionInfo.distributionColumnRefs; Partitioner partitioner = new DorisRangePartitioner(partitionInfo, partitionKeyIndex, partitionRangeKeys); Set validPartitionIndex = new HashSet<>(); @@ -440,7 +446,7 @@ public Iterator> call(Row row) { private Dataset convertSrcDataframeToDstDataframe(EtlJobConfig.EtlIndex baseIndex, Dataset srcDataframe, StructType dstTableSchema, - EtlJobConfig.EtlFileGroup fileGroup) throws UserException { + EtlJobConfig.EtlFileGroup fileGroup) throws SparkDppException { Dataset dataframe = srcDataframe; StructType srcSchema = dataframe.schema(); Set srcColumnNames = new HashSet<>(); @@ -471,7 +477,7 @@ private Dataset convertSrcDataframeToDstDataframe(EtlJobConfig.EtlIndex bas } else if (column.isAllowNull) { dataframe = dataframe.withColumn(dstField.name(), functions.lit(null)); } else { - throw new UserException("Reason: no data for column:" + dstField.name()); + throw new SparkDppException("Reason: no data for column:" + dstField.name()); } } if (column.columnType.equalsIgnoreCase("DATE")) { @@ -492,7 +498,7 @@ private Dataset convertSrcDataframeToDstDataframe(EtlJobConfig.EtlIndex bas // 2. process the mapping columns for (String mappingColumn : mappingColumns) { String mappingDescription = columnMappings.get(mappingColumn).toDescription(); - if (mappingDescription.toLowerCase().contains(FunctionSet.HLL_HASH)) { + if (mappingDescription.toLowerCase().contains("hll_hash")) { continue; } // here should cast data type to dst column type @@ -517,7 +523,7 @@ private Dataset loadDataFromPath(SparkSession spark, EtlJobConfig.EtlFileGroup fileGroup, String fileUrl, EtlJobConfig.EtlIndex baseIndex, - List columns) throws UserException { + List columns) throws SparkDppException { List columnValueFromPath = DppUtils.parseColumnsFromPath(fileUrl, fileGroup.columnsFromPath); List dataSrcColumns = fileGroup.fileFieldNames; if (dataSrcColumns == null) { @@ -630,7 +636,7 @@ private StructType createScrSchema(List srcColumns) { // partition keys will be parsed into double from json // so need to convert it to partition columns' type - private Object convertPartitionKey(Object srcValue, Class dstClass) throws UserException { + private Object convertPartitionKey(Object srcValue, Class dstClass) throws SparkDppException { if (dstClass.equals(Float.class) || dstClass.equals(Double.class)) { return null; } @@ -655,7 +661,7 @@ private Object convertPartitionKey(Object srcValue, Class dstClass) throws UserE } } else { LOG.warn("unsupport partition key:" + srcValue); - throw new UserException("unsupport partition key:" + srcValue); + throw new SparkDppException("unsupport partition key:" + srcValue); } } @@ -685,7 +691,7 @@ private java.sql.Date convertToJavaDate(int originDate) { } private List createPartitionRangeKeys( - EtlJobConfig.EtlPartitionInfo partitionInfo, List partitionKeySchema) throws UserException { + EtlJobConfig.EtlPartitionInfo partitionInfo, List partitionKeySchema) throws SparkDppException { List partitionRangeKeys = new ArrayList<>(); for (EtlJobConfig.EtlPartition partition : partitionInfo.partitions) { DorisRangePartitioner.PartitionRangeKey partitionRangeKey = new DorisRangePartitioner.PartitionRangeKey(); @@ -716,14 +722,13 @@ private Dataset loadDataFromFilePaths(SparkSession spark, List filePaths, EtlJobConfig.EtlFileGroup fileGroup, StructType dstTableSchema) - throws UserException, IOException, URISyntaxException { + throws SparkDppException, IOException, URISyntaxException { Dataset fileGroupDataframe = null; for (String filePath : filePaths) { fileNumberAcc.add(1); try { - Configuration conf = new Configuration(); URI uri = new URI(filePath); - FileSystem fs = FileSystem.get(uri, conf); + FileSystem fs = FileSystem.get(uri, serializableHadoopConf.value()); FileStatus fileStatus = fs.getFileStatus(new Path(filePath)); fileSizeAcc.add(fileStatus.getLen()); } catch (Exception e) { @@ -732,7 +737,7 @@ private Dataset loadDataFromFilePaths(SparkSession spark, } if (fileGroup.columnSeparator == null) { LOG.warn("invalid null column separator!"); - throw new UserException("Reason: invalid null column separator!"); + throw new SparkDppException("Reason: invalid null column separator!"); } Dataset dataframe = null; @@ -751,7 +756,7 @@ private Dataset loadDataFromHiveTable(SparkSession spark, String hiveDbTableName, EtlJobConfig.EtlIndex baseIndex, EtlJobConfig.EtlFileGroup fileGroup, - StructType dstTableSchema) throws UserException { + StructType dstTableSchema) throws SparkDppException { // select base index columns from hive table StringBuilder sql = new StringBuilder(); sql.append("select "); @@ -876,9 +881,8 @@ public void doDpp() throws Exception { DppResult dppResult = process(); String outputPath = etlJobConfig.getOutputPath(); String resultFilePath = outputPath + "/" + DPP_RESULT_FILE; - Configuration conf = new Configuration(); URI uri = new URI(outputPath); - FileSystem fs = FileSystem.get(uri, conf); + FileSystem fs = FileSystem.get(uri, serializableHadoopConf.value()); Path filePath = new Path(resultFilePath); FSDataOutputStream outputStream = fs.create(filePath); Gson gson = new Gson(); @@ -886,4 +890,5 @@ public void doDpp() throws Exception { outputStream.write('\n'); outputStream.close(); } -} \ No newline at end of file +} + diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/dpp/SparkRDDAggregator.java b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkRDDAggregator.java similarity index 96% rename from fe/fe-core/src/main/java/org/apache/doris/load/loadv2/dpp/SparkRDDAggregator.java rename to fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkRDDAggregator.java index adfca9f3b43d58..bd6f0db71cadf5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/dpp/SparkRDDAggregator.java +++ b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkRDDAggregator.java @@ -18,9 +18,7 @@ package org.apache.doris.load.loadv2.dpp; import org.apache.commons.lang3.StringUtils; -import org.apache.doris.common.UserException; -import org.apache.doris.load.loadv2.BitmapValue; -import org.apache.doris.load.loadv2.Hll; +import org.apache.doris.common.SparkDppException; import org.apache.doris.load.loadv2.etl.EtlJobConfig; import org.apache.spark.Partitioner; import org.apache.spark.api.java.function.Function2; @@ -55,7 +53,7 @@ Object finalize(Object value) { }; // TODO(wb) support more datatype:decimal,date,datetime - public static SparkRDDAggregator buildAggregator(EtlJobConfig.EtlColumn column) throws UserException { + public static SparkRDDAggregator buildAggregator(EtlJobConfig.EtlColumn column) throws SparkDppException { String aggType = StringUtils.lowerCase(column.aggregationType); String columnType = StringUtils.lowerCase(column.columnType); switch (aggType) { @@ -78,7 +76,7 @@ public static SparkRDDAggregator buildAggregator(EtlJobConfig.EtlColumn column) case "largeint": return new LargeIntMaxAggregator(); default: - throw new UserException(String.format("unsupported max aggregator for column type:%s", columnType)); + throw new SparkDppException(String.format("unsupported max aggregator for column type:%s", columnType)); } case "min": switch (columnType) { @@ -95,7 +93,7 @@ public static SparkRDDAggregator buildAggregator(EtlJobConfig.EtlColumn column) case "largeint": return new LargeIntMinAggregator(); default: - throw new UserException(String.format("unsupported min aggregator for column type:%s", columnType)); + throw new SparkDppException(String.format("unsupported min aggregator for column type:%s", columnType)); } case "sum": switch (columnType) { @@ -114,14 +112,14 @@ public static SparkRDDAggregator buildAggregator(EtlJobConfig.EtlColumn column) case "largeint": return new LargeIntSumAggregator(); default: - throw new UserException(String.format("unsupported sum aggregator for column type:%s", columnType)); + throw new SparkDppException(String.format("unsupported sum aggregator for column type:%s", columnType)); } case "replace_if_not_null": return new ReplaceIfNotNullAggregator(); case "replace": return new ReplaceAggregator(); default: - throw new UserException(String.format("unsupported aggregate type %s", aggType)); + throw new SparkDppException(String.format("unsupported aggregate type %s", aggType)); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/dpp/StringAccumulator.java b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/StringAccumulator.java similarity index 100% rename from fe/fe-core/src/main/java/org/apache/doris/load/loadv2/dpp/StringAccumulator.java rename to fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/StringAccumulator.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/etl/EtlJobConfig.java b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/etl/EtlJobConfig.java similarity index 97% rename from fe/fe-core/src/main/java/org/apache/doris/load/loadv2/etl/EtlJobConfig.java rename to fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/etl/EtlJobConfig.java index 6e6756c6ef0a09..9dec697939dba5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/etl/EtlJobConfig.java +++ b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/etl/EtlJobConfig.java @@ -17,10 +17,10 @@ package org.apache.doris.load.loadv2.etl; -import org.apache.doris.persist.gson.GsonUtils; - import com.google.common.collect.Lists; import com.google.common.collect.ImmutableMap; +import com.google.gson.ExclusionStrategy; +import com.google.gson.FieldAttributes; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.google.gson.annotations.SerializedName; @@ -217,7 +217,7 @@ public static String getTabletMetaStr(String filePath) throws Exception { public String configToJson() { GsonBuilder gsonBuilder = new GsonBuilder(); - gsonBuilder.addDeserializationExclusionStrategy(new GsonUtils.HiddenAnnotationExclusionStrategy()); + gsonBuilder.addDeserializationExclusionStrategy(new HiddenAnnotationExclusionStrategy()); Gson gson = gsonBuilder.create(); return gson.toJson(this); } @@ -599,4 +599,15 @@ public String toString() { '}'; } } + + public static class HiddenAnnotationExclusionStrategy implements ExclusionStrategy { + public boolean shouldSkipField(FieldAttributes f) { + return f.getAnnotation(SerializedName.class) == null; + } + + @Override + public boolean shouldSkipClass(Class clazz) { + return false; + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/etl/SparkEtlJob.java b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/etl/SparkEtlJob.java similarity index 100% rename from fe/fe-core/src/main/java/org/apache/doris/load/loadv2/etl/SparkEtlJob.java rename to fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/etl/SparkEtlJob.java diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BitmapValueTest.java b/fe/spark-dpp/src/test/java/org/apache/doris/load/loadv2/dpp/BitmapValueTest.java similarity index 98% rename from fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BitmapValueTest.java rename to fe/spark-dpp/src/test/java/org/apache/doris/load/loadv2/dpp/BitmapValueTest.java index f0d890f78d0448..d9c71e818c5e2d 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BitmapValueTest.java +++ b/fe/spark-dpp/src/test/java/org/apache/doris/load/loadv2/dpp/BitmapValueTest.java @@ -15,7 +15,10 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.load.loadv2; +package org.apache.doris.load.loadv2.dpp; + +import org.apache.doris.load.loadv2.dpp.BitmapValue; +import org.apache.doris.common.Codec; import org.junit.Assert; import org.junit.Test; @@ -27,8 +30,6 @@ import java.io.DataOutputStream; import java.io.IOException; -import static org.apache.doris.common.util.Util.decodeVarint64; -import static org.apache.doris.common.util.Util.encodeVarint64; import static org.junit.Assert.assertEquals; public class BitmapValueTest { @@ -39,8 +40,8 @@ public void testVarint64IntEncode() throws IOException { for (long value : sourceValue) { ByteArrayOutputStream byteArrayOutput = new ByteArrayOutputStream(); DataOutput output = new DataOutputStream(byteArrayOutput); - encodeVarint64(value, output); - assertEquals(value, decodeVarint64(new DataInputStream(new ByteArrayInputStream(byteArrayOutput.toByteArray())))); + Codec.encodeVarint64(value, output); + assertEquals(value, Codec.decodeVarint64(new DataInputStream(new ByteArrayInputStream(byteArrayOutput.toByteArray())))); } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/dpp/DorisRangePartitionerTest.java b/fe/spark-dpp/src/test/java/org/apache/doris/load/loadv2/dpp/DorisRangePartitionerTest.java similarity index 100% rename from fe/fe-core/src/test/java/org/apache/doris/load/loadv2/dpp/DorisRangePartitionerTest.java rename to fe/spark-dpp/src/test/java/org/apache/doris/load/loadv2/dpp/DorisRangePartitionerTest.java diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/dpp/DppUtilsTest.java b/fe/spark-dpp/src/test/java/org/apache/doris/load/loadv2/dpp/DppUtilsTest.java similarity index 100% rename from fe/fe-core/src/test/java/org/apache/doris/load/loadv2/dpp/DppUtilsTest.java rename to fe/spark-dpp/src/test/java/org/apache/doris/load/loadv2/dpp/DppUtilsTest.java diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/HllTest.java b/fe/spark-dpp/src/test/java/org/apache/doris/load/loadv2/dpp/HllTest.java similarity index 86% rename from fe/fe-core/src/test/java/org/apache/doris/load/loadv2/HllTest.java rename to fe/spark-dpp/src/test/java/org/apache/doris/load/loadv2/dpp/HllTest.java index 958e96f1af26c9..3c742c9843cdd6 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/HllTest.java +++ b/fe/spark-dpp/src/test/java/org/apache/doris/load/loadv2/dpp/HllTest.java @@ -15,7 +15,9 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.load.loadv2; +package org.apache.doris.load.loadv2.dpp; + +import org.apache.doris.load.loadv2.dpp.Hll; import org.junit.Assert; import org.junit.Test; @@ -27,16 +29,14 @@ import java.io.DataOutputStream; import java.io.IOException; -import static org.apache.doris.load.loadv2.Hll.*; - public class HllTest { @Test public void testFindFirstNonZeroBitPosition() { - Assert.assertTrue(getLongTailZeroNum(0) == 0); - Assert.assertTrue(getLongTailZeroNum(1) == 0); - Assert.assertTrue(getLongTailZeroNum(1l << 30) == 30); - Assert.assertTrue(getLongTailZeroNum(1l << 62) == 62); + Assert.assertTrue(Hll.getLongTailZeroNum(0) == 0); + Assert.assertTrue(Hll.getLongTailZeroNum(1) == 0); + Assert.assertTrue(Hll.getLongTailZeroNum(1l << 30) == 30); + Assert.assertTrue(Hll.getLongTailZeroNum(1l << 62) == 62); } @Test @@ -44,7 +44,7 @@ public void HllBasicTest() throws IOException { // test empty Hll emptyHll = new Hll(); - Assert.assertTrue(emptyHll.getType() == HLL_DATA_EMPTY); + Assert.assertTrue(emptyHll.getType() == Hll.HLL_DATA_EMPTY); Assert.assertTrue(emptyHll.estimateCardinality() == 0); ByteArrayOutputStream emptyOutputStream = new ByteArrayOutputStream(); @@ -53,15 +53,15 @@ public void HllBasicTest() throws IOException { DataInputStream emptyInputStream = new DataInputStream(new ByteArrayInputStream(emptyOutputStream.toByteArray())); Hll deserializedEmptyHll = new Hll(); deserializedEmptyHll.deserialize(emptyInputStream); - Assert.assertTrue(deserializedEmptyHll.getType() == HLL_DATA_EMPTY); + Assert.assertTrue(deserializedEmptyHll.getType() == Hll.HLL_DATA_EMPTY); // test explicit Hll explicitHll = new Hll(); - for (int i = 0; i < HLL_EXPLICLIT_INT64_NUM; i++) { + for (int i = 0; i < Hll.HLL_EXPLICLIT_INT64_NUM; i++) { explicitHll.updateWithHash(i); } - Assert.assertTrue(explicitHll.getType() == HLL_DATA_EXPLICIT); - Assert.assertTrue(explicitHll.estimateCardinality() == HLL_EXPLICLIT_INT64_NUM); + Assert.assertTrue(explicitHll.getType() == Hll.HLL_DATA_EXPLICIT); + Assert.assertTrue(explicitHll.estimateCardinality() == Hll.HLL_EXPLICLIT_INT64_NUM); ByteArrayOutputStream explicitOutputStream = new ByteArrayOutputStream(); DataOutput explicitOutput = new DataOutputStream(explicitOutputStream); @@ -69,17 +69,17 @@ public void HllBasicTest() throws IOException { DataInputStream explicitInputStream = new DataInputStream(new ByteArrayInputStream(explicitOutputStream.toByteArray())); Hll deserializedExplicitHll = new Hll(); deserializedExplicitHll.deserialize(explicitInputStream); - Assert.assertTrue(deserializedExplicitHll.getType() == HLL_DATA_EXPLICIT); + Assert.assertTrue(deserializedExplicitHll.getType() == Hll.HLL_DATA_EXPLICIT); // test sparse Hll sparseHll = new Hll(); - for (int i = 0; i < HLL_SPARSE_THRESHOLD; i++) { + for (int i = 0; i < Hll.HLL_SPARSE_THRESHOLD; i++) { sparseHll.updateWithHash(i); } - Assert.assertTrue(sparseHll.getType() == HLL_DATA_FULL); + Assert.assertTrue(sparseHll.getType() == Hll.HLL_DATA_FULL); // 2% error rate - Assert.assertTrue(sparseHll.estimateCardinality() > HLL_SPARSE_THRESHOLD * (1 - 0.02) && - sparseHll.estimateCardinality() < HLL_SPARSE_THRESHOLD * (1 + 0.02)); + Assert.assertTrue(sparseHll.estimateCardinality() > Hll.HLL_SPARSE_THRESHOLD * (1 - 0.02) && + sparseHll.estimateCardinality() < Hll.HLL_SPARSE_THRESHOLD * (1 + 0.02)); ByteArrayOutputStream sparseOutputStream = new ByteArrayOutputStream(); DataOutput sparseOutput = new DataOutputStream(sparseOutputStream); @@ -87,7 +87,7 @@ public void HllBasicTest() throws IOException { DataInputStream sparseInputStream = new DataInputStream(new ByteArrayInputStream(sparseOutputStream.toByteArray())); Hll deserializedSparseHll = new Hll(); deserializedSparseHll.deserialize(sparseInputStream); - Assert.assertTrue(deserializedSparseHll.getType() == HLL_DATA_SPARSE); + Assert.assertTrue(deserializedSparseHll.getType() == Hll.HLL_DATA_SPARSE); Assert.assertTrue(sparseHll.estimateCardinality() == deserializedSparseHll.estimateCardinality()); @@ -96,7 +96,7 @@ public void HllBasicTest() throws IOException { for (int i = 1; i <= Short.MAX_VALUE; i++) { fullHll.updateWithHash(i); } - Assert.assertTrue(fullHll.getType() == HLL_DATA_FULL); + Assert.assertTrue(fullHll.getType() == Hll.HLL_DATA_FULL); // the result 32748 is consistent with C++ 's implementation Assert.assertTrue(fullHll.estimateCardinality() == 32748); Assert.assertTrue(fullHll.estimateCardinality() > Short.MAX_VALUE * (1 - 0.02) && @@ -108,7 +108,7 @@ public void HllBasicTest() throws IOException { DataInputStream fullHllInputStream = new DataInputStream(new ByteArrayInputStream(fullHllOutputStream.toByteArray())); Hll deserializedFullHll = new Hll(); deserializedFullHll.deserialize(fullHllInputStream); - Assert.assertTrue(deserializedFullHll.getType() == HLL_DATA_FULL); + Assert.assertTrue(deserializedFullHll.getType() == Hll.HLL_DATA_FULL); Assert.assertTrue(deserializedFullHll.estimateCardinality() == fullHll.estimateCardinality()); } @@ -158,11 +158,11 @@ public void testCompareEstimateValueWithBe() throws IOException { long preValue = sparseHll.estimateCardinality(); // check serialize byte[] serializedHll = serializeHll(sparseHll); - Assert.assertTrue(serializedHll.length < HLL_REGISTERS_COUNT + 1); + Assert.assertTrue(serializedHll.length < Hll.HLL_REGISTERS_COUNT + 1); sparseHll = deserializeHll(serializedHll); Assert.assertTrue(sparseHll.estimateCardinality() == preValue); - Assert.assertTrue(sparseHll.getType() == HLL_DATA_SPARSE); + Assert.assertTrue(sparseHll.getType() == Hll.HLL_DATA_SPARSE); Hll otherHll = new Hll(); for (int i = 0; i < 1024; i++) { @@ -190,7 +190,7 @@ public void testCompareEstimateValueWithBe() throws IOException { byte[] serializedHll = serializeHll(fullHll); fullHll = deserializeHll(serializedHll); Assert.assertTrue(fullHll.estimateCardinality() == preValue); - Assert.assertTrue(serializedHll.length == HLL_REGISTERS_COUNT + 1); + Assert.assertTrue(serializedHll.length == Hll.HLL_REGISTERS_COUNT + 1); // 2% error rate Assert.assertTrue(preValue > 62 * 1024 && preValue < 66 * 1024); diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/dpp/MinimumCoverageRollupTreeBuilderTest.java b/fe/spark-dpp/src/test/java/org/apache/doris/load/loadv2/dpp/MinimumCoverageRollupTreeBuilderTest.java similarity index 100% rename from fe/fe-core/src/test/java/org/apache/doris/load/loadv2/dpp/MinimumCoverageRollupTreeBuilderTest.java rename to fe/spark-dpp/src/test/java/org/apache/doris/load/loadv2/dpp/MinimumCoverageRollupTreeBuilderTest.java diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/etl/SparkEtlJobTest.java b/fe/spark-dpp/src/test/java/org/apache/doris/load/loadv2/etl/SparkEtlJobTest.java similarity index 100% rename from fe/fe-core/src/test/java/org/apache/doris/load/loadv2/etl/SparkEtlJobTest.java rename to fe/spark-dpp/src/test/java/org/apache/doris/load/loadv2/etl/SparkEtlJobTest.java diff --git a/run-fe-ut.sh b/run-fe-ut.sh index 365fa5d01bdf64..4aacbe327437e5 100755 --- a/run-fe-ut.sh +++ b/run-fe-ut.sh @@ -75,9 +75,6 @@ fi echo "Build Frontend UT" -rm ${DORIS_HOME}/fe/build/ -rf -rm ${DORIS_HOME}/fe/output/ -rf - echo "******************************" echo " Runing DorisFe Unittest " echo "******************************"