Skip to content

Commit

Permalink
[fix](pipelinex) fix fragment instance reports
Browse files Browse the repository at this point in the history
  • Loading branch information
kaijchen committed Sep 3, 2024
1 parent 000fc9e commit aadc13b
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 10 deletions.
16 changes: 11 additions & 5 deletions be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,6 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) {
int64_t num_rows_load_success = 0;
int64_t num_rows_load_filtered = 0;
int64_t num_rows_load_unselected = 0;
int64_t num_finished_ranges = 0;
if (req.runtime_state->num_rows_load_total() > 0 ||
req.runtime_state->num_rows_load_filtered() > 0 ||
req.runtime_state->num_finished_range() > 0) {
Expand All @@ -402,7 +401,11 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) {
num_rows_load_success = req.runtime_state->num_rows_load_success();
num_rows_load_filtered = req.runtime_state->num_rows_load_filtered();
num_rows_load_unselected = req.runtime_state->num_rows_load_unselected();
num_finished_ranges = req.runtime_state->num_finished_range();
params.__isset.fragment_instance_reports = true;
TFragmentInstanceReport t;
t.__set_fragment_instance_id(req.runtime_state->fragment_instance_id());
t.__set_num_finished_range(req.runtime_state->num_finished_range());
params.fragment_instance_reports.push_back(t);
} else if (!req.runtime_states.empty()) {
for (auto* rs : req.runtime_states) {
if (rs->num_rows_load_total() > 0 || rs->num_rows_load_filtered() > 0 ||
Expand All @@ -411,11 +414,14 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) {
num_rows_load_success += rs->num_rows_load_success();
num_rows_load_filtered += rs->num_rows_load_filtered();
num_rows_load_unselected += rs->num_rows_load_unselected();
num_finished_ranges += rs->num_finished_range();
params.__isset.fragment_instance_reports = true;
TFragmentInstanceReport t;
t.__set_fragment_instance_id(rs->fragment_instance_id());
t.__set_num_finished_range(rs->num_finished_range());
params.fragment_instance_reports.push_back(t);
}
}
}
params.__set_finished_scan_ranges(num_finished_ranges);
params.load_counters.emplace(s_dpp_normal_all, std::to_string(num_rows_load_success));
params.load_counters.emplace(s_dpp_abnormal_all, std::to_string(num_rows_load_filtered));
params.load_counters.emplace(s_unselected_rows, std::to_string(num_rows_load_unselected));
Expand Down Expand Up @@ -528,7 +534,7 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) {
TReportExecStatusResult res;
Status rpc_status;

VLOG_DEBUG << "reportExecStatus params is "
LOG(INFO) << "[debug][DORIS-12389] reportExecStatus params is "
<< apache::thrift::ThriftDebugString(params).c_str();
if (!exec_status.ok()) {
LOG(WARNING) << "report error status: " << exec_status.msg()
Expand Down
22 changes: 17 additions & 5 deletions fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
import org.apache.doris.thrift.TExternalScanRange;
import org.apache.doris.thrift.TFileScanRange;
import org.apache.doris.thrift.TFileScanRangeParams;
import org.apache.doris.thrift.TFragmentInstanceReport;
import org.apache.doris.thrift.THivePartitionUpdate;
import org.apache.doris.thrift.TIcebergCommitData;
import org.apache.doris.thrift.TNetworkAddress;
Expand Down Expand Up @@ -2448,11 +2449,22 @@ public void updateFragmentExecStatus(TReportExecStatusParams params) {
}

if (params.isSetLoadedRows() && jobId != -1) {
Env.getCurrentEnv().getLoadManager().updateJobProgress(
jobId, params.getBackendId(), params.getQueryId(), params.getFragmentInstanceId(),
params.getLoadedRows(), params.getLoadedBytes(), params.isDone());
Env.getCurrentEnv().getProgressManager().updateProgress(String.valueOf(jobId),
params.getQueryId(), params.getFragmentInstanceId(), params.getFinishedScanRanges());
LOG.info("[debug][DORIS-12389] got fragment instance reports: {}", params);
if (params.isSetFragmentInstanceReports()) {
for (TFragmentInstanceReport report : params.getFragmentInstanceReports()) {
Env.getCurrentEnv().getLoadManager().updateJobProgress(
jobId, params.getBackendId(), params.getQueryId(), report.getFragmentInstanceId(),
params.getLoadedRows(), params.getLoadedBytes(), params.isDone());
Env.getCurrentEnv().getProgressManager().updateProgress(String.valueOf(jobId),
params.getQueryId(), report.getFragmentInstanceId(), report.getNumFinishedRange());
}
} else {
Env.getCurrentEnv().getLoadManager().updateJobProgress(
jobId, params.getBackendId(), params.getQueryId(), params.getFragmentInstanceId(),
params.getLoadedRows(), params.getLoadedBytes(), params.isDone());
Env.getCurrentEnv().getProgressManager().updateProgress(String.valueOf(jobId),
params.getQueryId(), params.getFragmentInstanceId(), params.getFinishedScanRanges());
}
}
}

Expand Down
8 changes: 8 additions & 0 deletions gensrc/thrift/FrontendService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,12 @@ struct TQueryProfile {
5: optional list<RuntimeProfile.TRuntimeProfileTree> load_channel_profiles
}

struct TFragmentInstanceReport {
1: optional Types.TUniqueId fragment_instance_id;
2: optional i32 num_finished_range;
}


// The results of an INSERT query, sent to the coordinator as part of
// TReportExecStatusParams
struct TReportExecStatusParams {
Expand Down Expand Up @@ -509,6 +515,8 @@ struct TReportExecStatusParams {

29: optional i64 txn_id
30: optional string label

31: optional list<TFragmentInstanceReport> fragment_instance_reports;
}

struct TFeResult {
Expand Down

0 comments on commit aadc13b

Please sign in to comment.