diff --git a/fe/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/src/main/java/org/apache/doris/catalog/OlapTable.java index b4d60f60000c1b..c0ac047eb2bf10 100644 --- a/fe/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -537,6 +537,10 @@ public KeysType getKeysType() { return keysType; } + public KeysType getKeysTypeByIndexId(long indexId) { + return indexIdToMeta.get(indexId).getKeysType(); + } + public PartitionInfo getPartitionInfo() { return partitionInfo; } diff --git a/fe/src/main/java/org/apache/doris/common/Pair.java b/fe/src/main/java/org/apache/doris/common/Pair.java index 14daca0877a500..1a520e02c380c7 100644 --- a/fe/src/main/java/org/apache/doris/common/Pair.java +++ b/fe/src/main/java/org/apache/doris/common/Pair.java @@ -17,15 +17,21 @@ package org.apache.doris.common; +import com.google.gson.annotations.SerializedName; + import java.util.Comparator; /** * The equivalent of C++'s std::pair<>. + * + * Notice: When using Pair for persistence, users need to guarantee that F and S can be serialized through Gson */ public class Pair { public static PairComparator> PAIR_VALUE_COMPARATOR = new PairComparator<>(); + @SerializedName(value = "first") public F first; + @SerializedName(value = "second") public S second; public Pair(F first, S second) { diff --git a/fe/src/main/java/org/apache/doris/common/util/BrokerUtil.java b/fe/src/main/java/org/apache/doris/common/util/BrokerUtil.java index 622b871589d172..ebbf82bd388d52 100644 --- a/fe/src/main/java/org/apache/doris/common/util/BrokerUtil.java +++ b/fe/src/main/java/org/apache/doris/common/util/BrokerUtil.java @@ -17,52 +17,65 @@ package org.apache.doris.common.util; -import com.google.common.collect.Lists; import org.apache.doris.analysis.BrokerDesc; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.FsBroker; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.ClientPool; +import org.apache.doris.common.Config; import org.apache.doris.common.UserException; import org.apache.doris.service.FrontendOptions; +import org.apache.doris.thrift.TBrokerCloseReaderRequest; +import org.apache.doris.thrift.TBrokerCloseWriterRequest; +import org.apache.doris.thrift.TBrokerDeletePathRequest; +import org.apache.doris.thrift.TBrokerFD; import org.apache.doris.thrift.TBrokerFileStatus; import org.apache.doris.thrift.TBrokerListPathRequest; import org.apache.doris.thrift.TBrokerListResponse; +import org.apache.doris.thrift.TBrokerOpenMode; +import org.apache.doris.thrift.TBrokerOpenReaderRequest; +import org.apache.doris.thrift.TBrokerOpenReaderResponse; +import org.apache.doris.thrift.TBrokerOpenWriterRequest; +import org.apache.doris.thrift.TBrokerOpenWriterResponse; +import org.apache.doris.thrift.TBrokerOperationStatus; import org.apache.doris.thrift.TBrokerOperationStatusCode; +import org.apache.doris.thrift.TBrokerPReadRequest; +import org.apache.doris.thrift.TBrokerPWriteRequest; +import org.apache.doris.thrift.TBrokerReadResponse; import org.apache.doris.thrift.TBrokerVersion; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TPaloBrokerService; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.thrift.TException; +import java.io.FileInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; import java.util.Collections; import java.util.List; public class BrokerUtil { private static final Logger LOG = LogManager.getLogger(BrokerUtil.class); - public static void parseBrokerFile(String path, BrokerDesc brokerDesc, List fileStatuses) + private static int READ_BUFFER_SIZE_B = 1024 * 1024; + + /** + * Parse file status in path with broker, except directory + * @param path + * @param brokerDesc + * @param fileStatuses: file path, size, isDir, isSplitable + * @throws UserException if broker op failed + */ + public static void parseFile(String path, BrokerDesc brokerDesc, List fileStatuses) throws UserException { - FsBroker broker = null; - try { - String localIP = FrontendOptions.getLocalHostAddress(); - broker = Catalog.getCurrentCatalog().getBrokerMgr().getBroker(brokerDesc.getName(), localIP); - } catch (AnalysisException e) { - throw new UserException(e.getMessage()); - } - TNetworkAddress address = new TNetworkAddress(broker.ip, broker.port); - TPaloBrokerService.Client client = null; - try { - client = ClientPool.brokerPool.borrowObject(address); - } catch (Exception e) { - try { - client = ClientPool.brokerPool.borrowObject(address); - } catch (Exception e1) { - throw new UserException("Create connection to broker(" + address + ") failed."); - } - } + TNetworkAddress address = getAddress(brokerDesc); + TPaloBrokerService.Client client = borrowClient(address); boolean failed = true; try { TBrokerListPathRequest request = new TBrokerListPathRequest( @@ -71,11 +84,11 @@ public static void parseBrokerFile(String path, BrokerDesc brokerDesc, List getSchemaChangeShadowColumnDesc(Table tbl, Map columnExprMap) { + List shadowColumnDescs = Lists.newArrayList(); + for (Column column : tbl.getFullSchema()) { + if (!column.isNameWithPrefix(SchemaChangeHandler.SHADOW_NAME_PRFIX)) { + continue; + } + + String originCol = column.getNameWithoutPrefix(SchemaChangeHandler.SHADOW_NAME_PRFIX); + if (columnExprMap.containsKey(originCol)) { + Expr mappingExpr = columnExprMap.get(originCol); + if (mappingExpr != null) { + /* + * eg: + * (A, C) SET (B = func(xx)) + * -> + * (A, C) SET (B = func(xx), __doris_shadow_B = func(xx)) + */ + ImportColumnDesc importColumnDesc = new ImportColumnDesc(column.getName(), mappingExpr); + shadowColumnDescs.add(importColumnDesc); + } else { + /* + * eg: + * (A, B, C) + * -> + * (A, B, C) SET (__doris_shadow_B = B) + */ + ImportColumnDesc importColumnDesc = new ImportColumnDesc(column.getName(), + new SlotRef(null, originCol)); + shadowColumnDescs.add(importColumnDesc); + } + } else { + /* + * There is a case that if user does not specify the related origin column, eg: + * COLUMNS (A, C), and B is not specified, but B is being modified so there is a shadow column '__doris_shadow_B'. + * We can not just add a mapping function "__doris_shadow_B = substitute(B)", because Doris can not find column B. + * In this case, __doris_shadow_B can use its default value, so no need to add it to column mapping + */ + // do nothing + } + } + return shadowColumnDescs; + } + + /* + * used for spark load job + * not init slot desc and analyze exprs + */ + public static void initColumns(Table tbl, List columnExprs, + Map>> columnToHadoopFunction) throws UserException { + initColumns(tbl, columnExprs, columnToHadoopFunction, null, null, null, null, null, false); + } + + /* + * This function should be used for broker load v2 and stream load. + * And it must be called in same db lock when planing. + */ + public static void initColumns(Table tbl, List columnExprs, + Map>> columnToHadoopFunction, + Map exprsByName, Analyzer analyzer, TupleDescriptor srcTupleDesc, + Map slotDescByName, TBrokerScanRangeParams params) throws UserException { + initColumns(tbl, columnExprs, columnToHadoopFunction, exprsByName, analyzer, + srcTupleDesc, slotDescByName, params, true); + } + /* * This function will do followings: * 1. fill the column exprs if user does not specify any column or column mapping. @@ -871,14 +943,12 @@ public static void checkAndCreateSource(Database db, DataDescription dataDescrip * 3. Add any shadow columns if have. * 4. validate hadoop functions * 5. init slot descs and expr map for load plan - * - * This function should be used for broker load v2 and stream load. - * And it must be called in same db lock when planing. */ public static void initColumns(Table tbl, List columnExprs, Map>> columnToHadoopFunction, Map exprsByName, Analyzer analyzer, TupleDescriptor srcTupleDesc, - Map slotDescByName, TBrokerScanRangeParams params) throws UserException { + Map slotDescByName, TBrokerScanRangeParams params, + boolean needInitSlotAndAnalyzeExprs) throws UserException { // check mapping column exist in schema // !! all column mappings are in columnExprs !! for (ImportColumnDesc importColumnDesc : columnExprs) { @@ -925,50 +995,8 @@ public static void initColumns(Table tbl, List columnExprs, throw new DdlException("Column has no default value. column: " + columnName); } - // When doing schema change, there may have some 'shadow' columns, with prefix '__doris_shadow_' in - // their names. These columns are invisible to user, but we need to generate data for these columns. - // So we add column mappings for these column. - // eg1: - // base schema is (A, B, C), and B is under schema change, so there will be a shadow column: '__doris_shadow_B' - // So the final column mapping should looks like: (A, B, C, __doris_shadow_B = substitute(B)); - for (Column column : tbl.getFullSchema()) { - if (!column.isNameWithPrefix(SchemaChangeHandler.SHADOW_NAME_PRFIX)) { - continue; - } - - String originCol = column.getNameWithoutPrefix(SchemaChangeHandler.SHADOW_NAME_PRFIX); - if (columnExprMap.containsKey(originCol)) { - Expr mappingExpr = columnExprMap.get(originCol); - if (mappingExpr != null) { - /* - * eg: - * (A, C) SET (B = func(xx)) - * -> - * (A, C) SET (B = func(xx), __doris_shadow_B = func(xx)) - */ - ImportColumnDesc importColumnDesc = new ImportColumnDesc(column.getName(), mappingExpr); - copiedColumnExprs.add(importColumnDesc); - } else { - /* - * eg: - * (A, B, C) - * -> - * (A, B, C) SET (__doris_shadow_B = B) - */ - ImportColumnDesc importColumnDesc = new ImportColumnDesc(column.getName(), - new SlotRef(null, originCol)); - copiedColumnExprs.add(importColumnDesc); - } - } else { - /* - * There is a case that if user does not specify the related origin column, eg: - * COLUMNS (A, C), and B is not specified, but B is being modified so there is a shadow column '__doris_shadow_B'. - * We can not just add a mapping function "__doris_shadow_B = substitute(B)", because Doris can not find column B. - * In this case, __doris_shadow_B can use its default value, so no need to add it to column mapping - */ - // do nothing - } - } + // get shadow column desc when table schema change + copiedColumnExprs.addAll(getSchemaChangeShadowColumnDesc(tbl, columnExprMap)); // validate hadoop functions if (columnToHadoopFunction != null) { @@ -991,6 +1019,10 @@ public static void initColumns(Table tbl, List columnExprs, } } + if (!needInitSlotAndAnalyzeExprs) { + return; + } + // init slot desc add expr map, also transform hadoop functions for (ImportColumnDesc importColumnDesc : copiedColumnExprs) { // make column name case match with real column name diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java index 099caeaa7b5125..3d394604fb0c4b 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java @@ -75,7 +75,7 @@ private void getAllFileStatus() throws UserException { long groupFileSize = 0; List fileStatuses = Lists.newArrayList(); for (String path : fileGroup.getFilePaths()) { - BrokerUtil.parseBrokerFile(path, brokerDesc, fileStatuses); + BrokerUtil.parseFile(path, brokerDesc, fileStatuses); } fileStatusList.add(fileStatuses); for (TBrokerFileStatus fstatus : fileStatuses) { diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java index 3b0bb52a55f2c9..28b556cd35032b 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java @@ -17,6 +17,7 @@ package org.apache.doris.load.loadv2; +import com.google.gson.annotations.SerializedName; import org.apache.doris.analysis.LoadStmt; import org.apache.doris.catalog.AuthorizationInfo; import org.apache.doris.catalog.Catalog; @@ -30,6 +31,7 @@ import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.FeConstants; import org.apache.doris.common.LabelAlreadyUsedException; +import org.apache.doris.common.LoadException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.UserException; import org.apache.doris.common.io.Text; @@ -46,6 +48,7 @@ import org.apache.doris.metric.MetricRepo; import org.apache.doris.mysql.privilege.PaloPrivilege; import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.Coordinator; import org.apache.doris.qe.QeProcessorImpl; @@ -392,7 +395,8 @@ public void beginTxn() throws LabelAlreadyUsedException, BeginTransactionExcepti * @throws AnalysisException there are error params in job * @throws DuplicatedRequestException */ - public void execute() throws LabelAlreadyUsedException, BeginTransactionException, AnalysisException, DuplicatedRequestException { + public void execute() throws LabelAlreadyUsedException, BeginTransactionException, AnalysisException, + DuplicatedRequestException, LoadException { writeLock(); try { unprotectedExecute(); @@ -401,8 +405,8 @@ public void execute() throws LabelAlreadyUsedException, BeginTransactionExceptio } } - public void unprotectedExecute() - throws LabelAlreadyUsedException, BeginTransactionException, AnalysisException, DuplicatedRequestException { + public void unprotectedExecute() throws LabelAlreadyUsedException, BeginTransactionException, AnalysisException, + DuplicatedRequestException, LoadException { // check if job state is pending if (state != JobState.PENDING) { return; @@ -410,7 +414,10 @@ public void unprotectedExecute() // the limit of job will be restrict when begin txn beginTxn(); unprotectedExecuteJob(); - unprotectedUpdateState(JobState.LOADING); + // update spark load job state from PENDING to ETL when pending task is finished + if (jobType != EtlJobType.SPARK) { + unprotectedUpdateState(JobState.LOADING); + } } public void processTimeout() { @@ -433,7 +440,7 @@ public void processTimeout() { } } - protected void unprotectedExecuteJob() { + protected void unprotectedExecuteJob() throws LoadException { } /** @@ -706,6 +713,9 @@ public List getShowInfo() throws DdlException { case CANCELLED: jobInfo.add("ETL:N/A; LOAD:N/A"); break; + case ETL: + jobInfo.add("ETL:" + progress + "%; LOAD:0%"); + break; default: jobInfo.add("ETL:100%; LOAD:" + progress + "%"); break; @@ -722,7 +732,7 @@ public List getShowInfo() throws DdlException { } // task info - jobInfo.add("cluster:N/A" + "; timeout(s):" + timeoutSecond + jobInfo.add("cluster:" + getResourceName() + "; timeout(s):" + timeoutSecond + "; max_filter_ratio:" + maxFilterRatio); // error msg @@ -735,7 +745,7 @@ public List getShowInfo() throws DdlException { // create time jobInfo.add(TimeUtils.longToTimeString(createTimestamp)); // etl start time - jobInfo.add(TimeUtils.longToTimeString(loadStartTimestamp)); + jobInfo.add(TimeUtils.longToTimeString(getEtlStartTimestamp())); // etl end time jobInfo.add(TimeUtils.longToTimeString(loadStartTimestamp)); // load start time @@ -751,6 +761,14 @@ public List getShowInfo() throws DdlException { } } + protected String getResourceName() { + return "N/A"; + } + + protected long getEtlStartTimestamp() { + return loadStartTimestamp; + } + public void getJobInfo(Load.JobInfo jobInfo) throws DdlException { checkAuth("SHOW LOAD"); jobInfo.tblNames.addAll(getTableNamesForShow()); @@ -768,6 +786,8 @@ public static LoadJob read(DataInput in) throws IOException { EtlJobType type = EtlJobType.valueOf(Text.readString(in)); if (type == EtlJobType.BROKER) { job = new BrokerLoadJob(); + } else if (type == EtlJobType.SPARK) { + job = new SparkLoadJob(); } else if (type == EtlJobType.INSERT) { job = new InsertLoadJob(); } else if (type == EtlJobType.MINI) { @@ -1016,4 +1036,60 @@ public void readFields(DataInput in) throws IOException { timezone = Text.readString(in); } } + + public void replayUpdateStateInfo(LoadJobStateUpdateInfo info) { + state = info.getState(); + transactionId = info.getTransactionId(); + loadStartTimestamp = info.getLoadStartTimestamp(); + } + + public static class LoadJobStateUpdateInfo implements Writable { + @SerializedName(value = "jobId") + private long jobId; + @SerializedName(value = "state") + private JobState state; + @SerializedName(value = "transactionId") + private long transactionId; + @SerializedName(value = "loadStartTimestamp") + private long loadStartTimestamp; + + public LoadJobStateUpdateInfo(long jobId, JobState state, long transactionId, long loadStartTimestamp) { + this.jobId = jobId; + this.state = state; + this.transactionId = transactionId; + this.loadStartTimestamp = loadStartTimestamp; + } + + public long getJobId() { + return jobId; + } + + public JobState getState() { + return state; + } + + public long getTransactionId() { + return transactionId; + } + + public long getLoadStartTimestamp() { + return loadStartTimestamp; + } + + @Override + public String toString() { + return GsonUtils.GSON.toJson(this); + } + + @Override + public void write(DataOutput out) throws IOException { + String json = GsonUtils.GSON.toJson(this); + Text.writeString(out, json); + } + + public static LoadJobStateUpdateInfo read(DataInput in) throws IOException { + String json = Text.readString(in); + return GsonUtils.GSON.fromJson(json, LoadJobStateUpdateInfo.class); + } + } } diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJobScheduler.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJobScheduler.java index f01e99ba97212a..9a2b691983963b 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJobScheduler.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJobScheduler.java @@ -21,6 +21,7 @@ import org.apache.doris.common.Config; import org.apache.doris.common.DuplicatedRequestException; import org.apache.doris.common.LabelAlreadyUsedException; +import org.apache.doris.common.LoadException; import org.apache.doris.common.util.LogBuilder; import org.apache.doris.common.util.LogKey; import org.apache.doris.common.util.MasterDaemon; @@ -74,8 +75,16 @@ private void process() throws InterruptedException { LOG.warn(new LogBuilder(LogKey.LOAD_JOB, loadJob.getId()) .add("error_msg", "There are error properties in job. Job will be cancelled") .build(), e); + // transaction not begin, so need not abort loadJob.cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.ETL_SUBMIT_FAIL, e.getMessage()), false, true); + } catch (LoadException e) { + LOG.warn(new LogBuilder(LogKey.LOAD_JOB, loadJob.getId()) + .add("error_msg", "Failed to submit etl job. Job will be cancelled") + .build(), e); + // transaction already begin, so need abort + loadJob.cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.ETL_SUBMIT_FAIL, e.getMessage()), + true, true); } catch (DuplicatedRequestException e) { // should not happen in load job scheduler, there is no request id. LOG.warn(new LogBuilder(LogKey.LOAD_JOB, loadJob.getId()) diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java index 1a5658ab49b12a..5dd2d05b462d7d 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java @@ -333,6 +333,17 @@ public void replayEndLoadJob(LoadJobFinalOperation operation) { .build()); } + public void replayUpdateLoadJobStateInfo(LoadJob.LoadJobStateUpdateInfo info) { + long jobId = info.getJobId(); + LoadJob job = idToLoadJob.get(jobId); + if (job == null) { + LOG.warn("replay update load job state failed. error: job not found, id: {}", jobId); + return; + } + + job.replayUpdateStateInfo(info); + } + public int getLoadJobNum(JobState jobState, long dbId) { readLock(); try { diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadTask.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadTask.java index e9b286f8091afa..0a52369362882d 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadTask.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadTask.java @@ -18,6 +18,7 @@ package org.apache.doris.load.loadv2; import org.apache.doris.catalog.Catalog; +import org.apache.doris.common.LoadException; import org.apache.doris.common.UserException; import org.apache.doris.common.util.LogBuilder; import org.apache.doris.common.util.LogKey; @@ -53,7 +54,7 @@ protected void exec() { } catch (UserException e) { failMsg.setMsg(e.getMessage() == null ? "" : e.getMessage()); LOG.warn(new LogBuilder(LogKey.LOAD_JOB, callback.getCallbackId()) - .add("error_msg", "Failed to execute load task").build()); + .add("error_msg", "Failed to execute load task").build(), e); } catch (Exception e) { failMsg.setMsg(e.getMessage() == null ? "" : e.getMessage()); LOG.warn(new LogBuilder(LogKey.LOAD_JOB, callback.getCallbackId()) @@ -66,6 +67,13 @@ protected void exec() { } } + /** + * init load task + * @throws LoadException + */ + public void init() throws LoadException { + } + /** * execute load task * diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java b/fe/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java new file mode 100644 index 00000000000000..d135b232d36a43 --- /dev/null +++ b/fe/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java @@ -0,0 +1,173 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load.loadv2; + +import org.apache.doris.PaloFe; +import org.apache.doris.analysis.BrokerDesc; +import org.apache.doris.catalog.SparkResource; +import org.apache.doris.common.LoadException; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.BrokerUtil; +import org.apache.doris.load.loadv2.etl.EtlJobConfig; +import org.apache.doris.thrift.TEtlState; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.spark.launcher.SparkAppHandle; +import org.apache.spark.launcher.SparkAppHandle.Listener; +import org.apache.spark.launcher.SparkAppHandle.State; +import org.apache.spark.launcher.SparkLauncher; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.util.Map; + +/** + * SparkEtlJobHandler is responsible for + * 1. submit spark etl job + * 2. get spark etl job status + * 3. kill spark etl job + * 4. get spark etl file paths + * 5. delete etl output path + */ +public class SparkEtlJobHandler { + private static final Logger LOG = LogManager.getLogger(SparkEtlJobHandler.class); + + private static final String APP_RESOURCE_NAME = "palo-fe.jar"; + private static final String CONFIG_FILE_NAME = "jobconfig.json"; + private static final String APP_RESOURCE_LOCAL_PATH = PaloFe.DORIS_HOME_DIR + "/lib/" + APP_RESOURCE_NAME; + private static final String JOB_CONFIG_DIR = "configs"; + private static final String MAIN_CLASS = "org.apache.doris.load.loadv2.etl.SparkEtlJob"; + private static final String ETL_JOB_NAME = "doris__%s"; + // 5min + private static final int GET_APPID_MAX_RETRY_TIMES = 300; + private static final int GET_APPID_SLEEP_MS = 1000; + + class SparkAppListener implements Listener { + @Override + public void stateChanged(SparkAppHandle sparkAppHandle) {} + + @Override + public void infoChanged(SparkAppHandle sparkAppHandle) {} + } + + public void submitEtlJob(long loadJobId, String loadLabel, EtlJobConfig etlJobConfig, SparkResource resource, + BrokerDesc brokerDesc, SparkPendingTaskAttachment attachment) throws LoadException { + // delete outputPath + deleteEtlOutputPath(etlJobConfig.outputPath, brokerDesc); + + // upload app resource and jobconfig to hdfs + String configsHdfsDir = etlJobConfig.outputPath + "/" + JOB_CONFIG_DIR + "/"; + String appResourceHdfsPath = configsHdfsDir + APP_RESOURCE_NAME; + String jobConfigHdfsPath = configsHdfsDir + CONFIG_FILE_NAME; + try { + BrokerUtil.writeFile(APP_RESOURCE_LOCAL_PATH, appResourceHdfsPath, brokerDesc); + byte[] configData = etlJobConfig.configToJson().getBytes("UTF-8"); + BrokerUtil.writeFile(configData, jobConfigHdfsPath, brokerDesc); + } catch (UserException | UnsupportedEncodingException e) { + throw new LoadException(e.getMessage()); + } + + SparkLauncher launcher = new SparkLauncher(); + // master | deployMode + // ------------|------------- + // yarn | cluster + // spark://xx | client + launcher.setMaster(resource.getMaster()) + .setDeployMode(resource.getDeployMode().name().toLowerCase()) + .setAppResource(appResourceHdfsPath) + // TODO(wyb): spark-load + // replace with getCanonicalName later + //.setMainClass(SparkEtlJob.class.getCanonicalName()) + .setMainClass(MAIN_CLASS) + .setAppName(String.format(ETL_JOB_NAME, loadLabel)) + .addAppArgs(jobConfigHdfsPath); + // spark configs + for (Map.Entry entry : resource.getSparkConfigs().entrySet()) { + launcher.setConf(entry.getKey(), entry.getValue()); + } + + // start app + SparkAppHandle handle = null; + State state = null; + String appId = null; + int retry = 0; + String errMsg = "start spark app failed. error: "; + try { + handle = launcher.startApplication(new SparkAppListener()); + } catch (IOException e) { + LOG.warn(errMsg, e); + throw new LoadException(errMsg + e.getMessage()); + } + + while (retry++ < GET_APPID_MAX_RETRY_TIMES) { + appId = handle.getAppId(); + if (appId != null) { + break; + } + + // check state and retry + state = handle.getState(); + if (fromSparkState(state) == TEtlState.CANCELLED) { + throw new LoadException(errMsg + "spark app state: " + state.toString()); + } + if (retry >= GET_APPID_MAX_RETRY_TIMES) { + throw new LoadException(errMsg + "wait too much time for getting appid. spark app state: " + + state.toString()); + } + + // log + if (retry % 10 == 0) { + LOG.info("spark appid that handle get is null. load job id: {}, state: {}, retry times: {}", + loadJobId, state.toString(), retry); + } + try { + Thread.sleep(GET_APPID_SLEEP_MS); + } catch (InterruptedException e) { + LOG.warn(e.getMessage()); + } + } + + // success + attachment.setAppId(appId); + attachment.setHandle(handle); + } + + public void deleteEtlOutputPath(String outputPath, BrokerDesc brokerDesc) { + try { + BrokerUtil.deletePath(outputPath, brokerDesc); + LOG.info("delete path success. path: {}", outputPath); + } catch (UserException e) { + LOG.warn("delete path failed. path: {}", outputPath, e); + } + } + + private TEtlState fromSparkState(State state) { + switch (state) { + case FINISHED: + return TEtlState.FINISHED; + case FAILED: + case KILLED: + case LOST: + return TEtlState.CANCELLED; + default: + // UNKNOWN CONNECTED SUBMITTED RUNNING + return TEtlState.RUNNING; + } + } +} diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java index 8bb68f764eb738..8f742350461da0 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java @@ -17,30 +17,43 @@ package org.apache.doris.load.loadv2; -import com.google.common.base.Strings; import org.apache.doris.analysis.BrokerDesc; import org.apache.doris.analysis.ResourceDesc; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Resource; import org.apache.doris.catalog.SparkResource; +import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; +import org.apache.doris.common.DuplicatedRequestException; +import org.apache.doris.common.LabelAlreadyUsedException; +import org.apache.doris.common.LoadException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.Pair; import org.apache.doris.common.io.Text; +import org.apache.doris.common.util.LogBuilder; +import org.apache.doris.common.util.LogKey; import org.apache.doris.load.EtlJobType; import org.apache.doris.load.FailMsg; import org.apache.doris.qe.OriginStatement; +import org.apache.doris.service.FrontendOptions; import org.apache.doris.task.AgentTaskQueue; import org.apache.doris.task.PushTask; +import org.apache.doris.transaction.BeginTransactionException; +import org.apache.doris.transaction.TransactionState.LoadJobSourceType; +import org.apache.doris.transaction.TransactionState.TxnCoordinator; +import org.apache.doris.transaction.TransactionState.TxnSourceType; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.spark.launcher.SparkAppHandle; import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import com.google.gson.annotations.SerializedName; import java.io.DataInput; import java.io.DataOutput; @@ -79,7 +92,7 @@ public class SparkLoadJob extends BulkLoadJob { private long quorumFinishTimestamp = -1; // below for push task private Map> tableToLoadPartitions = Maps.newHashMap(); - //private Map indexToPushBrokerReaderParams = Maps.newHashMap(); + //private Map indexToPushBrokerReaderParams = Maps.newHashMap(); private Map indexToSchemaHash = Maps.newHashMap(); private Map> tabletToSentReplicaPushTask = Maps.newHashMap(); private Set finishedReplicas = Sets.newHashSet(); @@ -127,6 +140,77 @@ private void setResourceInfo() throws DdlException { brokerDesc = new BrokerDesc(sparkResource.getBroker(), brokerProperties); } + @Override + public void beginTxn() + throws LabelAlreadyUsedException, BeginTransactionException, AnalysisException, DuplicatedRequestException { + transactionId = Catalog.getCurrentGlobalTransactionMgr() + .beginTransaction(dbId, Lists.newArrayList(fileGroupAggInfo.getAllTableIds()), label, null, + new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()), + LoadJobSourceType.FRONTEND, id, timeoutSecond); + } + + @Override + protected void unprotectedExecuteJob() throws LoadException { + // create pending task + LoadTask task = new SparkLoadPendingTask(this, fileGroupAggInfo.getAggKeyToFileGroups(), + sparkResource, brokerDesc); + task.init(); + idToTasks.put(task.getSignature(), task); + Catalog.getCurrentCatalog().getLoadTaskScheduler().submit(task); + } + + @Override + public void onTaskFinished(TaskAttachment attachment) { + if (attachment instanceof SparkPendingTaskAttachment) { + onPendingTaskFinished((SparkPendingTaskAttachment) attachment); + } + } + + private void onPendingTaskFinished(SparkPendingTaskAttachment attachment) { + writeLock(); + try { + // check if job has been cancelled + if (isTxnDone()) { + LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id) + .add("state", state) + .add("error_msg", "this task will be ignored when job is: " + state) + .build()); + return; + } + + if (finishedTaskIds.contains(attachment.getTaskId())) { + LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id) + .add("task_id", attachment.getTaskId()) + .add("error_msg", "this is a duplicated callback of pending task " + + "when broker already has loading task") + .build()); + return; + } + + // add task id into finishedTaskIds + finishedTaskIds.add(attachment.getTaskId()); + + sparkAppHandle = attachment.getHandle(); + appId = attachment.getAppId(); + etlOutputPath = attachment.getOutputPath(); + + executeEtl(); + // log etl state + unprotectedLogUpdateStateInfo(); + } finally { + writeUnlock(); + } + } + + /** + * update etl start time and state in spark load job + */ + private void executeEtl() { + etlStartTimestamp = System.currentTimeMillis(); + state = JobState.ETL; + LOG.info("update to {} state success. job id: {}", state, id); + } + /** * load job already cancelled or finished, clear job below: * 1. kill etl job and delete etl files @@ -136,8 +220,7 @@ private void clearJob() { Preconditions.checkState(state == JobState.FINISHED || state == JobState.CANCELLED); LOG.debug("kill etl job and delete etl files. id: {}, state: {}", id, state); - // TODO(wyb): spark-load - //SparkEtlJobHandler handler = new SparkEtlJobHandler(); + SparkEtlJobHandler handler = new SparkEtlJobHandler(); if (state == JobState.CANCELLED) { if ((!Strings.isNullOrEmpty(appId) && sparkResource.isYarnMaster()) || sparkAppHandle != null) { try { @@ -152,8 +235,7 @@ private void clearJob() { try { // delete label dir, remove the last taskId dir String outputPath = etlOutputPath.substring(0, etlOutputPath.lastIndexOf("/")); - // TODO(wyb): spark-load - //handler.deleteEtlOutputPath(outputPath, brokerDesc); + handler.deleteEtlOutputPath(outputPath, brokerDesc); } catch (Exception e) { LOG.warn("delete etl files failed. id: {}, state: {}", id, state, e); } @@ -198,6 +280,16 @@ public void cancelJob(FailMsg failMsg) throws DdlException { clearJob(); } + @Override + protected String getResourceName() { + return sparkResource.getName(); + } + + @Override + protected long getEtlStartTimestamp() { + return etlStartTimestamp; + } + @Override public void write(DataOutput out) throws IOException { super.write(out); @@ -226,4 +318,78 @@ public void readFields(DataInput in) throws IOException { tabletMetaToFileInfo.put(tabletMetaStr, fileInfo); } } + + /** + * log load job update info when job state changed to etl or loading + */ + private void unprotectedLogUpdateStateInfo() { + SparkLoadJobStateUpdateInfo info = new SparkLoadJobStateUpdateInfo( + id, state, transactionId, etlStartTimestamp, appId, etlOutputPath, + loadStartTimestamp, tabletMetaToFileInfo); + Catalog.getCurrentCatalog().getEditLog().logUpdateLoadJob(info); + } + + @Override + public void replayUpdateStateInfo(LoadJobStateUpdateInfo info) { + super.replayUpdateStateInfo(info); + SparkLoadJobStateUpdateInfo sparkJobStateInfo = (SparkLoadJobStateUpdateInfo) info; + etlStartTimestamp = sparkJobStateInfo.getEtlStartTimestamp(); + appId = sparkJobStateInfo.getAppId(); + etlOutputPath = sparkJobStateInfo.getEtlOutputPath(); + tabletMetaToFileInfo = sparkJobStateInfo.getTabletMetaToFileInfo(); + + switch (state) { + case ETL: + // nothing to do + break; + case LOADING: + // TODO(wyb): spark-load + //unprotectedPrepareLoadingInfos(); + break; + default: + LOG.warn("replay update load job state info failed. error: wrong state. job id: {}, state: {}", + id, state); + break; + } + } + + /** + * Used for spark load job journal log when job state changed to ETL or LOADING + */ + public static class SparkLoadJobStateUpdateInfo extends LoadJobStateUpdateInfo { + @SerializedName(value = "etlStartTimestamp") + private long etlStartTimestamp; + @SerializedName(value = "appId") + private String appId; + @SerializedName(value = "etlOutputPath") + private String etlOutputPath; + @SerializedName(value = "tabletMetaToFileInfo") + private Map> tabletMetaToFileInfo; + + public SparkLoadJobStateUpdateInfo(long jobId, JobState state, long transactionId, long etlStartTimestamp, + String appId, String etlOutputPath, long loadStartTimestamp, + Map> tabletMetaToFileInfo) { + super(jobId, state, transactionId, loadStartTimestamp); + this.etlStartTimestamp = etlStartTimestamp; + this.appId = appId; + this.etlOutputPath = etlOutputPath; + this.tabletMetaToFileInfo = tabletMetaToFileInfo; + } + + public long getEtlStartTimestamp() { + return etlStartTimestamp; + } + + public String getAppId() { + return appId; + } + + public String getEtlOutputPath() { + return etlOutputPath; + } + + public Map> getTabletMetaToFileInfo() { + return tabletMetaToFileInfo; + } + } } diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java b/fe/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java new file mode 100644 index 00000000000000..185e84a54958c9 --- /dev/null +++ b/fe/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java @@ -0,0 +1,550 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load.loadv2; + +import org.apache.doris.analysis.BrokerDesc; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.FunctionCallExpr; +import org.apache.doris.analysis.ImportColumnDesc; +import org.apache.doris.analysis.LiteralExpr; +import org.apache.doris.catalog.AggregateType; +import org.apache.doris.catalog.Catalog; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.DistributionInfo; +import org.apache.doris.catalog.DistributionInfo.DistributionInfoType; +import org.apache.doris.catalog.HashDistributionInfo; +import org.apache.doris.catalog.HiveTable; +import org.apache.doris.catalog.KeysType; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Partition; +import org.apache.doris.catalog.PartitionKey; +import org.apache.doris.catalog.PartitionType; +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.catalog.RangePartitionInfo; +import org.apache.doris.catalog.SparkResource; +import org.apache.doris.common.LoadException; +import org.apache.doris.common.Pair; +import org.apache.doris.common.UserException; +import org.apache.doris.load.BrokerFileGroup; +import org.apache.doris.load.BrokerFileGroupAggInfo.FileGroupAggKey; +import org.apache.doris.load.FailMsg; +import org.apache.doris.load.Load; +import org.apache.doris.load.loadv2.etl.EtlJobConfig; +import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlColumn; +import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlColumnMapping; +import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlFileGroup; +import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlIndex; +import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlJobProperty; +import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlPartition; +import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlPartitionInfo; +import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlTable; +import org.apache.doris.load.loadv2.etl.EtlJobConfig.FilePatternVersion; +import org.apache.doris.load.loadv2.etl.EtlJobConfig.SourceType; +import org.apache.doris.transaction.TransactionState; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Range; +import com.google.common.collect.Sets; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +// 1. create etl job config and write it into jobconfig.json file +// 2. submit spark etl job +public class SparkLoadPendingTask extends LoadTask { + private static final Logger LOG = LogManager.getLogger(SparkLoadPendingTask.class); + + private final Map> aggKeyToBrokerFileGroups; + private final SparkResource resource; + private final BrokerDesc brokerDesc; + private final long dbId; + private final String loadLabel; + private final long loadJobId; + private final long transactionId; + private EtlJobConfig etlJobConfig; + + public SparkLoadPendingTask(SparkLoadJob loadTaskCallback, + Map> aggKeyToBrokerFileGroups, + SparkResource resource, BrokerDesc brokerDesc) { + super(loadTaskCallback); + this.retryTime = 3; + this.attachment = new SparkPendingTaskAttachment(signature); + this.aggKeyToBrokerFileGroups = aggKeyToBrokerFileGroups; + this.resource = resource; + this.brokerDesc = brokerDesc; + this.dbId = loadTaskCallback.getDbId(); + this.loadJobId = loadTaskCallback.getId(); + this.loadLabel = loadTaskCallback.getLabel(); + this.transactionId = loadTaskCallback.getTransactionId(); + this.failMsg = new FailMsg(FailMsg.CancelType.ETL_SUBMIT_FAIL); + } + + @Override + void executeTask() throws LoadException { + LOG.info("begin to execute spark pending task. load job id: {}", loadJobId); + submitEtlJob(); + } + + private void submitEtlJob() throws LoadException { + SparkPendingTaskAttachment sparkAttachment = (SparkPendingTaskAttachment) attachment; + // retry different output path + etlJobConfig.outputPath = EtlJobConfig.getOutputPath(resource.getWorkingDir(), dbId, loadLabel, signature); + sparkAttachment.setOutputPath(etlJobConfig.outputPath); + + // handler submit etl job + SparkEtlJobHandler handler = new SparkEtlJobHandler(); + handler.submitEtlJob(loadJobId, loadLabel, etlJobConfig, resource, brokerDesc, sparkAttachment); + LOG.info("submit spark etl job success. load job id: {}, attachment: {}", loadJobId, sparkAttachment); + } + + @Override + public void init() throws LoadException { + createEtlJobConf(); + } + + private void createEtlJobConf() throws LoadException { + Database db = Catalog.getCurrentCatalog().getDb(dbId); + if (db == null) { + throw new LoadException("db does not exist. id: " + dbId); + } + + Map tables = Maps.newHashMap(); + db.readLock(); + try { + Map> tableIdToPartitionIds = Maps.newHashMap(); + Set allPartitionsTableIds = Sets.newHashSet(); + prepareTablePartitionInfos(db, tableIdToPartitionIds, allPartitionsTableIds); + + + for (Map.Entry> entry : aggKeyToBrokerFileGroups.entrySet()) { + FileGroupAggKey aggKey = entry.getKey(); + long tableId = aggKey.getTableId(); + + OlapTable table = (OlapTable) db.getTable(tableId); + if (table == null) { + throw new LoadException("table does not exist. id: " + tableId); + } + + EtlTable etlTable = null; + if (tables.containsKey(tableId)) { + etlTable = tables.get(tableId); + } else { + // indexes + List etlIndexes = createEtlIndexes(table); + // partition info + EtlPartitionInfo etlPartitionInfo = createEtlPartitionInfo(table, + tableIdToPartitionIds.get(tableId)); + etlTable = new EtlTable(etlIndexes, etlPartitionInfo); + tables.put(tableId, etlTable); + + // add table indexes to transaction state + TransactionState txnState = Catalog.getCurrentGlobalTransactionMgr() + .getTransactionState(dbId, transactionId); + if (txnState == null) { + throw new LoadException("txn does not exist. id: " + transactionId); + } + txnState.addTableIndexes(table); + } + + // file group + for (BrokerFileGroup fileGroup : entry.getValue()) { + etlTable.addFileGroup(createEtlFileGroup(fileGroup, tableIdToPartitionIds.get(tableId), db, table)); + } + } + } finally { + db.readUnlock(); + } + + String outputFilePattern = EtlJobConfig.getOutputFilePattern(loadLabel, FilePatternVersion.V1); + // strictMode timezone properties + EtlJobProperty properties = new EtlJobProperty(); + properties.strictMode = ((LoadJob) callback).strictMode; + properties.timezone = ((LoadJob) callback).timezone; + etlJobConfig = new EtlJobConfig(tables, outputFilePattern, loadLabel, properties); + } + + private void prepareTablePartitionInfos(Database db, Map> tableIdToPartitionIds, + Set allPartitionsTableIds) throws LoadException { + for (FileGroupAggKey aggKey : aggKeyToBrokerFileGroups.keySet()) { + long tableId = aggKey.getTableId(); + if (allPartitionsTableIds.contains(tableId)) { + continue; + } + + OlapTable table = (OlapTable) db.getTable(tableId); + if (table == null) { + throw new LoadException("table does not exist. id: " + tableId); + } + + Set partitionIds = null; + if (tableIdToPartitionIds.containsKey(tableId)) { + partitionIds = tableIdToPartitionIds.get(tableId); + } else { + partitionIds = Sets.newHashSet(); + tableIdToPartitionIds.put(tableId, partitionIds); + } + + Set groupPartitionIds = aggKey.getPartitionIds(); + // if not assign partition, use all partitions + if (groupPartitionIds == null || groupPartitionIds.isEmpty()) { + for (Partition partition : table.getPartitions()) { + partitionIds.add(partition.getId()); + } + + allPartitionsTableIds.add(tableId); + } else { + partitionIds.addAll(groupPartitionIds); + } + } + } + + private List createEtlIndexes(OlapTable table) throws LoadException { + List etlIndexes = Lists.newArrayList(); + + for (Map.Entry> entry : table.getIndexIdToSchema().entrySet()) { + long indexId = entry.getKey(); + int schemaHash = table.getSchemaHashByIndexId(indexId); + + // columns + List etlColumns = Lists.newArrayList(); + for (Column column : entry.getValue()) { + etlColumns.add(createEtlColumn(column)); + } + + // check distribution type + DistributionInfo distributionInfo = table.getDefaultDistributionInfo(); + if (distributionInfo.getType() != DistributionInfoType.HASH) { + // RANDOM not supported + String errMsg = "Unsupported distribution type. type: " + distributionInfo.getType().name(); + LOG.warn(errMsg); + throw new LoadException(errMsg); + } + + // index type + String indexType = null; + KeysType keysType = table.getKeysTypeByIndexId(indexId); + switch (keysType) { + case DUP_KEYS: + indexType = "DUPLICATE"; + break; + case AGG_KEYS: + indexType = "AGGREGATE"; + break; + case UNIQUE_KEYS: + indexType = "UNIQUE"; + break; + default: + String errMsg = "unknown keys type. type: " + keysType.name(); + LOG.warn(errMsg); + throw new LoadException(errMsg); + } + + // is base index + boolean isBaseIndex = indexId == table.getBaseIndexId() ? true : false; + + etlIndexes.add(new EtlIndex(indexId, etlColumns, schemaHash, indexType, isBaseIndex)); + } + + return etlIndexes; + } + + private EtlColumn createEtlColumn(Column column) { + // column name + String name = column.getName(); + // column type + PrimitiveType type = column.getDataType(); + String columnType = column.getDataType().toString(); + // is allow null + boolean isAllowNull = column.isAllowNull(); + // is key + boolean isKey = column.isKey(); + + // aggregation type + String aggregationType = null; + if (column.getAggregationType() != null) { + aggregationType = column.getAggregationType().toString(); + } + + // default value + String defaultValue = null; + if (column.getDefaultValue() != null) { + defaultValue = column.getDefaultValue(); + } + if (column.isAllowNull() && column.getDefaultValue() == null) { + defaultValue = "\\N"; + } + + // string length + int stringLength = 0; + if (type.isStringType()) { + stringLength = column.getStrLen(); + } + + // decimal precision scale + int precision = 0; + int scale = 0; + if (type.isDecimalType() || type.isDecimalV2Type()) { + precision = column.getPrecision(); + scale = column.getScale(); + } + + return new EtlColumn(name, columnType, isAllowNull, isKey, aggregationType, defaultValue, + stringLength, precision, scale); + } + + private EtlPartitionInfo createEtlPartitionInfo(OlapTable table, Set partitionIds) throws LoadException { + PartitionType type = table.getPartitionInfo().getType(); + + List partitionColumnRefs = Lists.newArrayList(); + List etlPartitions = Lists.newArrayList(); + if (type == PartitionType.RANGE) { + RangePartitionInfo rangePartitionInfo = (RangePartitionInfo) table.getPartitionInfo(); + for (Column column : rangePartitionInfo.getPartitionColumns()) { + partitionColumnRefs.add(column.getName()); + } + + for (Map.Entry> entry : rangePartitionInfo.getSortedRangeMap(false)) { + long partitionId = entry.getKey(); + if (!partitionIds.contains(partitionId)) { + continue; + } + + Partition partition = table.getPartition(partitionId); + if (partition == null) { + throw new LoadException("partition does not exist. id: " + partitionId); + } + + // bucket num + int bucketNum = partition.getDistributionInfo().getBucketNum(); + + // is max partition + Range range = entry.getValue(); + boolean isMaxPartition = range.upperEndpoint().isMaxValue(); + + // start keys + List rangeKeyExprs = range.lowerEndpoint().getKeys(); + List startKeys = Lists.newArrayList(); + for (int i = 0; i < rangeKeyExprs.size(); ++i) { + LiteralExpr literalExpr = rangeKeyExprs.get(i); + Object keyValue = literalExpr.getRealValue(); + startKeys.add(keyValue); + } + + // end keys + // is empty list when max partition + List endKeys = Lists.newArrayList(); + if (!isMaxPartition) { + rangeKeyExprs = range.upperEndpoint().getKeys(); + for (int i = 0; i < rangeKeyExprs.size(); ++i) { + LiteralExpr literalExpr = rangeKeyExprs.get(i); + Object keyValue = literalExpr.getRealValue(); + endKeys.add(keyValue); + } + } + + etlPartitions.add(new EtlPartition(partitionId, startKeys, endKeys, isMaxPartition, bucketNum)); + } + } else { + Preconditions.checkState(type == PartitionType.UNPARTITIONED); + Preconditions.checkState(partitionIds.size() == 1); + + for (Long partitionId : partitionIds) { + Partition partition = table.getPartition(partitionId); + if (partition == null) { + throw new LoadException("partition does not exist. id: " + partitionId); + } + + // bucket num + int bucketNum = partition.getDistributionInfo().getBucketNum(); + + etlPartitions.add(new EtlPartition(partitionId, Lists.newArrayList(), Lists.newArrayList(), + true, bucketNum)); + } + } + + // distribution column refs + List distributionColumnRefs = Lists.newArrayList(); + DistributionInfo distributionInfo = table.getDefaultDistributionInfo(); + Preconditions.checkState(distributionInfo.getType() == DistributionInfoType.HASH); + for (Column column : ((HashDistributionInfo) distributionInfo).getDistributionColumns()) { + distributionColumnRefs.add(column.getName()); + } + + return new EtlPartitionInfo(type.typeString, partitionColumnRefs, distributionColumnRefs, etlPartitions); + } + + private EtlFileGroup createEtlFileGroup(BrokerFileGroup fileGroup, Set tablePartitionIds, + Database db, OlapTable table) throws LoadException { + List copiedColumnExprList = Lists.newArrayList(fileGroup.getColumnExprList()); + Map exprByName = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); + for (ImportColumnDesc columnDesc : copiedColumnExprList) { + if (!columnDesc.isColumn()) { + exprByName.put(columnDesc.getColumnName(), columnDesc.getExpr()); + } + } + + // check columns + try { + Load.initColumns(table, copiedColumnExprList, fileGroup.getColumnToHadoopFunction()); + } catch (UserException e) { + throw new LoadException(e.getMessage()); + } + // add shadow column mapping when schema change + for (ImportColumnDesc columnDesc : Load.getSchemaChangeShadowColumnDesc(table, exprByName)) { + copiedColumnExprList.add(columnDesc); + exprByName.put(columnDesc.getColumnName(), columnDesc.getExpr()); + } + + // check negative for sum aggregate type + if (fileGroup.isNegative()) { + for (Column column : table.getBaseSchema()) { + if (!column.isKey() && column.getAggregationType() != AggregateType.SUM) { + throw new LoadException("Column is not SUM AggreateType. column:" + column.getName()); + } + } + } + + // fill file field names if empty + List fileFieldNames = fileGroup.getFileFieldNames(); + if (fileFieldNames == null || fileFieldNames.isEmpty()) { + fileFieldNames = Lists.newArrayList(); + for (Column column : table.getBaseSchema()) { + fileFieldNames.add(column.getName()); + } + } + + // column mappings + Map>> columnToHadoopFunction = fileGroup.getColumnToHadoopFunction(); + Map columnMappings = Maps.newHashMap(); + if (columnToHadoopFunction != null) { + for (Map.Entry>> entry : columnToHadoopFunction.entrySet()) { + columnMappings.put(entry.getKey(), + new EtlColumnMapping(entry.getValue().first, entry.getValue().second)); + } + } + for (ImportColumnDesc columnDesc : copiedColumnExprList) { + if (columnDesc.isColumn() || columnMappings.containsKey(columnDesc.getColumnName())) { + continue; + } + // the left must be column expr + columnMappings.put(columnDesc.getColumnName(), new EtlColumnMapping(columnDesc.getExpr().toSql())); + } + + // partition ids + List partitionIds = fileGroup.getPartitionIds(); + if (partitionIds == null || partitionIds.isEmpty()) { + partitionIds = Lists.newArrayList(tablePartitionIds); + } + + // where + // TODO: check + String where = ""; + if (fileGroup.getWhereExpr() != null) { + where = fileGroup.getWhereExpr().toSql(); + } + + // load from table + String hiveDbTableName = ""; + Map hiveTableProperties = Maps.newHashMap(); + if (fileGroup.isLoadFromTable()) { + long srcTableId = fileGroup.getSrcTableId(); + HiveTable srcHiveTable = (HiveTable) db.getTable(srcTableId); + if (srcHiveTable == null) { + throw new LoadException("table does not exist. id: " + srcTableId); + } + hiveDbTableName = srcHiveTable.getHiveDbTable(); + hiveTableProperties.putAll(srcHiveTable.getHiveProperties()); + } + + // check hll and bitmap func + // TODO: more check + for (Column column : table.getBaseSchema()) { + String columnName = column.getName(); + PrimitiveType columnType = column.getDataType(); + Expr expr = exprByName.get(columnName); + if (columnType == PrimitiveType.HLL) { + checkHllMapping(columnName, expr); + } + if (columnType == PrimitiveType.BITMAP) { + checkBitmapMapping(columnName, expr, fileGroup.isLoadFromTable()); + } + } + + EtlFileGroup etlFileGroup = null; + if (fileGroup.isLoadFromTable()) { + etlFileGroup = new EtlFileGroup(SourceType.HIVE, hiveDbTableName, hiveTableProperties, + fileGroup.isNegative(), columnMappings, where, partitionIds); + } else { + etlFileGroup = new EtlFileGroup(SourceType.FILE, fileGroup.getFilePaths(), fileFieldNames, + fileGroup.getColumnsFromPath(), fileGroup.getValueSeparator(), + fileGroup.getLineDelimiter(), fileGroup.isNegative(), + fileGroup.getFileFormat(), columnMappings, + where, partitionIds); + } + + return etlFileGroup; + } + + private void checkHllMapping(String columnName, Expr expr) throws LoadException { + if (expr == null) { + throw new LoadException("HLL column func is not assigned. column:" + columnName); + } + + String msg = "HLL column must use hll function, like " + columnName + "=hll_hash(xxx) or " + + columnName + "=hll_empty()"; + if (!(expr instanceof FunctionCallExpr)) { + throw new LoadException(msg); + } + FunctionCallExpr fn = (FunctionCallExpr) expr; + String functionName = fn.getFnName().getFunction(); + if (!functionName.equalsIgnoreCase("hll_hash") + && !functionName.equalsIgnoreCase("hll_empty")) { + throw new LoadException(msg); + } + } + + private void checkBitmapMapping(String columnName, Expr expr, boolean isLoadFromTable) throws LoadException { + if (expr == null) { + throw new LoadException("BITMAP column func is not assigned. column:" + columnName); + } + + String msg = "BITMAP column must use bitmap function, like " + columnName + "=to_bitmap(xxx) or " + + columnName + "=bitmap_hash() or " + columnName + "=bitmap_dict()"; + if (!(expr instanceof FunctionCallExpr)) { + throw new LoadException(msg); + } + FunctionCallExpr fn = (FunctionCallExpr) expr; + String functionName = fn.getFnName().getFunction(); + if (!functionName.equalsIgnoreCase("to_bitmap") + && !functionName.equalsIgnoreCase("bitmap_hash") + && !functionName.equalsIgnoreCase("bitmap_dict")) { + throw new LoadException(msg); + } + + if (functionName.equalsIgnoreCase("bitmap_dict") && !isLoadFromTable) { + throw new LoadException("Bitmap global dict should load data from hive table"); + } + } +} diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/SparkPendingTaskAttachment.java b/fe/src/main/java/org/apache/doris/load/loadv2/SparkPendingTaskAttachment.java new file mode 100644 index 00000000000000..311ca3bab70380 --- /dev/null +++ b/fe/src/main/java/org/apache/doris/load/loadv2/SparkPendingTaskAttachment.java @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load.loadv2; + +import org.apache.spark.launcher.SparkAppHandle; + +public class SparkPendingTaskAttachment extends TaskAttachment { + private SparkAppHandle handle; + private String appId; + private String outputPath; + + public SparkPendingTaskAttachment(long taskId) { + super(taskId); + } + + public SparkAppHandle getHandle() { + return handle; + } + + public String getAppId() { + return appId; + } + + public void setAppId(String appId) { + this.appId = appId; + } + + public void setHandle(SparkAppHandle handle) { + this.handle = handle; + } + + public String getOutputPath() { + return outputPath; + } + + public void setOutputPath(String outputPath) { + this.outputPath = outputPath; + } + + @Override + public String toString() { + return "SparkPendingTaskAttachment{" + + "appId='" + appId + '\'' + + ", outputPath='" + outputPath + '\'' + + '}'; + } +} diff --git a/fe/src/main/java/org/apache/doris/persist/EditLog.java b/fe/src/main/java/org/apache/doris/persist/EditLog.java index fa3ef9e425e02a..7f3da6c9f0af14 100644 --- a/fe/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/src/main/java/org/apache/doris/persist/EditLog.java @@ -53,6 +53,7 @@ import org.apache.doris.load.Load; import org.apache.doris.load.LoadErrorHub; import org.apache.doris.load.LoadJob; +import org.apache.doris.load.loadv2.LoadJob.LoadJobStateUpdateInfo; import org.apache.doris.load.loadv2.LoadJobFinalOperation; import org.apache.doris.load.routineload.RoutineLoadJob; import org.apache.doris.meta.MetaContext; @@ -684,6 +685,11 @@ public static void loadJournal(Catalog catalog, JournalEntity journal) { catalog.getLoadManager().replayEndLoadJob(operation); break; } + case OperationType.OP_UPDATE_LOAD_JOB: { + LoadJobStateUpdateInfo info = (LoadJobStateUpdateInfo) journal.getData(); + catalog.getLoadManager().replayUpdateLoadJobStateInfo(info); + break; + } case OperationType.OP_CREATE_RESOURCE: { final Resource resource = (Resource) journal.getData(); catalog.getResourceMgr().replayCreateResource(resource); @@ -1266,6 +1272,10 @@ public void logEndLoadJob(LoadJobFinalOperation loadJobFinalOperation) { logEdit(OperationType.OP_END_LOAD_JOB, loadJobFinalOperation); } + public void logUpdateLoadJob(LoadJobStateUpdateInfo info) { + logEdit(OperationType.OP_UPDATE_LOAD_JOB, info); + } + public void logCreateResource(Resource resource) { // TODO(wyb): spark-load //logEdit(OperationType.OP_CREATE_RESOURCE, resource); diff --git a/fe/src/main/java/org/apache/doris/persist/OperationType.java b/fe/src/main/java/org/apache/doris/persist/OperationType.java index d06fcba8d30eb2..01171e6a8e952a 100644 --- a/fe/src/main/java/org/apache/doris/persist/OperationType.java +++ b/fe/src/main/java/org/apache/doris/persist/OperationType.java @@ -156,7 +156,7 @@ public class OperationType { // this finish op include finished and cancelled public static final short OP_END_LOAD_JOB = 231; // update job info, used by spark load - //public static final short OP_UPDATE_LOAD_JOB = 232; + public static final short OP_UPDATE_LOAD_JOB = 232; // small files 251~260 public static final short OP_CREATE_SMALL_FILE = 251; diff --git a/fe/src/main/java/org/apache/doris/persist/gson/GsonUtils.java b/fe/src/main/java/org/apache/doris/persist/gson/GsonUtils.java index 10e1ad7f302086..8e9cbe2db83173 100644 --- a/fe/src/main/java/org/apache/doris/persist/gson/GsonUtils.java +++ b/fe/src/main/java/org/apache/doris/persist/gson/GsonUtils.java @@ -26,6 +26,8 @@ import org.apache.doris.catalog.RandomDistributionInfo; import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.SparkResource; +import org.apache.doris.load.loadv2.LoadJob.LoadJobStateUpdateInfo; +import org.apache.doris.load.loadv2.SparkLoadJob.SparkLoadJobStateUpdateInfo; import com.google.common.base.Preconditions; import com.google.common.collect.ArrayListMultimap; @@ -102,6 +104,12 @@ public class GsonUtils { .registerSubtype(RollupJobV2.class, RollupJobV2.class.getSimpleName()) .registerSubtype(SchemaChangeJobV2.class, SchemaChangeJobV2.class.getSimpleName()); + // runtime adapter for class "LoadJobStateUpdateInfo" + private static RuntimeTypeAdapterFactory loadJobStateUpdateInfoTypeAdapterFactory + = RuntimeTypeAdapterFactory + .of(LoadJobStateUpdateInfo.class, "clazz") + .registerSubtype(SparkLoadJobStateUpdateInfo.class, SparkLoadJobStateUpdateInfo.class.getSimpleName()); + // the builder of GSON instance. // Add any other adapters if necessary. private static final GsonBuilder GSON_BUILDER = new GsonBuilder() @@ -113,7 +121,8 @@ public class GsonUtils { .registerTypeAdapterFactory(columnTypeAdapterFactory) .registerTypeAdapterFactory(distributionInfoTypeAdapterFactory) .registerTypeAdapterFactory(resourceTypeAdapterFactory) - .registerTypeAdapterFactory(alterJobV2TypeAdapterFactory); + .registerTypeAdapterFactory(alterJobV2TypeAdapterFactory) + .registerTypeAdapterFactory(loadJobStateUpdateInfoTypeAdapterFactory); // this instance is thread-safe. public static final Gson GSON = GSON_BUILDER.create(); diff --git a/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java b/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java index 5df36ae9abc126..a4427eddfc4e3f 100644 --- a/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java +++ b/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java @@ -354,7 +354,7 @@ private void getFileStatusAndCalcInstance() throws UserException { for (BrokerFileGroup fileGroup : fileGroups) { List fileStatuses = Lists.newArrayList(); for (String path : fileGroup.getFilePaths()) { - BrokerUtil.parseBrokerFile(path, brokerDesc, fileStatuses); + BrokerUtil.parseFile(path, brokerDesc, fileStatuses); } fileStatusesList.add(fileStatuses); filesAdded += fileStatuses.size(); diff --git a/fe/src/test/java/org/apache/doris/common/util/BrokerUtilTest.java b/fe/src/test/java/org/apache/doris/common/util/BrokerUtilTest.java index 55a6e00c000d55..4be5d0fa7e5945 100644 --- a/fe/src/test/java/org/apache/doris/common/util/BrokerUtilTest.java +++ b/fe/src/test/java/org/apache/doris/common/util/BrokerUtilTest.java @@ -20,12 +20,44 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; +import org.apache.doris.analysis.BrokerDesc; +import org.apache.doris.catalog.BrokerMgr; +import org.apache.doris.catalog.Catalog; +import org.apache.doris.catalog.FsBroker; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.GenericPool; import org.apache.doris.common.UserException; +import org.apache.doris.thrift.TBrokerCloseReaderRequest; +import org.apache.doris.thrift.TBrokerCloseWriterRequest; +import org.apache.doris.thrift.TBrokerDeletePathRequest; +import org.apache.doris.thrift.TBrokerFD; +import org.apache.doris.thrift.TBrokerFileStatus; +import org.apache.doris.thrift.TBrokerListPathRequest; +import org.apache.doris.thrift.TBrokerListResponse; +import org.apache.doris.thrift.TBrokerOpenReaderRequest; +import org.apache.doris.thrift.TBrokerOpenReaderResponse; +import org.apache.doris.thrift.TBrokerOpenWriterRequest; +import org.apache.doris.thrift.TBrokerOpenWriterResponse; +import org.apache.doris.thrift.TBrokerOperationStatus; +import org.apache.doris.thrift.TBrokerOperationStatusCode; +import org.apache.doris.thrift.TBrokerPReadRequest; +import org.apache.doris.thrift.TBrokerPWriteRequest; +import org.apache.doris.thrift.TBrokerReadResponse; +import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TPaloBrokerService; import com.google.common.collect.Lists; - +import com.google.common.collect.Maps; +import mockit.Expectations; +import mockit.Injectable; +import mockit.Mock; +import mockit.Mocked; +import mockit.MockUp; +import org.apache.thrift.TException; +import org.junit.Assert; import org.junit.Test; +import java.io.UnsupportedEncodingException; import java.util.Collections; import java.util.List; @@ -122,4 +154,172 @@ public void parseColumnsFromPath() { } } -} + + @Test + public void testReadFile(@Mocked TPaloBrokerService.Client client, @Mocked Catalog catalog, + @Injectable BrokerMgr brokerMgr) + throws TException, UserException, UnsupportedEncodingException { + // list response + TBrokerListResponse listResponse = new TBrokerListResponse(); + TBrokerOperationStatus status = new TBrokerOperationStatus(); + status.statusCode = TBrokerOperationStatusCode.OK; + listResponse.opStatus = status; + List files = Lists.newArrayList(); + String filePath = "hdfs://127.0.0.1:10000/doris/jobs/1/label6/9/dpp_result.json"; + files.add(new TBrokerFileStatus(filePath, false, 10, false)); + listResponse.files = files; + + // open reader response + TBrokerOpenReaderResponse openReaderResponse = new TBrokerOpenReaderResponse(); + openReaderResponse.opStatus = status; + openReaderResponse.fd = new TBrokerFD(1, 2); + + // read response + String dppResultStr = "{'normal_rows': 10, 'abnormal_rows': 0, 'failed_reason': 'etl job failed'}"; + TBrokerReadResponse readResponse = new TBrokerReadResponse(); + readResponse.opStatus = status; + readResponse.setData(dppResultStr.getBytes("UTF-8")); + + FsBroker fsBroker = new FsBroker("127.0.0.1", 99999); + + new MockUp>() { + @Mock + public TPaloBrokerService.Client borrowObject(TNetworkAddress address) throws Exception { + return client; + } + + @Mock + public void returnObject(TNetworkAddress address, TPaloBrokerService.Client object) { + return; + } + + @Mock + public void invalidateObject(TNetworkAddress address, TPaloBrokerService.Client object) { + return; + } + }; + + new Expectations() { + { + catalog.getBrokerMgr(); + result = brokerMgr; + brokerMgr.getBroker(anyString, anyString); + result = fsBroker; + client.listPath((TBrokerListPathRequest) any); + result = listResponse; + client.openReader((TBrokerOpenReaderRequest) any); + result = openReaderResponse; + client.pread((TBrokerPReadRequest) any); + result = readResponse; + times = 1; + client.closeReader((TBrokerCloseReaderRequest) any); + result = status; + } + }; + + BrokerDesc brokerDesc = new BrokerDesc("broker0", Maps.newHashMap()); + byte[] data = BrokerUtil.readFile(filePath, brokerDesc); + String readStr = new String(data, "UTF-8"); + Assert.assertEquals(dppResultStr, readStr); + } + + @Test + public void testWriteFile(@Mocked TPaloBrokerService.Client client, @Mocked Catalog catalog, + @Injectable BrokerMgr brokerMgr) + throws TException, UserException, UnsupportedEncodingException { + // open writer response + TBrokerOpenWriterResponse openWriterResponse = new TBrokerOpenWriterResponse(); + TBrokerOperationStatus status = new TBrokerOperationStatus(); + status.statusCode = TBrokerOperationStatusCode.OK; + openWriterResponse.opStatus = status; + openWriterResponse.fd = new TBrokerFD(1, 2); + FsBroker fsBroker = new FsBroker("127.0.0.1", 99999); + + new MockUp>() { + @Mock + public TPaloBrokerService.Client borrowObject(TNetworkAddress address) throws Exception { + return client; + } + + @Mock + public void returnObject(TNetworkAddress address, TPaloBrokerService.Client object) { + return; + } + + @Mock + public void invalidateObject(TNetworkAddress address, TPaloBrokerService.Client object) { + return; + } + }; + + new Expectations() { + { + catalog.getBrokerMgr(); + result = brokerMgr; + brokerMgr.getBroker(anyString, anyString); + result = fsBroker; + client.openWriter((TBrokerOpenWriterRequest) any); + result = openWriterResponse; + client.pwrite((TBrokerPWriteRequest) any); + result = status; + times = 1; + client.closeWriter((TBrokerCloseWriterRequest) any); + result = status; + } + }; + + BrokerDesc brokerDesc = new BrokerDesc("broker0", Maps.newHashMap()); + byte[] configs = "{'label': 'label0'}".getBytes("UTF-8"); + String destFilePath = "hdfs://127.0.0.1:10000/doris/jobs/1/label6/9/configs/jobconfig.json"; + try { + BrokerUtil.writeFile(configs, destFilePath, brokerDesc); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } + } + + @Test + public void testDeletePath(@Mocked TPaloBrokerService.Client client, @Mocked Catalog catalog, + @Injectable BrokerMgr brokerMgr) throws AnalysisException, TException { + // delete response + TBrokerOperationStatus status = new TBrokerOperationStatus(); + status.statusCode = TBrokerOperationStatusCode.OK; + FsBroker fsBroker = new FsBroker("127.0.0.1", 99999); + + new MockUp>() { + @Mock + public TPaloBrokerService.Client borrowObject(TNetworkAddress address) throws Exception { + return client; + } + + @Mock + public void returnObject(TNetworkAddress address, TPaloBrokerService.Client object) { + return; + } + + @Mock + public void invalidateObject(TNetworkAddress address, TPaloBrokerService.Client object) { + return; + } + }; + + new Expectations() { + { + catalog.getBrokerMgr(); + result = brokerMgr; + brokerMgr.getBroker(anyString, anyString); + result = fsBroker; + client.deletePath((TBrokerDeletePathRequest) any); + result = status; + times = 1; + } + }; + + try { + BrokerDesc brokerDesc = new BrokerDesc("broker0", Maps.newHashMap()); + BrokerUtil.deletePath("hdfs://127.0.0.1:10000/doris/jobs/1/label6/9", brokerDesc); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } + } +} \ No newline at end of file diff --git a/fe/src/test/java/org/apache/doris/load/loadv2/BrokerLoadPendingTaskTest.java b/fe/src/test/java/org/apache/doris/load/loadv2/BrokerLoadPendingTaskTest.java index 9b15819dd9e6cf..6e9f81e3fca424 100644 --- a/fe/src/test/java/org/apache/doris/load/loadv2/BrokerLoadPendingTaskTest.java +++ b/fe/src/test/java/org/apache/doris/load/loadv2/BrokerLoadPendingTaskTest.java @@ -64,7 +64,7 @@ public void testExecuteTask(@Injectable BrokerLoadJob brokerLoadJob, }; new MockUp() { @Mock - public void parseBrokerFile(String path, BrokerDesc brokerDesc, List fileStatuses) { + public void parseFile(String path, BrokerDesc brokerDesc, List fileStatuses) { fileStatuses.add(tBrokerFileStatus); } }; diff --git a/fe/src/test/java/org/apache/doris/load/loadv2/LoadJobTest.java b/fe/src/test/java/org/apache/doris/load/loadv2/LoadJobTest.java index dd6993223e2ff9..f324b76c9bc3df 100644 --- a/fe/src/test/java/org/apache/doris/load/loadv2/LoadJobTest.java +++ b/fe/src/test/java/org/apache/doris/load/loadv2/LoadJobTest.java @@ -25,6 +25,7 @@ import org.apache.doris.common.DdlException; import org.apache.doris.common.DuplicatedRequestException; import org.apache.doris.common.LabelAlreadyUsedException; +import org.apache.doris.common.LoadException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.jmockit.Deencapsulation; import org.apache.doris.metric.LongCounterMetric; @@ -122,7 +123,11 @@ public void testExecute(@Mocked GlobalTransactionMgr globalTransactionMgr, } }; - loadJob.execute(); + try { + loadJob.execute(); + } catch (LoadException e) { + Assert.fail(e.getMessage()); + } Assert.assertEquals(JobState.LOADING, loadJob.getState()); Assert.assertEquals(1, loadJob.getTransactionId()); diff --git a/fe/src/test/java/org/apache/doris/load/loadv2/SparkLoadPendingTaskTest.java b/fe/src/test/java/org/apache/doris/load/loadv2/SparkLoadPendingTaskTest.java new file mode 100644 index 00000000000000..374bbb280a8266 --- /dev/null +++ b/fe/src/test/java/org/apache/doris/load/loadv2/SparkLoadPendingTaskTest.java @@ -0,0 +1,326 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load.loadv2; + +import org.apache.doris.analysis.BrokerDesc; +import org.apache.doris.analysis.DataDescription; +import org.apache.doris.analysis.PartitionKeyDesc; +import org.apache.doris.analysis.PartitionValue; +import org.apache.doris.analysis.SingleRangePartitionDesc; +import org.apache.doris.catalog.AggregateType; +import org.apache.doris.catalog.Catalog; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.DistributionInfo; +import org.apache.doris.catalog.HashDistributionInfo; +import org.apache.doris.catalog.KeysType; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Partition; +import org.apache.doris.catalog.PartitionInfo; +import org.apache.doris.catalog.RangePartitionInfo; +import org.apache.doris.catalog.ScalarType; +import org.apache.doris.catalog.SinglePartitionInfo; +import org.apache.doris.catalog.SparkResource; +import org.apache.doris.catalog.Type; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.LoadException; +import org.apache.doris.common.jmockit.Deencapsulation; +import org.apache.doris.load.BrokerFileGroup; +import org.apache.doris.load.BrokerFileGroupAggInfo; +import org.apache.doris.load.loadv2.etl.EtlJobConfig; +import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlFileGroup; +import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlIndex; +import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlPartition; +import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlPartitionInfo; +import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlTable; + +import org.junit.Assert; +import org.junit.Test; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import mockit.Expectations; +import mockit.Injectable; +import mockit.Mock; +import mockit.MockUp; +import mockit.Mocked; + +import java.util.List; +import java.util.Map; + +public class SparkLoadPendingTaskTest { + + @Test + public void testExecuteTask(@Injectable SparkLoadJob sparkLoadJob, + @Injectable SparkResource resource, + @Injectable BrokerDesc brokerDesc, + @Mocked Catalog catalog, + @Injectable Database database, + @Injectable OlapTable table) throws LoadException { + long dbId = 0L; + long tableId = 1L; + + // columns + List columns = Lists.newArrayList(); + columns.add(new Column("c1", Type.BIGINT, true, null, false, null, "")); + + // indexes + Map> indexIdToSchema = Maps.newHashMap(); + long indexId = 3L; + indexIdToSchema.put(indexId, columns); + + // partition and distribution infos + long partitionId = 2L; + DistributionInfo distributionInfo = new HashDistributionInfo(2, Lists.newArrayList(columns.get(0))); + PartitionInfo partitionInfo = new SinglePartitionInfo(); + Partition partition = new Partition(partitionId, "p1", null, distributionInfo); + List partitions = Lists.newArrayList(partition); + + // file group + Map> aggKeyToFileGroups = Maps.newHashMap(); + List brokerFileGroups = Lists.newArrayList(); + DataDescription desc = new DataDescription("testTable", null, Lists.newArrayList("abc.txt"), + null, null, null, false, null); + BrokerFileGroup brokerFileGroup = new BrokerFileGroup(desc); + brokerFileGroups.add(brokerFileGroup); + BrokerFileGroupAggInfo.FileGroupAggKey aggKey = new BrokerFileGroupAggInfo.FileGroupAggKey(tableId, null); + aggKeyToFileGroups.put(aggKey, brokerFileGroups); + + new Expectations() { + { + catalog.getDb(dbId); + result = database; + database.getTable(tableId); + result = table; + table.getPartitions(); + result = partitions; + table.getIndexIdToSchema(); + result = indexIdToSchema; + table.getDefaultDistributionInfo(); + result = distributionInfo; + table.getSchemaHashByIndexId(indexId); + result = 123; + table.getPartitionInfo(); + result = partitionInfo; + table.getPartition(partitionId); + result = partition; + table.getKeysTypeByIndexId(indexId); + result = KeysType.DUP_KEYS; + table.getBaseIndexId(); + result = indexId; + } + }; + + String appId = "application_15888888888_0088"; + new MockUp() { + @Mock + public void submitEtlJob(long loadJobId, String loadLabel, EtlJobConfig etlJobConfig, + SparkResource resource, BrokerDesc brokerDesc, + SparkPendingTaskAttachment attachment) throws LoadException { + attachment.setAppId(appId); + } + }; + + SparkLoadPendingTask task = new SparkLoadPendingTask(sparkLoadJob, aggKeyToFileGroups, resource, brokerDesc); + task.init(); + SparkPendingTaskAttachment attachment = Deencapsulation.getField(task, "attachment"); + Assert.assertEquals(null, attachment.getAppId()); + task.executeTask(); + Assert.assertEquals(appId, attachment.getAppId()); + } + + @Test(expected = LoadException.class) + public void testNoDb(@Injectable SparkLoadJob sparkLoadJob, + @Injectable SparkResource resource, + @Injectable BrokerDesc brokerDesc, + @Mocked Catalog catalog) throws LoadException { + long dbId = 0L; + + new Expectations() { + { + catalog.getDb(dbId); + result = null; + } + }; + + SparkLoadPendingTask task = new SparkLoadPendingTask(sparkLoadJob, null, resource, brokerDesc); + task.init(); + } + + @Test(expected = LoadException.class) + public void testNoTable(@Injectable SparkLoadJob sparkLoadJob, + @Injectable SparkResource resource, + @Injectable BrokerDesc brokerDesc, + @Mocked Catalog catalog, + @Injectable Database database) throws LoadException { + long dbId = 0L; + long tableId = 1L; + + Map> aggKeyToFileGroups = Maps.newHashMap(); + List brokerFileGroups = Lists.newArrayList(); + DataDescription desc = new DataDescription("testTable", null, Lists.newArrayList("abc.txt"), + null, null, null, false, null); + BrokerFileGroup brokerFileGroup = new BrokerFileGroup(desc); + brokerFileGroups.add(brokerFileGroup); + BrokerFileGroupAggInfo.FileGroupAggKey aggKey = new BrokerFileGroupAggInfo.FileGroupAggKey(tableId, null); + aggKeyToFileGroups.put(aggKey, brokerFileGroups); + + new Expectations() { + { + catalog.getDb(dbId); + result = database; + database.getTable(tableId); + result = null; + } + }; + + SparkLoadPendingTask task = new SparkLoadPendingTask(sparkLoadJob, aggKeyToFileGroups, resource, brokerDesc); + task.init(); + } + + @Test + public void testRangePartitionHashDistribution(@Injectable SparkLoadJob sparkLoadJob, + @Injectable SparkResource resource, + @Injectable BrokerDesc brokerDesc, + @Mocked Catalog catalog, + @Injectable Database database, + @Injectable OlapTable table) throws LoadException, DdlException, AnalysisException { + long dbId = 0L; + long tableId = 1L; + + // c1 is partition column, c2 is distribution column + List columns = Lists.newArrayList(); + columns.add(new Column("c1", Type.INT, true, null, false, null, "")); + columns.add(new Column("c2", ScalarType.createVarchar(10), true, null, false, null, "")); + columns.add(new Column("c3", Type.INT, false, AggregateType.SUM, false, null, "")); + + // indexes + Map> indexIdToSchema = Maps.newHashMap(); + long index1Id = 3L; + indexIdToSchema.put(index1Id, columns); + long index2Id = 4L; + indexIdToSchema.put(index2Id, Lists.newArrayList(columns.get(0), columns.get(2))); + + // partition and distribution info + long partition1Id = 2L; + long partition2Id = 5L; + int distributionColumnIndex = 1; + DistributionInfo distributionInfo = new HashDistributionInfo(3, Lists.newArrayList(columns.get(distributionColumnIndex))); + Partition partition1 = new Partition(partition1Id, "p1", null, + distributionInfo); + Partition partition2 = new Partition(partition2Id, "p2", null, + new HashDistributionInfo(4, Lists.newArrayList(columns.get(distributionColumnIndex)))); + int partitionColumnIndex = 0; + List partitions = Lists.newArrayList(partition1, partition2); + RangePartitionInfo partitionInfo = new RangePartitionInfo(Lists.newArrayList(columns.get(partitionColumnIndex))); + PartitionKeyDesc partitionKeyDesc1 = new PartitionKeyDesc(Lists.newArrayList(new PartitionValue("10"))); + SingleRangePartitionDesc partitionDesc1 = new SingleRangePartitionDesc(false, "p1", partitionKeyDesc1, null); + partitionDesc1.analyze(1, null); + partitionInfo.handleNewSinglePartitionDesc(partitionDesc1, partition1Id, false); + PartitionKeyDesc partitionKeyDesc2 = new PartitionKeyDesc(Lists.newArrayList(new PartitionValue("20"))); + SingleRangePartitionDesc partitionDesc2 = new SingleRangePartitionDesc(false, "p2", partitionKeyDesc2, null); + partitionDesc2.analyze(1, null); + partitionInfo.handleNewSinglePartitionDesc(partitionDesc2, partition2Id, false); + + // file group + Map> aggKeyToFileGroups = Maps.newHashMap(); + List brokerFileGroups = Lists.newArrayList(); + DataDescription desc = new DataDescription("testTable", null, Lists.newArrayList("abc.txt"), + null, null, null, false, null); + BrokerFileGroup brokerFileGroup = new BrokerFileGroup(desc); + brokerFileGroups.add(brokerFileGroup); + BrokerFileGroupAggInfo.FileGroupAggKey aggKey = new BrokerFileGroupAggInfo.FileGroupAggKey(tableId, null); + aggKeyToFileGroups.put(aggKey, brokerFileGroups); + + new Expectations() { + { + catalog.getDb(dbId); + result = database; + database.getTable(tableId); + result = table; + table.getPartitions(); + result = partitions; + table.getIndexIdToSchema(); + result = indexIdToSchema; + table.getDefaultDistributionInfo(); + result = distributionInfo; + table.getSchemaHashByIndexId(index1Id); + result = 123; + table.getSchemaHashByIndexId(index2Id); + result = 234; + table.getPartitionInfo(); + result = partitionInfo; + table.getPartition(partition1Id); + result = partition1; + table.getPartition(partition2Id); + result = partition2; + table.getKeysTypeByIndexId(index1Id); + result = KeysType.AGG_KEYS; + table.getKeysTypeByIndexId(index2Id); + result = KeysType.AGG_KEYS; + table.getBaseIndexId(); + result = index1Id; + } + }; + + SparkLoadPendingTask task = new SparkLoadPendingTask(sparkLoadJob, aggKeyToFileGroups, resource, brokerDesc); + EtlJobConfig etlJobConfig = Deencapsulation.getField(task, "etlJobConfig"); + Assert.assertEquals(null, etlJobConfig); + task.init(); + etlJobConfig = Deencapsulation.getField(task, "etlJobConfig"); + Assert.assertTrue(etlJobConfig != null); + + // check table id + Map idToEtlTable = etlJobConfig.tables; + Assert.assertEquals(1, idToEtlTable.size()); + Assert.assertTrue(idToEtlTable.containsKey(tableId)); + + // check indexes + EtlTable etlTable = idToEtlTable.get(tableId); + List etlIndexes = etlTable.indexes; + Assert.assertEquals(2, etlIndexes.size()); + Assert.assertEquals(index1Id, etlIndexes.get(0).indexId); + Assert.assertEquals(index2Id, etlIndexes.get(1).indexId); + + // check base index columns + EtlIndex baseIndex = etlIndexes.get(0); + Assert.assertTrue(baseIndex.isBaseIndex); + Assert.assertEquals(3, baseIndex.columns.size()); + for (int i = 0; i < columns.size(); i++) { + Assert.assertEquals(columns.get(i).getName(), baseIndex.columns.get(i).columnName); + } + Assert.assertEquals("AGGREGATE", baseIndex.indexType); + + // check partitions + EtlPartitionInfo etlPartitionInfo = etlTable.partitionInfo; + Assert.assertEquals("RANGE", etlPartitionInfo.partitionType); + List partitionColumns = etlPartitionInfo.partitionColumnRefs; + Assert.assertEquals(1, partitionColumns.size()); + Assert.assertEquals(columns.get(partitionColumnIndex).getName(), partitionColumns.get(0)); + List distributionColumns = etlPartitionInfo.distributionColumnRefs; + Assert.assertEquals(1, distributionColumns.size()); + Assert.assertEquals(columns.get(distributionColumnIndex).getName(), distributionColumns.get(0)); + List etlPartitions = etlPartitionInfo.partitions; + Assert.assertEquals(2, etlPartitions.size()); + + // check file group + List etlFileGroups = etlTable.fileGroups; + Assert.assertEquals(1, etlFileGroups.size()); + } +}