Skip to content

Commit

Permalink
Storage: Add the time cost about stream in local/remote (#8676)
Browse files Browse the repository at this point in the history
ref #8675
  • Loading branch information
JaySon-Huang authored Mar 7, 2024
1 parent 07a9a21 commit ee471e9
Show file tree
Hide file tree
Showing 7 changed files with 56 additions and 9 deletions.
39 changes: 37 additions & 2 deletions dbms/src/Flash/Statistics/TableScanImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,15 @@ namespace DB
{
String TableScanDetail::toJson() const
{
return fmt::format(R"({{"is_local":{},"packets":{},"bytes":{}}})", is_local, packets, bytes);
auto max_cost_ms = max_stream_cost_ns < 0 ? 0 : max_stream_cost_ns / 1'000'000.0;
auto min_cost_ms = min_stream_cost_ns < 0 ? 0 : min_stream_cost_ns / 1'000'000.0;
return fmt::format(
R"({{"is_local":{},"packets":{},"bytes":{},"max":{},"min":{}}})",
is_local,
packets,
bytes,
max_cost_ms,
min_cost_ms);
}

void TableScanStatistics::appendExtraJson(FmtBuffer & fmt_buffer) const
Expand Down Expand Up @@ -57,12 +65,21 @@ void TableScanStatistics::collectExtraRuntimeDetail()
{
/// remote read
updateTableScanDetail(cop_stream->getConnectionProfileInfos());
// TODO: Can not get the execution time of remote read streams?
}
else if (const auto * local_stream = dynamic_cast<const IProfilingBlockInputStream *>(&stream);
local_stream)
{
/// local read input stream also is IProfilingBlockInputStream
local_table_scan_detail.bytes += local_stream->getProfileInfo().bytes;
const auto & prof = local_stream->getProfileInfo();
local_table_scan_detail.bytes += prof.bytes;
const double this_execution_time = prof.execution_time * 1.0;
if (local_table_scan_detail.max_stream_cost_ns < 0.0 // not inited
|| local_table_scan_detail.max_stream_cost_ns < this_execution_time)
local_table_scan_detail.max_stream_cost_ns = this_execution_time;
if (local_table_scan_detail.min_stream_cost_ns < 0.0 // not inited
|| local_table_scan_detail.min_stream_cost_ns > this_execution_time)
local_table_scan_detail.min_stream_cost_ns = this_execution_time;
}
else
{
Expand All @@ -73,9 +90,27 @@ void TableScanStatistics::collectExtraRuntimeDetail()
case ExecutionMode::Pipeline:
transformInBoundIOProfileForPipeline(dag_context, executor_id, [&](const IOProfileInfo & profile_info) {
if (profile_info.is_local)
{
local_table_scan_detail.bytes += profile_info.operator_info->bytes;
const double this_execution_time = profile_info.operator_info->execution_time * 1.0;
if (local_table_scan_detail.max_stream_cost_ns < 0.0 // not inited
|| local_table_scan_detail.max_stream_cost_ns < this_execution_time)
local_table_scan_detail.max_stream_cost_ns = this_execution_time;
if (local_table_scan_detail.min_stream_cost_ns < 0.0 // not inited
|| local_table_scan_detail.min_stream_cost_ns > this_execution_time)
local_table_scan_detail.min_stream_cost_ns = this_execution_time;
}
else
{
updateTableScanDetail(profile_info.connection_profile_infos);
const double this_execution_time = profile_info.operator_info->execution_time * 1.0;
if (remote_table_scan_detail.max_stream_cost_ns < 0.0 // not inited
|| remote_table_scan_detail.max_stream_cost_ns < this_execution_time)
remote_table_scan_detail.max_stream_cost_ns = this_execution_time;
if (remote_table_scan_detail.min_stream_cost_ns < 0.0 // not inited
|| remote_table_scan_detail.max_stream_cost_ns > this_execution_time)
remote_table_scan_detail.min_stream_cost_ns = this_execution_time;
}
});
break;
}
Expand Down
4 changes: 3 additions & 1 deletion dbms/src/Flash/Statistics/TableScanImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ namespace DB
{
struct TableScanDetail : public ConnectionProfileInfo
{
bool is_local;
const bool is_local;
double min_stream_cost_ns = -1.0;
double max_stream_cost_ns = -1.0;

explicit TableScanDetail(bool is_local_)
: is_local(is_local_)
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Operators/IOProfileInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ struct IOProfileInfo

OperatorProfileInfoPtr operator_info;

bool is_local;
const bool is_local;
std::vector<ConnectionProfileInfo> connection_profile_infos{};
RemoteExecutionSummary remote_execution_summary{};
};
Expand Down
6 changes: 4 additions & 2 deletions dbms/src/Storages/DeltaMerge/ScanContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ String ScanContext::toJson() const
json->set("dmfile_skip_rows", total_dmfile_skipped_rows.load());
json->set("dmfile_read_time", fmt::format("{:.3f}ms", total_dmfile_read_time_ns.load() / NS_TO_MS_SCALE));

json->set("remote_region_num", total_remote_region_num.load());
json->set("local_region_num", total_remote_region_num.load());
json->set("num_remote_region", total_remote_region_num.load());
json->set("num_local_region", total_remote_region_num.load());
json->set("num_stale_read", num_stale_read.load());

json->set("read_bytes", user_read_bytes.load());

Expand All @@ -56,6 +57,7 @@ String ScanContext::toJson() const

json->set("learner_read_time", fmt::format("{:.3f}ms", learner_read_ns.load() / NS_TO_MS_SCALE));
json->set("create_snapshot_time", fmt::format("{:.3f}ms", create_snapshot_time_ns.load() / NS_TO_MS_SCALE));
json->set("build_stream_time", fmt::format("{:.3f}ms", build_inputstream_time_ns.load() / NS_TO_MS_SCALE));
json->set("build_bitmap_time", fmt::format("{:.3f}ms", build_bitmap_time_ns.load() / NS_TO_MS_SCALE));

std::stringstream buf;
Expand Down
10 changes: 8 additions & 2 deletions dbms/src/Storages/DeltaMerge/ScanContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class ScanContext

std::atomic<uint64_t> total_remote_region_num{0};
std::atomic<uint64_t> total_local_region_num{0};
std::atomic<uint64_t> num_stale_read{0};

// the read bytes from delta layer and stable layer (in-mem, decompressed)
std::atomic<uint64_t> user_read_bytes{0};
Expand Down Expand Up @@ -82,6 +83,7 @@ class ScanContext
std::atomic<uint64_t> learner_read_ns{0};
// Create snapshot from PageStorage
std::atomic<uint64_t> create_snapshot_time_ns{0};
std::atomic<uint64_t> build_inputstream_time_ns{0};
// Building bitmap
std::atomic<uint64_t> build_bitmap_time_ns{0};

Expand Down Expand Up @@ -138,12 +140,10 @@ class ScanContext
total_dmfile_skipped_rows += other.total_dmfile_skipped_rows;
total_dmfile_rough_set_index_check_time_ns += other.total_dmfile_rough_set_index_check_time_ns;
total_dmfile_read_time_ns += other.total_dmfile_read_time_ns;
create_snapshot_time_ns += other.create_snapshot_time_ns;

total_local_region_num += other.total_local_region_num;
total_remote_region_num += other.total_remote_region_num;
user_read_bytes += other.user_read_bytes;
learner_read_ns += other.learner_read_ns;
disagg_read_cache_hit_size += other.disagg_read_cache_hit_size;
disagg_read_cache_miss_size += other.disagg_read_cache_miss_size;

Expand All @@ -157,6 +157,12 @@ class ScanContext
mvcc_input_rows += other.mvcc_input_rows;
mvcc_input_bytes += other.mvcc_input_bytes;
mvcc_output_rows += other.mvcc_output_rows;
late_materialization_skip_rows += other.late_materialization_skip_rows;

learner_read_ns += other.learner_read_ns;
create_snapshot_time_ns += other.create_snapshot_time_ns;
build_inputstream_time_ns += other.build_inputstream_time_ns;
build_bitmap_time_ns += other.build_bitmap_time_ns;
}

void merge(const tipb::TiFlashScanContext & other)
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Storages/DeltaMerge/Segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -801,6 +801,8 @@ BlockInputStreamPtr Segment::getInputStream(
UInt64 max_version,
size_t expected_block_size)
{
Stopwatch sw;
SCOPE_EXIT({ dm_context.scan_context->build_inputstream_time_ns += sw.elapsed(); });
auto clipped_block_rows = clipBlockRows( //
dm_context.global_context,
expected_block_size,
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/KVStore/Read/LearnerRead.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ LearnerReadSnapshot doLearnerRead(

if (auto * dag_context = context.getDAGContext())
{
// TODO(observability): add info about the number of stale read regions
mvcc_query_info.scan_context->num_stale_read = worker.getStats().num_stale_read;
dag_context->has_read_wait_index = true;
dag_context->read_wait_index_start_timestamp = start_time;
dag_context->read_wait_index_end_timestamp = end_time;
Expand Down

0 comments on commit ee471e9

Please sign in to comment.