Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 105 additions & 4 deletions be/src/http/action/mini_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
#include "util/file_utils.h"
#include "util/json_util.h"
#include "util/string_parser.hpp"
#include "util/string_util.h"
#include "util/thrift_rpc_helper.h"
#include "util/time.h"
#include "util/url_coding.h"
Expand Down Expand Up @@ -155,10 +156,12 @@ Status MiniLoadAction::data_saved_dir(const LoadHandle& desc, const std::string&
}

Status MiniLoadAction::_load(HttpRequest* http_req, const std::string& file_path,
const std::string& user, const std::string& cluster) {
const std::string& user, const std::string& cluster,
int64_t file_size) {
// Prepare request parameters.
std::map<std::string, std::string> params(http_req->query_params().begin(),
http_req->query_params().end());
RETURN_IF_ERROR(_merge_header(http_req, &params));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment to say that the param in header will overwrite the query params

params.erase(LABEL_KEY);
params.erase(SUB_LABEL_KEY);

Expand Down Expand Up @@ -191,11 +194,12 @@ Status MiniLoadAction::_load(HttpRequest* http_req, const std::string& file_path
}
req.__set_properties(params);
req.files.push_back(file_path);
req.__isset.file_size = true;
req.file_size.push_back(file_size);
req.backend.__set_hostname(BackendOptions::get_localhost());
req.backend.__set_port(config::be_port);

req.__set_timestamp(GetCurrentTimeMicros());

try {
client->miniLoad(res, req);
} catch (apache::thrift::transport::TTransportException& e) {
Expand Down Expand Up @@ -234,6 +238,103 @@ Status MiniLoadAction::_load(HttpRequest* http_req, const std::string& file_path
return Status(res.status);
}

Status MiniLoadAction::_merge_header(HttpRequest* http_req,
std::map<std::string, std::string>* params) {
if (http_req == nullptr || params == nullptr) {
return Status::OK();
}
if (!http_req->header(HTTP_FORMAT_KEY).empty()) {
(*params)[HTTP_FORMAT_KEY] = http_req->header(HTTP_FORMAT_KEY);
}
if (!http_req->header(HTTP_COLUMNS).empty()) {
(*params)[HTTP_COLUMNS] = http_req->header(HTTP_COLUMNS);
}
if (!http_req->header(HTTP_WHERE).empty()) {
(*params)[HTTP_WHERE] = http_req->header(HTTP_WHERE);
}
if (!http_req->header(HTTP_COLUMN_SEPARATOR).empty()) {
(*params)[HTTP_COLUMN_SEPARATOR] = http_req->header(HTTP_COLUMN_SEPARATOR);
}
if (!http_req->header(HTTP_PARTITIONS).empty()) {
(*params)[HTTP_PARTITIONS] = http_req->header(HTTP_PARTITIONS);
if (!http_req->header(HTTP_TEMP_PARTITIONS).empty()) {
return Status::InvalidArgument(
"Can not specify both partitions and temporary partitions");
}
}
if (!http_req->header(HTTP_TEMP_PARTITIONS).empty()) {
(*params)[HTTP_TEMP_PARTITIONS] = http_req->header(HTTP_TEMP_PARTITIONS);
if (!http_req->header(HTTP_PARTITIONS).empty()) {
return Status::InvalidArgument(
"Can not specify both partitions and temporary partitions");
}
}
if (!http_req->header(HTTP_NEGATIVE).empty() &&
boost::iequals(http_req->header(HTTP_NEGATIVE), "true")) {
(*params)[HTTP_NEGATIVE] = "true";
} else {
(*params)[HTTP_NEGATIVE] = "false";
}
if (!http_req->header(HTTP_STRICT_MODE).empty()) {
if (boost::iequals(http_req->header(HTTP_STRICT_MODE), "false")) {
(*params)[HTTP_STRICT_MODE] = "false";
} else if (boost::iequals(http_req->header(HTTP_STRICT_MODE), "true")) {
(*params)[HTTP_STRICT_MODE] = "true";
} else {
return Status::InvalidArgument("Invalid strict mode format. Must be bool type");
}
}
if (!http_req->header(HTTP_TIMEZONE).empty()) {
(*params)[HTTP_TIMEZONE] = http_req->header(HTTP_TIMEZONE);
}
if (!http_req->header(HTTP_EXEC_MEM_LIMIT).empty()) {
(*params)[HTTP_EXEC_MEM_LIMIT] = http_req->header(HTTP_EXEC_MEM_LIMIT);
}
if (!http_req->header(HTTP_JSONPATHS).empty()) {
(*params)[HTTP_JSONPATHS] = http_req->header(HTTP_JSONPATHS);
}
if (!http_req->header(HTTP_JSONROOT).empty()) {
(*params)[HTTP_JSONROOT] = http_req->header(HTTP_JSONROOT);
}
if (!http_req->header(HTTP_STRIP_OUTER_ARRAY).empty()) {
if (boost::iequals(http_req->header(HTTP_STRIP_OUTER_ARRAY), "true")) {
(*params)[HTTP_STRIP_OUTER_ARRAY] = "true";
} else {
(*params)[HTTP_STRIP_OUTER_ARRAY] = "false";
}
} else {
(*params)[HTTP_STRIP_OUTER_ARRAY] = "false";
}
if (!http_req->header(HTTP_FUNCTION_COLUMN + "." + HTTP_SEQUENCE_COL).empty()) {
(*params)[HTTP_FUNCTION_COLUMN + "." + HTTP_SEQUENCE_COL] =
http_req->header(HTTP_FUNCTION_COLUMN + "." + HTTP_SEQUENCE_COL);
}
if (params->find(HTTP_MERGE_TYPE) == params->end()) {
params->insert(std::make_pair(HTTP_MERGE_TYPE, "APPEND"));
}
StringCaseMap<TMergeType::type> merge_type_map = {{"APPEND", TMergeType::APPEND},
{"DELETE", TMergeType::DELETE},
{"MERGE", TMergeType::MERGE}};
if (!http_req->header(HTTP_MERGE_TYPE).empty()) {
std::string merge_type = http_req->header(HTTP_MERGE_TYPE);
auto it = merge_type_map.find(merge_type);
if (it != merge_type_map.end()) {
(*params)[HTTP_MERGE_TYPE] = it->first;
} else {
return Status::InvalidArgument("Invalid merge type " + merge_type);
}
}
if (!http_req->header(HTTP_DELETE_CONDITION).empty()) {
if ((*params)[HTTP_MERGE_TYPE] == "MERGE") {
(*params)[HTTP_DELETE_CONDITION] = http_req->header(HTTP_DELETE_CONDITION);
} else {
return Status::InvalidArgument("not support delete when merge type is " +
(*params)[HTTP_MERGE_TYPE] + ".");
}
}
return Status::OK();
}

static bool parse_auth(const std::string& auth, std::string* user, std::string* passwd,
std::string* cluster) {
std::string decoded_auth;
Expand Down Expand Up @@ -547,8 +648,8 @@ void MiniLoadAction::_handle(HttpRequest* http_req) {
"receipt size not equal with body size");
return;
}
auto st =
_load(http_req, ctx->file_path, ctx->load_check_req.user, ctx->load_check_req.cluster);
auto st = _load(http_req, ctx->file_path, ctx->load_check_req.user, ctx->load_check_req.cluster,
ctx->bytes_written);
std::string str = to_json(st);
HttpChannel::send_reply(http_req, str);
}
Expand Down
4 changes: 3 additions & 1 deletion be/src/http/action/mini_load.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class MiniLoadAction : public HttpHandler {

private:
Status _load(HttpRequest* req, const std::string& file_path, const std::string& user,
const std::string& cluster);
const std::string& cluster, int64_t file_size);

Status data_saved_dir(const LoadHandle& desc, const std::string& table, std::string* file_path);

Expand Down Expand Up @@ -96,6 +96,8 @@ class MiniLoadAction : public HttpHandler {

bool _is_streaming(HttpRequest* req);

Status _merge_header(HttpRequest* http_req, std::map<std::string, std::string>* params);

const std::string _streaming_function_name = "STREAMING_MINI_LOAD";

ExecEnv* _exec_env;
Expand Down
6 changes: 6 additions & 0 deletions fe/fe-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,12 @@ under the License.
<version>5.5.0</version>
</dependency>

<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>4.0.3</version>
</dependency>

</dependencies>

<build>
Expand Down
16 changes: 16 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/analysis/BrokerDesc.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.thrift.TFileType;

import com.google.common.collect.Maps;

Expand All @@ -37,11 +38,15 @@
// "password" = "password0"
// )
public class BrokerDesc implements Writable {
// just for multi load
public final static String MULTI_LOAD_BROKER = "__DORIS_MULTI_LOAD_BROKER__";
public final static String MULTI_LOAD_BROKER_BACKEND_KEY = "__DORIS_MULTI_LOAD_BROKER_BACKEND__";
private String name;
private Map<String, String> properties;

// Only used for recovery
private BrokerDesc() {
this.properties = Maps.newHashMap();
}

public BrokerDesc(String name, Map<String, String> properties) {
Expand All @@ -60,6 +65,17 @@ public Map<String, String> getProperties() {
return properties;
}

public boolean isMultiLoadBroker() {
return this.name.equalsIgnoreCase(MULTI_LOAD_BROKER);
}

public TFileType getFileType() {
if (isMultiLoadBroker()) {
return TFileType.FILE_LOCAL;
}
return TFileType.FILE_BROKER;
}

@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.Pair;
import org.apache.doris.common.util.SqlParserUtils;
import org.apache.doris.load.loadv2.LoadTask;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
Expand All @@ -45,6 +46,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.StringReader;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -98,6 +100,9 @@ public class DataDescription {
private final String fileFormat;
private final boolean isNegative;

// this only used in multi load, all filePaths is file not dir
private List<Long> fileSize;

// column names of source files
private List<String> fileFieldNames;
// column names in the path
Expand All @@ -111,6 +116,11 @@ public class DataDescription {
// Used for mini load
private TNetworkAddress beAddr;
private String lineDelimiter;
private String columnDef;
private long backendId;
private boolean stripOuterArray = false;
private String jsonPaths = "";
private String jsonRoot = "";

private String sequenceCol;

Expand Down Expand Up @@ -267,10 +277,54 @@ public String getSequenceCol() {
return sequenceCol;
}

public void setColumnDef(String columnDef) {
this.columnDef = columnDef;
}

public boolean hasSequenceCol() {
return !Strings.isNullOrEmpty(sequenceCol);
}

public List<Long> getFileSize() {
return fileSize;
}

public void setFileSize(List<Long> fileSize) {
this.fileSize = fileSize;
}

public long getBackendId() {
return backendId;
}

public void setBackendId(long backendId) {
this.backendId = backendId;
}

public boolean isStripOuterArray() {
return stripOuterArray;
}

public void setStripOuterArray(boolean stripOuterArray) {
this.stripOuterArray = stripOuterArray;
}

public String getJsonPaths() {
return jsonPaths;
}

public void setJsonPaths(String jsonPaths) {
this.jsonPaths = jsonPaths;
}

public String getJsonRoot() {
return jsonRoot;
}

public void setJsonRoot(String jsonRoot) {
this.jsonRoot = jsonRoot;
}

@Deprecated
public void addColumnMapping(String functionName, Pair<String, List<String>> pair) {
if (Strings.isNullOrEmpty(functionName) || pair == null) {
Expand Down Expand Up @@ -391,6 +445,36 @@ private void analyzeColumns() throws AnalysisException {
}
}
}
private void analyzeMultiLoadColumns() throws AnalysisException {
if (columnDef == null || columnDef.isEmpty()) {
return;
}
String columnsSQL = new String("COLUMNS (" + columnDef + ")");
SqlParser parser = new SqlParser(new SqlScanner(new StringReader(columnsSQL)));
ImportColumnsStmt columnsStmt;
try {
columnsStmt = (ImportColumnsStmt) SqlParserUtils.getFirstStmt(parser);
} catch (Error e) {
LOG.warn("error happens when parsing columns, sql={}", columnsSQL, e);
throw new AnalysisException("failed to parsing columns' header, maybe contain unsupported character");
} catch (AnalysisException e) {
LOG.warn("analyze columns' statement failed, sql={}, error={}",
columnsSQL, parser.getErrorMsg(columnsSQL), e);
String errorMessage = parser.getErrorMsg(columnsSQL);
if (errorMessage == null) {
throw e;
} else {
throw new AnalysisException(errorMessage, e);
}
} catch (Exception e) {
LOG.warn("failed to parse columns header, sql={}", columnsSQL, e);
throw new AnalysisException("parse columns header failed", e);
}

if (columnsStmt.getColumns() != null && !columnsStmt.getColumns().isEmpty()) {
parsedColumnExprList = columnsStmt.getColumns();
}
}

private void analyzeColumnToHadoopFunction(String columnName, Expr child1) throws AnalysisException {
Preconditions.checkState(child1 instanceof FunctionCallExpr);
Expand Down Expand Up @@ -705,7 +789,18 @@ public void analyzeWithoutCheckPriv(String fullDbName) throws AnalysisException
}

analyzeColumns();
analyzeMultiLoadColumns();
analyzeSequenceCol(fullDbName);
if (mergeType == LoadTask.MergeType.MERGE) {
parsedColumnExprList.add(ImportColumnDesc.newDeleteSignImportColumnDesc(deleteCondition));
} else if (mergeType == LoadTask.MergeType.DELETE) {
parsedColumnExprList.add(ImportColumnDesc.newDeleteSignImportColumnDesc(new IntLiteral(1)));
}
// add columnExpr for sequence column
if (hasSequenceCol()) {
parsedColumnExprList.add(new ImportColumnDesc(Column.SEQUENCE_COL,
new SlotRef(null, getSequenceCol())));
}
}

/*
Expand Down
Loading