From ee471e9e751fd984d7a118b181473e2876cade1a Mon Sep 17 00:00:00 2001 From: JaySon Date: Thu, 7 Mar 2024 18:12:36 +0800 Subject: [PATCH] Storage: Add the time cost about stream in local/remote (#8676) ref pingcap/tiflash#8675 --- dbms/src/Flash/Statistics/TableScanImpl.cpp | 39 ++++++++++++++++++- dbms/src/Flash/Statistics/TableScanImpl.h | 4 +- dbms/src/Operators/IOProfileInfo.h | 2 +- dbms/src/Storages/DeltaMerge/ScanContext.cpp | 6 ++- dbms/src/Storages/DeltaMerge/ScanContext.h | 10 ++++- dbms/src/Storages/DeltaMerge/Segment.cpp | 2 + .../src/Storages/KVStore/Read/LearnerRead.cpp | 2 +- 7 files changed, 56 insertions(+), 9 deletions(-) diff --git a/dbms/src/Flash/Statistics/TableScanImpl.cpp b/dbms/src/Flash/Statistics/TableScanImpl.cpp index 585ba703ae3..7316164a686 100644 --- a/dbms/src/Flash/Statistics/TableScanImpl.cpp +++ b/dbms/src/Flash/Statistics/TableScanImpl.cpp @@ -21,7 +21,15 @@ namespace DB { String TableScanDetail::toJson() const { - return fmt::format(R"({{"is_local":{},"packets":{},"bytes":{}}})", is_local, packets, bytes); + auto max_cost_ms = max_stream_cost_ns < 0 ? 0 : max_stream_cost_ns / 1'000'000.0; + auto min_cost_ms = min_stream_cost_ns < 0 ? 0 : min_stream_cost_ns / 1'000'000.0; + return fmt::format( + R"({{"is_local":{},"packets":{},"bytes":{},"max":{},"min":{}}})", + is_local, + packets, + bytes, + max_cost_ms, + min_cost_ms); } void TableScanStatistics::appendExtraJson(FmtBuffer & fmt_buffer) const @@ -57,12 +65,21 @@ void TableScanStatistics::collectExtraRuntimeDetail() { /// remote read updateTableScanDetail(cop_stream->getConnectionProfileInfos()); + // TODO: Can not get the execution time of remote read streams? } else if (const auto * local_stream = dynamic_cast(&stream); local_stream) { /// local read input stream also is IProfilingBlockInputStream - local_table_scan_detail.bytes += local_stream->getProfileInfo().bytes; + const auto & prof = local_stream->getProfileInfo(); + local_table_scan_detail.bytes += prof.bytes; + const double this_execution_time = prof.execution_time * 1.0; + if (local_table_scan_detail.max_stream_cost_ns < 0.0 // not inited + || local_table_scan_detail.max_stream_cost_ns < this_execution_time) + local_table_scan_detail.max_stream_cost_ns = this_execution_time; + if (local_table_scan_detail.min_stream_cost_ns < 0.0 // not inited + || local_table_scan_detail.min_stream_cost_ns > this_execution_time) + local_table_scan_detail.min_stream_cost_ns = this_execution_time; } else { @@ -73,9 +90,27 @@ void TableScanStatistics::collectExtraRuntimeDetail() case ExecutionMode::Pipeline: transformInBoundIOProfileForPipeline(dag_context, executor_id, [&](const IOProfileInfo & profile_info) { if (profile_info.is_local) + { local_table_scan_detail.bytes += profile_info.operator_info->bytes; + const double this_execution_time = profile_info.operator_info->execution_time * 1.0; + if (local_table_scan_detail.max_stream_cost_ns < 0.0 // not inited + || local_table_scan_detail.max_stream_cost_ns < this_execution_time) + local_table_scan_detail.max_stream_cost_ns = this_execution_time; + if (local_table_scan_detail.min_stream_cost_ns < 0.0 // not inited + || local_table_scan_detail.min_stream_cost_ns > this_execution_time) + local_table_scan_detail.min_stream_cost_ns = this_execution_time; + } else + { updateTableScanDetail(profile_info.connection_profile_infos); + const double this_execution_time = profile_info.operator_info->execution_time * 1.0; + if (remote_table_scan_detail.max_stream_cost_ns < 0.0 // not inited + || remote_table_scan_detail.max_stream_cost_ns < this_execution_time) + remote_table_scan_detail.max_stream_cost_ns = this_execution_time; + if (remote_table_scan_detail.min_stream_cost_ns < 0.0 // not inited + || remote_table_scan_detail.max_stream_cost_ns > this_execution_time) + remote_table_scan_detail.min_stream_cost_ns = this_execution_time; + } }); break; } diff --git a/dbms/src/Flash/Statistics/TableScanImpl.h b/dbms/src/Flash/Statistics/TableScanImpl.h index a4840429f46..d1f408bf12e 100644 --- a/dbms/src/Flash/Statistics/TableScanImpl.h +++ b/dbms/src/Flash/Statistics/TableScanImpl.h @@ -22,7 +22,9 @@ namespace DB { struct TableScanDetail : public ConnectionProfileInfo { - bool is_local; + const bool is_local; + double min_stream_cost_ns = -1.0; + double max_stream_cost_ns = -1.0; explicit TableScanDetail(bool is_local_) : is_local(is_local_) diff --git a/dbms/src/Operators/IOProfileInfo.h b/dbms/src/Operators/IOProfileInfo.h index 2c8312fda1a..036a33fb66d 100644 --- a/dbms/src/Operators/IOProfileInfo.h +++ b/dbms/src/Operators/IOProfileInfo.h @@ -46,7 +46,7 @@ struct IOProfileInfo OperatorProfileInfoPtr operator_info; - bool is_local; + const bool is_local; std::vector connection_profile_infos{}; RemoteExecutionSummary remote_execution_summary{}; }; diff --git a/dbms/src/Storages/DeltaMerge/ScanContext.cpp b/dbms/src/Storages/DeltaMerge/ScanContext.cpp index 99ce5c99daa..fce3291c882 100644 --- a/dbms/src/Storages/DeltaMerge/ScanContext.cpp +++ b/dbms/src/Storages/DeltaMerge/ScanContext.cpp @@ -30,8 +30,9 @@ String ScanContext::toJson() const json->set("dmfile_skip_rows", total_dmfile_skipped_rows.load()); json->set("dmfile_read_time", fmt::format("{:.3f}ms", total_dmfile_read_time_ns.load() / NS_TO_MS_SCALE)); - json->set("remote_region_num", total_remote_region_num.load()); - json->set("local_region_num", total_remote_region_num.load()); + json->set("num_remote_region", total_remote_region_num.load()); + json->set("num_local_region", total_remote_region_num.load()); + json->set("num_stale_read", num_stale_read.load()); json->set("read_bytes", user_read_bytes.load()); @@ -56,6 +57,7 @@ String ScanContext::toJson() const json->set("learner_read_time", fmt::format("{:.3f}ms", learner_read_ns.load() / NS_TO_MS_SCALE)); json->set("create_snapshot_time", fmt::format("{:.3f}ms", create_snapshot_time_ns.load() / NS_TO_MS_SCALE)); + json->set("build_stream_time", fmt::format("{:.3f}ms", build_inputstream_time_ns.load() / NS_TO_MS_SCALE)); json->set("build_bitmap_time", fmt::format("{:.3f}ms", build_bitmap_time_ns.load() / NS_TO_MS_SCALE)); std::stringstream buf; diff --git a/dbms/src/Storages/DeltaMerge/ScanContext.h b/dbms/src/Storages/DeltaMerge/ScanContext.h index ae7c5130db1..c9909fbb352 100644 --- a/dbms/src/Storages/DeltaMerge/ScanContext.h +++ b/dbms/src/Storages/DeltaMerge/ScanContext.h @@ -49,6 +49,7 @@ class ScanContext std::atomic total_remote_region_num{0}; std::atomic total_local_region_num{0}; + std::atomic num_stale_read{0}; // the read bytes from delta layer and stable layer (in-mem, decompressed) std::atomic user_read_bytes{0}; @@ -82,6 +83,7 @@ class ScanContext std::atomic learner_read_ns{0}; // Create snapshot from PageStorage std::atomic create_snapshot_time_ns{0}; + std::atomic build_inputstream_time_ns{0}; // Building bitmap std::atomic build_bitmap_time_ns{0}; @@ -138,12 +140,10 @@ class ScanContext total_dmfile_skipped_rows += other.total_dmfile_skipped_rows; total_dmfile_rough_set_index_check_time_ns += other.total_dmfile_rough_set_index_check_time_ns; total_dmfile_read_time_ns += other.total_dmfile_read_time_ns; - create_snapshot_time_ns += other.create_snapshot_time_ns; total_local_region_num += other.total_local_region_num; total_remote_region_num += other.total_remote_region_num; user_read_bytes += other.user_read_bytes; - learner_read_ns += other.learner_read_ns; disagg_read_cache_hit_size += other.disagg_read_cache_hit_size; disagg_read_cache_miss_size += other.disagg_read_cache_miss_size; @@ -157,6 +157,12 @@ class ScanContext mvcc_input_rows += other.mvcc_input_rows; mvcc_input_bytes += other.mvcc_input_bytes; mvcc_output_rows += other.mvcc_output_rows; + late_materialization_skip_rows += other.late_materialization_skip_rows; + + learner_read_ns += other.learner_read_ns; + create_snapshot_time_ns += other.create_snapshot_time_ns; + build_inputstream_time_ns += other.build_inputstream_time_ns; + build_bitmap_time_ns += other.build_bitmap_time_ns; } void merge(const tipb::TiFlashScanContext & other) diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp index 017abf647ba..04c6bf67e2b 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.cpp +++ b/dbms/src/Storages/DeltaMerge/Segment.cpp @@ -801,6 +801,8 @@ BlockInputStreamPtr Segment::getInputStream( UInt64 max_version, size_t expected_block_size) { + Stopwatch sw; + SCOPE_EXIT({ dm_context.scan_context->build_inputstream_time_ns += sw.elapsed(); }); auto clipped_block_rows = clipBlockRows( // dm_context.global_context, expected_block_size, diff --git a/dbms/src/Storages/KVStore/Read/LearnerRead.cpp b/dbms/src/Storages/KVStore/Read/LearnerRead.cpp index 9b183857117..619a838de4e 100644 --- a/dbms/src/Storages/KVStore/Read/LearnerRead.cpp +++ b/dbms/src/Storages/KVStore/Read/LearnerRead.cpp @@ -61,7 +61,7 @@ LearnerReadSnapshot doLearnerRead( if (auto * dag_context = context.getDAGContext()) { - // TODO(observability): add info about the number of stale read regions + mvcc_query_info.scan_context->num_stale_read = worker.getStats().num_stale_read; dag_context->has_read_wait_index = true; dag_context->read_wait_index_start_timestamp = start_time; dag_context->read_wait_index_end_timestamp = end_time;