Skip to content

Commit

Permalink
[fix](multi-table) fix unknown source slot descriptor when load multi…
Browse files Browse the repository at this point in the history
… table (apache#25762)
  • Loading branch information
sollhui authored and 胥剑旭 committed Dec 14, 2023
1 parent dc02e04 commit 9520e1d
Show file tree
Hide file tree
Showing 9 changed files with 142 additions and 13 deletions.
14 changes: 10 additions & 4 deletions be/src/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -254,13 +254,19 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re
fragment_context->set_is_report_success(request.query_options.is_report_success);
}

auto* desc_tbl = _query_ctx->desc_tbl;
_runtime_state->set_desc_tbl(desc_tbl);
if (request.is_simplified_param) {
_desc_tbl = _query_ctx->desc_tbl;
} else {
DCHECK(request.__isset.desc_tbl);
RETURN_IF_ERROR(
DescriptorTbl::create(_runtime_state->obj_pool(), request.desc_tbl, &_desc_tbl));
}
_runtime_state->set_desc_tbl(_desc_tbl);

// 2. Create ExecNode to build pipeline with PipelineFragmentContext
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(
ExecNode::create_tree(_runtime_state.get(), _runtime_state->obj_pool(),
request.fragment.plan, *desc_tbl, &_root_plan));
request.fragment.plan, *_desc_tbl, &_root_plan));

// Set senders of exchange nodes before pipeline build
std::vector<ExecNode*> exch_nodes;
Expand Down Expand Up @@ -316,7 +322,7 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(DataSink::create_data_sink(
_runtime_state->obj_pool(), request.fragment.output_sink,
request.fragment.output_exprs, request, idx, _root_plan->row_desc(),
_runtime_state.get(), &_sink, *desc_tbl));
_runtime_state.get(), &_sink, *_desc_tbl));
}

_root_pipeline = fragment_context->add_pipeline();
Expand Down
2 changes: 2 additions & 0 deletions be/src/pipeline/pipeline_fragment_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,8 @@ class PipelineFragmentContext : public std::enable_shared_from_this<PipelineFrag
// profile reporting-related
report_status_callback _report_status_cb;

DescriptorTbl* _desc_tbl;

private:
static bool _has_inverted_index_or_partial_update(TOlapTableSink sink);
std::vector<std::unique_ptr<PipelineTask>> _tasks;
Expand Down
14 changes: 10 additions & 4 deletions be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,14 @@ Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& r
_runtime_state->set_load_job_id(request.load_job_id);
}

auto* desc_tbl = _query_ctx->desc_tbl;
_runtime_state->set_desc_tbl(desc_tbl);
if (request.is_simplified_param) {
_desc_tbl = _query_ctx->desc_tbl;
} else {
DCHECK(request.__isset.desc_tbl);
RETURN_IF_ERROR(
DescriptorTbl::create(_runtime_state->obj_pool(), request.desc_tbl, &_desc_tbl));
}
_runtime_state->set_desc_tbl(_desc_tbl);
_runtime_state->set_num_per_fragment_instances(request.num_senders);

// 2. Build pipelines with operators in this fragment.
Expand All @@ -217,7 +223,7 @@ Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& r
}
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_create_data_sink(
_runtime_state->obj_pool(), request.fragment.output_sink, request.fragment.output_exprs,
request, root_pipeline->output_row_desc(), _runtime_state.get(), *desc_tbl,
request, root_pipeline->output_row_desc(), _runtime_state.get(), *_desc_tbl,
root_pipeline->id()));
RETURN_IF_ERROR(_sink->init(request.fragment.output_sink));
static_cast<void>(root_pipeline->set_sink(_sink));
Expand Down Expand Up @@ -407,7 +413,7 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
_runtime_states[i]->set_load_job_id(request.load_job_id);
}

_runtime_states[i]->set_desc_tbl(_query_ctx->desc_tbl);
_runtime_states[i]->set_desc_tbl(_desc_tbl);
_runtime_states[i]->set_per_fragment_instance_idx(local_params.sender_id);
_runtime_states[i]->set_num_per_fragment_instances(request.num_senders);
_runtime_states[i]->resize_op_id_to_local_state(max_operator_id());
Expand Down
14 changes: 10 additions & 4 deletions be/src/runtime/plan_fragment_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -165,13 +165,19 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request) {
}

