Skip to content

Commit

Permalink
Merge branch 'branch-2.1' into pick-37511
Browse files Browse the repository at this point in the history
  • Loading branch information
kaijchen authored Jul 13, 2024
2 parents fef9384 + ec8467f commit 6b5860a
Show file tree
Hide file tree
Showing 108 changed files with 4,631 additions and 2,843 deletions.
23 changes: 12 additions & 11 deletions be/src/common/daemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
#include "runtime/memory/global_memory_arbitrator.h"
#include "runtime/memory/mem_tracker.h"
#include "runtime/memory/mem_tracker_limiter.h"
#include "runtime/memory/memory_reclamation.h"
#include "runtime/runtime_query_statistics_mgr.h"
#include "runtime/workload_group/workload_group_manager.h"
#include "util/cpu_info.h"
Expand Down Expand Up @@ -192,7 +193,7 @@ void Daemon::memory_maintenance_thread() {
// Refresh process memory metrics.
doris::PerfCounters::refresh_proc_status();
doris::MemInfo::refresh_proc_meminfo();
doris::GlobalMemoryArbitrator::refresh_vm_rss_sub_allocator_cache();
doris::GlobalMemoryArbitrator::reset_refresh_interval_memory_growth();

// Update and print memory stat when the memory changes by 256M.
if (abs(last_print_proc_mem - PerfCounters::get_vm_rss()) > 268435456) {
Expand Down Expand Up @@ -229,38 +230,38 @@ void Daemon::memory_gc_thread() {
if (config::disable_memory_gc) {
continue;
}
auto sys_mem_available = doris::MemInfo::sys_mem_available();
auto sys_mem_available = doris::GlobalMemoryArbitrator::sys_mem_available();
auto process_memory_usage = doris::GlobalMemoryArbitrator::process_memory_usage();

// GC excess memory for resource groups that not enable overcommit
auto tg_free_mem = doris::MemInfo::tg_disable_overcommit_group_gc();
auto tg_free_mem = doris::MemoryReclamation::tg_disable_overcommit_group_gc();
sys_mem_available += tg_free_mem;
process_memory_usage -= tg_free_mem;

if (memory_full_gc_sleep_time_ms <= 0 &&
(sys_mem_available < doris::MemInfo::sys_mem_available_low_water_mark() ||
process_memory_usage >= doris::MemInfo::mem_limit())) {
// No longer full gc and minor gc during sleep.
std::string mem_info =
doris::GlobalMemoryArbitrator::process_limit_exceeded_errmsg_str();
memory_full_gc_sleep_time_ms = memory_gc_sleep_time_ms;
memory_minor_gc_sleep_time_ms = memory_gc_sleep_time_ms;
LOG(INFO) << fmt::format(
"[MemoryGC] start full GC, {}.",
doris::GlobalMemoryArbitrator::process_limit_exceeded_errmsg_str());
LOG(INFO) << fmt::format("[MemoryGC] start full GC, {}.", mem_info);
doris::MemTrackerLimiter::print_log_process_usage();
if (doris::MemInfo::process_full_gc()) {
if (doris::MemoryReclamation::process_full_gc(std::move(mem_info))) {
// If there is not enough memory to be gc, the process memory usage will not be printed in the next continuous gc.
doris::MemTrackerLimiter::enable_print_log_process_usage();
}
} else if (memory_minor_gc_sleep_time_ms <= 0 &&
(sys_mem_available < doris::MemInfo::sys_mem_available_warning_water_mark() ||
process_memory_usage >= doris::MemInfo::soft_mem_limit())) {
// No minor gc during sleep, but full gc is possible.
std::string mem_info =
doris::GlobalMemoryArbitrator::process_soft_limit_exceeded_errmsg_str();
memory_minor_gc_sleep_time_ms = memory_gc_sleep_time_ms;
LOG(INFO) << fmt::format(
"[MemoryGC] start minor GC, {}.",
doris::GlobalMemoryArbitrator::process_soft_limit_exceeded_errmsg_str());
LOG(INFO) << fmt::format("[MemoryGC] start minor GC, {}.", mem_info);
doris::MemTrackerLimiter::print_log_process_usage();
if (doris::MemInfo::process_minor_gc()) {
if (doris::MemoryReclamation::process_minor_gc(std::move(mem_info))) {
doris::MemTrackerLimiter::enable_print_log_process_usage();
}
} else {
Expand Down
39 changes: 39 additions & 0 deletions be/src/http/action/clear_cache_action.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "http/action/clear_cache_action.h"

#include <sstream>
#include <string>

#include "http/http_channel.h"
#include "http/http_headers.h"
#include "http/http_request.h"
#include "http/http_status.h"
#include "runtime/memory/cache_manager.h"

namespace doris {

const static std::string HEADER_JSON = "application/json";

void ClearDataCacheAction::handle(HttpRequest* req) {
req->add_output_header(HttpHeaders::CONTENT_TYPE, "text/plain; version=0.0.4");
CacheManager::instance()->clear_once();
HttpChannel::send_reply(req, HttpStatus::OK, "");
}

} // end namespace doris
35 changes: 35 additions & 0 deletions be/src/http/action/clear_cache_action.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include "http/http_handler.h"

namespace doris {

class HttpRequest;

class ClearDataCacheAction : public HttpHandler {
public:
ClearDataCacheAction() = default;

~ClearDataCacheAction() override = default;

void handle(HttpRequest* req) override;
};

} // end namespace doris
5 changes: 5 additions & 0 deletions be/src/http/default_path_handlers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include "gutil/strings/substitute.h"
#include "http/action/tablets_info_action.h"
#include "http/web_page_handler.h"
#include "runtime/memory/global_memory_arbitrator.h"
#include "runtime/memory/mem_tracker.h"
#include "runtime/memory/mem_tracker_limiter.h"
#include "util/easy_json.h"
Expand Down Expand Up @@ -155,6 +156,10 @@ void mem_tracker_handler(const WebPageHandler::ArgumentMap& args, std::stringstr
MemTrackerLimiter::Type::SCHEMA_CHANGE);
} else if (iter->second == "other") {
MemTrackerLimiter::make_type_snapshots(&snapshots, MemTrackerLimiter::Type::OTHER);
} else if (iter->second == "reserved_memory") {
GlobalMemoryArbitrator::make_reserved_memory_snapshots(&snapshots);
} else if (iter->second == "all") {
MemTrackerLimiter::make_all_memory_state_snapshots(&snapshots);
}
} else {
(*output) << "<h4>*Notice:</h4>\n";
Expand Down
15 changes: 7 additions & 8 deletions be/src/olap/memtable_memory_limiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -223,14 +223,13 @@ void MemTableMemoryLimiter::refresh_mem_tracker() {
_log_timer.reset();
// if not exist load task, this log should not be printed.
if (_mem_usage != 0) {
LOG(INFO) << ss.str() << ", process mem: " << PerfCounters::get_vm_rss_str()
<< " (without allocator cache: "
<< PrettyPrinter::print_bytes(GlobalMemoryArbitrator::process_memory_usage())
<< "), load mem: " << PrettyPrinter::print_bytes(_mem_tracker->consumption())
<< ", memtable writers num: " << _writers.size()
<< " (active: " << PrettyPrinter::print_bytes(_active_mem_usage)
<< ", write: " << PrettyPrinter::print_bytes(_write_mem_usage)
<< ", flush: " << PrettyPrinter::print_bytes(_flush_mem_usage) << ")";
LOG(INFO) << fmt::format(
"{}, {}, load mem: {}, memtable writers num: {} (active: {}, write: {}, flush: {})",
ss.str(), GlobalMemoryArbitrator::process_memory_used_details_str(),
PrettyPrinter::print_bytes(_mem_tracker->consumption()), _writers.size(),
PrettyPrinter::print_bytes(_active_mem_usage),
PrettyPrinter::print_bytes(_write_mem_usage),
PrettyPrinter::print_bytes(_flush_mem_usage));
}
}

Expand Down
12 changes: 5 additions & 7 deletions be/src/olap/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2901,22 +2901,20 @@ Status Tablet::sort_block(vectorized::Block& in_block, vectorized::Block& output
vectorized::MutableBlock mutable_output_block =
vectorized::MutableBlock::build_mutable_block(&output_block);

std::vector<RowInBlock*> _row_in_blocks;
_row_in_blocks.reserve(in_block.rows());

std::shared_ptr<RowInBlockComparator> vec_row_comparator =
std::make_shared<RowInBlockComparator>(_tablet_meta->tablet_schema().get());
vec_row_comparator->set_block(&mutable_input_block);

std::vector<RowInBlock*> row_in_blocks;
std::vector<std::unique_ptr<RowInBlock>> row_in_blocks;
DCHECK(in_block.rows() <= std::numeric_limits<int>::max());
row_in_blocks.reserve(in_block.rows());
for (size_t i = 0; i < in_block.rows(); ++i) {
row_in_blocks.emplace_back(new RowInBlock {i});
row_in_blocks.emplace_back(std::make_unique<RowInBlock>(i));
}
std::sort(row_in_blocks.begin(), row_in_blocks.end(),
[&](const RowInBlock* l, const RowInBlock* r) -> bool {
auto value = (*vec_row_comparator)(l, r);
[&](const std::unique_ptr<RowInBlock>& l,
const std::unique_ptr<RowInBlock>& r) -> bool {
auto value = (*vec_row_comparator)(l.get(), r.get());
DCHECK(value != 0) << "value equel when sort block, l_pos: " << l->_row_pos
<< " r_pos: " << r->_row_pos;
return value < 0;
Expand Down
42 changes: 36 additions & 6 deletions be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include <thrift/protocol/TDebugProtocol.h>
#include <thrift/transport/TTransportException.h>

#include <algorithm>
#include <atomic>

#include "common/status.h"
Expand Down Expand Up @@ -1173,21 +1174,50 @@ void FragmentMgr::cancel_worker() {
continue;
}

auto itr = running_fes.find(q.second->coord_addr);
auto query_context = q.second;

auto itr = running_fes.find(query_context->coord_addr);
if (itr != running_fes.end()) {
if (q.second->get_fe_process_uuid() == itr->second.info.process_uuid ||
if (query_context->get_fe_process_uuid() == itr->second.info.process_uuid ||
itr->second.info.process_uuid == 0) {
continue;
} else {
LOG_WARNING("Coordinator of query {} restarted, going to cancel it.",
print_id(q.second->query_id()));
// In some rear cases, the rpc port of follower is not updated in time,
// then the port of this follower will be zero, but acutally it is still running,
// and be has already received the query from follower.
// So we need to check if host is in running_fes.
bool fe_host_is_standing = std::any_of(
running_fes.begin(), running_fes.end(),
[query_context](const auto& fe) {
return fe.first.hostname ==
query_context->coord_addr.hostname &&
fe.first.port == 0;
});
if (fe_host_is_standing) {
LOG_WARNING(
"Coordinator {}:{} is not found, but its host is still "
"running with an unstable brpc port, not going to cancel "
"it.",
query_context->coord_addr.hostname,
query_context->coord_addr.port,
print_id(query_context->query_id()));
continue;
} else {
LOG_WARNING(
"Could not find target coordinator {}:{} of query {}, "
"going to "
"cancel it.",
query_context->coord_addr.hostname,
query_context->coord_addr.port,
print_id(query_context->query_id()));
}
}
} else {
LOG_WARNING(
"Could not find target coordinator {}:{} of query {}, going to "
"cancel it.",
q.second->coord_addr.hostname, q.second->coord_addr.port,
print_id(q.second->query_id()));
query_context->coord_addr.hostname, query_context->coord_addr.port,
print_id(query_context->query_id()));
}

// Coorninator of this query has already dead.
Expand Down
7 changes: 7 additions & 0 deletions be/src/runtime/memory/cache_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,13 @@ int64_t CacheManager::for_each_cache_prune_all(RuntimeProfile* profile) {
return 0;
}

void CacheManager::clear_once() {
std::lock_guard<std::mutex> l(_caches_lock);
for (const auto& pair : _caches) {
pair.second->prune_all(true);
}
}

void CacheManager::clear_once(CachePolicy::CacheType type) {
std::lock_guard<std::mutex> l(_caches_lock);
_caches[type]->prune_all(true); // will print log
Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/memory/cache_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ class CacheManager {

int64_t for_each_cache_prune_all(RuntimeProfile* profile = nullptr);

void clear_once();
void clear_once(CachePolicy::CacheType type);

bool need_prune(int64_t* last_timestamp, const std::string& type) {
Expand Down
50 changes: 49 additions & 1 deletion be/src/runtime/memory/global_memory_arbitrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,64 @@

#include <bvar/bvar.h>

#include "runtime/thread_context.h"

namespace doris {

std::mutex GlobalMemoryArbitrator::_reserved_trackers_lock;
std::unordered_map<std::string, MemTracker::MemCounter> GlobalMemoryArbitrator::_reserved_trackers;

bvar::PassiveStatus<int64_t> g_vm_rss_sub_allocator_cache(
"meminfo_vm_rss_sub_allocator_cache",
[](void*) { return GlobalMemoryArbitrator::vm_rss_sub_allocator_cache(); }, nullptr);
bvar::PassiveStatus<int64_t> g_process_memory_usage(
"meminfo_process_memory_usage",
[](void*) { return GlobalMemoryArbitrator::process_memory_usage(); }, nullptr);
bvar::PassiveStatus<int64_t> g_sys_mem_avail(
"meminfo_sys_mem_avail", [](void*) { return GlobalMemoryArbitrator::sys_mem_available(); },
nullptr);

std::atomic<int64_t> GlobalMemoryArbitrator::_s_vm_rss_sub_allocator_cache = -1;
std::atomic<int64_t> GlobalMemoryArbitrator::_s_process_reserved_memory = 0;
std::atomic<int64_t> GlobalMemoryArbitrator::refresh_interval_memory_growth = 0;

bool GlobalMemoryArbitrator::try_reserve_process_memory(int64_t bytes) {
if (sys_mem_available() - bytes < MemInfo::sys_mem_available_low_water_mark()) {
return false;
}
int64_t old_reserved_mem = _s_process_reserved_memory.load(std::memory_order_relaxed);
int64_t new_reserved_mem = 0;
do {
new_reserved_mem = old_reserved_mem + bytes;
if (UNLIKELY(vm_rss_sub_allocator_cache() +
refresh_interval_memory_growth.load(std::memory_order_relaxed) +
new_reserved_mem >=
MemInfo::mem_limit())) {
return false;
}
} while (!_s_process_reserved_memory.compare_exchange_weak(old_reserved_mem, new_reserved_mem,
std::memory_order_relaxed));
{
std::lock_guard<std::mutex> l(_reserved_trackers_lock);
_reserved_trackers[doris::thread_context()->thread_mem_tracker()->label()].add(bytes);
}
return true;
}

void GlobalMemoryArbitrator::release_process_reserved_memory(int64_t bytes) {
_s_process_reserved_memory.fetch_sub(bytes, std::memory_order_relaxed);
{
std::lock_guard<std::mutex> l(_reserved_trackers_lock);
auto label = doris::thread_context()->thread_mem_tracker()->label();
auto it = _reserved_trackers.find(label);
if (it == _reserved_trackers.end()) {
DCHECK(false) << "release unknown reserved memory " << label << ", bytes: " << bytes;
return;
}
_reserved_trackers[label].sub(bytes);
if (_reserved_trackers[label].current_value() == 0) {
_reserved_trackers.erase(it);
}
}
}

} // namespace doris
Loading

0 comments on commit 6b5860a

Please sign in to comment.