diff --git a/be/src/olap/rowset/segment_v2/page_io.cpp b/be/src/olap/rowset/segment_v2/page_io.cpp index 513d4a3e4f8743..d6cfda3be24f22 100644 --- a/be/src/olap/rowset/segment_v2/page_io.cpp +++ b/be/src/olap/rowset/segment_v2/page_io.cpp @@ -146,6 +146,8 @@ Status PageIO::read_and_decompress_page_(const PageReadOptions& opts, PageHandle footer_size, opts.file_reader->path().native()); } *body = Slice(page_slice.data, page_slice.size - 4 - footer_size); + // If read from cache, then should also recorded in uncompressed bytes read counter. + opts.stats->uncompressed_bytes_read += body->size; return Status::OK(); } diff --git a/be/src/util/doris_metrics.cpp b/be/src/util/doris_metrics.cpp index b82b39e4c7d1db..c0c0660b4c08d2 100644 --- a/be/src/util/doris_metrics.cpp +++ b/be/src/util/doris_metrics.cpp @@ -43,6 +43,8 @@ DEFINE_COUNTER_METRIC_PROTOTYPE_3ARG(fragment_requests_total, MetricUnit::REQUES "Total fragment requests received."); DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(fragment_request_duration_us, MetricUnit::MICROSECONDS); DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(query_scan_bytes, MetricUnit::BYTES); +DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(query_scan_bytes_from_local, MetricUnit::BYTES); +DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(query_scan_bytes_from_remote, MetricUnit::BYTES); DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(query_scan_rows, MetricUnit::ROWS); DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(query_scan_count, MetricUnit::NOUNIT); DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(push_requests_success_total, MetricUnit::REQUESTS, "", @@ -247,6 +249,8 @@ DorisMetrics::DorisMetrics() : _metric_registry(_s_registry_name) { INT_COUNTER_METRIC_REGISTER(_server_metric_entity, fragment_requests_total); INT_COUNTER_METRIC_REGISTER(_server_metric_entity, fragment_request_duration_us); INT_COUNTER_METRIC_REGISTER(_server_metric_entity, query_scan_bytes); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, query_scan_bytes_from_local); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, query_scan_bytes_from_remote); INT_COUNTER_METRIC_REGISTER(_server_metric_entity, query_scan_rows); INT_COUNTER_METRIC_REGISTER(_server_metric_entity, push_requests_success_total); diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h index f4641d466af11f..10f1ff0fca25f3 100644 --- a/be/src/util/doris_metrics.h +++ b/be/src/util/doris_metrics.h @@ -50,6 +50,8 @@ class DorisMetrics { IntCounter* fragment_requests_total = nullptr; IntCounter* fragment_request_duration_us = nullptr; IntCounter* query_scan_bytes = nullptr; + IntCounter* query_scan_bytes_from_local = nullptr; + IntCounter* query_scan_bytes_from_remote = nullptr; IntCounter* query_scan_rows = nullptr; IntCounter* push_requests_success_total = nullptr; diff --git a/be/src/vec/exec/scan/olap_scanner.cpp b/be/src/vec/exec/scan/olap_scanner.cpp index 274f7b31802006..d1c0fcb51a35d4 100644 --- a/be/src/vec/exec/scan/olap_scanner.cpp +++ b/be/src/vec/exec/scan/olap_scanner.cpp @@ -525,7 +525,6 @@ Status OlapScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eof _tablet_reader_params.tablet->read_block_count.fetch_add(1, std::memory_order_relaxed); *eof = false; } - _update_realtime_counters(); return Status::OK(); } @@ -537,17 +536,45 @@ Status OlapScanner::close(RuntimeState* state) { return Status::OK(); } -void OlapScanner::_update_realtime_counters() { +void OlapScanner::update_realtime_counters() { pipeline::OlapScanLocalState* local_state = static_cast(_local_state); const OlapReaderStatistics& stats = _tablet_reader->stats(); COUNTER_UPDATE(local_state->_read_compressed_counter, stats.compressed_bytes_read); - COUNTER_UPDATE(local_state->_scan_bytes, stats.compressed_bytes_read); - _tablet_reader->mutable_stats()->compressed_bytes_read = 0; - + COUNTER_UPDATE(local_state->_scan_bytes, stats.uncompressed_bytes_read); COUNTER_UPDATE(local_state->_scan_rows, stats.raw_rows_read); - // if raw_rows_read is reset, scanNode will scan all table rows which may cause BE crash + + // Make sure the scan bytes and scan rows counter in audit log is the same as the counter in + // doris metrics. + // ScanBytes is the uncompressed bytes read from local + remote + // bytes_read_from_local is the compressed bytes read from local + // bytes_read_from_remote is the compressed bytes read from remote + // scan bytes > bytes_read_from_local + bytes_read_from_remote + _state->get_query_ctx()->resource_ctx()->io_context()->update_scan_rows(stats.raw_rows_read); + _state->get_query_ctx()->resource_ctx()->io_context()->update_scan_bytes( + stats.uncompressed_bytes_read); + _state->get_query_ctx()->resource_ctx()->io_context()->update_scan_bytes_from_local_storage( + stats.file_cache_stats.bytes_read_from_local); + _state->get_query_ctx()->resource_ctx()->io_context()->update_scan_bytes_from_remote_storage( + stats.file_cache_stats.bytes_read_from_remote); + + // In case of no cache, we still need to update the IO stats. uncompressed bytes read == local + remote + if (stats.file_cache_stats.bytes_read_from_local == 0 && + stats.file_cache_stats.bytes_read_from_remote == 0) { + DorisMetrics::instance()->query_scan_bytes_from_local->increment( + stats.compressed_bytes_read); + } else { + DorisMetrics::instance()->query_scan_bytes_from_local->increment( + stats.file_cache_stats.bytes_read_from_local); + DorisMetrics::instance()->query_scan_bytes_from_remote->increment( + stats.file_cache_stats.bytes_read_from_remote); + } + + _tablet_reader->mutable_stats()->compressed_bytes_read = 0; + _tablet_reader->mutable_stats()->uncompressed_bytes_read = 0; _tablet_reader->mutable_stats()->raw_rows_read = 0; + _tablet_reader->mutable_stats()->file_cache_stats.bytes_read_from_local = 0; + _tablet_reader->mutable_stats()->file_cache_stats.bytes_read_from_remote = 0; } void OlapScanner::_collect_profile_before_close() { @@ -566,7 +593,7 @@ void OlapScanner::_collect_profile_before_close() { auto* local_state = (pipeline::OlapScanLocalState*)_local_state; COUNTER_UPDATE(local_state->_io_timer, stats.io_ns); COUNTER_UPDATE(local_state->_read_compressed_counter, stats.compressed_bytes_read); - COUNTER_UPDATE(local_state->_scan_bytes, stats.compressed_bytes_read); + COUNTER_UPDATE(local_state->_scan_bytes, stats.uncompressed_bytes_read); COUNTER_UPDATE(local_state->_decompressor_timer, stats.decompress_ns); COUNTER_UPDATE(local_state->_read_uncompressed_counter, stats.uncompressed_bytes_read); COUNTER_UPDATE(local_state->_block_load_timer, stats.block_load_ns); @@ -701,16 +728,12 @@ void OlapScanner::_collect_profile_before_close() { // Update metrics DorisMetrics::instance()->query_scan_bytes->increment( - local_state->_read_compressed_counter->value()); + local_state->_read_uncompressed_counter->value()); DorisMetrics::instance()->query_scan_rows->increment(local_state->_scan_rows->value()); auto& tablet = _tablet_reader_params.tablet; - tablet->query_scan_bytes->increment(local_state->_read_compressed_counter->value()); + tablet->query_scan_bytes->increment(local_state->_read_uncompressed_counter->value()); tablet->query_scan_rows->increment(local_state->_scan_rows->value()); tablet->query_scan_count->increment(1); - _state->get_query_ctx()->resource_ctx()->io_context()->update_scan_bytes_from_local_storage( - stats.file_cache_stats.bytes_read_from_local); - _state->get_query_ctx()->resource_ctx()->io_context()->update_scan_bytes_from_remote_storage( - stats.file_cache_stats.bytes_read_from_remote); } } // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/olap_scanner.h b/be/src/vec/exec/scan/olap_scanner.h index 9058ec237910c1..64b62b64052240 100644 --- a/be/src/vec/exec/scan/olap_scanner.h +++ b/be/src/vec/exec/scan/olap_scanner.h @@ -77,13 +77,13 @@ class OlapScanner : public Scanner { doris::TabletStorageType get_storage_type() override; + void update_realtime_counters() override; + protected: Status _get_block_impl(RuntimeState* state, Block* block, bool* eos) override; void _collect_profile_before_close() override; private: - void _update_realtime_counters(); - Status _init_tablet_reader_params(const std::vector& key_ranges, const std::vector>& filters, const pipeline::FilterPredicates& filter_predicates, diff --git a/be/src/vec/exec/scan/scanner.cpp b/be/src/vec/exec/scan/scanner.cpp index 150d683c5b9cb1..c3ba75e0193f09 100644 --- a/be/src/vec/exec/scan/scanner.cpp +++ b/be/src/vec/exec/scan/scanner.cpp @@ -101,10 +101,6 @@ Status Scanner::get_block(RuntimeState* state, Block* block, bool* eof) { } } -#ifndef BE_TEST - int64_t old_scan_rows = _num_rows_read; - int64_t old_scan_bytes = _num_byte_read; -#endif { do { // if step 2 filter all rows of block, and block will be reused to get next rows, @@ -136,13 +132,6 @@ Status Scanner::get_block(RuntimeState* state, Block* block, bool* eof) { _num_rows_read < rows_read_threshold); } -#ifndef BE_TEST - _state->get_query_ctx()->resource_ctx()->io_context()->update_scan_rows(_num_rows_read - - old_scan_rows); - _state->get_query_ctx()->resource_ctx()->io_context()->update_scan_bytes(_num_byte_read - - old_scan_bytes); -#endif - if (state->is_cancelled()) { // TODO: Should return the specific ErrorStatus instead of just Cancelled. return Status::Cancelled("cancelled"); diff --git a/be/src/vec/exec/scan/scanner.h b/be/src/vec/exec/scan/scanner.h index c682dd7d7583d5..f5973c87502e08 100644 --- a/be/src/vec/exec/scan/scanner.h +++ b/be/src/vec/exec/scan/scanner.h @@ -136,6 +136,10 @@ class Scanner { void update_scan_cpu_timer(); + // Some counters need to be updated realtime, for example, workload group policy need + // scan bytes to cancel the query exceed limit. + virtual void update_realtime_counters() {} + RuntimeState* runtime_state() { return _state; } bool is_open() { return _is_open; } diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index fe3fd5b0fe8c18..3d430e2d564504 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -364,6 +364,10 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr ctx, scan_task->set_status(status); eos = true; } + // WorkloadGroup Policy will check cputime realtime, so that should update the counter + // as soon as possible, could not update it on close. + scanner->update_scan_cpu_timer(); + scanner->update_realtime_counters(); if (eos) { scanner->mark_to_need_to_close();