Skip to content

Commit

Permalink
NED
Browse files Browse the repository at this point in the history
  • Loading branch information
zhiqiang-hhhh committed Jan 26, 2025
1 parent 5a9e77c commit af636d2
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 5 deletions.
8 changes: 8 additions & 0 deletions be/src/util/doris_metrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,14 @@ void DorisMetrics::initialize(bool init_system_metrics, const std::set<std::stri
scanner_push_back_scan_task_costs_0->set_value(scanner_push_back_scan_task_stats_0->mean());
scanner_push_back_scan_task_costs_1->set_value(scanner_push_back_scan_task_stats_1->mean());
scanner_get_block_for_loop_costs->set_value(scanner_get_block_for_loop_stats->mean());

scanner_get_block_stats->reduce_size(1);
scan_operator_get_block_from_queue_stats->reduce_size(1);
scanner_do_real_task_stats->reduce_size(1);
scanner_merge_block_costs_stat->reduce_size(1);
scanner_push_back_scan_task_stats_0->reduce_size(1);
scanner_push_back_scan_task_stats_1->reduce_size(1);
scanner_get_block_for_loop_stats->reduce_size(1);
});
}

Expand Down
8 changes: 8 additions & 0 deletions be/src/util/interval_histogram.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,14 @@ T IntervalHistogramStat<T>::min() {
return *std::min_element(window.begin(), window.end());
}

template <typename T>
void IntervalHistogramStat<T>::reduce_size(size_t n) {
std::unique_lock<std::shared_mutex> lock(mutex);
for (size_t i = 0; i < n && !window.empty(); ++i) {
window.pop_front();
}
}

template class doris::IntervalHistogramStat<int64>;
template class doris::IntervalHistogramStat<int32>;

Expand Down
2 changes: 2 additions & 0 deletions be/src/util/interval_histogram.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ class IntervalHistogramStat {
T median();
T max();
T min();

void reduce_size(size_t n); // New method to reduce the size of the window

private:
boost::circular_buffer<T> window;
Expand Down
5 changes: 4 additions & 1 deletion be/src/vec/exec/scan/scanner_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ using namespace std::chrono_literals;

ScanTask::~ScanTask() {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_thread_context.query_mem_tracker);
VLOG_DEBUG << fmt::format("Destroy scan task, cacheblock cnt {}", cached_blocks.size());
DorisMetrics::instance()->scanner_context_cached_block_cnt->increment(-cached_blocks.size());
for (auto& block : cached_blocks) {
VLOG_DEBUG << fmt::format("Decrease block size {}, {}", block.first->allocated_bytes(),
Expand Down Expand Up @@ -276,7 +277,6 @@ Status ScannerContext::submit_scan_task(std::shared_ptr<ScanTask> scan_task,
}

void ScannerContext::push_back_scan_task(std::shared_ptr<ScanTask> scan_task) {

MonotonicStopWatch watch0;
watch0.start();
if (scan_task->status_ok()) {
Expand Down Expand Up @@ -342,6 +342,9 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo
if (!scan_task->cached_blocks.empty()) {
auto [current_block, block_size] = std::move(scan_task->cached_blocks.front());
scan_task->cached_blocks.pop_front();
VLOG_DEBUG << fmt::format(
"ScannerContext pop front cached block, size {}, {}",
current_block->allocated_bytes(), block_size);
DorisMetrics::instance()->scanner_context_cached_block_size->increment(-block_size);
DorisMetrics::instance()->scanner_context_cached_block_cnt->increment(-1);
_block_memory_usage -= block_size;
Expand Down
11 changes: 7 additions & 4 deletions be/src/vec/exec/scan/scanner_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -271,8 +271,6 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
}
// Projection will truncate useless columns, makes block size change.
auto free_block_bytes = free_block->allocated_bytes();
DorisMetrics::instance()->scanner_context_cached_block_size->increment(
free_block_bytes);
raw_bytes_read += free_block_bytes;

if (!scan_task->cached_blocks.empty() &&
Expand All @@ -295,13 +293,18 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
// Return block succeed or not, this free_block is not used by this scan task any more.
// If block can be reused, its memory usage will be added back.
ctx->return_free_block(std::move(free_block));
ctx->inc_block_usage(scan_task->cached_blocks.back().first->allocated_bytes() -
block_size);
size_t block_usage_delta =
scan_task->cached_blocks.back().first->allocated_bytes() - block_size;
ctx->inc_block_usage(block_usage_delta);
DorisMetrics::instance()->scanner_context_cached_block_size->increment(
block_usage_delta);
DorisMetrics::instance()->scanner_merge_block_costs_stat->add(
watch2.elapsed_time());
} else {
ctx->inc_block_usage(free_block_bytes);
scan_task->cached_blocks.emplace_back(std::move(free_block), free_block_bytes);
DorisMetrics::instance()->scanner_context_cached_block_size->increment(
free_block_bytes);
DorisMetrics::instance()->scanner_context_cached_block_cnt->increment(1);
}

Expand Down

0 comments on commit af636d2

Please sign in to comment.