diff --git a/fe/src/com/baidu/palo/load/DppScheduler.java b/fe/src/com/baidu/palo/load/DppScheduler.java index aba632e35c7b25..a7a04d62ecfe2e 100644 --- a/fe/src/com/baidu/palo/load/DppScheduler.java +++ b/fe/src/com/baidu/palo/load/DppScheduler.java @@ -18,7 +18,6 @@ import com.baidu.palo.common.Config; import com.baidu.palo.common.FeConstants; import com.baidu.palo.common.LoadException; -import com.baidu.palo.common.Pair; import com.baidu.palo.common.util.CommandResult; import com.baidu.palo.common.util.Util; import com.baidu.palo.thrift.TEtlState; @@ -59,6 +58,7 @@ public class DppScheduler { private static final String JOB_CONFIG_DIR = PALO_HOME + "/temp/job_conf"; private static final String JOB_CONFIG_FILE = "jobconfig.json"; private static final String LOCAL_DPP_DIR = PALO_HOME + "/lib/dpp/" + FeConstants.dpp_version; + private static final int DEFAULT_REDUCE_NUM = 1000; private static final long GB = 1024 * 1024 * 1024L; // hdfs://host:port/outputPath/dbId/loadLabel/etlOutputDir @@ -159,20 +159,24 @@ public EtlSubmitResult submitEtlJob(long jobId, String loadLabel, String cluster } } } - - Pair inputPathAndReduceNum; - try { - inputPathAndReduceNum = getInputPathAndCalReduceNumBySize(jobConf); - } catch (LoadException e) { - failMsgs.add(e.getMessage()); - status.setStatus_code(TStatusCode.CANCELLED); - return new EtlSubmitResult(status, null); - } - + + // create input path + Set inputPaths = getInputPaths(jobConf); + String inputPath = StringUtils.join(inputPaths, " -input "); + + // reduce num + int reduceNumByInputSize = 0; + try { + reduceNumByInputSize = calcReduceNumByInputSize(inputPaths); + } catch (InputSizeInvalidException e) { + failMsgs.add(e.getMessage()); + status.setStatus_code(TStatusCode.CANCELLED); + return new EtlSubmitResult(status, null); + } int reduceNumByTablet = calcReduceNumByTablet(jobConf); - int reduceNum = Math.min(inputPathAndReduceNum.second, reduceNumByTablet); + int reduceNum = Math.min(reduceNumByInputSize, reduceNumByTablet); LOG.debug("calculate reduce num. reduceNum: {}, reduceNumByInputSize: {}, reduceNumByTablet: {}", - reduceNum, inputPathAndReduceNum.second, reduceNumByTablet); + reduceNum, reduceNumByInputSize, reduceNumByTablet); // rm path String outputPath = (String) jobConf.get("output_path"); @@ -180,9 +184,9 @@ public EtlSubmitResult submitEtlJob(long jobId, String loadLabel, String cluster // submit etl job String etlJobName = String.format(ETL_JOB_NAME, dbName, loadLabel); - String hadoopRunCmd = String.format(HADOOP_BISTREAMING_CMD, HADOOP_CLIENT, hadoopConfig, etlJobName, - inputPathAndReduceNum.first, outputPath, hadoopConfig, applicationsPath, applicationsPath, - applicationsPath, reduceNum, configFile.getAbsolutePath()); + String hadoopRunCmd = String.format(HADOOP_BISTREAMING_CMD, HADOOP_CLIENT, hadoopConfig, etlJobName, inputPath, + outputPath, hadoopConfig, applicationsPath, applicationsPath, applicationsPath, reduceNum, + configFile.getAbsolutePath()); LOG.info(hadoopRunCmd); String outputLine = null; List hadoopRunCmdList = Util.shellSplit(hadoopRunCmd); @@ -325,62 +329,58 @@ private void prepareDppApplications() throws LoadException { } } } - - private Pair getInputPathAndCalReduceNumBySize(Map jobConf) throws LoadException { - Map tables = (Map) jobConf.get("tables"); - Set fileUrls = new HashSet(); - for (Map table : tables.values()) { - Map sourceFileSchema = (Map) table.get("source_file_schema"); - for (Map> schema : sourceFileSchema.values()) { - fileUrls.addAll(schema.get("file_urls")); - } - } - - String fileUrl = StringUtils.join(fileUrls, " "); - Set inputPaths = new HashSet(); - String hadoopLsCmd = String.format(HADOOP_LS_CMD, HADOOP_CLIENT, hadoopConfig, fileUrl); - CommandResult lsResult = Util.executeCommand(hadoopLsCmd); - if (lsResult.getReturnCode() != 0) { - LOG.error("hadoopLsCmd: {}", hadoopLsCmd); - throw new LoadException("get file list from hdfs failed"); - } - - int reduceNum = 0; - // calc total size - long totalSizeB = 0L; - String stdout = lsResult.getStdout(); - String[] lsFileResults = stdout.split("\n"); - for (String line : lsFileResults) { - // drwxr-xr-x 3 palo palo 0 2014-12-08 14:37 /tmp/file - String[] fileInfos = line.split(" +"); - if (fileInfos.length == 8) { - String filePath = fileInfos[fileInfos.length - 1]; - if (inputPaths.add(filePath)) { - totalSizeB += Long.parseLong(fileInfos[4]); - } - } - } - - // check input size limit - int inputSizeLimitGB = Config.load_input_size_limit_gb; - if (inputSizeLimitGB != 0) { - if (totalSizeB > inputSizeLimitGB * GB) { - String failMsg = "Input file size[" + (float) totalSizeB / GB + "GB]" - + " exceeds system limit[" + inputSizeLimitGB + "GB]"; - LOG.warn(failMsg); - throw new InputSizeInvalidException(failMsg); - } - } - - if (totalSizeB != 0) { - reduceNum = (int) (totalSizeB / Config.dpp_bytes_per_reduce) + 1; - } - - String inputPath = StringUtils.join(inputPaths, " -input "); - Pair inputPathAndReduceNum = new Pair(inputPath, reduceNum); - return inputPathAndReduceNum; - } - + + private Set getInputPaths(Map jobConf) { + Set inputPaths = new HashSet(); + Map tables = (Map) jobConf.get("tables"); + for (Map table : tables.values()) { + Map sourceFileSchema = (Map) table.get("source_file_schema"); + for (Map> schema : sourceFileSchema.values()) { + List fileUrls = schema.get("file_urls"); + inputPaths.addAll(fileUrls); + } + } + return inputPaths; + } + + private int calcReduceNumByInputSize(Set inputPaths) throws InputSizeInvalidException { + int reduceNum = 0; + String hadoopCountCmd = String.format(HADOOP_COUNT_CMD, HADOOP_CLIENT, hadoopConfig, + StringUtils.join(inputPaths, " ")); + LOG.info(hadoopCountCmd); + CommandResult result = Util.executeCommand(hadoopCountCmd); + if (result.getReturnCode() != 0) { + LOG.warn("hadoop count error, result: {}", result); + return DEFAULT_REDUCE_NUM; + } + + // calc total size + long totalSizeB = 0L; + String[] fileInfos = result.getStdout().split("\n"); + for (String fileInfo : fileInfos) { + String[] fileInfoArr = fileInfo.trim().split(" +"); + if (fileInfoArr.length == 4) { + totalSizeB += Long.parseLong(fileInfoArr[2]); + } + } + + // check input size limit + int inputSizeLimitGB = Config.load_input_size_limit_gb; + if (inputSizeLimitGB != 0) { + if (totalSizeB > inputSizeLimitGB * GB) { + String failMsg = "Input file size[" + (float) totalSizeB / GB + "GB]" + + " exceeds system limit[" + inputSizeLimitGB + "GB]"; + LOG.warn(failMsg); + throw new InputSizeInvalidException(failMsg); + } + } + + if (totalSizeB != 0) { + reduceNum = (int) (totalSizeB / Config.dpp_bytes_per_reduce) + 1; + } + return reduceNum; + } + private int calcReduceNumByTablet(Map jobConf) { int reduceNum = 0; Map tables = (Map) jobConf.get("tables");