From 3b83a342e5cb1cc1f638aa375bcbdc0499e219c0 Mon Sep 17 00:00:00 2001 From: Kaijie Chen Date: Tue, 3 Sep 2024 20:56:53 +0800 Subject: [PATCH] [fix](pipelinex) fix fragment instance reports --- be/src/runtime/fragment_mgr.cpp | 14 ++++++++---- .../java/org/apache/doris/qe/Coordinator.java | 22 ++++++++++++++----- gensrc/thrift/FrontendService.thrift | 8 +++++++ 3 files changed, 35 insertions(+), 9 deletions(-) diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index e93f8beec07c39..91eb10f3653a4a 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -393,7 +393,6 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) { int64_t num_rows_load_success = 0; int64_t num_rows_load_filtered = 0; int64_t num_rows_load_unselected = 0; - int64_t num_finished_ranges = 0; if (req.runtime_state->num_rows_load_total() > 0 || req.runtime_state->num_rows_load_filtered() > 0 || req.runtime_state->num_finished_range() > 0) { @@ -402,7 +401,11 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) { num_rows_load_success = req.runtime_state->num_rows_load_success(); num_rows_load_filtered = req.runtime_state->num_rows_load_filtered(); num_rows_load_unselected = req.runtime_state->num_rows_load_unselected(); - num_finished_ranges = req.runtime_state->num_finished_range(); + params.__isset.fragment_instance_reports = true; + TFragmentInstanceReport t; + t.__set_fragment_instance_id(req.runtime_state->fragment_instance_id()); + t.__set_num_finished_range(req.runtime_state->num_finished_range()); + params.fragment_instance_reports.emplace_back(t); } else if (!req.runtime_states.empty()) { for (auto* rs : req.runtime_states) { if (rs->num_rows_load_total() > 0 || rs->num_rows_load_filtered() > 0 || @@ -411,11 +414,14 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) { num_rows_load_success += rs->num_rows_load_success(); num_rows_load_filtered += rs->num_rows_load_filtered(); num_rows_load_unselected += rs->num_rows_load_unselected(); - num_finished_ranges += rs->num_finished_range(); + params.__isset.fragment_instance_reports = true; + TFragmentInstanceReport t; + t.__set_fragment_instance_id(rs->fragment_instance_id()); + t.__set_num_finished_range(rs->num_finished_range()); + params.fragment_instance_reports.push_back(t); } } } - params.__set_finished_scan_ranges(num_finished_ranges); params.load_counters.emplace(s_dpp_normal_all, std::to_string(num_rows_load_success)); params.load_counters.emplace(s_dpp_abnormal_all, std::to_string(num_rows_load_filtered)); params.load_counters.emplace(s_unselected_rows, std::to_string(num_rows_load_unselected)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 0cbe3eebbab9f1..3cee5dd9308e2a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -93,6 +93,7 @@ import org.apache.doris.thrift.TExternalScanRange; import org.apache.doris.thrift.TFileScanRange; import org.apache.doris.thrift.TFileScanRangeParams; +import org.apache.doris.thrift.TFragmentInstanceReport; import org.apache.doris.thrift.THivePartitionUpdate; import org.apache.doris.thrift.TIcebergCommitData; import org.apache.doris.thrift.TNetworkAddress; @@ -2448,11 +2449,22 @@ public void updateFragmentExecStatus(TReportExecStatusParams params) { } if (params.isSetLoadedRows() && jobId != -1) { - Env.getCurrentEnv().getLoadManager().updateJobProgress( - jobId, params.getBackendId(), params.getQueryId(), params.getFragmentInstanceId(), - params.getLoadedRows(), params.getLoadedBytes(), params.isDone()); - Env.getCurrentEnv().getProgressManager().updateProgress(String.valueOf(jobId), - params.getQueryId(), params.getFragmentInstanceId(), params.getFinishedScanRanges()); + LOG.info("[debug][DORIS-12389] got fragment instance reports: {}", params); + if (params.isSetFragmentInstanceReports()) { + for (TFragmentInstanceReport report : params.getFragmentInstanceReports()) { + Env.getCurrentEnv().getLoadManager().updateJobProgress( + jobId, params.getBackendId(), params.getQueryId(), report.getFragmentInstanceId(), + params.getLoadedRows(), params.getLoadedBytes(), params.isDone()); + Env.getCurrentEnv().getProgressManager().updateProgress(String.valueOf(jobId), + params.getQueryId(), report.getFragmentInstanceId(), report.getFragmentInstanceReports()); + } + } else { + Env.getCurrentEnv().getLoadManager().updateJobProgress( + jobId, params.getBackendId(), params.getQueryId(), params.getFragmentInstanceId(), + params.getLoadedRows(), params.getLoadedBytes(), params.isDone()); + Env.getCurrentEnv().getProgressManager().updateProgress(String.valueOf(jobId), + params.getQueryId(), params.getFragmentInstanceId(), params.getFinishedScanRanges()); + } } } diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 765e7790e90781..5a93427f3c2b15 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -434,6 +434,12 @@ struct TQueryProfile { 5: optional list load_channel_profiles } +struct TFragmentInstanceReport { + 1: optional Types.TUniqueId fragment_instance_id; + 2: optional i32 num_finished_range; +} + + // The results of an INSERT query, sent to the coordinator as part of // TReportExecStatusParams struct TReportExecStatusParams { @@ -509,6 +515,8 @@ struct TReportExecStatusParams { 29: optional i64 txn_id 30: optional string label + + 31: optional list fragment_instance_reports; } struct TFeResult {