diff --git a/be/src/olap/rowset/segment_v2/page_io.cpp b/be/src/olap/rowset/segment_v2/page_io.cpp index 004ebfbc84a683..0e3d891aa07b12 100644 --- a/be/src/olap/rowset/segment_v2/page_io.cpp +++ b/be/src/olap/rowset/segment_v2/page_io.cpp @@ -147,6 +147,8 @@ Status PageIO::read_and_decompress_page_(const PageReadOptions& opts, PageHandle footer_size, opts.file_reader->path().native()); } *body = Slice(page_slice.data, page_slice.size - 4 - footer_size); + // If read from cache, then should also recorded in uncompressed bytes read counter. + opts.stats->uncompressed_bytes_read += body->size; return Status::OK(); } diff --git a/be/src/util/doris_metrics.cpp b/be/src/util/doris_metrics.cpp index e7e700bfaf3b3c..7dee4e2a9fe08f 100644 --- a/be/src/util/doris_metrics.cpp +++ b/be/src/util/doris_metrics.cpp @@ -43,6 +43,8 @@ DEFINE_COUNTER_METRIC_PROTOTYPE_3ARG(fragment_requests_total, MetricUnit::REQUES "Total fragment requests received."); DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(fragment_request_duration_us, MetricUnit::MICROSECONDS); DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(query_scan_bytes, MetricUnit::BYTES); +DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(query_scan_bytes_from_local, MetricUnit::BYTES); +DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(query_scan_bytes_from_remote, MetricUnit::BYTES); DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(query_scan_rows, MetricUnit::ROWS); DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(query_scan_count, MetricUnit::NOUNIT); DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(push_requests_success_total, MetricUnit::REQUESTS, "", @@ -238,6 +240,8 @@ DorisMetrics::DorisMetrics() : _metric_registry(_s_registry_name) { INT_COUNTER_METRIC_REGISTER(_server_metric_entity, fragment_requests_total); INT_COUNTER_METRIC_REGISTER(_server_metric_entity, fragment_request_duration_us); INT_COUNTER_METRIC_REGISTER(_server_metric_entity, query_scan_bytes); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, query_scan_bytes_from_local); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, query_scan_bytes_from_remote); INT_COUNTER_METRIC_REGISTER(_server_metric_entity, query_scan_rows); INT_COUNTER_METRIC_REGISTER(_server_metric_entity, push_requests_success_total); diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h index d95eee6800e9f7..94e13554eb7654 100644 --- a/be/src/util/doris_metrics.h +++ b/be/src/util/doris_metrics.h @@ -50,6 +50,8 @@ class DorisMetrics { IntCounter* fragment_requests_total = nullptr; IntCounter* fragment_request_duration_us = nullptr; IntCounter* query_scan_bytes = nullptr; + IntCounter* query_scan_bytes_from_local = nullptr; + IntCounter* query_scan_bytes_from_remote = nullptr; IntCounter* query_scan_rows = nullptr; IntCounter* push_requests_success_total = nullptr; diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp b/be/src/vec/exec/scan/new_olap_scanner.cpp index ba5330d166cbb0..4a5883da55f1f4 100644 --- a/be/src/vec/exec/scan/new_olap_scanner.cpp +++ b/be/src/vec/exec/scan/new_olap_scanner.cpp @@ -524,7 +524,6 @@ Status NewOlapScanner::_get_block_impl(RuntimeState* state, Block* block, bool* _tablet_reader_params.tablet->read_block_count.fetch_add(1, std::memory_order_relaxed); *eof = false; } - _update_realtime_counters(); return Status::OK(); } @@ -537,17 +536,42 @@ Status NewOlapScanner::close(RuntimeState* state) { return Status::OK(); } -void NewOlapScanner::_update_realtime_counters() { +void NewOlapScanner::update_realtime_counters() { pipeline::OlapScanLocalState* local_state = static_cast(_local_state); const OlapReaderStatistics& stats = _tablet_reader->stats(); COUNTER_UPDATE(local_state->_read_compressed_counter, stats.compressed_bytes_read); - COUNTER_UPDATE(local_state->_scan_bytes, stats.compressed_bytes_read); - _tablet_reader->mutable_stats()->compressed_bytes_read = 0; - + COUNTER_UPDATE(local_state->_scan_bytes, stats.uncompressed_bytes_read); COUNTER_UPDATE(local_state->_scan_rows, stats.raw_rows_read); - // if raw_rows_read is reset, scanNode will scan all table rows which may cause BE crash + + // Make sure the scan bytes and scan rows counter in audit log is the same as the counter in + // doris metrics. + // ScanBytes is the uncompressed bytes read from local + remote + // bytes_read_from_local is the compressed bytes read from local + // bytes_read_from_remote is the compressed bytes read from remote + // scan bytes > bytes_read_from_local + bytes_read_from_remote + if (_query_statistics) { + _query_statistics->add_scan_rows(stats.raw_rows_read); + _query_statistics->add_scan_bytes(stats.uncompressed_bytes_read); + } + + // In case of no cache, we still need to update the IO stats. uncompressed bytes read == local + remote + if (stats.file_cache_stats.bytes_read_from_local == 0 && + stats.file_cache_stats.bytes_read_from_remote == 0) { + DorisMetrics::instance()->query_scan_bytes_from_local->increment( + stats.compressed_bytes_read); + } else { + DorisMetrics::instance()->query_scan_bytes_from_local->increment( + stats.file_cache_stats.bytes_read_from_local); + DorisMetrics::instance()->query_scan_bytes_from_remote->increment( + stats.file_cache_stats.bytes_read_from_remote); + } + + _tablet_reader->mutable_stats()->compressed_bytes_read = 0; + _tablet_reader->mutable_stats()->uncompressed_bytes_read = 0; _tablet_reader->mutable_stats()->raw_rows_read = 0; + _tablet_reader->mutable_stats()->file_cache_stats.bytes_read_from_local = 0; + _tablet_reader->mutable_stats()->file_cache_stats.bytes_read_from_remote = 0; } void NewOlapScanner::_collect_profile_before_close() { @@ -563,7 +587,7 @@ void NewOlapScanner::_collect_profile_before_close() { #define INCR_COUNTER(Parent) \ COUNTER_UPDATE(Parent->_io_timer, stats.io_ns); \ COUNTER_UPDATE(Parent->_read_compressed_counter, stats.compressed_bytes_read); \ - COUNTER_UPDATE(Parent->_scan_bytes, stats.compressed_bytes_read); \ + COUNTER_UPDATE(Parent->_scan_bytes, stats.uncompressed_bytes_read); \ COUNTER_UPDATE(Parent->_decompressor_timer, stats.decompress_ns); \ COUNTER_UPDATE(Parent->_read_uncompressed_counter, stats.uncompressed_bytes_read); \ COUNTER_UPDATE(Parent->_block_load_timer, stats.block_load_ns); \ @@ -710,10 +734,10 @@ void NewOlapScanner::_collect_profile_before_close() { // Update metrics DorisMetrics::instance()->query_scan_bytes->increment( - local_state->_read_compressed_counter->value()); + local_state->_read_uncompressed_counter->value()); DorisMetrics::instance()->query_scan_rows->increment(local_state->_scan_rows->value()); auto& tablet = _tablet_reader_params.tablet; - tablet->query_scan_bytes->increment(local_state->_read_compressed_counter->value()); + tablet->query_scan_bytes->increment(local_state->_read_uncompressed_counter->value()); tablet->query_scan_rows->increment(local_state->_scan_rows->value()); tablet->query_scan_count->increment(1); if (_query_statistics) { diff --git a/be/src/vec/exec/scan/new_olap_scanner.h b/be/src/vec/exec/scan/new_olap_scanner.h index fd1246b120ba77..73e867ac4ce641 100644 --- a/be/src/vec/exec/scan/new_olap_scanner.h +++ b/be/src/vec/exec/scan/new_olap_scanner.h @@ -77,13 +77,13 @@ class NewOlapScanner : public VScanner { doris::TabletStorageType get_storage_type() override; + void update_realtime_counters() override; + protected: Status _get_block_impl(RuntimeState* state, Block* block, bool* eos) override; void _collect_profile_before_close() override; private: - void _update_realtime_counters(); - Status _init_tablet_reader_params(const std::vector& key_ranges, const std::vector& filters, const pipeline::FilterPredicates& filter_predicates, diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index 1b14d172790671..f3326d3841abe1 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -313,8 +313,11 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr ctx, scan_task->set_status(status); eos = true; } - + // WorkloadGroup Policy will check cputime realtime, so that should update the counter + // as soon as possible, could not update it on close. scanner->update_scan_cpu_timer(); + scanner->update_realtime_counters(); + if (eos) { scanner->mark_to_need_to_close(); } diff --git a/be/src/vec/exec/scan/vscanner.cpp b/be/src/vec/exec/scan/vscanner.cpp index 0087a19d92f54a..96a384177deb06 100644 --- a/be/src/vec/exec/scan/vscanner.cpp +++ b/be/src/vec/exec/scan/vscanner.cpp @@ -103,8 +103,6 @@ Status VScanner::get_block(RuntimeState* state, Block* block, bool* eof) { } } - int64_t old_scan_rows = _num_rows_read; - int64_t old_scan_bytes = _num_byte_read; { do { // if step 2 filter all rows of block, and block will be reused to get next rows, @@ -136,11 +134,6 @@ Status VScanner::get_block(RuntimeState* state, Block* block, bool* eof) { _num_rows_read < rows_read_threshold); } - if (_query_statistics) { - _query_statistics->add_scan_rows(_num_rows_read - old_scan_rows); - _query_statistics->add_scan_bytes(_num_byte_read - old_scan_bytes); - } - if (state->is_cancelled()) { // TODO: Should return the specific ErrorStatus instead of just Cancelled. return Status::Cancelled("cancelled"); diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h index bb68055e1f07a3..ac6371c3abce93 100644 --- a/be/src/vec/exec/scan/vscanner.h +++ b/be/src/vec/exec/scan/vscanner.h @@ -128,6 +128,10 @@ class VScanner { void update_scan_cpu_timer(); + // Some counters need to be updated realtime, for example, workload group policy need + // scan bytes to cancel the query exceed limit. + virtual void update_realtime_counters() {} + RuntimeState* runtime_state() { return _state; } bool is_open() { return _is_open; }