// set up desc tbl
DescriptorTbl* desc_tbl = _query_ctx->desc_tbl;
_runtime_state->set_desc_tbl(desc_tbl);
if (request.is_simplified_param) {
_desc_tbl = _query_ctx->desc_tbl;
} else {
DCHECK(request.__isset.desc_tbl);
RETURN_IF_ERROR(
DescriptorTbl::create(_runtime_state->obj_pool(), request.desc_tbl, &_desc_tbl));
}
_runtime_state->set_desc_tbl(_desc_tbl);

// set up plan
DCHECK(request.__isset.fragment);
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(ExecNode::create_tree(
_runtime_state.get(), obj_pool(), request.fragment.plan, *desc_tbl, &_plan));
_runtime_state.get(), obj_pool(), request.fragment.plan, *_desc_tbl, &_plan));

// set #senders of exchange nodes before calling Prepare()
std::vector<ExecNode*> exch_nodes;
Expand Down Expand Up @@ -222,7 +228,7 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request) {
if (request.fragment.__isset.output_sink) {
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(DataSink::create_data_sink(
obj_pool(), request.fragment.output_sink, request.fragment.output_exprs, params,
row_desc(), runtime_state(), &_sink, *desc_tbl));
row_desc(), runtime_state(), &_sink, *_desc_tbl));
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_sink->prepare(runtime_state()));

