Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions be/src/olap/rowset/segment_v2/page_io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ Status PageIO::read_and_decompress_page_(const PageReadOptions& opts, PageHandle
footer_size, opts.file_reader->path().native());
}
*body = Slice(page_slice.data, page_slice.size - 4 - footer_size);
// If read from cache, then should also recorded in uncompressed bytes read counter.
opts.stats->uncompressed_bytes_read += body->size;
return Status::OK();
}

Expand Down
4 changes: 4 additions & 0 deletions be/src/util/doris_metrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ DEFINE_COUNTER_METRIC_PROTOTYPE_3ARG(fragment_requests_total, MetricUnit::REQUES
"Total fragment requests received.");
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(fragment_request_duration_us, MetricUnit::MICROSECONDS);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(query_scan_bytes, MetricUnit::BYTES);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(query_scan_bytes_from_local, MetricUnit::BYTES);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(query_scan_bytes_from_remote, MetricUnit::BYTES);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(query_scan_rows, MetricUnit::ROWS);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(query_scan_count, MetricUnit::NOUNIT);
DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(push_requests_success_total, MetricUnit::REQUESTS, "",
Expand Down Expand Up @@ -247,6 +249,8 @@ DorisMetrics::DorisMetrics() : _metric_registry(_s_registry_name) {
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, fragment_requests_total);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, fragment_request_duration_us);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, query_scan_bytes);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, query_scan_bytes_from_local);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, query_scan_bytes_from_remote);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, query_scan_rows);

INT_COUNTER_METRIC_REGISTER(_server_metric_entity, push_requests_success_total);
Expand Down
2 changes: 2 additions & 0 deletions be/src/util/doris_metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ class DorisMetrics {
IntCounter* fragment_requests_total = nullptr;
IntCounter* fragment_request_duration_us = nullptr;
IntCounter* query_scan_bytes = nullptr;
IntCounter* query_scan_bytes_from_local = nullptr;
IntCounter* query_scan_bytes_from_remote = nullptr;
IntCounter* query_scan_rows = nullptr;

IntCounter* push_requests_success_total = nullptr;
Expand Down
49 changes: 36 additions & 13 deletions be/src/vec/exec/scan/olap_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,6 @@ Status OlapScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eof
_tablet_reader_params.tablet->read_block_count.fetch_add(1, std::memory_order_relaxed);
*eof = false;
}
_update_realtime_counters();
return Status::OK();
}

Expand All @@ -537,17 +536,45 @@ Status OlapScanner::close(RuntimeState* state) {
return Status::OK();
}

