Skip to content

Commit

Permalink
Merge branch 'branch-1.2-lts' into pick
Browse files Browse the repository at this point in the history
  • Loading branch information
sohardforaname authored Aug 28, 2023
2 parents dcdd288 + 6ee5488 commit 23b083d
Show file tree
Hide file tree
Showing 22 changed files with 554 additions and 180 deletions.
17 changes: 12 additions & 5 deletions be/src/agent/agent_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,20 @@ AgentServer::AgentServer(ExecEnv* exec_env, const TMasterInfo& master_info)
#define CREATE_AND_START_THREAD(type, pool_name)
#endif // BE_TEST

#ifndef BE_TEST
// Both PUSH and REALTIME_PUSH type use _push_load_workers
_push_load_workers.reset(new PushTaskPool(exec_env, TaskWorkerPool::ThreadModel::MULTI_THREADS,
PushTaskPool::PushWokerType::LOAD_V2));
_push_load_workers->start();
_push_delete_workers.reset(new PushTaskPool(exec_env,
TaskWorkerPool::ThreadModel::MULTI_THREADS,
PushTaskPool::PushWokerType::DELETE));
_push_delete_workers->start();
#endif
CREATE_AND_START_POOL(CREATE_TABLE, _create_tablet_workers);
CREATE_AND_START_POOL(DROP_TABLE, _drop_tablet_workers);
// Both PUSH and REALTIME_PUSH type use _push_workers
CREATE_AND_START_POOL(PUSH, _push_workers);
CREATE_AND_START_POOL(PUBLISH_VERSION, _publish_version_workers);
CREATE_AND_START_POOL(CLEAR_TRANSACTION_TASK, _clear_transaction_task_workers);
CREATE_AND_START_POOL(DELETE, _delete_workers);
CREATE_AND_START_POOL(ALTER_TABLE, _alter_tablet_workers);
CREATE_AND_START_POOL(CLONE, _clone_workers);
CREATE_AND_START_POOL(STORAGE_MEDIUM_MIGRATE, _storage_medium_migrate_workers);
Expand Down Expand Up @@ -165,9 +172,9 @@ void AgentServer::submit_tasks(TAgentResult& agent_result,
}
if (task.push_req.push_type == TPushType::LOAD ||
task.push_req.push_type == TPushType::LOAD_V2) {
_push_workers->submit_task(task);
_push_load_workers->submit_task(task);
} else if (task.push_req.push_type == TPushType::DELETE) {
_delete_workers->submit_task(task);
_push_delete_workers->submit_task(task);
} else {
ret_st = Status::InvalidArgument(
"task(signature={}, type={}, push_type={}) has wrong push_type", signature,
Expand Down
4 changes: 2 additions & 2 deletions be/src/agent/agent_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,10 @@ class AgentServer {

std::unique_ptr<TaskWorkerPool> _create_tablet_workers;
std::unique_ptr<TaskWorkerPool> _drop_tablet_workers;
std::unique_ptr<TaskWorkerPool> _push_workers;
std::unique_ptr<TaskWorkerPool> _push_load_workers;
std::unique_ptr<TaskWorkerPool> _publish_version_workers;
std::unique_ptr<TaskWorkerPool> _clear_transaction_task_workers;
std::unique_ptr<TaskWorkerPool> _delete_workers;
std::unique_ptr<TaskWorkerPool> _push_delete_workers;
std::unique_ptr<TaskWorkerPool> _alter_tablet_workers;
std::unique_ptr<TaskWorkerPool> _clone_workers;
std::unique_ptr<TaskWorkerPool> _storage_medium_migrate_workers;
Expand Down
202 changes: 85 additions & 117 deletions be/src/agent/task_worker_pool.cpp

Large diffs are not rendered by default.

22 changes: 16 additions & 6 deletions be/src/agent/task_worker_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,16 +164,12 @@ class TaskWorkerPool {
// notify the worker. currently for task/disk/tablet report thread
void notify_thread();

private:
protected:
bool _register_task_info(const TTaskType::type task_type, int64_t signature);
void _remove_task_info(const TTaskType::type task_type, int64_t signature);
void _finish_task(const TFinishTaskRequest& finish_task_request);
uint32_t _get_next_task_index(int32_t thread_count, std::deque<TAgentTaskRequest>& tasks,
TPriority::type priority);

void _create_tablet_worker_thread_callback();
void _drop_tablet_worker_thread_callback();
void _push_worker_thread_callback();
void _publish_version_worker_thread_callback();
void _clear_transaction_task_worker_thread_callback();
void _alter_tablet_worker_thread_callback();
Expand Down Expand Up @@ -209,7 +205,7 @@ class TaskWorkerPool {
// random sleep 1~second seconds
void _random_sleep(int second);

private:
protected:
std::string _name;

// Reference to the ExecEnv::_master_info
Expand Down Expand Up @@ -237,6 +233,7 @@ class TaskWorkerPool {
// Always 1 when _thread_model is SINGLE_THREAD
uint32_t _worker_count;
TaskWorkerType _task_worker_type;
std::function<void()> _cb;

static FrontendServiceClientCache _master_service_client_cache;
static std::atomic_ulong _s_report_version;
Expand All @@ -246,4 +243,17 @@ class TaskWorkerPool {

DISALLOW_COPY_AND_ASSIGN(TaskWorkerPool);
}; // class TaskWorkerPool

class PushTaskPool : public TaskWorkerPool {
public:
enum class PushWokerType { LOAD_V2, DELETE };
PushTaskPool(ExecEnv* env, ThreadModel thread_model, PushWokerType type);
void _push_worker_thread_callback();

DISALLOW_COPY_AND_ASSIGN(PushTaskPool);

private:
PushWokerType _push_worker_type;
};

} // namespace doris
8 changes: 8 additions & 0 deletions be/src/runtime/tablets_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,7 @@ Status TabletsChannel::add_batch(const TabletWriterAddRequest& request,
}
}

size_t row_count = 0;
auto get_send_data = [&]() {
if constexpr (std::is_same_v<TabletWriterAddRequest, PTabletWriterAddBatchRequest>) {
return RowBatch(*_row_desc, request.row_batch());
Expand All @@ -476,6 +477,13 @@ Status TabletsChannel::add_batch(const TabletWriterAddRequest& request,
};

auto send_data = get_send_data();
if constexpr (std::is_same_v<TabletWriterAddRequest, PTabletWriterAddBatchRequest>) {
row_count = send_data.num_rows();
} else {
row_count = send_data.rows();
}
CHECK(row_count == request.tablet_ids_size())
<< "block rows: " << row_count << ", tablet_ids_size: " << request.tablet_ids_size();
google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors =
response->mutable_tablet_errors();
for (const auto& tablet_to_rowidxs_it : tablet_to_rowidxs) {
Expand Down
6 changes: 6 additions & 0 deletions be/src/vec/sink/vtablet_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,8 @@ void VNodeChannel::try_send_block(RuntimeState* state) {
// tablet_ids has already set when add row
request.set_packet_seq(_next_packet_seq);
auto block = mutable_block->to_block();
CHECK(block.rows() == request.tablet_ids_size())
<< "block rows: " << block.rows() << ", tablet_ids_size: " << request.tablet_ids_size();
if (block.rows() > 0) {
SCOPED_ATOMIC_TIMER(&_serialize_batch_ns);
size_t uncompressed_bytes = 0, compressed_bytes = 0;
Expand All @@ -298,6 +300,10 @@ void VNodeChannel::try_send_block(RuntimeState* state) {
_add_block_closure->clear_in_flight();
return;
}
{
vectorized::Block tmp_block(*request.mutable_block());
CHECK(block.rows() == tmp_block.rows());
}
if (compressed_bytes >= double(config::brpc_max_body_size) * 0.95f) {
LOG(WARNING) << "send block too large, this rpc may failed. send size: "
<< compressed_bytes << ", threshold: " << config::brpc_max_body_size
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@ public String toString() {
return "TIMEV2(" + scale + ")";
} else if (type == PrimitiveType.VARCHAR) {
if (isWildcardVarchar()) {
return "VARCHAR(*)";
return "VARCHAR";
}
return "VARCHAR(" + len + ")";
} else if (type == PrimitiveType.STRING) {
Expand Down
6 changes: 3 additions & 3 deletions fe/fe-core/src/main/cup/sql_parser.cup
Original file line number Diff line number Diff line change
Expand Up @@ -6247,7 +6247,7 @@ column_subscript ::=
{: ArrayList<Expr> list = new ArrayList<Expr>();
list.add(e);
list.add(index);
RESULT = new FunctionCallExpr("%element_extract%", list);
RESULT = new FunctionCallExpr("element_at", list);
:}
;

Expand All @@ -6256,14 +6256,14 @@ column_slice ::=
{: ArrayList<Expr> list = new ArrayList<Expr>();
list.add(e);
list.add(offset);
RESULT = new FunctionCallExpr("%element_slice%", list);
RESULT = new FunctionCallExpr("array_slice", list);
:}
| expr:e LBRACKET expr:offset COLON expr:length RBRACKET
{: ArrayList<Expr> list = new ArrayList<Expr>();
list.add(e);
list.add(offset);
list.add(length);
RESULT = new FunctionCallExpr("%element_slice%", list);
RESULT = new FunctionCallExpr("array_slice", list);
:}
;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
import org.apache.doris.planner.OriginalPlanner;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.qe.ConnectContext;

import com.google.common.base.Preconditions;
import lombok.Getter;

import java.util.ArrayList;
Expand Down Expand Up @@ -66,6 +70,18 @@ public void analyze(Analyzer analyzer) throws UserException {
QueryStmt tmpStmt = queryStmt.clone();
tmpStmt.analyze(dummyRootAnalyzer);
this.queryStmt = tmpStmt;
// to adjust the nullable of the result expression, we have to create plan fragment from the query stmt.
OriginalPlanner planner = new OriginalPlanner(dummyRootAnalyzer);
planner.createPlanFragments(queryStmt, dummyRootAnalyzer, ConnectContext.get().getSessionVariable().toThrift());
PlanFragment root = planner.getFragments().get(0);
List<Expr> outputs = root.getOutputExprs();
Preconditions.checkArgument(outputs.size() == queryStmt.getResultExprs().size());
for (int i = 0; i < outputs.size(); ++i) {
if (queryStmt.getResultExprs().get(i).getSrcSlotRef() != null) {
queryStmt.getResultExprs().get(i).getSrcSlotRef().getColumn()
.setIsAllowNull(outputs.get(i).isNullable());
}
}
ArrayList<Expr> resultExprs = getQueryStmt().getResultExprs();
if (columnNames != null && columnNames.size() != resultExprs.size()) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_COL_NUMBER_NOT_MATCH);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,29 @@ protected Expr substituteImpl(ExprSubstitutionMap smap, ExprSubstitutionMap disj
aggFnParams = aggFnParams
.clone(newParams);
}
return super.substituteImpl(smap, disjunctsMap, analyzer);
if (isImplicitCast()) {
return getChild(0).substituteImpl(smap, disjunctsMap, analyzer);
}
if (smap != null) {
Expr substExpr = smap.get(this);
if (substExpr != null) {
return substExpr.clone();
}
}
if (Expr.IS_OR_PREDICATE.apply(this) && disjunctsMap != null) {
smap = disjunctsMap;
disjunctsMap = null;
}
for (int i = 0; i < children.size(); ++i) {
if (!(children.get(i) instanceof LiteralExpr)) {
children.set(i, children.get(i).substituteImpl(smap, disjunctsMap, analyzer));
}
}
// SlotRefs must remain analyzed to support substitution across query blocks. All
// other exprs must be analyzed again after the substitution to add implicit casts
// and for resolving their correct function signature.
resetAnalysisState();
return this;
}

@Override
Expand Down Expand Up @@ -522,10 +544,18 @@ private String paramsToSql() {
len = len - 1;
}
for (int i = 0; i < len; ++i) {
if (i != 0) {
if (fnName.getFunction().equalsIgnoreCase("group_concat")
&& orderByElements.size() > 0 && i == len - orderByElements.size()) {
sb.append(" ");
} else {
sb.append(", ");
}
}
if (i == 1 && (fnName.getFunction().equalsIgnoreCase("aes_decrypt")
|| fnName.getFunction().equalsIgnoreCase("aes_encrypt")
|| fnName.getFunction().equalsIgnoreCase("sm4_decrypt")
|| fnName.getFunction().equalsIgnoreCase("sm4_encrypt"))) {
|| fnName.getFunction().equalsIgnoreCase("aes_encrypt")
|| fnName.getFunction().equalsIgnoreCase("sm4_decrypt")
|| fnName.getFunction().equalsIgnoreCase("sm4_encrypt"))) {
result.add("\'***\'");
} else if (orderByElements.size() > 0 && i == len - orderByElements.size()) {
result.add("ORDER BY " + children.get(i).toSql());
Expand Down Expand Up @@ -1569,6 +1599,11 @@ && collectChildReturnTypes()[0].isDecimalV3()) {
}
// rewrite return type if is nested type function
analyzeNestedFunction();
for (OrderByElement o : orderByElements) {
if (!o.getExpr().isAnalyzed) {
o.getExpr().analyzeImpl(analyzer);
}
}
}

// if return type is nested type, need to be determined the sub-element type
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,10 +267,4 @@ public void finalizeImplForNereids() throws AnalysisException {
public String toString() {
return getStringValue();
}

@Override
protected Expr substituteImpl(ExprSubstitutionMap smap, ExprSubstitutionMap disjunctsMap,
Analyzer analyzer) {
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,9 @@ public void init(Analyzer analyzer) throws UserException {
// to our input; our conjuncts don't get substituted because they already
// refer to our output
outputSmap = getCombinedChildSmap();
if (aggInfo.isMerge()) {
aggInfo.substitute(aggInfo.getIntermediateSmap(), analyzer);
}
aggInfo.substitute(outputSmap, analyzer);

// assert consistent aggregate expr and slot materialization
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1053,7 +1053,7 @@ private PlanNode createJoinPlan(Analyzer analyzer, TableRef leftmostRef, List<Pa
// reset assigned conjuncts of analyzer in every compare
analyzer.setAssignedConjuncts(root.getAssignedConjuncts());
PlanNode candidate = createJoinNode(analyzer, root, rootPlanNodeOfCandidate, tblRefOfCandidate);
// (ML): 这里还需要吗?应该不会返回null吧
// it may not return null, but protect.
if (candidate == null) {
continue;
}
Expand Down Expand Up @@ -2724,7 +2724,7 @@ private List<Expr> getPredicatesBoundedByGroupbysSourceExpr(
while (sourceExpr instanceof SlotRef) {
SlotRef slotRef = (SlotRef) sourceExpr;
SlotDescriptor slotDesc = slotRef.getDesc();
if (slotDesc.getSourceExprs().isEmpty()) {
if (slotDesc.getSourceExprs().size() != 1) {
break;
}
sourceExpr = slotDesc.getSourceExprs().get(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,13 +339,6 @@ public long beginTransaction(List<Long> tableIdList, String label, TUniqueId req
}

return tid;
} catch (DuplicatedRequestException e) {
throw e;
} catch (Exception e) {
if (MetricRepo.isInit) {
MetricRepo.COUNTER_TXN_REJECT.increase(1L);
}
throw e;
} finally {
writeUnlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,23 +132,32 @@ public long beginTransaction(long dbId, List<Long> tableIdList, String label, TU
TxnCoordinator coordinator, LoadJobSourceType sourceType, long listenerId, long timeoutSecond)
throws AnalysisException, LabelAlreadyUsedException, BeginTransactionException, DuplicatedRequestException,
QuotaExceedException, MetaNotFoundException {
try {
if (Config.disable_load_job) {
throw new AnalysisException("disable_load_job is set to true, all load jobs are prevented");
}

if (Config.disable_load_job) {
throw new AnalysisException("disable_load_job is set to true, all load jobs are prevented");
}

switch (sourceType) {
case BACKEND_STREAMING:
checkValidTimeoutSecond(timeoutSecond, Config.max_stream_load_timeout_second,
Config.min_load_timeout_second);
break;
default:
checkValidTimeoutSecond(timeoutSecond, Config.max_load_timeout_second, Config.min_load_timeout_second);
}
switch (sourceType) {
case BACKEND_STREAMING:
checkValidTimeoutSecond(timeoutSecond, Config.max_stream_load_timeout_second,
Config.min_load_timeout_second);
break;
default:
checkValidTimeoutSecond(timeoutSecond, Config.max_load_timeout_second,
Config.min_load_timeout_second);
}

DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(dbId);
return dbTransactionMgr.beginTransaction(tableIdList, label, requestId,
DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(dbId);
return dbTransactionMgr.beginTransaction(tableIdList, label, requestId,
coordinator, sourceType, listenerId, timeoutSecond);
} catch (DuplicatedRequestException e) {
throw e;
} catch (Exception e) {
if (MetricRepo.isInit) {
MetricRepo.COUNTER_TXN_REJECT.increase(1L);
}
throw e;
}
}

private void checkValidTimeoutSecond(long timeoutSecond, int maxLoadTimeoutSecond,
Expand Down
Loading

0 comments on commit 23b083d

Please sign in to comment.