RuntimeProfile* sink_profile = _sink->profile();
Expand Down
2 changes: 2 additions & 0 deletions be/src/runtime/plan_fragment_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,8 @@ class PlanFragmentExecutor {

bool _group_commit = false;

DescriptorTbl* _desc_tbl;

ObjectPool* obj_pool() { return _runtime_state->obj_pool(); }

// typedef for TPlanFragmentExecParams.per_node_scan_ranges
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
routine_load_dup_tbl_basic|49|2023-08-08|FALSE|\N|16275|-2144851675|-2303421957908954634|-46526938720058765|-13141.142578|-686632233.230200|229942298.0|-152553823.0|2022-09-01 00:16:01|2023-03-25|2022-09-07 14:59:03|s||yvuILR2iNxfe8RRml|{"student": true, "name": "Alice", "grade": 9, "subjects": ["math", "science", "history"]}|true|1|2|3|4|5|6.0|7.0|888888888|999999999|2023-08-24|2023-08-24 12:00:00|2023-08-24|2023-08-24 12:00:00|我能吞下玻璃而不伤身体|我能吞下玻璃而不伤身体|我能吞下玻璃而不伤身体|{}
routine_load_uniq_tbl_basic|49|2023-08-08|FALSE|\N|16275|-2144851675|-2303421957908954634|-46526938720058765|-13141.142578|-686632233.230200|229942298.0|-152553823.0|2022-09-01 00:16:01|2023-03-25|2022-09-07 14:59:03|s||yvuILR2iNxfe8RRml|{"student": true, "name": "Alice", "grade": 9, "subjects": ["math", "science", "history"]}|true|1|2|3|4|5|6.0|7.0|888888888|999999999|2023-08-24|2023-08-24 12:00:00|2023-08-24|2023-08-24 12:00:00|我能吞下玻璃而不伤身体|我能吞下玻璃而不伤身体|我能吞下玻璃而不伤身体|{}
routine_load_mow_tbl_basic|49|2023-08-08|FALSE|\N|16275|-2144851675|-2303421957908954634|-46526938720058765|-13141.142578|-686632233.230200|229942298.0|-152553823.0|2022-09-01 00:16:01|2023-03-25|2022-09-07 14:59:03|s||yvuILR2iNxfe8RRml|{"student": true, "name": "Alice", "grade": 9, "subjects": ["math", "science", "history"]}|true|1|2|3|4|5|6.0|7.0|888888888|999999999|2023-08-24|2023-08-24 12:00:00|2023-08-24|2023-08-24 12:00:00|我能吞下玻璃而不伤身体|我能吞下玻璃而不伤身体|我能吞下玻璃而不伤身体|{}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
routine_load_dup_tbl_basic|{"k00": "8", "k01": "2023-08-14", "k02": "1", "k03": "109", "k04": "-31573", "k05": "-1362465190", "k06": "3990845741226497177", "k07": "2732763251146840270", "k08": "-25698.553", "k09": "1312831962.5678179", "k10": "\\N", "k11": "\\N", "k12": "2023-03-07 14:13:19", "k13": "2022-10-18", "k14": "2023-07-16 05:03:13", "k15": "D", "k16": "", "k17": "PBn1wa6X8WneZYLMac11zzyhGl7tPXB5XgjmOV8L6uav9ja5oY433ktb2yhyQQIqBveZPkme", "k18": "\\N"}
routine_load_uniq_tbl_basic|{"k00": "8", "k01": "2023-08-14", "k02": "1", "k03": "109", "k04": "-31573", "k05": "-1362465190", "k06": "3990845741226497177", "k07": "2732763251146840270", "k08": "-25698.553", "k09": "1312831962.5678179", "k10": "\\N", "k11": "\\N", "k12": "2023-03-07 14:13:19", "k13": "2022-10-18", "k14": "2023-07-16 05:03:13", "k15": "D", "k16": "", "k17": "PBn1wa6X8WneZYLMac11zzyhGl7tPXB5XgjmOV8L6uav9ja5oY433ktb2yhyQQIqBveZPkme", "k18": "\\N"}
routine_load_mow_tbl_basic|{"k00": "8", "k01": "2023-08-14", "k02": "1", "k03": "109", "k04": "-31573", "k05": "-1362465190", "k06": "3990845741226497177", "k07": "2732763251146840270", "k08": "-25698.553", "k09": "1312831962.5678179", "k10": "\\N", "k11": "\\N", "k12": "2023-03-07 14:13:19", "k13": "2022-10-18", "k14": "2023-07-16 05:03:13", "k15": "D", "k16": "", "k17": "PBn1wa6X8WneZYLMac11zzyhGl7tPXB5XgjmOV8L6uav9ja5oY433ktb2yhyQQIqBveZPkme", "k18": "\\N"}
14 changes: 13 additions & 1 deletion regression-test/data/load_p0/routine_load/test_routine_load.out
Original file line number Diff line number Diff line change
Expand Up @@ -986,4 +986,16 @@
49 2023-08-08 false \N 16275 -2144851675 -2303421957908954634 -46526938720058765 -13141.143 -6.866322332302E8 99999999.9 -99999999.9 2022-09-01T00:16:01 2023-03-25 2022-09-07T14:59:03 s yvuILR2iNxfe8RRml {"student":true,"name":"Alice","grade":9,"subjects":["math","science","history"]}

-- !sql_multi_table_one_data --
8 2023-08-14 true 109 -31573 -1362465190 3990845741226497177 2732763251146840270 -25698.553 1.312831962567818E9 \N \N 2023-03-07T14:13:19 2022-10-18 2023-07-16T05:03:13 D PBn1wa6X8WneZYLMac11zzyhGl7tPXB5XgjmOV8L6uav9ja5oY433ktb2yhyQQIqBveZPkme \N
8 2023-08-14 true 109 -31573 -1362465190 3990845741226497177 2732763251146840270 -25698.553 1.312831962567818E9 \N \N 2023-03-07T14:13:19 2022-10-18 2023-07-16T05:03:13 D PBn1wa6X8WneZYLMac11zzyhGl7tPXB5XgjmOV8L6uav9ja5oY433ktb2yhyQQIqBveZPkme \N

-- !sql_multi_table --
49 2023-08-08 false \N 16275 -2144851675 -2303421957908954634 -46526938720058765 -13141.143 -6.866322332302E8 99999999.9 -99999999.9 2022-09-01T00:16:01 2023-03-25 2022-09-07T14:59:03 s yvuILR2iNxfe8RRml {"student":true,"name":"Alice","grade":9,"subjects":["math","science","history"]} true 1 2 3 4 5 6.0 7.0 888888888 999999999 2023-08-24 2023-08-24T12:00 2023-08-24 2023-08-24T12:00 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 {}

-- !sql_multi_table --
49 2023-08-08 false \N 16275 -2144851675 -2303421957908954634 -46526938720058765 -13141.143 -6.866322332302E8 99999999.9 -99999999.9 2022-09-01T00:16:01 2023-03-25 2022-09-07T14:59:03 s yvuILR2iNxfe8RRml {"student":true,"name":"Alice","grade":9,"subjects":["math","science","history"]} true 1 2 3 4 5 6.0 7.0 888888888 999999999 2023-08-24 2023-08-24T12:00 2023-08-24 2023-08-24T12:00 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 {}

-- !sql_multi_table --
8 2023-08-14 true 109 -31573 -1362465190 3990845741226497177 2732763251146840270 -25698.553 1.312831962567818E9 \N \N 2023-03-07T14:13:19 2022-10-18 2023-07-16T05:03:13 D PBn1wa6X8WneZYLMac11zzyhGl7tPXB5XgjmOV8L6uav9ja5oY433ktb2yhyQQIqBveZPkme \N true 1 2 3 4 5 6.0 7.0 888888888 999999999 2023-08-24 2023-08-24T12:00 2023-08-24 2023-08-24T12:00 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 \N

-- !sql_multi_table --
8 2023-08-14 true 109 -31573 -1362465190 3990845741226497177 2732763251146840270 -25698.553 1.312831962567818E9 \N \N 2023-03-07T14:13:19 2022-10-18 2023-07-16T05:03:13 D PBn1wa6X8WneZYLMac11zzyhGl7tPXB5XgjmOV8L6uav9ja5oY433ktb2yhyQQIqBveZPkme \N true 1 2 3 4 5 6.0 7.0 888888888 999999999 2023-08-24 2023-08-24T12:00 2023-08-24 2023-08-24T12:00 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 \N
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ suite("test_routine_load","p0") {
"dup_tbl_basic_multi_table",
]

def multiTables1 = [
"dup_tbl_basic",
"uniq_tbl_basic",
]

def jobs = [
"dup_tbl_basic_job",
"uniq_tbl_basic_job",
Expand Down Expand Up @@ -148,6 +153,11 @@ suite("test_routine_load","p0") {
"multi_table_json",
]

def multiTableJobName1 = [
"multi_table_csv1",
"multi_table_json1",
]

def formats = [
"csv",
"json",
Expand Down Expand Up @@ -1030,4 +1040,83 @@ suite("test_routine_load","p0") {
j++
}
}

if (enabled != null && enabled.equalsIgnoreCase("true")) {
def j = 0
for (String jobName in multiTableJobName1) {
try {
for (String tableName in multiTables1) {
sql new File("""${context.file.parent}/ddl/${tableName}_drop.sql""").text
sql new File("""${context.file.parent}/ddl/${tableName}_create.sql""").text
}

sql """
CREATE ROUTINE LOAD ${jobName}
COLUMNS TERMINATED BY "|"
PROPERTIES
(
"max_batch_interval" = "5",
"format" = "${formats[j]}",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200"
)
FROM KAFKA
(
"kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
"kafka_topic" = "${jobName}",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
"""
sql "sync"

i = 0
for (String tableName in multiTables1) {
while (true) {
sleep(1000)
def res = sql "show routine load for ${jobName}"
def state = res[0][8].toString()
if (state == "NEED_SCHEDULE") {
continue;
}
assertEquals(res[0][8].toString(), "RUNNING")
break;
}

def count = 0
def tableName1 = "routine_load_" + tableName
while (true) {
def res = sql "select count(*) from ${tableName1}"
def state = sql "show routine load for ${jobName}"
log.info("routine load state: ${state[0][8].toString()}".toString())
log.info("routine load statistic: ${state[0][14].toString()}".toString())
log.info("reason of state changed: ${state[0][17].toString()}".toString())
if (res[0][0] > 0) {
break
}
if (count >= 120) {
log.error("routine load can not visible for long time")
assertEquals(20, res[0][0])
break
}
sleep(5000)
count++
}

if (i <= 3) {
qt_sql_multi_table "select * from ${tableName1} order by k00,k01"
} else {
qt_sql_multi_table "select * from ${tableName1} order by k00"
}

i++
}
} finally {
sql "stop routine load for ${jobName}"
for (String tableName in multiTables1) {
sql new File("""${context.file.parent}/ddl/${tableName}_drop.sql""").text
}
}
j++
}
}
}

0 comments on commit 9520e1d

Please sign in to comment.