diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SparkResource.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/SparkResource.java index 2277d0ba74b0a4..4f0f95ea6456ca 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SparkResource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SparkResource.java @@ -17,11 +17,13 @@ package org.apache.doris.catalog; +import org.apache.doris.analysis.BrokerDesc; import org.apache.doris.analysis.ResourceDesc; import org.apache.doris.common.DdlException; +import org.apache.doris.common.LoadException; import org.apache.doris.common.Pair; import org.apache.doris.common.proc.BaseProcResult; - +import org.apache.doris.load.loadv2.SparkRepository; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -144,6 +146,26 @@ public SparkResource getCopiedResource() { return new SparkResource(name, Maps.newHashMap(sparkConfigs), workingDir, broker, brokerProperties); } + // Each SparkResource has and only has one SparkRepository. + // This method get the remote archive which matches the dpp version from remote repository + public synchronized SparkRepository.SparkArchive prepareArchive() throws LoadException { + String remoteRepositoryPath = workingDir + "/" + Catalog.getCurrentCatalog().getClusterId() + + "/" + SparkRepository.REPOSITORY_DIR + name; + BrokerDesc brokerDesc = new BrokerDesc(broker, getBrokerPropertiesWithoutPrefix()); + SparkRepository repository = new SparkRepository(remoteRepositoryPath, brokerDesc); + // This checks and uploads the remote archive. + repository.prepare(); + SparkRepository.SparkArchive archive = repository.getCurrentArchive(); + // Normally, an archive should contain a DPP library and a SPARK library + Preconditions.checkState(archive.libraries.size() == 2); + SparkRepository.SparkLibrary dppLibrary = archive.getDppLibrary(); + SparkRepository.SparkLibrary spark2xLibrary = archive.getSpark2xLibrary(); + if (dppLibrary == null || spark2xLibrary == null) { + throw new LoadException("failed to get libraries from remote archive"); + } + return archive; + } + public boolean isYarnMaster() { return getMaster().equalsIgnoreCase(YARN_MASTER); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java index 44dfe123b50a1c..399877f5f83bbc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java @@ -509,12 +509,30 @@ public class Config extends ConfigBase { @ConfField(mutable = true, masterOnly = true) public static int hadoop_load_default_timeout_second = 86400 * 3; // 3 day + // Configurations for spark load + /** + * Default spark dpp version + */ + @ConfField + public static String spark_dpp_version = "1_0_0"; /** * Default spark load timeout */ @ConfField(mutable = true, masterOnly = true) public static int spark_load_default_timeout_second = 86400; // 1 day + /** + * Default spark home dir + */ + @ConfField(mutable = true, masterOnly = true) + public static String spark_home_default_dir = PaloFe.DORIS_HOME_DIR + "/lib/spark2x"; + + /** + * Default spark dependencies path + */ + @ConfField + public static String spark_resource_path = ""; + /** * Default number of waiting jobs for routine load and version 2 of load * This is a desired number. diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java index ebbf82bd388d52..75c82f40968518 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java @@ -23,8 +23,11 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.ClientPool; import org.apache.doris.common.Config; +import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; import org.apache.doris.service.FrontendOptions; +import org.apache.doris.thrift.TBrokerCheckPathExistRequest; +import org.apache.doris.thrift.TBrokerCheckPathExistResponse; import org.apache.doris.thrift.TBrokerCloseReaderRequest; import org.apache.doris.thrift.TBrokerCloseWriterRequest; import org.apache.doris.thrift.TBrokerDeletePathRequest; @@ -45,7 +48,6 @@ import org.apache.doris.thrift.TBrokerVersion; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TPaloBrokerService; - import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -349,6 +351,38 @@ public static void deletePath(String path, BrokerDesc brokerDesc) throws UserExc } } + public static boolean checkPathExist(String remotePath, BrokerDesc brokerDesc) throws UserException { + Pair pair = getBrokerAddressAndClient(brokerDesc); + TPaloBrokerService.Client client = pair.first; + TNetworkAddress address = pair.second; + boolean failed = true; + try { + TBrokerCheckPathExistRequest req = new TBrokerCheckPathExistRequest(TBrokerVersion.VERSION_ONE, + remotePath, brokerDesc.getProperties()); + TBrokerCheckPathExistResponse rep = client.checkPathExist(req); + if (rep.getOpStatus().getStatusCode() != TBrokerOperationStatusCode.OK) { + throw new UserException("Broker check path exist failed. path=" + remotePath + ", broker=" + address + + ", msg=" + rep.getOpStatus().getMessage()); + } + failed = false; + return rep.isPathExist; + } catch (TException e) { + LOG.warn("Broker check path exist failed, path={}, address={}, exception={}", remotePath, address, e); + throw new UserException("Broker check path exist exception. path=" + remotePath + ",broker=" + address); + } finally { + returnClient(client, address, failed); + } + } + + public static Pair getBrokerAddressAndClient(BrokerDesc brokerDesc) throws UserException { + Pair pair = new Pair(null, null); + TNetworkAddress address = getAddress(brokerDesc); + TPaloBrokerService.Client client = borrowClient(address); + pair.first = client; + pair.second = address; + return pair; + } + private static TNetworkAddress getAddress(BrokerDesc brokerDesc) throws UserException { FsBroker broker = null; try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java index ff295d6ce96bc5..5063e9b1050070 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java @@ -17,9 +17,9 @@ package org.apache.doris.load.loadv2; -import org.apache.doris.PaloFe; import org.apache.doris.analysis.BrokerDesc; import org.apache.doris.catalog.SparkResource; +import org.apache.doris.common.Config; import org.apache.doris.common.LoadException; import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; @@ -30,6 +30,12 @@ import org.apache.doris.load.loadv2.etl.SparkEtlJob; import org.apache.doris.thrift.TBrokerFileStatus; import org.apache.doris.thrift.TEtlState; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.gson.Gson; +import com.google.gson.JsonSyntaxException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationReport; @@ -47,13 +53,6 @@ import org.apache.spark.launcher.SparkAppHandle.State; import org.apache.spark.launcher.SparkLauncher; -import com.google.common.base.Strings; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.gson.Gson; -import com.google.gson.JsonSyntaxException; - import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.List; @@ -70,9 +69,7 @@ public class SparkEtlJobHandler { private static final Logger LOG = LogManager.getLogger(SparkEtlJobHandler.class); - private static final String APP_RESOURCE_NAME = "palo-fe.jar"; private static final String CONFIG_FILE_NAME = "jobconfig.json"; - private static final String APP_RESOURCE_LOCAL_PATH = PaloFe.DORIS_HOME_DIR + "/lib/" + APP_RESOURCE_NAME; private static final String JOB_CONFIG_DIR = "configs"; private static final String ETL_JOB_NAME = "doris__%s"; // 5min @@ -92,12 +89,34 @@ public void submitEtlJob(long loadJobId, String loadLabel, EtlJobConfig etlJobCo // delete outputPath deleteEtlOutputPath(etlJobConfig.outputPath, brokerDesc); - // upload app resource and jobconfig to hdfs + // prepare dpp archive + SparkRepository.SparkArchive archive = resource.prepareArchive(); + SparkRepository.SparkLibrary dppLibrary = archive.getDppLibrary(); + SparkRepository.SparkLibrary spark2xLibrary = archive.getSpark2xLibrary(); + + // spark home + String sparkHome = Config.spark_home_default_dir; + // etl config path String configsHdfsDir = etlJobConfig.outputPath + "/" + JOB_CONFIG_DIR + "/"; - String appResourceHdfsPath = configsHdfsDir + APP_RESOURCE_NAME; + // etl config json path String jobConfigHdfsPath = configsHdfsDir + CONFIG_FILE_NAME; + // spark submit app resource path + String appResourceHdfsPath = dppLibrary.remotePath; + // spark yarn archive path + String jobArchiveHdfsPath = spark2xLibrary.remotePath; + // spark yarn stage dir + String jobStageHdfsPath = resource.getWorkingDir(); + + // update archive and stage configs here + Map sparkConfigs = resource.getSparkConfigs(); + if (Strings.isNullOrEmpty(sparkConfigs.get("spark.yarn.archive"))) { + sparkConfigs.put("spark.yarn.archive", jobArchiveHdfsPath); + } + if (Strings.isNullOrEmpty(sparkConfigs.get("spark.yarn.stage.dir"))) { + sparkConfigs.put("spark.yarn.stage.dir", jobStageHdfsPath); + } + try { - BrokerUtil.writeFile(APP_RESOURCE_LOCAL_PATH, appResourceHdfsPath, brokerDesc); byte[] configData = etlJobConfig.configToJson().getBytes("UTF-8"); BrokerUtil.writeFile(configData, jobConfigHdfsPath, brokerDesc); } catch (UserException | UnsupportedEncodingException e) { @@ -114,7 +133,9 @@ public void submitEtlJob(long loadJobId, String loadLabel, EtlJobConfig etlJobCo .setAppResource(appResourceHdfsPath) .setMainClass(SparkEtlJob.class.getCanonicalName()) .setAppName(String.format(ETL_JOB_NAME, loadLabel)) + .setSparkHome(sparkHome) .addAppArgs(jobConfigHdfsPath); + // spark configs for (Map.Entry entry : resource.getSparkConfigs().entrySet()) { launcher.setConf(entry.getKey(), entry.getValue()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkRepository.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkRepository.java new file mode 100644 index 00000000000000..1062092b8f6b94 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkRepository.java @@ -0,0 +1,361 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load.loadv2; + +import org.apache.doris.PaloFe; +import org.apache.doris.analysis.BrokerDesc; +import org.apache.doris.common.Config; +import org.apache.doris.common.LoadException; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.BrokerUtil; +import org.apache.doris.thrift.TBrokerFileStatus; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; + +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.List; +import java.util.Optional; + +/* + * SparkRepository represents the remote repository for spark archives uploaded by spark + * The organization in repository is: + * + * * __spark_repository__/ + * * __archive_1_0_0/ + * * __lib_990325d2c0d1d5e45bf675e54e44fb16_spark-dpp.jar + * * __lib_7670c29daf535efe3c9b923f778f61fc_spark-2x.zip + * * __archive_2_2_0/ + * * __lib_64d5696f99c379af2bee28c1c84271d5_spark-dpp.jar + * * __lib_1bbb74bb6b264a270bc7fca3e964160f_spark-2x.zip + * * __archive_3_2_0/ + * * ... + */ +public class SparkRepository { + private static final Logger LOG = LogManager.getLogger(SparkRepository.class); + + public static final String REPOSITORY_DIR = "__spark_repository__"; + public static final String PREFIX_ARCHIVE = "__archive_"; + public static final String PREFIX_LIB = "__lib_"; + public static final String SPARK_DPP = "spark-dpp"; + public static final String SPARK_2X = "spark-2x"; + public static final String SUFFIX = ".zip"; + + private static final String PATH_DELIMITER = "/"; + private static final String FILE_NAME_SEPARATOR = "_"; + + private static final String DPP_RESOURCE = "/spark-dpp/spark-dpp.jar"; + private static final String SPARK_RESOURCE = "/jars/spark-2x.zip"; + + private String remoteRepositoryPath; + private BrokerDesc brokerDesc; + private String localDppPath; + private String localSpark2xPath; + + // Version of the spark dpp program in this cluster + private String currentDppVersion; + // Archive that current dpp version pointed to + private SparkArchive currentArchive; + + public SparkRepository(String remoteRepositoryPath, BrokerDesc brokerDesc) { + this.remoteRepositoryPath = remoteRepositoryPath; + this.brokerDesc = brokerDesc; + this.currentDppVersion = Config.spark_dpp_version; + this.currentArchive = new SparkArchive(getRemoteArchivePath(currentDppVersion), currentDppVersion); + this.localDppPath = PaloFe.DORIS_HOME_DIR + DPP_RESOURCE; + if (!Strings.isNullOrEmpty(Config.spark_resource_path)) { + this.localSpark2xPath = Config.spark_resource_path; + } else { + this.localSpark2xPath = Config.spark_home_default_dir + SPARK_RESOURCE; + } + } + + public void prepare() throws LoadException { + initRepository(); + } + + private void initRepository() throws LoadException { + LOG.info("start to init remote repository"); + boolean needUpload = false; + boolean needReplace = false; + CHECK: { + if (Strings.isNullOrEmpty(remoteRepositoryPath) || brokerDesc == null) { + break CHECK; + } + + if (!checkCurrentArchiveExists()) { + needUpload = true; + break CHECK; + } + + // init current archive + String remoteArchivePath = getRemoteArchivePath(currentDppVersion); + List libraries = Lists.newArrayList(); + getLibraries(remoteArchivePath, libraries); + if (libraries.size() != 2) { + needUpload = true; + needReplace = true; + break CHECK; + } + currentArchive.libraries.addAll(libraries); + for (SparkLibrary library : currentArchive.libraries) { + String localMd5sum = null; + switch (library.libType) { + case DPP: + localMd5sum = getMd5String(localDppPath); + break; + case SPARK2X: + localMd5sum = getMd5String(localSpark2xPath); + break; + default: + Preconditions.checkState(false, "wrong library type: " + library.libType); + break; + } + if (!localMd5sum.equals(library.md5sum)) { + needUpload = true; + needReplace = true; + break; + } + } + } + + if (needUpload) { + uploadArchive(needReplace); + } + LOG.info("init spark repository success, current dppVersion={}, archive path={}, libraries size={}", + currentDppVersion, currentArchive.remotePath, currentArchive.libraries.size()); + } + + private boolean checkCurrentArchiveExists() { + boolean result = false; + Preconditions.checkNotNull(remoteRepositoryPath); + String remotePath = getRemoteArchivePath(currentDppVersion); + try { + result = BrokerUtil.checkPathExist(remotePath, brokerDesc); + LOG.info("check archive exists in repository, {}", result); + } catch (UserException e) { + LOG.warn("Failed to check remote archive exist, path={}, version={}", remotePath, currentDppVersion); + } + return result; + } + + private void uploadArchive(boolean isReplace) throws LoadException { + try { + String remoteArchivePath = getRemoteArchivePath(currentDppVersion); + if (isReplace) { + BrokerUtil.deletePath(remoteArchivePath, brokerDesc); + currentArchive.libraries.clear(); + } + String srcFilePath = null; + // upload dpp + { + srcFilePath = localDppPath; + String md5sum = getMd5String(srcFilePath); + long size = getFileSize(srcFilePath); + String fileName = getFileName(PATH_DELIMITER, srcFilePath); + String destFilePath = remoteArchivePath + PATH_DELIMITER + + assemblyFileName(PREFIX_LIB, md5sum, fileName, ""); + upload(srcFilePath, destFilePath); + currentArchive.libraries.add(new SparkLibrary(destFilePath, md5sum, SparkLibrary.LibType.DPP, size)); + } + // upload spark2x + { + srcFilePath = localSpark2xPath; + String md5sum = getMd5String(srcFilePath); + long size = getFileSize(srcFilePath); + String fileName = getFileName(PATH_DELIMITER, srcFilePath); + String destFilePath = remoteArchivePath + PATH_DELIMITER + + assemblyFileName(PREFIX_LIB, md5sum, fileName, ""); + upload(srcFilePath, destFilePath); + currentArchive.libraries.add(new SparkLibrary(destFilePath, md5sum, SparkLibrary.LibType.SPARK2X, size)); + } + LOG.info("finished to upload archive to repository, currentDppVersion={}, path={}", + currentDppVersion, remoteArchivePath); + } catch (UserException e) { + throw new LoadException(e.getMessage()); + } + } + + private void getLibraries(String remoteArchivePath, List libraries) throws LoadException { + List fileStatuses = Lists.newArrayList(); + try { + BrokerUtil.parseFile(remoteArchivePath + "/*", brokerDesc, fileStatuses); + } catch (UserException e) { + throw new LoadException(e.getMessage()); + } + + for (TBrokerFileStatus fileStatus : fileStatuses) { + String fileName = getFileName(PATH_DELIMITER, fileStatus.path); + if (!fileName.startsWith(PREFIX_LIB)) { + continue; + } + String[] lib_arg = unWrap(PREFIX_LIB, SUFFIX, fileName).split(FILE_NAME_SEPARATOR); + if (lib_arg.length != 2) { + continue; + } + String md5sum = lib_arg[0]; + String type = lib_arg[1]; + SparkLibrary.LibType libType = null; + switch (type) { + case SPARK_DPP: + libType = SparkLibrary.LibType.DPP; + break; + case SPARK_2X: + libType = SparkLibrary.LibType.SPARK2X; + break; + default: + Preconditions.checkState(false, "wrong library type: " + type); + break; + } + SparkLibrary remoteFile = new SparkLibrary(fileStatus.path, md5sum, libType, fileStatus.size); + libraries.add(remoteFile); + LOG.info("get Libraries from remote archive, archive path={}, library={}, md5sum={}, size={}", + remoteArchivePath, remoteFile.remotePath, remoteFile.md5sum, remoteFile.size); + } + } + + public String getMd5String(String filePath) throws LoadException { + File file = new File(filePath); + String md5sum = null; + try { + md5sum = DigestUtils.md5Hex(new FileInputStream(file)); + Preconditions.checkNotNull(md5sum); + LOG.debug("get md5sum from file {}, md5sum={}", filePath, md5sum); + return md5sum; + } catch (FileNotFoundException e) { + throw new LoadException("file " + filePath + "dose not exist"); + } catch (IOException e) { + throw new LoadException("failed to get md5sum from file " + filePath); + } + } + + public long getFileSize(String filePath) throws LoadException { + File file = new File(filePath); + long size = file.length(); + if (size <= 0) { + throw new LoadException("failed to get size from file " + filePath); + } + return size; + } + + private void upload(String srcFilePath, String destFilePath) throws LoadException { + try { + BrokerUtil.writeFile(srcFilePath, destFilePath , brokerDesc); + LOG.info("finished to upload file, localPath={}, remotePath={}", srcFilePath, destFilePath); + } catch (UserException e) { + throw new LoadException("failed to upload lib to repository, srcPath=" +srcFilePath + + " destPath=" + destFilePath + " message=" + e.getMessage()); + } + } + + public SparkArchive getCurrentArchive() { + return currentArchive; + } + + private static String getFileName(String delimiter, String path) { + return path.substring(path.lastIndexOf(delimiter) + 1); + } + + private static String unWrap(String prefix, String suffix, String fileName) { + return fileName.substring(prefix.length(), fileName.length() - suffix.length()); + } + + private static String joinPrefix(String prefix, String fileName) { + return prefix + fileName; + } + + // eg: + // __lib_979bfbcb9469888f1c5539e168d1925d_spark-2x.zip + public static String assemblyFileName(String prefix, String md5sum, String fileName, String suffix) { + return prefix + md5sum + FILE_NAME_SEPARATOR + fileName + suffix; + } + + // eg: + // .../__spark_repository__/__archive_1_0_0 + private String getRemoteArchivePath(String version) { + return Joiner.on(PATH_DELIMITER).join(remoteRepositoryPath, joinPrefix(PREFIX_ARCHIVE, version)); + } + + // Represents a remote directory contains the uploaded libraries + // an archive is named as __archive_{dppVersion}. + // e.g. __archive_1_0_0/ + // \ __lib_990325d2c0d1d5e45bf675e54e44fb16_spark-dpp.jar + // * \ __lib_7670c29daf535efe3c9b923f778f61fc_spark-2x.zip + public static class SparkArchive { + public String remotePath; + public String version; + public List libraries; + + public SparkArchive(String remotePath, String version) { + this.remotePath = remotePath; + this.version = version; + this.libraries = Lists.newArrayList(); + } + + public SparkLibrary getDppLibrary() { + SparkLibrary result = null; + Optional library = libraries.stream(). + filter(lib -> lib.libType == SparkLibrary.LibType.DPP).findFirst(); + if (library.isPresent()) { + result = library.get(); + } + return result; + } + + public SparkLibrary getSpark2xLibrary() { + SparkLibrary result = null; + Optional library = libraries.stream(). + filter(lib -> lib.libType == SparkLibrary.LibType.SPARK2X).findFirst(); + if (library.isPresent()) { + result = library.get(); + } + return result; + } + } + + // Represents a uploaded remote file that save in the archive + // a library refers to the dependency of DPP program or spark platform + // named as __lib_{md5sum}_spark_{type}.{jar/zip}. + // e.g. __lib_990325d2c0d1d5e45bf675e54e44fb16_spark-dpp.jar + // __lib_7670c29daf535efe3c9b923f778f61fc_spark-2x.zip + public static class SparkLibrary { + public String remotePath; + public String md5sum; + public long size; + public LibType libType; + + public enum LibType { + DPP, SPARK2X + } + + public SparkLibrary(String remotePath, String md5sum, LibType libType, long size) { + this.remotePath = remotePath; + this.md5sum = md5sum; + this.libType = libType; + this.size = size; + } + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkEtlJobHandlerTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkEtlJobHandlerTest.java index 02a690bd3c72f2..e011062fbbc0e0 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkEtlJobHandlerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkEtlJobHandlerTest.java @@ -17,11 +17,18 @@ package org.apache.doris.load.loadv2; +import mockit.Expectations; +import mockit.Injectable; +import mockit.Mock; +import mockit.MockUp; +import mockit.Mocked; + import org.apache.doris.analysis.BrokerDesc; import org.apache.doris.catalog.BrokerMgr; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.FsBroker; import org.apache.doris.catalog.SparkResource; +import org.apache.doris.common.Config; import org.apache.doris.common.GenericPool; import org.apache.doris.common.LoadException; import org.apache.doris.common.UserException; @@ -36,14 +43,9 @@ import org.apache.doris.thrift.TEtlState; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TPaloBrokerService; - import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import mockit.Expectations; -import mockit.Injectable; -import mockit.Mock; -import mockit.Mocked; -import mockit.MockUp; + import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; @@ -70,6 +72,9 @@ public class SparkEtlJobHandlerTest { private String appId; private String etlOutputPath; private String trackingUrl; + private String dppVersion; + private String remoteArchivePath; + private SparkRepository.SparkArchive archive; @Before public void setUp() { @@ -81,6 +86,13 @@ public void setUp() { appId = "application_15888888888_0088"; etlOutputPath = "hdfs://127.0.0.1:10000/tmp/doris/100/label/101"; trackingUrl = "http://127.0.0.1:8080/proxy/application_1586619723848_0088/"; + dppVersion = Config.spark_dpp_version; + remoteArchivePath = etlOutputPath + "/__repository__/__archive_" + dppVersion; + archive = new SparkRepository.SparkArchive(remoteArchivePath, dppVersion); + archive.libraries.add(new SparkRepository + .SparkLibrary("", "", SparkRepository.SparkLibrary.LibType.DPP, 0L)); + archive.libraries.add(new SparkRepository + .SparkLibrary("", "", SparkRepository.SparkLibrary.LibType.SPARK2X, 0L)); } @Test @@ -99,6 +111,13 @@ public void testSubmitEtlJob(@Mocked BrokerUtil brokerUtil, @Mocked SparkLaunche EtlJobConfig etlJobConfig = new EtlJobConfig(Maps.newHashMap(), etlOutputPath, label, null); SparkResource resource = new SparkResource(resourceName); + new Expectations(resource) { + { + resource.prepareArchive(); + result = archive; + } + }; + Map sparkConfigs = resource.getSparkConfigs(); sparkConfigs.put("spark.master", "yarn"); sparkConfigs.put("spark.submit.deployMode", "cluster"); @@ -129,6 +148,13 @@ public void testSubmitEtlJobFailed(@Mocked BrokerUtil brokerUtil, @Mocked SparkL EtlJobConfig etlJobConfig = new EtlJobConfig(Maps.newHashMap(), etlOutputPath, label, null); SparkResource resource = new SparkResource(resourceName); + new Expectations(resource) { + { + resource.prepareArchive(); + result = archive; + } + }; + Map sparkConfigs = resource.getSparkConfigs(); sparkConfigs.put("spark.master", "yarn"); sparkConfigs.put("spark.submit.deployMode", "cluster"); diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkRepositoryTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkRepositoryTest.java new file mode 100644 index 00000000000000..70e5cf9e51e6bb --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkRepositoryTest.java @@ -0,0 +1,234 @@ +package org.apache.doris.load.loadv2; + +import mockit.Expectations; +import mockit.Mock; +import mockit.MockUp; +import mockit.Mocked; + +import org.apache.doris.analysis.BrokerDesc; +import org.apache.doris.catalog.Catalog; +import org.apache.doris.common.Config; +import org.apache.doris.common.LoadException; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.BrokerUtil; +import org.apache.doris.thrift.TBrokerFileStatus; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.List; + +public class SparkRepositoryTest { + private SparkRepository repository; + + private final String DPP_LOCAL_MD5SUM = "b3cd0ae3a4121e2426532484442e90ec"; + private final String SPARK_LOCAL_MD5SUM = "6d2b052ffbdf7082c019bd202432739c"; + private final String DPP_VERSION = Config.spark_dpp_version; + private final String SPARK_LOAD_WORK_DIR = "hdfs://127.0.0.1/99999/user/doris/etl"; + private final String DPP_NAME = SparkRepository.SPARK_DPP + ".jar"; + private final String SPARK_NAME = SparkRepository.SPARK_2X + ".zip"; + + private String remoteRepoPath; + private String remoteArchivePath; + private String remoteDppLibraryPath; + private String remoteSparkLibraryPath; + + private List files; + + @Mocked + Catalog catalog; + @Mocked + BrokerUtil brokerUtil; + + @Before + public void setUp() { + // e.g. hdfs://127.0.0.1/99999/user/doris/etl/__spark_repository__ + remoteRepoPath = SPARK_LOAD_WORK_DIR + "/" + SparkRepository.REPOSITORY_DIR; + // e.g. hdfs://127.0.0.1/99999/user/doris/etl/__spark_repository__/__archive_1_0_0 + remoteArchivePath = remoteRepoPath + "/" + SparkRepository.PREFIX_ARCHIVE + DPP_VERSION; + // e.g. hdfs://127.0.0.1/99999/user/doris/etl/__spark_repository__/__archive_1_0_0/__lib_b3cd0ae3a4121e2426532484442e90ec_spark-dpp.jar + remoteDppLibraryPath = remoteArchivePath + "/" + SparkRepository.PREFIX_LIB + DPP_LOCAL_MD5SUM + "_" + DPP_NAME; + // e.g. hdfs://127.0.0.1/99999/user/doris/etl/__spark_repository__/__archive_1_0_0/__lib_6d2b052ffbdf7082c019bd202432739c_spark-2x.zip + remoteSparkLibraryPath = remoteArchivePath + "/" + SparkRepository.PREFIX_LIB + SPARK_LOCAL_MD5SUM + "_" + SPARK_NAME; + + files = Lists.newArrayList(); + files.add(new TBrokerFileStatus(remoteDppLibraryPath, false, 1024, false)); + files.add(new TBrokerFileStatus(remoteSparkLibraryPath, false, 10240, false)); + } + + @Test + public void testNormal() { + + new MockUp() { + @Mock + boolean checkPathExist(String remotePath, BrokerDesc brokerDesc) + throws UserException { return true; } + @Mock + void parseFile(String path, BrokerDesc brokerDesc, List fileStatuses) + throws UserException { fileStatuses.addAll(files); } + }; + + BrokerDesc brokerDesc = new BrokerDesc("broker", Maps.newHashMap()); + SparkRepository repository = new SparkRepository(remoteRepoPath, brokerDesc); + try { + new Expectations(repository) { + { + repository.getMd5String(anyString); + returns(DPP_LOCAL_MD5SUM, SPARK_LOCAL_MD5SUM); + } + }; + + // prepare repository + repository.prepare(); + + // get archive + SparkRepository.SparkArchive archive = repository.getCurrentArchive(); + Assert.assertEquals(archive.libraries.size(), 2); + + // check if the remote libraries are equal to local libraries + List libraries = archive.libraries; + for (SparkRepository.SparkLibrary library : libraries) { + switch (library.libType) { + case DPP: + Assert.assertEquals(library.remotePath, remoteDppLibraryPath); + Assert.assertEquals(library.md5sum, DPP_LOCAL_MD5SUM); + Assert.assertEquals(library.size, 1024); + break; + case SPARK2X: + Assert.assertEquals(library.remotePath, remoteSparkLibraryPath); + Assert.assertEquals(library.md5sum, SPARK_LOCAL_MD5SUM); + Assert.assertEquals(library.size, 10240); + break; + default: + Assert.fail("wrong library type: " + library.libType); + } + } + } catch (Exception e) { + Assert.fail(); + } + } + + @Test + public void testArchiveNotExists() { + new MockUp() { + @Mock + boolean checkPathExist(String remotePath, BrokerDesc brokerDesc) + throws UserException { return false; } + @Mock + void writeFile(String srcFilePath, String destFilePath, BrokerDesc brokerDesc) + throws UserException { return;} + }; + + BrokerDesc brokerDesc = new BrokerDesc("broker", Maps.newHashMap()); + SparkRepository repository = new SparkRepository(remoteRepoPath, brokerDesc); + try { + new Expectations(repository) { + { + repository.getMd5String(anyString); + returns(DPP_LOCAL_MD5SUM, SPARK_LOCAL_MD5SUM); + + repository.getFileSize(anyString); + returns(1024L, 10240L); + } + }; + + // prepare repository + repository.prepare(); + + // get archive + SparkRepository.SparkArchive archive = repository.getCurrentArchive(); + Assert.assertEquals(archive.libraries.size(), 2); + + // check if the remote libraries are equal to local libraries + List libraries = archive.libraries; + for (SparkRepository.SparkLibrary library : libraries) { + switch (library.libType) { + case DPP: + Assert.assertEquals(library.remotePath, remoteDppLibraryPath); + Assert.assertEquals(library.md5sum, DPP_LOCAL_MD5SUM); + Assert.assertEquals(library.size, 1024); + break; + case SPARK2X: + Assert.assertEquals(library.remotePath, remoteSparkLibraryPath); + Assert.assertEquals(library.md5sum, SPARK_LOCAL_MD5SUM); + Assert.assertEquals(library.size, 10240); + break; + default: + Assert.fail("wrong library type: " + library.libType); + } + } + } catch (LoadException e) { + Assert.fail(); + } + } + + @Test + public void testLibraryMd5MissMatch() { + new MockUp() { + @Mock + boolean checkPathExist(String remotePath, BrokerDesc brokerDesc) + throws UserException { return true; } + @Mock + void parseFile(String path, BrokerDesc brokerDesc, List fileStatuses) + throws UserException { fileStatuses.addAll(files); } + @Mock + void deletePath(String path, BrokerDesc brokerDesc) + throws UserException { return; } + @Mock + void writeFile(String srcFilePath, String destFilePath, BrokerDesc brokerDesc) + throws UserException { return;} + }; + + // new md5dum of local library + String newMd5sum = "new_local_md5sum_value"; + // new remote path + String newRemoteDppPath = remoteArchivePath + "/" + SparkRepository.PREFIX_LIB + newMd5sum + "_" + DPP_NAME; + String newRemoteSparkPath = remoteArchivePath + "/" + SparkRepository.PREFIX_LIB + newMd5sum + "_" + SPARK_NAME; + + BrokerDesc brokerDesc = new BrokerDesc("broker", Maps.newHashMap()); + SparkRepository repository = new SparkRepository(remoteRepoPath, brokerDesc); + try { + new Expectations(repository) { + { + repository.getMd5String(anyString); + result = newMd5sum; + + repository.getFileSize(anyString); + returns(1024L, 10240L); + } + }; + + // prepare repository + repository.prepare(); + + // get archive + SparkRepository.SparkArchive archive = repository.getCurrentArchive(); + Assert.assertEquals(archive.libraries.size(), 2); + + // check if the remote libraries are equal to local libraries + List libraries = archive.libraries; + for (SparkRepository.SparkLibrary library : libraries) { + switch (library.libType) { + case DPP: + Assert.assertEquals(library.remotePath, newRemoteDppPath); + Assert.assertEquals(library.md5sum, newMd5sum); + Assert.assertEquals(library.size, 1024); + break; + case SPARK2X: + Assert.assertEquals(library.remotePath, newRemoteSparkPath); + Assert.assertEquals(library.md5sum, newMd5sum); + Assert.assertEquals(library.size, 10240); + break; + default: + Assert.fail("wrong library type: " + library.libType); + } + } + } catch (LoadException e) { + Assert.fail(); + } + } + +}