void OlapScanner::_update_realtime_counters() {
void OlapScanner::update_realtime_counters() {
pipeline::OlapScanLocalState* local_state =
static_cast<pipeline::OlapScanLocalState*>(_local_state);
const OlapReaderStatistics& stats = _tablet_reader->stats();
COUNTER_UPDATE(local_state->_read_compressed_counter, stats.compressed_bytes_read);
COUNTER_UPDATE(local_state->_scan_bytes, stats.compressed_bytes_read);
_tablet_reader->mutable_stats()->compressed_bytes_read = 0;

COUNTER_UPDATE(local_state->_scan_bytes, stats.uncompressed_bytes_read);
COUNTER_UPDATE(local_state->_scan_rows, stats.raw_rows_read);
// if raw_rows_read is reset, scanNode will scan all table rows which may cause BE crash

// Make sure the scan bytes and scan rows counter in audit log is the same as the counter in
// doris metrics.
// ScanBytes is the uncompressed bytes read from local + remote
// bytes_read_from_local is the compressed bytes read from local
// bytes_read_from_remote is the compressed bytes read from remote
// scan bytes > bytes_read_from_local + bytes_read_from_remote
_state->get_query_ctx()->resource_ctx()->io_context()->update_scan_rows(stats.raw_rows_read);
_state->get_query_ctx()->resource_ctx()->io_context()->update_scan_bytes(
stats.uncompressed_bytes_read);
_state->get_query_ctx()->resource_ctx()->io_context()->update_scan_bytes_from_local_storage(
stats.file_cache_stats.bytes_read_from_local);
_state->get_query_ctx()->resource_ctx()->io_context()->update_scan_bytes_from_remote_storage(
stats.file_cache_stats.bytes_read_from_remote);

// In case of no cache, we still need to update the IO stats. uncompressed bytes read == local + remote
if (stats.file_cache_stats.bytes_read_from_local == 0 &&
stats.file_cache_stats.bytes_read_from_remote == 0) {
DorisMetrics::instance()->query_scan_bytes_from_local->increment(
stats.compressed_bytes_read);
} else {
DorisMetrics::instance()->query_scan_bytes_from_local->increment(
stats.file_cache_stats.bytes_read_from_local);
DorisMetrics::instance()->query_scan_bytes_from_remote->increment(
stats.file_cache_stats.bytes_read_from_remote);
}

_tablet_reader->mutable_stats()->compressed_bytes_read = 0;
_tablet_reader->mutable_stats()->uncompressed_bytes_read = 0;
_tablet_reader->mutable_stats()->raw_rows_read = 0;
_tablet_reader->mutable_stats()->file_cache_stats.bytes_read_from_local = 0;
_tablet_reader->mutable_stats()->file_cache_stats.bytes_read_from_remote = 0;
}

void OlapScanner::_collect_profile_before_close() {
Expand All @@ -566,7 +593,7 @@ void OlapScanner::_collect_profile_before_close() {
auto* local_state = (pipeline::OlapScanLocalState*)_local_state;
COUNTER_UPDATE(local_state->_io_timer, stats.io_ns);
COUNTER_UPDATE(local_state->_read_compressed_counter, stats.compressed_bytes_read);
COUNTER_UPDATE(local_state->_scan_bytes, stats.compressed_bytes_read);
COUNTER_UPDATE(local_state->_scan_bytes, stats.uncompressed_bytes_read);
COUNTER_UPDATE(local_state->_decompressor_timer, stats.decompress_ns);
COUNTER_UPDATE(local_state->_read_uncompressed_counter, stats.uncompressed_bytes_read);
COUNTER_UPDATE(local_state->_block_load_timer, stats.block_load_ns);
Expand Down Expand Up @@ -701,16 +728,12 @@ void OlapScanner::_collect_profile_before_close() {

// Update metrics
DorisMetrics::instance()->query_scan_bytes->increment(
local_state->_read_compressed_counter->value());
local_state->_read_uncompressed_counter->value());
DorisMetrics::instance()->query_scan_rows->increment(local_state->_scan_rows->value());
auto& tablet = _tablet_reader_params.tablet;
tablet->query_scan_bytes->increment(local_state->_read_compressed_counter->value());
tablet->query_scan_bytes->increment(local_state->_read_uncompressed_counter->value());
tablet->query_scan_rows->increment(local_state->_scan_rows->value());
tablet->query_scan_count->increment(1);
_state->get_query_ctx()->resource_ctx()->io_context()->update_scan_bytes_from_local_storage(
stats.file_cache_stats.bytes_read_from_local);
_state->get_query_ctx()->resource_ctx()->io_context()->update_scan_bytes_from_remote_storage(
stats.file_cache_stats.bytes_read_from_remote);
}

} // namespace doris::vectorized
4 changes: 2 additions & 2 deletions be/src/vec/exec/scan/olap_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,13 @@ class OlapScanner : public Scanner {

doris::TabletStorageType get_storage_type() override;

void update_realtime_counters() override;

protected:
Status _get_block_impl(RuntimeState* state, Block* block, bool* eos) override;
void _collect_profile_before_close() override;

private:
void _update_realtime_counters();

Status _init_tablet_reader_params(const std::vector<OlapScanRange*>& key_ranges,
const std::vector<FilterOlapParam<TCondition>>& filters,
const pipeline::FilterPredicates& filter_predicates,
Expand Down
11 changes: 0 additions & 11 deletions be/src/vec/exec/scan/scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,6 @@ Status Scanner::get_block(RuntimeState* state, Block* block, bool* eof) {
}
}

#ifndef BE_TEST
int64_t old_scan_rows = _num_rows_read;
int64_t old_scan_bytes = _num_byte_read;
#endif
{
do {
// if step 2 filter all rows of block, and block will be reused to get next rows,
Expand Down Expand Up @@ -136,13 +132,6 @@ Status Scanner::get_block(RuntimeState* state, Block* block, bool* eof) {
_num_rows_read < rows_read_threshold);
}

#ifndef BE_TEST
_state->get_query_ctx()->resource_ctx()->io_context()->update_scan_rows(_num_rows_read -
old_scan_rows);
_state->get_query_ctx()->resource_ctx()->io_context()->update_scan_bytes(_num_byte_read -
old_scan_bytes);
#endif

if (state->is_cancelled()) {
// TODO: Should return the specific ErrorStatus instead of just Cancelled.
return Status::Cancelled("cancelled");
Expand Down
4 changes: 4 additions & 0 deletions be/src/vec/exec/scan/scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ class Scanner {

void update_scan_cpu_timer();

// Some counters need to be updated realtime, for example, workload group policy need
// scan bytes to cancel the query exceed limit.
virtual void update_realtime_counters() {}

RuntimeState* runtime_state() { return _state; }

bool is_open() { return _is_open; }
Expand Down
4 changes: 4 additions & 0 deletions be/src/vec/exec/scan/scanner_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,10 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
scan_task->set_status(status);
eos = true;
}
// WorkloadGroup Policy will check cputime realtime, so that should update the counter
// as soon as possible, could not update it on close.
scanner->update_scan_cpu_timer();
scanner->update_realtime_counters();

if (eos) {
scanner->mark_to_need_to_close();
Expand Down
Loading