From f8f88522e7c30ec1584dd876e7532d72d679485c Mon Sep 17 00:00:00 2001 From: yangzhg Date: Tue, 15 Sep 2020 17:05:39 +0800 Subject: [PATCH 1/3] using stream load in multiload --- be/src/http/action/mini_load.cpp | 102 +++++++- be/src/http/action/mini_load.h | 4 +- fe/fe-core/pom.xml | 6 + .../org/apache/doris/analysis/BrokerDesc.java | 16 ++ .../doris/analysis/DataDescription.java | 95 +++++++ .../org/apache/doris/analysis/LoadStmt.java | 28 ++- .../org/apache/doris/catalog/BrokerMgr.java | 4 + .../apache/doris/common/proc/LoadProcDir.java | 3 + .../apache/doris/http/rest/LoadAction.java | 31 +-- .../apache/doris/http/rest/MultiCommit.java | 10 +- .../apache/doris/http/rest/MultiStart.java | 6 + .../doris/http/rest/RestBaseAction.java | 1 + .../doris/http/rest/RestBaseResult.java | 3 +- .../apache/doris/httpv2/rest/LoadAction.java | 39 ++- .../apache/doris/httpv2/rest/MultiAction.java | 15 +- .../apache/doris/load/BrokerFileGroup.java | 66 ++++- .../main/java/org/apache/doris/load/Load.java | 50 ++-- .../load/loadv2/BrokerLoadPendingTask.java | 84 ++++--- .../apache/doris/load/loadv2/LoadManager.java | 22 +- .../apache/doris/planner/BrokerScanNode.java | 122 ++++++--- .../doris/planner/MultiLoadScanNode.java | 39 +++ .../doris/planner/StreamLoadScanNode.java | 2 +- .../org/apache/doris/qe/MultiLoadMgr.java | 234 +++++++++++++++--- .../org/apache/doris/qe/MultiLoadMgrTest.java | 45 +++- gensrc/thrift/FrontendService.thrift | 1 + 25 files changed, 840 insertions(+), 188 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/planner/MultiLoadScanNode.java diff --git a/be/src/http/action/mini_load.cpp b/be/src/http/action/mini_load.cpp index e7a9f3eaa7dbae..e4b0ac232b3519 100644 --- a/be/src/http/action/mini_load.cpp +++ b/be/src/http/action/mini_load.cpp @@ -57,6 +57,7 @@ #include "util/file_utils.h" #include "util/json_util.h" #include "util/string_parser.hpp" +#include "util/string_util.h" #include "util/thrift_rpc_helper.h" #include "util/time.h" #include "util/url_coding.h" @@ -155,10 +156,12 @@ Status MiniLoadAction::data_saved_dir(const LoadHandle& desc, const std::string& } Status MiniLoadAction::_load(HttpRequest* http_req, const std::string& file_path, - const std::string& user, const std::string& cluster) { + const std::string& user, const std::string& cluster, + int64_t file_size) { // Prepare request parameters. std::map params(http_req->query_params().begin(), http_req->query_params().end()); + RETURN_IF_ERROR(_merge_header(http_req, ¶ms)); params.erase(LABEL_KEY); params.erase(SUB_LABEL_KEY); @@ -191,11 +194,12 @@ Status MiniLoadAction::_load(HttpRequest* http_req, const std::string& file_path } req.__set_properties(params); req.files.push_back(file_path); + req.__isset.file_size = true; + req.file_size.push_back(file_size); req.backend.__set_hostname(BackendOptions::get_localhost()); req.backend.__set_port(config::be_port); req.__set_timestamp(GetCurrentTimeMicros()); - try { client->miniLoad(res, req); } catch (apache::thrift::transport::TTransportException& e) { @@ -234,6 +238,100 @@ Status MiniLoadAction::_load(HttpRequest* http_req, const std::string& file_path return Status(res.status); } +Status MiniLoadAction::_merge_header(HttpRequest* http_req, std::map* params) { + if (http_req == nullptr || params == nullptr) { + return Status::OK(); + } + if (!http_req->header(HTTP_FORMAT_KEY).empty()) { + (*params)[HTTP_FORMAT_KEY] = http_req->header(HTTP_FORMAT_KEY); + } + if (!http_req->header(HTTP_COLUMNS).empty()) { + (*params)[HTTP_COLUMNS] = http_req->header(HTTP_COLUMNS); + } + if (!http_req->header(HTTP_WHERE).empty()) { + (*params)[HTTP_WHERE] = http_req->header(HTTP_WHERE); + } + if (!http_req->header(HTTP_COLUMN_SEPARATOR).empty()) { + (*params)[HTTP_COLUMN_SEPARATOR] = http_req->header(HTTP_COLUMN_SEPARATOR); + } + if (!http_req->header(HTTP_PARTITIONS).empty()) { + (*params)[HTTP_PARTITIONS] = http_req->header(HTTP_PARTITIONS); + if (!http_req->header(HTTP_TEMP_PARTITIONS).empty()) { + return Status::InvalidArgument("Can not specify both partitions and temporary partitions"); + } + } + if (!http_req->header(HTTP_TEMP_PARTITIONS).empty()) { + (*params)[HTTP_TEMP_PARTITIONS] = http_req->header(HTTP_TEMP_PARTITIONS); + if (!http_req->header(HTTP_PARTITIONS).empty()) { + return Status::InvalidArgument("Can not specify both partitions and temporary partitions"); + } + } + if (!http_req->header(HTTP_NEGATIVE).empty() && boost::iequals(http_req->header(HTTP_NEGATIVE), "true")) { + (*params)[HTTP_NEGATIVE] = "true"; + } else { + (*params)[HTTP_NEGATIVE] = "false"; + } + if (!http_req->header(HTTP_STRICT_MODE).empty()) { + if (boost::iequals(http_req->header(HTTP_STRICT_MODE), "false")) { + (*params)[HTTP_STRICT_MODE] = "false"; + } else if (boost::iequals(http_req->header(HTTP_STRICT_MODE), "true")) { + (*params)[HTTP_STRICT_MODE] = "true"; + } else { + return Status::InvalidArgument("Invalid strict mode format. Must be bool type"); + } + } + if (!http_req->header(HTTP_TIMEZONE).empty()) { + (*params)[HTTP_TIMEZONE] = http_req->header(HTTP_TIMEZONE); + } + if (!http_req->header(HTTP_EXEC_MEM_LIMIT).empty()) { + (*params)[HTTP_EXEC_MEM_LIMIT] = http_req->header(HTTP_EXEC_MEM_LIMIT); + } + if (!http_req->header(HTTP_JSONPATHS).empty()) { + (*params)[HTTP_JSONPATHS] = http_req->header(HTTP_JSONPATHS); + } + if (!http_req->header(HTTP_JSONROOT).empty()) { + (*params)[HTTP_JSONROOT] = http_req->header(HTTP_JSONROOT); + } + if (!http_req->header(HTTP_STRIP_OUTER_ARRAY).empty()) { + if (boost::iequals(http_req->header(HTTP_STRIP_OUTER_ARRAY), "true")) { + (*params)[HTTP_STRIP_OUTER_ARRAY] = "true"; + } else { + (*params)[HTTP_STRIP_OUTER_ARRAY] = "false"; + } + } else { + (*params)[HTTP_STRIP_OUTER_ARRAY] = "false"; + } + if (!http_req->header(HTTP_FUNCTION_COLUMN + "." + HTTP_SEQUENCE_COL).empty()) { + (*params)[HTTP_FUNCTION_COLUMN + "." + HTTP_SEQUENCE_COL] = http_req->header(HTTP_FUNCTION_COLUMN + "." + HTTP_SEQUENCE_COL); + } + if (params->find(HTTP_MERGE_TYPE) == params->end()) { + params->insert(std::make_pair(HTTP_MERGE_TYPE, "APPEND")); + } + StringCaseMap merge_type_map = { + { "APPEND", TMergeType::APPEND }, + { "DELETE", TMergeType::DELETE }, + { "MERGE", TMergeType::MERGE } + }; + if (!http_req->header(HTTP_MERGE_TYPE).empty()) { + std::string merge_type = http_req->header(HTTP_MERGE_TYPE); + auto it = merge_type_map.find(merge_type); + if (it != merge_type_map.end() ) { + (*params)[HTTP_MERGE_TYPE] = it->first; + } else { + return Status::InvalidArgument("Invalid merge type " + merge_type); + } + } + if (!http_req->header(HTTP_DELETE_CONDITION).empty()) { + if ((*params)[HTTP_MERGE_TYPE] == "MERGE") { + (*params)[HTTP_DELETE_CONDITION] = http_req->header(HTTP_DELETE_CONDITION); + } else { + return Status::InvalidArgument("not support delete when merge type is " + (*params)[HTTP_MERGE_TYPE] + "."); + } + } + return Status::OK(); +} + + static bool parse_auth(const std::string& auth, std::string* user, std::string* passwd, std::string* cluster) { std::string decoded_auth; diff --git a/be/src/http/action/mini_load.h b/be/src/http/action/mini_load.h index 66350221c660c1..ac3f292060c7ef 100644 --- a/be/src/http/action/mini_load.h +++ b/be/src/http/action/mini_load.h @@ -67,7 +67,7 @@ class MiniLoadAction : public HttpHandler { private: Status _load(HttpRequest* req, const std::string& file_path, const std::string& user, - const std::string& cluster); + const std::string& cluster, int64_t file_size); Status data_saved_dir(const LoadHandle& desc, const std::string& table, std::string* file_path); @@ -96,6 +96,8 @@ class MiniLoadAction : public HttpHandler { bool _is_streaming(HttpRequest* req); + Status _merge_header(HttpRequest* http_req, std::map* params); + const std::string _streaming_function_name = "STREAMING_MINI_LOAD"; ExecEnv* _exec_env; diff --git a/fe/fe-core/pom.xml b/fe/fe-core/pom.xml index 8b91f86ef03810..a8549bddad26de 100644 --- a/fe/fe-core/pom.xml +++ b/fe/fe-core/pom.xml @@ -569,6 +569,12 @@ under the License. 5.5.0 + + org.awaitility + awaitility + 4.0.3 + + diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/BrokerDesc.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/BrokerDesc.java index 8c561b47ee5149..eed2cce256730a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/BrokerDesc.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/BrokerDesc.java @@ -20,6 +20,7 @@ import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.PrintableMap; +import org.apache.doris.thrift.TFileType; import com.google.common.collect.Maps; @@ -37,11 +38,15 @@ // "password" = "password0" // ) public class BrokerDesc implements Writable { + // just for multi load + public final static String MULTI_LOAD_BROKER = "__DORIS_MULTI_LOAD_BROKER__"; + public final static String MULTI_LOAD_BROKER_BACKEND_KEY = "__DORIS_MULTI_LOAD_BROKER_BACKEND__"; private String name; private Map properties; // Only used for recovery private BrokerDesc() { + this.properties = Maps.newHashMap(); } public BrokerDesc(String name, Map properties) { @@ -60,6 +65,17 @@ public Map getProperties() { return properties; } + public boolean isMultiLoadBroker() { + return this.name.equalsIgnoreCase(MULTI_LOAD_BROKER); + } + + public TFileType getFileType() { + if (isMultiLoadBroker()) { + return TFileType.FILE_LOCAL; + } + return TFileType.FILE_BROKER; + } + @Override public void write(DataOutput out) throws IOException { Text.writeString(out, name); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java index c09b883bfae3d8..e1dabcd65e77c5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java @@ -29,6 +29,7 @@ import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; import org.apache.doris.common.Pair; +import org.apache.doris.common.util.SqlParserUtils; import org.apache.doris.load.loadv2.LoadTask; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; @@ -45,6 +46,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.io.StringReader; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -98,6 +100,9 @@ public class DataDescription { private final String fileFormat; private final boolean isNegative; + // this only used in multi load, all filePaths is file not dir + private List fileSize; + // column names of source files private List fileFieldNames; // column names in the path @@ -111,6 +116,11 @@ public class DataDescription { // Used for mini load private TNetworkAddress beAddr; private String lineDelimiter; + private String columnDef; + private long backendId; + private boolean stripOuterArray = false; + private String jsonPaths = ""; + private String jsonRoot = ""; private String sequenceCol; @@ -267,10 +277,54 @@ public String getSequenceCol() { return sequenceCol; } + public void setColumnDef(String columnDef) { + this.columnDef = columnDef; + } + public boolean hasSequenceCol() { return !Strings.isNullOrEmpty(sequenceCol); } + public List getFileSize() { + return fileSize; + } + + public void setFileSize(List fileSize) { + this.fileSize = fileSize; + } + + public long getBackendId() { + return backendId; + } + + public void setBackendId(long backendId) { + this.backendId = backendId; + } + + public boolean isStripOuterArray() { + return stripOuterArray; + } + + public void setStripOuterArray(boolean stripOuterArray) { + this.stripOuterArray = stripOuterArray; + } + + public String getJsonPaths() { + return jsonPaths; + } + + public void setJsonPaths(String jsonPaths) { + this.jsonPaths = jsonPaths; + } + + public String getJsonRoot() { + return jsonRoot; + } + + public void setJsonRoot(String jsonRoot) { + this.jsonRoot = jsonRoot; + } + @Deprecated public void addColumnMapping(String functionName, Pair> pair) { if (Strings.isNullOrEmpty(functionName) || pair == null) { @@ -391,6 +445,36 @@ private void analyzeColumns() throws AnalysisException { } } } + private void analyzeMultiLoadColumns() throws AnalysisException { + if (columnDef == null || columnDef.isEmpty()) { + return; + } + String columnsSQL = new String("COLUMNS (" + columnDef + ")"); + SqlParser parser = new SqlParser(new SqlScanner(new StringReader(columnsSQL))); + ImportColumnsStmt columnsStmt; + try { + columnsStmt = (ImportColumnsStmt) SqlParserUtils.getFirstStmt(parser); + } catch (Error e) { + LOG.warn("error happens when parsing columns, sql={}", columnsSQL, e); + throw new AnalysisException("failed to parsing columns' header, maybe contain unsupported character"); + } catch (AnalysisException e) { + LOG.warn("analyze columns' statement failed, sql={}, error={}", + columnsSQL, parser.getErrorMsg(columnsSQL), e); + String errorMessage = parser.getErrorMsg(columnsSQL); + if (errorMessage == null) { + throw e; + } else { + throw new AnalysisException(errorMessage, e); + } + } catch (Exception e) { + LOG.warn("failed to parse columns header, sql={}", columnsSQL, e); + throw new AnalysisException("parse columns header failed", e); + } + + if (columnsStmt.getColumns() != null && !columnsStmt.getColumns().isEmpty()) { + parsedColumnExprList = columnsStmt.getColumns(); + } + } private void analyzeColumnToHadoopFunction(String columnName, Expr child1) throws AnalysisException { Preconditions.checkState(child1 instanceof FunctionCallExpr); @@ -705,7 +789,18 @@ public void analyzeWithoutCheckPriv(String fullDbName) throws AnalysisException } analyzeColumns(); + analyzeMultiLoadColumns(); analyzeSequenceCol(fullDbName); + if (mergeType == LoadTask.MergeType.MERGE) { + parsedColumnExprList.add(ImportColumnDesc.newDeleteSignImportColumnDesc(deleteCondition)); + } else if (mergeType == LoadTask.MergeType.DELETE) { + parsedColumnExprList.add(ImportColumnDesc.newDeleteSignImportColumnDesc(new IntLiteral(1))); + } + // add columnExpr for sequence column + if (hasSequenceCol()) { + parsedColumnExprList.add(new ImportColumnDesc(Column.SEQUENCE_COL, + new SlotRef(null, getSequenceCol()))); + } } /* diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java index 0fe8eaa7498ff6..97f3745209d9c7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java @@ -81,7 +81,7 @@ public class LoadStmt extends DdlStmt { private static final String VERSION = "version"; public static final String STRICT_MODE = "strict_mode"; public static final String TIMEZONE = "timezone"; - + // for load data from Baidu Object Store(BOS) public static final String BOS_ENDPOINT = "bos_endpoint"; public static final String BOS_ACCESSKEY = "bos_accesskey"; @@ -95,6 +95,24 @@ public class LoadStmt extends DdlStmt { public static final String KEY_IN_PARAM_LINE_DELIMITER = "line_delimiter"; public static final String KEY_IN_PARAM_PARTITIONS = "partitions"; public static final String KEY_IN_PARAM_FORMAT_TYPE = "format"; + + public static final String KEY_IN_PARAM_WHERE = "where"; + public static final String KEY_IN_PARAM_MAX_FILTER_RATIO = "max_filter_ratio"; + public static final String KEY_IN_PARAM_TIMEOUT = "timeout"; + public static final String KEY_IN_PARAM_TEMP_PARTITIONS = "temporary_partitions"; + public static final String KEY_IN_PARAM_NEGATIVE = "negative"; + public static final String KEY_IN_PARAM_STRICT_MODE = "strict_mode"; + public static final String KEY_IN_PARAM_TIMEZONE = "timezone"; + public static final String KEY_IN_PARAM_EXEC_MEM_LIMIT = "exec_mem_limit"; + public static final String KEY_IN_PARAM_JSONPATHS = "jsonpaths"; + public static final String KEY_IN_PARAM_JSONROOT = "json_root"; + public static final String KEY_IN_PARAM_STRIP_OUTER_ARRAY = "strip_outer_array"; + public static final String KEY_IN_PARAM_MERGE_TYPE = "merge_type"; + public static final String KEY_IN_PARAM_DELETE_CONDITION = "delete"; + public static final String KEY_IN_PARAM_FUNCTION_COLUMN = "function_column"; + public static final String KEY_IN_PARAM_SEQUENCE_COL = "sequence_col"; + public static final String KEY_IN_PARAM_BACKEND_ID = "backend_id"; + private final LabelName label; private final List dataDescriptions; private final BrokerDesc brokerDesc; @@ -102,6 +120,7 @@ public class LoadStmt extends DdlStmt { private final ResourceDesc resourceDesc; private final Map properties; private String user; + private EtlJobType etlJobType = EtlJobType.UNKNOWN; private String version = "v2"; @@ -317,7 +336,7 @@ public void analyze(Analyzer analyzer) throws UserException { } } else if (brokerDesc != null) { etlJobType = EtlJobType.BROKER; - } else { + } else if (etlJobType != EtlJobType.UNKNOWN) { // if cluster is null, use default hadoop cluster // if cluster is not null, use this hadoop cluster etlJobType = EtlJobType.HADOOP; @@ -345,6 +364,11 @@ public String getVersion() { return version; } + public void setEtlJobType(EtlJobType etlJobType) { + this.etlJobType = etlJobType; + } + + @Override public String toSql() { StringBuilder sb = new StringBuilder(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/BrokerMgr.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/BrokerMgr.java index f88f648ad8dafc..e1ec818c20b210 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/BrokerMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/BrokerMgr.java @@ -17,6 +17,7 @@ package org.apache.doris.catalog; +import org.apache.doris.analysis.BrokerDesc; import org.apache.doris.analysis.ModifyBrokerClause; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; @@ -125,6 +126,9 @@ public FsBroker getAnyBroker(String brokerName) { } public FsBroker getBroker(String brokerName, String host) throws AnalysisException { + if (brokerName.equalsIgnoreCase(BrokerDesc.MULTI_LOAD_BROKER)) { + return new FsBroker("127.0.0.1", 0); + } lock.lock(); try { ArrayListMultimap brokerAddsMap = brokersMap.get(brokerName); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/LoadProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/LoadProcDir.java index f08e2d59c6353a..f1a9c560d12390 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/LoadProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/LoadProcDir.java @@ -41,6 +41,9 @@ public class LoadProcDir implements ProcDirInterface { // label and state column index of result public static final int LABEL_INDEX = 1; public static final int STATE_INDEX = 2; + public static final int ERR_MSG_INDEX = 7; + public static final int URL_INDEX = 13; + public static final int JOB_DETAILS_INDEX = 14; private static final int LIMIT = 2000; diff --git a/fe/fe-core/src/main/java/org/apache/doris/http/rest/LoadAction.java b/fe/fe-core/src/main/java/org/apache/doris/http/rest/LoadAction.java index 0ef28a023dfac9..33a3910df63546 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/http/rest/LoadAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/http/rest/LoadAction.java @@ -106,32 +106,27 @@ public void executeWithoutPassword(BaseRequest request, BaseResponse response) t // check auth checkTblAuth(ConnectContext.get().getCurrentUserIdentity(), fullDbName, tableName, PrivPredicate.LOAD); + TNetworkAddress redirectAddr; if (!isStreamLoad && !Strings.isNullOrEmpty(request.getSingleParameter(SUB_LABEL_NAME_PARAM))) { // only multi mini load need to redirect to Master, because only Master has the info of table to // the Backend which the file exists. if (redirectToMaster(request, response)) { return; } - } - - // Choose a backend sequentially. - List backendIds = Catalog.getCurrentSystemInfo().seqChooseBackendIds(1, true, false, clusterName); - if (backendIds == null) { - throw new DdlException("No backend alive."); - } - - Backend backend = Catalog.getCurrentSystemInfo().getBackend(backendIds.get(0)); - if (backend == null) { - throw new DdlException("No backend alive."); - } - - TNetworkAddress redirectAddr = new TNetworkAddress(backend.getHost(), backend.getHttpPort()); + redirectAddr = execEnv.getMultiLoadMgr().redirectAddr(fullDbName, label); + } else { + // Choose a backend sequentially. + List backendIds = Catalog.getCurrentSystemInfo().seqChooseBackendIds(1, true, false, clusterName); + if (backendIds == null) { + throw new DdlException("No backend alive."); + } - if (!isStreamLoad) { - String subLabel = request.getSingleParameter(SUB_LABEL_NAME_PARAM); - if (!Strings.isNullOrEmpty(subLabel)) { - redirectAddr = execEnv.getMultiLoadMgr().redirectAddr(fullDbName, label, tableName, redirectAddr); + Backend backend = Catalog.getCurrentSystemInfo().getBackend(backendIds.get(0)); + if (backend == null) { + throw new DdlException("No backend alive."); } + + redirectAddr = new TNetworkAddress(backend.getHost(), backend.getHttpPort()); } LOG.info("redirect load action to destination={}, stream: {}, db: {}, tbl: {}, label: {}", diff --git a/fe/fe-core/src/main/java/org/apache/doris/http/rest/MultiCommit.java b/fe/fe-core/src/main/java/org/apache/doris/http/rest/MultiCommit.java index 55f976730bff7d..956c73dc3957f0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/http/rest/MultiCommit.java +++ b/fe/fe-core/src/main/java/org/apache/doris/http/rest/MultiCommit.java @@ -64,8 +64,14 @@ public void executeWithoutPassword(BaseRequest request, BaseResponse response) if (redirectToMaster(request, response)) { return; } - execEnv.getMultiLoadMgr().commit(fullDbName, label); - sendResult(request, response, RestBaseResult.getOk()); + RestBaseResult result = new RestBaseResult(); + try { + execEnv.getMultiLoadMgr().commit(fullDbName, label); + } catch (Exception e) { + result.msg = e.getMessage(); + result.status = ActionStatus.FAILED; + } + sendResult(request, response, result); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/http/rest/MultiStart.java b/fe/fe-core/src/main/java/org/apache/doris/http/rest/MultiStart.java index e8ed99328f43e1..da8581dbd0de1c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/http/rest/MultiStart.java +++ b/fe/fe-core/src/main/java/org/apache/doris/http/rest/MultiStart.java @@ -79,6 +79,12 @@ public void executeWithoutPassword(BaseRequest request, BaseResponse response) properties.put(key, value); } } + for (String key : keys) { + String value = request.getRequest().headers().get(key); + if (!Strings.isNullOrEmpty(value)) { + properties.put(key, value); + } + } execEnv.getMultiLoadMgr().startMulti(fullDbName, label, properties); sendResult(request, response, RestBaseResult.getOk()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/http/rest/RestBaseAction.java b/fe/fe-core/src/main/java/org/apache/doris/http/rest/RestBaseAction.java index b6477a71ad2d27..8fea6046b24e8f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/http/rest/RestBaseAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/http/rest/RestBaseAction.java @@ -55,6 +55,7 @@ public void handleRequest(BaseRequest request) throws Exception { execute(request, response); } catch (DdlException e) { if (e instanceof UnauthorizedException) { + response.appendContent(e.getMessage()); response.updateHeader(HttpHeaderNames.WWW_AUTHENTICATE.toString(), "Basic realm=\"\""); writeResponse(request, response, HttpResponseStatus.UNAUTHORIZED); } else { diff --git a/fe/fe-core/src/main/java/org/apache/doris/http/rest/RestBaseResult.java b/fe/fe-core/src/main/java/org/apache/doris/http/rest/RestBaseResult.java index d4a4a6f40a9e1f..63cd07cc02f6e6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/http/rest/RestBaseResult.java +++ b/fe/fe-core/src/main/java/org/apache/doris/http/rest/RestBaseResult.java @@ -18,6 +18,7 @@ package org.apache.doris.http.rest; import com.google.gson.Gson; +import com.google.gson.GsonBuilder; // Base restful result public class RestBaseResult { @@ -40,7 +41,7 @@ public static RestBaseResult getOk() { } public String toJson() { - Gson gson = new Gson(); + Gson gson = new GsonBuilder().disableHtmlEscaping().create(); return gson.toJson(this); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java index 696467cac5a77d..8ae128d5f1f191 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java @@ -111,6 +111,7 @@ private Object executeWithoutPassword(HttpServletRequest request, // check auth checkTblAuth(ConnectContext.get().getCurrentUserIdentity(), fullDbName, tableName, PrivPredicate.LOAD); + TNetworkAddress redirectAddr; if (!isStreamLoad && !Strings.isNullOrEmpty(request.getParameter(SUB_LABEL_NAME_PARAM))) { // only multi mini load need to redirect to Master, because only Master has the info of table to // the Backend which the file exists. @@ -118,30 +119,24 @@ private Object executeWithoutPassword(HttpServletRequest request, if (redirectView != null) { return redirectView; } - } - - // Choose a backend sequentially. - List backendIds = Catalog.getCurrentSystemInfo().seqChooseBackendIds(1, true, false, clusterName); - if (backendIds == null) { - return ResponseEntityBuilder.okWithCommonError("No backend alive."); - } - - Backend backend = Catalog.getCurrentSystemInfo().getBackend(backendIds.get(0)); - if (backend == null) { - return ResponseEntityBuilder.okWithCommonError("No backend alive."); - } - - TNetworkAddress redirectAddr = new TNetworkAddress(backend.getHost(), backend.getHttpPort()); + try { + redirectAddr = execEnv.getMultiLoadMgr().redirectAddr(fullDbName, label); + } catch (DdlException e) { + return ResponseEntityBuilder.okWithCommonError(e.getMessage()); + } + } else { + // Choose a backend sequentially. + List backendIds = Catalog.getCurrentSystemInfo().seqChooseBackendIds(1, true, false, clusterName); + if (backendIds == null) { + return ResponseEntityBuilder.okWithCommonError("No backend alive."); + } - if (!isStreamLoad) { - String subLabel = request.getParameter(SUB_LABEL_NAME_PARAM); - if (!Strings.isNullOrEmpty(subLabel)) { - try { - redirectAddr = execEnv.getMultiLoadMgr().redirectAddr(fullDbName, label, tableName, redirectAddr); - } catch (DdlException e) { - return ResponseEntityBuilder.okWithCommonError(e.getMessage()); - } + Backend backend = Catalog.getCurrentSystemInfo().getBackend(backendIds.get(0)); + if (backend == null) { + return ResponseEntityBuilder.okWithCommonError("No backend alive."); } + + redirectAddr = new TNetworkAddress(backend.getHost(), backend.getHttpPort()); } LOG.info("redirect load action to destination={}, stream: {}, db: {}, tbl: {}, label: {}", diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/MultiAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/MultiAction.java index 58afe595af7767..71b2b232765cdc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/MultiAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/MultiAction.java @@ -19,6 +19,7 @@ import org.apache.doris.analysis.LoadStmt; import org.apache.doris.common.DdlException; +import org.apache.doris.http.rest.RestBaseResult; import org.apache.doris.httpv2.entity.ResponseEntityBuilder; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; @@ -130,6 +131,12 @@ public Object multi_start( properties.put(key, value); } } + for (String key : keys) { + String value = request.getHeader(key); + if (!Strings.isNullOrEmpty(value)) { + properties.put(key, value); + } + } execEnv.getMultiLoadMgr().startMulti(fullDbName, label, properties); return ResponseEntityBuilder.ok(); } @@ -188,8 +195,12 @@ public Object multi_commit( if (redirectView != null) { return redirectView; } - - execEnv.getMultiLoadMgr().commit(fullDbName, label); + RestBaseResult result = new RestBaseResult(); + try { + execEnv.getMultiLoadMgr().commit(fullDbName, label); + } catch (Exception e) { + return ResponseEntityBuilder.okWithCommonError(e.getMessage()); + } return ResponseEntityBuilder.ok(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java b/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java index 88b6d8062c8d27..fc7df6bdc8b355 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java @@ -40,6 +40,7 @@ import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; import org.apache.doris.load.loadv2.LoadTask; +import org.apache.doris.thrift.TNetworkAddress; import com.google.common.base.Strings; import com.google.common.collect.Lists; @@ -71,6 +72,9 @@ public class BrokerFileGroup implements Writable { private List partitionIds; // can be null, means no partition specified private List filePaths; + // this only used in multi load, all filePaths is file not dir + private List fileSize; + private List fileFieldNames; private List columnsFromPath; // columnExprList includes all fileFieldNames, columnsFromPath and column mappings @@ -89,6 +93,13 @@ public class BrokerFileGroup implements Writable { private long srcTableId = -1; private boolean isLoadFromTable = false; + // for multi load + private TNetworkAddress beAddr; + private long backendID; + private boolean stripOuterArray = false; + private String jsonPaths = ""; + private String jsonRoot = ""; + // for unit test and edit log persistence private BrokerFileGroup() { } @@ -171,14 +182,18 @@ public void parse(Database db, DataDescription dataDescription) throws DdlExcept fileFormat = dataDescription.getFileFormat(); if (fileFormat != null) { - if (!fileFormat.toLowerCase().equals("parquet") && !fileFormat.toLowerCase().equals("csv") && !fileFormat.toLowerCase().equals("orc")) { - throw new DdlException("File Format Type "+fileFormat+" is invalid."); + if (!fileFormat.equalsIgnoreCase("parquet") + && !fileFormat.equalsIgnoreCase("csv") + && !fileFormat.equalsIgnoreCase("orc") + && !fileFormat.equalsIgnoreCase("json")) { + throw new DdlException("File Format Type " + fileFormat + " is invalid."); } } isNegative = dataDescription.isNegative(); // FilePath filePaths = dataDescription.getFilePaths(); + fileSize = dataDescription.getFileSize(); if (dataDescription.isLoadFromTable()) { String srcTableName = dataDescription.getSrcTableName(); @@ -206,6 +221,13 @@ public void parse(Database db, DataDescription dataDescription) throws DdlExcept srcTableId = srcTable.getId(); isLoadFromTable = true; } + beAddr = dataDescription.getBeAddr(); + backendID = dataDescription.getBackendId(); + if (fileFormat != null && fileFormat.equalsIgnoreCase("json")) { + stripOuterArray = dataDescription.isStripOuterArray(); + jsonPaths = dataDescription.getJsonPaths(); + jsonRoot = dataDescription.getJsonRoot(); + } } public long getTableId() { @@ -280,6 +302,46 @@ public boolean hasSequenceCol() { return !Strings.isNullOrEmpty(sequenceCol); } + public List getFileSize() { + return fileSize; + } + + public void setFileSize(List fileSize) { + this.fileSize = fileSize; + } + + public TNetworkAddress getBeAddr() { + return beAddr; + } + + public long getBackendID() { + return backendID; + } + + public boolean isStripOuterArray() { + return stripOuterArray; + } + + public void setStripOuterArray(boolean stripOuterArray) { + this.stripOuterArray = stripOuterArray; + } + + public String getJsonPaths() { + return jsonPaths; + } + + public void setJsonPaths(String jsonPaths) { + this.jsonPaths = jsonPaths; + } + + public String getJsonRoot() { + return jsonRoot; + } + + public void setJsonRoot(String jsonRoot) { + this.jsonRoot = jsonRoot; + } + public boolean isBinaryFileFormat() { if (fileFormat == null) { // null means default: csv diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java index d3d1f1cbf4e827..2e491b4c7116b4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java @@ -1481,6 +1481,19 @@ public void deregisterMiniLabel(String fullDbName, String label) throws DdlExcep } } + public boolean isUncommittedLabel(long dbId, String label) throws DdlException { + readLock(); + try { + if (dbToMiniLabels.containsKey(dbId)) { + Map uncommittedLabels = dbToMiniLabels.get(dbId); + return uncommittedLabels.containsKey(label); + } + } finally { + readUnlock(); + } + return false; + } + public boolean isLabelUsed(long dbId, String label) throws DdlException { readLock(); try { @@ -1530,27 +1543,32 @@ private boolean unprotectIsLabelUsed(long dbId, String label, long timestamp, bo // check dbToMiniLabel if (checkMini) { - if (dbToMiniLabels.containsKey(dbId)) { - Map uncommittedLabels = dbToMiniLabels.get(dbId); - if (uncommittedLabels.containsKey(label)) { - if (timestamp == -1) { - throw new LabelAlreadyUsedException(label); + return checkMultiLabelUsed(dbId, label, timestamp); + } + + return false; + } + + private boolean checkMultiLabelUsed(long dbId, String label, long timestamp) throws DdlException { + if (dbToMiniLabels.containsKey(dbId)) { + Map uncommittedLabels = dbToMiniLabels.get(dbId); + if (uncommittedLabels.containsKey(label)) { + if (timestamp == -1) { + throw new LabelAlreadyUsedException(label); + } else { + if (timestamp == uncommittedLabels.get(label)) { + // this timestamp is used to verify if this label check is a retry request from backend. + // if the timestamp in request is same as timestamp in existing load job, + // which means this load job is already submitted + LOG.info("get a retry mini load request with label: {}, timestamp: {}. return ok", + label, timestamp); + return true; } else { - if (timestamp == uncommittedLabels.get(label)) { - // this timestamp is used to verify if this label check is a retry request from backend. - // if the timestamp in request is same as timestamp in existing load job, - // which means this load job is already submitted - LOG.info("get a retry mini load request with label: {}, timestamp: {}. return ok", - label, timestamp); - return true; - } else { - throw new LabelAlreadyUsedException(label); - } + throw new LabelAlreadyUsedException(label); } } } } - return false; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java index f87b01fc659ec7..f82afe146cbb5d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java @@ -18,6 +18,7 @@ package org.apache.doris.load.loadv2; import org.apache.doris.analysis.BrokerDesc; +import org.apache.doris.common.AnalysisException; import org.apache.doris.common.UserException; import org.apache.doris.common.util.BrokerUtil; import org.apache.doris.common.util.LogBuilder; @@ -27,11 +28,11 @@ import org.apache.doris.load.FailMsg; import org.apache.doris.thrift.TBrokerFileStatus; +import com.google.common.collect.Lists; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import com.google.common.collect.Lists; - import java.util.List; import java.util.Map; @@ -43,8 +44,8 @@ public class BrokerLoadPendingTask extends LoadTask { private BrokerDesc brokerDesc; public BrokerLoadPendingTask(BrokerLoadJob loadTaskCallback, - Map> aggKeyToBrokerFileGroups, - BrokerDesc brokerDesc) { + Map> aggKeyToBrokerFileGroups, + BrokerDesc brokerDesc) { super(loadTaskCallback, TaskType.PENDING); this.retryTime = 3; this.attachment = new BrokerPendingTaskAttachment(signature); @@ -71,38 +72,57 @@ private void getAllFileStatus() throws UserException { long tableTotalFileSize = 0; int tableTotalFileNum = 0; int groupNum = 0; - for (BrokerFileGroup fileGroup : fileGroups) { - long groupFileSize = 0; - List fileStatuses = Lists.newArrayList(); - for (String path : fileGroup.getFilePaths()) { - BrokerUtil.parseFile(path, brokerDesc, fileStatuses); + if (brokerDesc.isMultiLoadBroker()) { + for (BrokerFileGroup fileGroup : fileGroups) { + if (fileGroup.getFilePaths().size() != fileGroup.getFileSize().size()) { + LOG.warn("Cannot get file size, file path count {}, file size count {}", + fileGroup.getFilePaths().size(), fileGroup.getFileSize().size()); + throw new AnalysisException("Cannot get file size."); + } + List fileStatuses = Lists.newArrayList(); + tableTotalFileNum += fileGroup.getFilePaths().size(); + for (int i = 0; i < fileGroup.getFilePaths().size(); i++) { + tableTotalFileSize += fileGroup.getFileSize().get(i); + TBrokerFileStatus fileStatus = new TBrokerFileStatus(fileGroup.getFilePaths().get(i), + false, fileGroup.getFileSize().get(i), false); + fileStatuses.add(fileStatus); + } + fileStatusList.add(fileStatuses); } - boolean isBinaryFileFormat = fileGroup.isBinaryFileFormat(); - List filteredFileStatuses = Lists.newArrayList(); - for (TBrokerFileStatus fstatus : fileStatuses) { - if (fstatus.getSize() == 0 && isBinaryFileFormat) { - // For parquet or orc file, if it is an empty file, ignore it. - // Because we can not read an empty parquet or orc file. - if (LOG.isDebugEnabled()) { - LOG.debug(new LogBuilder(LogKey.LOAD_JOB, callback.getCallbackId()) - .add("empty file", fstatus).build()); - } - } else { - groupFileSize += fstatus.size; - filteredFileStatuses.add(fstatus); - if (LOG.isDebugEnabled()) { - LOG.debug(new LogBuilder(LogKey.LOAD_JOB, callback.getCallbackId()) - .add("file_status", fstatus).build()); + } else { + for (BrokerFileGroup fileGroup : fileGroups) { + long groupFileSize = 0; + List fileStatuses = Lists.newArrayList(); + for (String path : fileGroup.getFilePaths()) { + BrokerUtil.parseFile(path, brokerDesc, fileStatuses); + } + boolean isBinaryFileFormat = fileGroup.isBinaryFileFormat(); + List filteredFileStatuses = Lists.newArrayList(); + for (TBrokerFileStatus fstatus : fileStatuses) { + if (fstatus.getSize() == 0 && isBinaryFileFormat) { + // For parquet or orc file, if it is an empty file, ignore it. + // Because we can not read an empty parquet or orc file. + if (LOG.isDebugEnabled()) { + LOG.debug(new LogBuilder(LogKey.LOAD_JOB, callback.getCallbackId()) + .add("empty file", fstatus).build()); + } + } else { + groupFileSize += fstatus.size; + filteredFileStatuses.add(fstatus); + if (LOG.isDebugEnabled()) { + LOG.debug(new LogBuilder(LogKey.LOAD_JOB, callback.getCallbackId()) + .add("file_status", fstatus).build()); + } } } + fileStatusList.add(filteredFileStatuses); + tableTotalFileSize += groupFileSize; + tableTotalFileNum += filteredFileStatuses.size(); + LOG.info("get {} files in file group {} for table {}. size: {}. job: {}, broker: {} ", + filteredFileStatuses.size(), groupNum, entry.getKey(), groupFileSize, + callback.getCallbackId(), BrokerUtil.getAddress(brokerDesc)); + groupNum++; } - fileStatusList.add(filteredFileStatuses); - tableTotalFileSize += groupFileSize; - tableTotalFileNum += filteredFileStatuses.size(); - LOG.info("get {} files in file group {} for table {}. size: {}. job: {}, broker: {} ", - filteredFileStatuses.size(), groupNum, entry.getKey(), groupFileSize, - callback.getCallbackId(), BrokerUtil.getAddress(brokerDesc)); - groupNum++; } totalFileSize += tableTotalFileSize; diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java index edfaadef06c25d..e345e66dc6ad31 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java @@ -102,13 +102,21 @@ public void createLoadJobFromStmt(LoadStmt stmt) throws DdlException { LoadJob loadJob = null; writeLock(); try { - checkLabelUsed(dbId, stmt.getLabel().getLabelName()); - if (stmt.getBrokerDesc() == null && stmt.getResourceDesc() == null) { - throw new DdlException("LoadManager only support the broker and spark load."); - } - if (loadJobScheduler.isQueueFull()) { - throw new DdlException("There are more than " + Config.desired_max_waiting_jobs + " load jobs in waiting queue, " - + "please retry later."); + if (stmt.getBrokerDesc().isMultiLoadBroker()) { + if (!Catalog.getCurrentCatalog().getLoadInstance() + .isUncommittedLabel(dbId, stmt.getLabel().getLabelName())) { + throw new DdlException("label: " + stmt.getLabel().getLabelName() + " not found!") ; + } + + } else { + checkLabelUsed(dbId, stmt.getLabel().getLabelName()); + if (stmt.getBrokerDesc() == null && stmt.getResourceDesc() == null) { + throw new DdlException("LoadManager only support the broker and spark load."); + } + if (loadJobScheduler.isQueueFull()) { + throw new DdlException("There are more than " + Config.desired_max_waiting_jobs + " load jobs in waiting queue, " + + "please retry later."); + } } loadJob = BulkLoadJob.fromLoadStmt(stmt); createLoadJob(loadJob); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java index c38df80e590a73..86c5bbfd2aebb3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java @@ -21,22 +21,19 @@ import org.apache.doris.analysis.BrokerDesc; import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.ImportColumnDesc; -import org.apache.doris.analysis.IntLiteral; import org.apache.doris.analysis.SlotDescriptor; -import org.apache.doris.analysis.SlotRef; import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.catalog.BrokerTable; import org.apache.doris.catalog.Catalog; -import org.apache.doris.catalog.Column; import org.apache.doris.catalog.FsBroker; import org.apache.doris.catalog.Table; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; +import org.apache.doris.common.DdlException; import org.apache.doris.common.UserException; import org.apache.doris.common.util.BrokerUtil; import org.apache.doris.load.BrokerFileGroup; import org.apache.doris.load.Load; -import org.apache.doris.load.loadv2.LoadTask; import org.apache.doris.system.Backend; import org.apache.doris.thrift.TBrokerFileStatus; import org.apache.doris.thrift.TBrokerRangeDesc; @@ -44,7 +41,6 @@ import org.apache.doris.thrift.TBrokerScanRangeParams; import org.apache.doris.thrift.TExplainLevel; import org.apache.doris.thrift.TFileFormatType; -import org.apache.doris.thrift.TFileType; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TScanRange; import org.apache.doris.thrift.TScanRangeLocation; @@ -223,16 +219,6 @@ private void initColumns(ParamCreateContext context) throws UserException { List columnExprs = Lists.newArrayList(); if (isLoad()) { columnExprs = context.fileGroup.getColumnExprList(); - if (mergeType == LoadTask.MergeType.MERGE) { - columnExprs.add(ImportColumnDesc.newDeleteSignImportColumnDesc(deleteCondition)); - } else if (mergeType == LoadTask.MergeType.DELETE) { - columnExprs.add(ImportColumnDesc.newDeleteSignImportColumnDesc(new IntLiteral(1))); - } - // add columnExpr for sequence column - if (context.fileGroup.hasSequenceCol()) { - columnExprs.add(new ImportColumnDesc(Column.SEQUENCE_COL, - new SlotRef(null, context.fileGroup.getSequenceCol()))); - } } Load.initColumns(targetTable, columnExprs, @@ -240,18 +226,30 @@ private void initColumns(ParamCreateContext context) throws UserException { context.tupleDescriptor, context.slotDescByName, context.params); } - private TScanRangeLocations newLocations(TBrokerScanRangeParams params, String brokerName) + private TScanRangeLocations newLocations(TBrokerScanRangeParams params, BrokerDesc brokerDesc) throws UserException { - Backend selectedBackend = backends.get(nextBe++); - nextBe = nextBe % backends.size(); + + Backend selectedBackend; + if (brokerDesc.isMultiLoadBroker()) { + if (!brokerDesc.getProperties().containsKey(BrokerDesc.MULTI_LOAD_BROKER_BACKEND_KEY)) { + throw new DdlException("backend not found for multi load."); + } + String backendId = brokerDesc.getProperties().get(BrokerDesc.MULTI_LOAD_BROKER_BACKEND_KEY); + selectedBackend = Catalog.getCurrentSystemInfo().getBackend(Long.valueOf(backendId)); + if (selectedBackend == null) { + throw new DdlException("backend " + backendId + " not found for multi load."); + } + } else { + selectedBackend = backends.get(nextBe++); + nextBe = nextBe % backends.size(); + } // Generate on broker scan range TBrokerScanRange brokerScanRange = new TBrokerScanRange(); brokerScanRange.setParams(params); - FsBroker broker = null; try { - broker = Catalog.getCurrentCatalog().getBrokerMgr().getBroker(brokerName, selectedBackend.getHost()); + broker = Catalog.getCurrentCatalog().getBrokerMgr().getBroker(brokerDesc.getName(), selectedBackend.getHost()); } catch (AnalysisException e) { throw new UserException(e.getMessage()); } @@ -273,6 +271,23 @@ private TScanRangeLocations newLocations(TBrokerScanRangeParams params, String b return locations; } + private TScanRangeLocations newLocations(TBrokerScanRangeParams params) + throws UserException { + // Generate on broker scan range + TBrokerScanRange brokerScanRange = new TBrokerScanRange(); + brokerScanRange.setParams(params); + + // Scan range + TScanRange scanRange = new TScanRange(); + scanRange.setBrokerScanRange(brokerScanRange); + + // Locations + TScanRangeLocations locations = new TScanRangeLocations(); + locations.setScanRange(scanRange); + + return locations; + } + private TBrokerScanRange brokerScanRange(TScanRangeLocations locations) { return locations.scan_range.broker_scan_range; } @@ -288,8 +303,14 @@ private void getFileStatusAndCalcInstance() throws UserException { for (BrokerFileGroup fileGroup : fileGroups) { boolean isBinaryFileFormat = fileGroup.isBinaryFileFormat(); List fileStatuses = Lists.newArrayList(); - for (String path : fileGroup.getFilePaths()) { - BrokerUtil.parseFile(path, brokerDesc, fileStatuses); + for (int i = 0; i < fileGroup.getFilePaths().size(); i++) { + if (brokerDesc.isMultiLoadBroker()) { + TBrokerFileStatus fileStatus = new TBrokerFileStatus(fileGroup.getFilePaths().get(i), + false, fileGroup.getFileSize().get(i), false); + fileStatuses.add(fileStatus); + } else { + BrokerUtil.parseFile(fileGroup.getFilePaths().get(i), brokerDesc, fileStatuses); + } } // only get non-empty file or non-binary file @@ -312,16 +333,20 @@ private void getFileStatusAndCalcInstance() throws UserException { totalBytes = 0; for (List fileStatuses : fileStatusesList) { - Collections.sort(fileStatuses, T_BROKER_FILE_STATUS_COMPARATOR); + if (!brokerDesc.isMultiLoadBroker()) { + Collections.sort(fileStatuses, T_BROKER_FILE_STATUS_COMPARATOR); + } for (TBrokerFileStatus fileStatus : fileStatuses) { totalBytes += fileStatus.size; } } - - numInstances = (int) (totalBytes / Config.min_bytes_per_broker_scanner); - numInstances = Math.min(backends.size(), numInstances); - numInstances = Math.min(numInstances, Config.max_broker_concurrency); - numInstances = Math.max(1, numInstances); + numInstances = 1; + if (!brokerDesc.isMultiLoadBroker()) { + numInstances = (int) (totalBytes / Config.min_bytes_per_broker_scanner); + numInstances = Math.min(backends.size(), numInstances); + numInstances = Math.min(numInstances, Config.max_broker_concurrency); + numInstances = Math.max(1, numInstances); + } bytesPerInstance = totalBytes / numInstances + 1; @@ -350,6 +375,10 @@ private TFileFormatType formatType(String fileFormat, String path) { return TFileFormatType.FORMAT_PARQUET; } else if (fileFormat.toLowerCase().equals("orc")) { return TFileFormatType.FORMAT_ORC; + } else if (fileFormat.toLowerCase().equals("json")) { + return TFileFormatType.FORMAT_JSON; + } else if (fileFormat.toLowerCase().equals("csv")) { + return TFileFormatType.FORMAT_CSV_PLAIN; } } @@ -379,8 +408,7 @@ private void processFileGroup( if (fileStatuses == null || fileStatuses.isEmpty()) { return; } - - TScanRangeLocations curLocations = newLocations(context.params, brokerDesc.getName()); + TScanRangeLocations curLocations = newLocations(context.params, brokerDesc); long curInstanceBytes = 0; long curFileOffset = 0; for (int i = 0; i < fileStatuses.size(); ) { @@ -393,15 +421,22 @@ private void processFileGroup( int numberOfColumnsFromFile = context.slotDescByName.size() - columnsFromPath.size(); if (tmpBytes > bytesPerInstance) { // Now only support split plain text - if (formatType == TFileFormatType.FORMAT_CSV_PLAIN && fileStatus.isSplitable) { + if ((formatType == TFileFormatType.FORMAT_CSV_PLAIN && fileStatus.isSplitable) + || formatType == TFileFormatType.FORMAT_JSON) { long rangeBytes = bytesPerInstance - curInstanceBytes; TBrokerRangeDesc rangeDesc = createBrokerRangeDesc(curFileOffset, fileStatus, formatType, - rangeBytes, columnsFromPath, numberOfColumnsFromFile); + rangeBytes, columnsFromPath, numberOfColumnsFromFile, brokerDesc); + if (formatType == TFileFormatType.FORMAT_JSON) { + rangeDesc.setStripOuterArray(context.fileGroup.isStripOuterArray()); + rangeDesc.setJsonpaths(context.fileGroup.getJsonPaths()); + rangeDesc.setJsonRoot(context.fileGroup.getJsonRoot()); + } brokerScanRange(curLocations).addToRanges(rangeDesc); curFileOffset += rangeBytes; + } else { TBrokerRangeDesc rangeDesc = createBrokerRangeDesc(curFileOffset, fileStatus, formatType, - leftBytes, columnsFromPath, numberOfColumnsFromFile); + leftBytes, columnsFromPath, numberOfColumnsFromFile, brokerDesc); brokerScanRange(curLocations).addToRanges(rangeDesc); curFileOffset = 0; i++; @@ -409,12 +444,17 @@ private void processFileGroup( // New one scan locationsList.add(curLocations); - curLocations = newLocations(context.params, brokerDesc.getName()); + curLocations = newLocations(context.params, brokerDesc); curInstanceBytes = 0; } else { TBrokerRangeDesc rangeDesc = createBrokerRangeDesc(curFileOffset, fileStatus, formatType, - leftBytes, columnsFromPath, numberOfColumnsFromFile); + leftBytes, columnsFromPath, numberOfColumnsFromFile, brokerDesc); + if (formatType == TFileFormatType.FORMAT_JSON) { + rangeDesc.setStripOuterArray(context.fileGroup.isStripOuterArray()); + rangeDesc.setJsonpaths(context.fileGroup.getJsonPaths()); + rangeDesc.setJsonRoot(context.fileGroup.getJsonRoot()); + } brokerScanRange(curLocations).addToRanges(rangeDesc); curFileOffset = 0; curInstanceBytes += leftBytes; @@ -430,9 +470,10 @@ private void processFileGroup( private TBrokerRangeDesc createBrokerRangeDesc(long curFileOffset, TBrokerFileStatus fileStatus, TFileFormatType formatType, long rangeBytes, - List columnsFromPath, int numberOfColumnsFromFile) { + List columnsFromPath, int numberOfColumnsFromFile, + BrokerDesc brokerDesc) { TBrokerRangeDesc rangeDesc = new TBrokerRangeDesc(); - rangeDesc.setFileType(TFileType.FILE_BROKER); + rangeDesc.setFileType(brokerDesc.getFileType()); rangeDesc.setFormatType(formatType); rangeDesc.setPath(fileStatus.path); rangeDesc.setSplittable(fileStatus.isSplitable); @@ -471,7 +512,8 @@ public void finalize(Analyzer analyzer) throws UserException { if (loadJobId != -1) { LOG.info("broker load job {} with txn {} has {} scan range: {}", loadJobId, txnId, locationsList.size(), - locationsList.stream().map(loc -> loc.locations.get(0).backend_id).toArray()); + brokerDesc.isMultiLoadBroker() ? "local" + : locationsList.stream().map(loc -> loc.locations.get(0).backend_id).toArray()); } } @@ -489,9 +531,7 @@ protected String getNodeExplainString(String prefix, TExplainLevel detailLevel) output.append(prefix).append("PATH: ") .append(Joiner.on(",").join(brokerTable.getPaths())).append("\",\n"); } - if (brokerDesc != null) { - output.append(prefix).append("BROKER: ").append(brokerDesc.getName()).append("\n"); - } + output.append(prefix).append("BROKER: ").append(brokerDesc.getName()).append("\n"); return output.toString(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/MultiLoadScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/MultiLoadScanNode.java new file mode 100644 index 00000000000000..a8f0495d62561f --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/MultiLoadScanNode.java @@ -0,0 +1,39 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner; + +import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.thrift.TScanRangeLocations; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.List; + +public class MultiLoadScanNode extends LoadScanNode { + private static final Logger LOG = LogManager.getLogger(MultiLoadScanNode.class); + + public MultiLoadScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName) { + super(id, desc, planNodeName); + } + + @Override + public List getScanRangeLocations(long maxScanRangeLength) { + return null; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java index 801c181305f2d4..c343a51a6efd63 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java @@ -140,7 +140,7 @@ public void init(Analyzer analyzer) throws UserException { } @Override - public void finalize(Analyzer analyzer) throws UserException, UserException { + public void finalize(Analyzer analyzer) throws UserException { finalizeParams(slotDescByName, exprsByName, brokerScanRange.params, srcTupleDesc, taskInfo.isStrictMode(), taskInfo.getNegative(), analyzer); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java b/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java index 05b4b72f14a834..b3e746e3a80211 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java @@ -17,15 +17,30 @@ package org.apache.doris.qe; +import org.apache.doris.analysis.Analyzer; +import org.apache.doris.analysis.BrokerDesc; import org.apache.doris.analysis.ColumnSeparator; import org.apache.doris.analysis.DataDescription; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.ImportWhereStmt; import org.apache.doris.analysis.LabelName; import org.apache.doris.analysis.LoadStmt; +import org.apache.doris.analysis.PartitionNames; +import org.apache.doris.analysis.SqlParser; +import org.apache.doris.analysis.SqlScanner; import org.apache.doris.catalog.Catalog; +import org.apache.doris.catalog.Database; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.LabelAlreadyUsedException; +import org.apache.doris.common.Pair; +import org.apache.doris.common.UserException; +import org.apache.doris.common.proc.LoadProcDir; +import org.apache.doris.common.util.SqlParserUtils; import org.apache.doris.load.EtlJobType; +import org.apache.doris.load.loadv2.LoadTask; +import org.apache.doris.system.Backend; import org.apache.doris.thrift.TMiniLoadRequest; import org.apache.doris.thrift.TNetworkAddress; @@ -33,16 +48,26 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import com.google.common.collect.Streams; +import org.apache.commons.collections.CollectionUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.util.Arrays; +import java.io.StringReader; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; + +import static org.apache.doris.load.loadv2.JobState.FINISHED; +import static org.apache.doris.load.loadv2.JobState.LOADING; +import static org.apache.doris.load.loadv2.JobState.PENDING; +import static org.awaitility.Awaitility.await; // Class used to record state of multi-load operation public class MultiLoadMgr { @@ -69,7 +94,14 @@ public void startMulti(String fullDbName, String label, Map prop if (infoMap.containsKey(multiLabel)) { throw new LabelAlreadyUsedException(label); } - infoMap.put(multiLabel, new MultiLoadDesc(multiLabel, properties)); + MultiLoadDesc multiLoadDesc = new MultiLoadDesc(multiLabel, properties); + List backendIds = Catalog.getCurrentSystemInfo().seqChooseBackendIds(1, + true, false, ConnectContext.get().getClusterName()); + if (backendIds == null) { + throw new DdlException("No backend alive."); + } + multiLoadDesc.setBackendId(backendIds.get(0)); + infoMap.put(multiLabel, multiLoadDesc); } finally { lock.writeLock().unlock(); } @@ -78,14 +110,21 @@ public void startMulti(String fullDbName, String label, Map prop } public void load(TMiniLoadRequest request) throws DdlException { + if (CollectionUtils.isNotEmpty(request.getFileSize()) + && request.getFileSize().size() != request.getFiles().size()) { + throw new DdlException("files count and file size count not match: [" + request.getFileSize().size() + + "!=" + request.getFiles().size()+"]"); + } + List> files = Streams.zip(request.getFiles().stream(), request.getFileSize().stream(), Pair::create) + .collect(Collectors.toList()); load(request.getDb(), request.getLabel(), request.getSubLabel(), request.getTbl(), - request.getFiles(), request.getBackend(), request.getProperties(), request.getTimestamp()); + files, request.getBackend(), request.getProperties(), request.getTimestamp()); } // Add one load job private void load(String fullDbName, String label, String subLabel, String table, - List files, + List> files, TNetworkAddress fileAddr, Map properties, long timestamp) throws DdlException { @@ -126,15 +165,38 @@ public void commit(String fullDbName, String label) throws DdlException { if (multiLoadDesc == null) { throw new DdlException("Unknown label(" + multiLabel + ")"); } - Catalog.getCurrentCatalog().getLoadInstance().addLoadJob( - multiLoadDesc.toLoadStmt(), - EtlJobType.MINI, - System.currentTimeMillis()); + Catalog.getCurrentCatalog().getLoadManager().createLoadJobFromStmt(multiLoadDesc.toLoadStmt()); infoMap.remove(multiLabel); } finally { lock.writeLock().unlock(); } Catalog.getCurrentCatalog().getLoadInstance().deregisterMiniLabel(fullDbName, label); + Catalog catalog = Catalog.getCurrentCatalog(); + Database db = catalog.getDb(fullDbName); + if (db == null) { + throw new DdlException("db: " + fullDbName + "not found!"); + } + long dbId = db.getId(); + ConnectContext ctx = ConnectContext.get(); + await().atMost(Config.broker_load_default_timeout_second, TimeUnit.SECONDS).until(() -> { + ConnectContext.threadLocalInfo.set(ctx); + List> loadInfos = catalog.getLoadManager().getLoadJobInfosByDb(dbId, label, true, + null); + if (loadInfos.size() != 1) { + throw new DdlException("label(" + label + ") can be only used once."); + } + List loadInfo = loadInfos.get(0); + String jobState = loadInfo.get(LoadProcDir.STATE_INDEX).toString(); + if (jobState.equals(FINISHED.name())) { + return true; + } else if (jobState.equals(PENDING.name()) || jobState.equals(LOADING.name())) { + return false; + } else { + throw new DdlException("job failed. ErrorMsg: " + loadInfo.get(LoadProcDir.ERR_MSG_INDEX).toString() + + ", URL: " + loadInfo.get(LoadProcDir.URL_INDEX).toString() + + ", JobDetails: " + loadInfo.get(LoadProcDir.JOB_DETAILS_INDEX).toString()); + } + }); } // Abort a in-progress multi-load job @@ -184,8 +246,7 @@ public void list(String fullDbName, List labels) throws DdlException { } } - public TNetworkAddress redirectAddr(String fullDbName, String label, String tbl, TNetworkAddress defaultAddr) - throws DdlException { + public TNetworkAddress redirectAddr(String fullDbName, String label) throws DdlException { LabelName multiLabel = new LabelName(fullDbName, label); lock.writeLock().lock(); try { @@ -193,7 +254,8 @@ public TNetworkAddress redirectAddr(String fullDbName, String label, String tbl, if (desc == null) { throw new DdlException("Unknown multiLabel(" + multiLabel + ")"); } - return desc.getHost(tbl, defaultAddr); + Backend backend = Catalog.getCurrentSystemInfo().getBackend(desc.getBackendId()); + return new TNetworkAddress(backend.getHost(), backend.getHttpPort()); } finally { lock.writeLock().unlock(); } @@ -204,18 +266,18 @@ private static class MultiLoadDesc { private LabelName multiLabel; private Map loadDescByLabel; private Map loadDescByTable; - private Map addressByTable; + private Long backendId; private Map properties; public MultiLoadDesc(LabelName label, Map properties) { multiLabel = label; loadDescByLabel = Maps.newHashMap(); loadDescByTable = Maps.newHashMap(); - addressByTable = Maps.newHashMap(); + backendId = -1L; this.properties = properties; } - public void addFile(String subLabel, String table, List files, + public void addFile(String subLabel, String table, List> files, TNetworkAddress fileAddr, Map properties, long timestamp) throws DdlException { @@ -234,6 +296,7 @@ public void addFile(String subLabel, String table, List files, desc = loadDescByTable.get(table); if (desc == null) { desc = new TableLoadDesc(table, subLabel, files, fileAddr, properties, timestamp); + desc.setBackendId(backendId); loadDescByTable.put(table, desc); } else { if (!desc.canMerge(properties)) { @@ -291,15 +354,15 @@ public boolean isSubLabelUsed(String subLabel, long timestamp) throws DdlExcepti return false; } - public TNetworkAddress getHost(String table, TNetworkAddress defaultAddr) { - TNetworkAddress address = addressByTable.get(table); - if (address != null) { - return address; - } - addressByTable.put(table, defaultAddr); - return defaultAddr; + public void setBackendId(long backendId) { + this.backendId = backendId; } + public long getBackendId() { + return backendId; + } + + public LoadStmt toLoadStmt() throws DdlException { LabelName commitLabel = multiLabel; @@ -307,26 +370,40 @@ public LoadStmt toLoadStmt() throws DdlException { for (TableLoadDesc desc : loadDescByTable.values()) { dataDescriptions.add(desc.toDataDesc()); } - - return new LoadStmt(commitLabel, dataDescriptions, null, null, properties); + Map brokerProperties = Maps.newHashMap(); + brokerProperties.put(BrokerDesc.MULTI_LOAD_BROKER_BACKEND_KEY, backendId.toString()); + BrokerDesc brokerDesc = new BrokerDesc(BrokerDesc.MULTI_LOAD_BROKER, brokerProperties); + LoadStmt loadStmt = new LoadStmt(commitLabel, dataDescriptions, brokerDesc, null, properties); + loadStmt.setEtlJobType(EtlJobType.BROKER); + loadStmt.setOrigStmt(new OriginStatement("", 0)); + Analyzer analyzer = new Analyzer(ConnectContext.get().getCatalog(), ConnectContext.get()); + try { + loadStmt.analyze(analyzer); + } catch (UserException e) { + throw new DdlException(e.getMessage()); + } + return loadStmt; } } public static class TableLoadDesc { // identity of this load private String tbl; - private Map> filesByLabel; + private Map>> filesByLabel; private TNetworkAddress address; + + private Long backendId; private Map properties; // 2 or more files may be loaded to same table with different sub labels. // So we use Set to save all timestamp of all different sub labels private Set timestamps = Sets.newHashSet(); - public TableLoadDesc(String tbl, String label, List files, + public TableLoadDesc(String tbl, String label, List> files, TNetworkAddress address, Map properties, long timestamp) { this.tbl = tbl; - this.filesByLabel = Maps.newHashMap(); + this.filesByLabel = Maps.newLinkedHashMap(); + this.address = address; this.properties = properties; filesByLabel.put(label, files); @@ -341,7 +418,7 @@ public boolean isEmpty() { return filesByLabel.isEmpty(); } - public void addFiles(String label, List files) { + public void addFiles(String label, List> files) { filesByLabel.put(label, files); } @@ -357,19 +434,43 @@ public void addTimestamp(long timestamp) { timestamps.add(timestamp); } + + public Long getBackendId() { + return backendId; + } + + public void setBackendId(Long backendId) { + this.backendId = backendId; + } + // TODO(zc): public DataDescription toDataDesc() throws DdlException { List files = Lists.newArrayList(); - for (List fileOfLabel : filesByLabel.values()) { - files.addAll(fileOfLabel); + List fileSizes = Lists.newArrayList(); + Iterator>>> it = filesByLabel.entrySet().iterator(); + while(it.hasNext()) { + List> value = it.next().getValue(); + value.stream().forEach(pair -> { + files.add(pair.first); + fileSizes.add(pair.second); + }); } - List columns = null; ColumnSeparator columnSeparator = null; + PartitionNames partitionNames = null; + String fileFormat = properties.get(LoadStmt.KEY_IN_PARAM_FORMAT_TYPE); + boolean isNegative = properties.get(LoadStmt.KEY_IN_PARAM_NEGATIVE) == null ? false : + Boolean.valueOf(properties.get(LoadStmt.KEY_IN_PARAM_NEGATIVE)); + Expr whereExpr = null; + LoadTask.MergeType mergeType = LoadTask.MergeType.APPEND; + Expr deleteCondition = null; + String sequenceColName = properties.get(LoadStmt.KEY_IN_PARAM_SEQUENCE_COL); + String colString = null; + Backend backend = null; + boolean stripOuterArray = false; + String jsonPaths = ""; + String jsonRoot = ""; if (properties != null) { - String colString = properties.get(LoadStmt.KEY_IN_PARAM_COLUMNS); - if (colString != null) { - columns = Arrays.asList(colString.split(",")); - } + colString = properties.get(LoadStmt.KEY_IN_PARAM_COLUMNS); String columnSeparatorStr = properties.get(LoadStmt.KEY_IN_PARAM_COLUMN_SEPARATOR); if (columnSeparatorStr != null) { columnSeparator = new ColumnSeparator(columnSeparatorStr); @@ -379,14 +480,71 @@ public DataDescription toDataDesc() throws DdlException { throw new DdlException(e.getMessage()); } } + if (properties.get(LoadStmt.KEY_IN_PARAM_PARTITIONS) != null) { + String[] partNames = properties.get(LoadStmt.KEY_IN_PARAM_PARTITIONS).trim().split("\\s*,\\s*"); + partitionNames = new PartitionNames(false, Lists.newArrayList(partNames)); + } else if (properties.get(LoadStmt.KEY_IN_PARAM_TEMP_PARTITIONS) != null) { + String[] partNames = properties.get(LoadStmt.KEY_IN_PARAM_TEMP_PARTITIONS).trim().split("\\s*,\\s*"); + partitionNames = new PartitionNames(true, Lists.newArrayList(partNames)); + } + if (properties.get(LoadStmt.KEY_IN_PARAM_MERGE_TYPE) != null) { + mergeType = LoadTask.MergeType.valueOf(properties.get(LoadStmt.KEY_IN_PARAM_MERGE_TYPE)); + } + if (properties.get(LoadStmt.KEY_IN_PARAM_WHERE) != null) { + whereExpr = parseWhereExpr(properties.get(LoadStmt.KEY_IN_PARAM_WHERE)); + } + if (properties.get(LoadStmt.KEY_IN_PARAM_DELETE_CONDITION) != null) { + deleteCondition = parseWhereExpr(properties.get(LoadStmt.KEY_IN_PARAM_DELETE_CONDITION)); + } + backend = Catalog.getCurrentSystemInfo().getBackend(backendId); + if (backend == null) { + throw new DdlException("Backend [" + backendId + "] not found. "); + } + if (fileFormat != null && fileFormat.equalsIgnoreCase("json")) { + stripOuterArray = Boolean.valueOf( + properties.getOrDefault(LoadStmt.KEY_IN_PARAM_STRIP_OUTER_ARRAY, "false")); + jsonPaths = properties.getOrDefault(LoadStmt.KEY_IN_PARAM_JSONPATHS, ""); + jsonRoot = properties.getOrDefault(LoadStmt.KEY_IN_PARAM_JSONROOT, ""); + } } - DataDescription dataDescription = new DataDescription( - tbl, null, files, columns, columnSeparator, null, false, null); - - dataDescription.setBeAddr(address); + DataDescription dataDescription = new DataDescription(tbl, partitionNames, files, null, columnSeparator, + fileFormat, null, isNegative, null, whereExpr, mergeType, deleteCondition, + sequenceColName); + dataDescription.setColumnDef(colString); + dataDescription.setBeAddr(new TNetworkAddress(backend.getHost(), backend.getHeartbeatPort())); + dataDescription.setFileSize(fileSizes); + dataDescription.setBackendId(backendId); + dataDescription.setJsonPaths(jsonPaths); + dataDescription.setJsonRoot(jsonRoot); + dataDescription.setStripOuterArray(stripOuterArray); return dataDescription; } + + private Expr parseWhereExpr(String whereString) throws DdlException { + String whereSQL = "WHERE " + whereString; + SqlParser parser = new SqlParser(new SqlScanner(new StringReader(whereSQL))); + ImportWhereStmt whereStmt; + try { + whereStmt = (ImportWhereStmt) SqlParserUtils.getFirstStmt(parser); + } catch (Error e) { + LOG.warn("error happens when parsing where header, sql={}", whereSQL, e); + throw new DdlException("failed to parsing where header, maybe contain unsupported character"); + } catch (DdlException e) { + LOG.warn("analyze where statement failed, sql={}, error={}", + whereSQL, parser.getErrorMsg(whereSQL), e); + String errorMessage = parser.getErrorMsg(whereSQL); + if (errorMessage == null) { + throw e; + } else { + throw new DdlException(errorMessage, e); + } + } catch (Exception e) { + LOG.warn("failed to parse where header, sql={}", whereSQL, e); + throw new DdlException("parse columns header failed", e); + } + return whereStmt.getExpr(); + } } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/MultiLoadMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/MultiLoadMgrTest.java index 807dfa6f516c64..a65788f654f8a0 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/MultiLoadMgrTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/MultiLoadMgrTest.java @@ -17,23 +17,66 @@ package org.apache.doris.qe; +import org.apache.doris.backup.CatalogMocker; import org.apache.doris.catalog.Catalog; import org.apache.doris.common.DdlException; +import org.apache.doris.system.SystemInfoService; import com.google.common.collect.Lists; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import java.util.List; +import mockit.Delegate; +import mockit.Expectations; +import mockit.Mock; +import mockit.MockUp; import mockit.Mocked; public class MultiLoadMgrTest { @Mocked private Catalog catalog; - + @Mocked + private ConnectContext context; + @Mocked + private SystemInfoService systemInfoService; + @Before + public void setUp() { + new Expectations() { + { + ConnectContext.get(); + minTimes = 0; + result = context; + } + }; + new Expectations(context) { + { + context.getClusterName(); + minTimes = 0; + result = "default"; + } + }; + new Expectations() { + { + systemInfoService.seqChooseBackendIds(anyInt, anyBoolean, anyBoolean, anyString); + minTimes = 0; + result = new Delegate() { + public synchronized List seqChooseBackendIds(int backendNum, boolean needAlive, + boolean isCreate, String clusterName) { + List beIds = Lists.newArrayList(); + beIds.add(CatalogMocker.BACKEND1_ID); + beIds.add(CatalogMocker.BACKEND2_ID); + beIds.add(CatalogMocker.BACKEND3_ID); + return beIds; + } + }; + } + }; + } @Test public void testStartNormal() throws DdlException { MultiLoadMgr mgr = new MultiLoadMgr(); diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 4964e3fa670023..131f6096f84eda 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -415,6 +415,7 @@ struct TMiniLoadRequest { 11: optional i64 timestamp 12: optional string user_ip 13: optional bool is_retry + 14: optional list file_size } struct TUpdateMiniEtlTaskStatusRequest { From e5805c59b601b867351cae35b258f87a04987342 Mon Sep 17 00:00:00 2001 From: yangzhg Date: Fri, 4 Dec 2020 10:16:13 +0800 Subject: [PATCH 2/3] format --- be/src/http/action/mini_load.cpp | 35 +++++++++++++++++--------------- 1 file changed, 19 insertions(+), 16 deletions(-) diff --git a/be/src/http/action/mini_load.cpp b/be/src/http/action/mini_load.cpp index e4b0ac232b3519..c17a3000058d0f 100644 --- a/be/src/http/action/mini_load.cpp +++ b/be/src/http/action/mini_load.cpp @@ -238,7 +238,8 @@ Status MiniLoadAction::_load(HttpRequest* http_req, const std::string& file_path return Status(res.status); } -Status MiniLoadAction::_merge_header(HttpRequest* http_req, std::map* params) { +Status MiniLoadAction::_merge_header(HttpRequest* http_req, + std::map* params) { if (http_req == nullptr || params == nullptr) { return Status::OK(); } @@ -257,16 +258,19 @@ Status MiniLoadAction::_merge_header(HttpRequest* http_req, std::mapheader(HTTP_PARTITIONS).empty()) { (*params)[HTTP_PARTITIONS] = http_req->header(HTTP_PARTITIONS); if (!http_req->header(HTTP_TEMP_PARTITIONS).empty()) { - return Status::InvalidArgument("Can not specify both partitions and temporary partitions"); + return Status::InvalidArgument( + "Can not specify both partitions and temporary partitions"); } } if (!http_req->header(HTTP_TEMP_PARTITIONS).empty()) { (*params)[HTTP_TEMP_PARTITIONS] = http_req->header(HTTP_TEMP_PARTITIONS); if (!http_req->header(HTTP_PARTITIONS).empty()) { - return Status::InvalidArgument("Can not specify both partitions and temporary partitions"); + return Status::InvalidArgument( + "Can not specify both partitions and temporary partitions"); } } - if (!http_req->header(HTTP_NEGATIVE).empty() && boost::iequals(http_req->header(HTTP_NEGATIVE), "true")) { + if (!http_req->header(HTTP_NEGATIVE).empty() && + boost::iequals(http_req->header(HTTP_NEGATIVE), "true")) { (*params)[HTTP_NEGATIVE] = "true"; } else { (*params)[HTTP_NEGATIVE] = "false"; @@ -294,28 +298,27 @@ Status MiniLoadAction::_merge_header(HttpRequest* http_req, std::mapheader(HTTP_STRIP_OUTER_ARRAY).empty()) { if (boost::iequals(http_req->header(HTTP_STRIP_OUTER_ARRAY), "true")) { - (*params)[HTTP_STRIP_OUTER_ARRAY] = "true"; + (*params)[HTTP_STRIP_OUTER_ARRAY] = "true"; } else { - (*params)[HTTP_STRIP_OUTER_ARRAY] = "false"; + (*params)[HTTP_STRIP_OUTER_ARRAY] = "false"; } } else { - (*params)[HTTP_STRIP_OUTER_ARRAY] = "false"; + (*params)[HTTP_STRIP_OUTER_ARRAY] = "false"; } if (!http_req->header(HTTP_FUNCTION_COLUMN + "." + HTTP_SEQUENCE_COL).empty()) { - (*params)[HTTP_FUNCTION_COLUMN + "." + HTTP_SEQUENCE_COL] = http_req->header(HTTP_FUNCTION_COLUMN + "." + HTTP_SEQUENCE_COL); + (*params)[HTTP_FUNCTION_COLUMN + "." + HTTP_SEQUENCE_COL] = + http_req->header(HTTP_FUNCTION_COLUMN + "." + HTTP_SEQUENCE_COL); } if (params->find(HTTP_MERGE_TYPE) == params->end()) { params->insert(std::make_pair(HTTP_MERGE_TYPE, "APPEND")); } - StringCaseMap merge_type_map = { - { "APPEND", TMergeType::APPEND }, - { "DELETE", TMergeType::DELETE }, - { "MERGE", TMergeType::MERGE } - }; + StringCaseMap merge_type_map = {{"APPEND", TMergeType::APPEND}, + {"DELETE", TMergeType::DELETE}, + {"MERGE", TMergeType::MERGE}}; if (!http_req->header(HTTP_MERGE_TYPE).empty()) { std::string merge_type = http_req->header(HTTP_MERGE_TYPE); auto it = merge_type_map.find(merge_type); - if (it != merge_type_map.end() ) { + if (it != merge_type_map.end()) { (*params)[HTTP_MERGE_TYPE] = it->first; } else { return Status::InvalidArgument("Invalid merge type " + merge_type); @@ -325,13 +328,13 @@ Status MiniLoadAction::_merge_header(HttpRequest* http_req, std::mapheader(HTTP_DELETE_CONDITION); } else { - return Status::InvalidArgument("not support delete when merge type is " + (*params)[HTTP_MERGE_TYPE] + "."); + return Status::InvalidArgument("not support delete when merge type is " + + (*params)[HTTP_MERGE_TYPE] + "."); } } return Status::OK(); } - static bool parse_auth(const std::string& auth, std::string* user, std::string* passwd, std::string* cluster) { std::string decoded_auth; From ee99660d0df88ef369ce8ef8e4162f7c92774d0e Mon Sep 17 00:00:00 2001 From: yangzhg Date: Mon, 7 Dec 2020 10:05:02 +0800 Subject: [PATCH 3/3] rebase --- be/src/http/action/mini_load.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/be/src/http/action/mini_load.cpp b/be/src/http/action/mini_load.cpp index c17a3000058d0f..0a185ce8bd2206 100644 --- a/be/src/http/action/mini_load.cpp +++ b/be/src/http/action/mini_load.cpp @@ -648,8 +648,8 @@ void MiniLoadAction::_handle(HttpRequest* http_req) { "receipt size not equal with body size"); return; } - auto st = - _load(http_req, ctx->file_path, ctx->load_check_req.user, ctx->load_check_req.cluster); + auto st = _load(http_req, ctx->file_path, ctx->load_check_req.user, ctx->load_check_req.cluster, + ctx->bytes_written); std::string str = to_json(st); HttpChannel::send_reply(http_req, str); }