diff --git a/programs/local/LocalServer.cpp b/programs/local/LocalServer.cpp index 449f43f0d69..e207bfecf64 100644 --- a/programs/local/LocalServer.cpp +++ b/programs/local/LocalServer.cpp @@ -134,6 +134,8 @@ namespace ServerSetting extern const ServerSettingsUInt64 max_prefixes_deserialization_thread_pool_size; extern const ServerSettingsUInt64 max_prefixes_deserialization_thread_pool_free_size; extern const ServerSettingsUInt64 prefixes_deserialization_thread_pool_thread_pool_queue_size; + extern const ServerSettingsUInt64 memory_worker_period_ms; + extern const ServerSettingsBool memory_worker_correct_memory_tracker; } namespace ErrorCodes @@ -236,6 +238,15 @@ void LocalServer::initialize(Poco::Util::Application & self) }); #endif +#if defined(OS_LINUX) + memory_worker = std::make_unique( + server_settings[ServerSetting::memory_worker_period_ms], + server_settings[ServerSetting::memory_worker_correct_memory_tracker], + /* use_cgroup */ true, + nullptr); + memory_worker->start(); +#endif + getIOThreadPool().initialize( server_settings[ServerSetting::max_io_thread_pool_size], server_settings[ServerSetting::max_io_thread_pool_free_size], diff --git a/programs/local/LocalServer.h b/programs/local/LocalServer.h index 3cc42364a3b..f0e2ca56f89 100644 --- a/programs/local/LocalServer.h +++ b/programs/local/LocalServer.h @@ -8,6 +8,7 @@ #include #include #include +#include #include #include @@ -91,6 +92,8 @@ class LocalServer : public ClientApplicationBase, public Loggers private: void cleanStreamingQuery(); + + std::unique_ptr memory_worker; }; } diff --git a/programs/local/chdb.cpp b/programs/local/chdb.cpp index 16fe99c9a36..02a9f4d0b8d 100644 --- a/programs/local/chdb.cpp +++ b/programs/local/chdb.cpp @@ -10,9 +10,41 @@ #include "PythonTableCache.h" #endif +extern thread_local bool chdb_destructor_cleanup_in_progress; + namespace CHDB { +/** + * RAII guard for accurate memory tracking in chDB external interfaces + * + * When Python (or other programming language) threads call chDB-provided interfaces + * such as chdb_destroy_query_result, the memory released cannot be accurately tracked + * by ClickHouse's MemoryTracker, which may lead to false reports of insufficient memory. + * + * Therefore, for all externally exposed chDB interfaces, ChdbDestructorGuard must be + * used at the beginning of execution to provide thread marking, enabling MemoryTracker + * to accurately track memory changes. + */ +class ChdbDestructorGuard +{ +public: + ChdbDestructorGuard() + { + chdb_destructor_cleanup_in_progress = true; + } + + ~ChdbDestructorGuard() + { + chdb_destructor_cleanup_in_progress = false; + } + + ChdbDestructorGuard(const ChdbDestructorGuard &) = delete; + ChdbDestructorGuard & operator=(const ChdbDestructorGuard &) = delete; + ChdbDestructorGuard(ChdbDestructorGuard &&) = delete; + ChdbDestructorGuard & operator=(ChdbDestructorGuard &&) = delete; +}; + static std::shared_mutex global_connection_mutex; static std::mutex CHDB_MUTEX; chdb_conn * global_conn_ptr = nullptr; @@ -222,9 +254,6 @@ static std::pair createQueryResult(DB::LocalServer * serve else if (!req.isIteration()) { server->streaming_query_context = std::make_shared(); - /// TODO: support memory tracker for streaming query - server->streaming_query_context->limit = total_memory_tracker.getHardLimit(); - total_memory_tracker.setHardLimit(0); query_result = createStreamingQueryResult(server, req); is_end = !query_result->getError().empty(); @@ -246,9 +275,6 @@ static std::pair createQueryResult(DB::LocalServer * serve { if (server->streaming_query_context) { - total_memory_tracker.resetCounters(); - MemoryTracker::updateRSS(0); - total_memory_tracker.setHardLimit(server->streaming_query_context->limit); server->streaming_query_context.reset(); } #if USE_PYTHON @@ -429,6 +455,8 @@ using namespace CHDB; local_result * query_stable(int argc, char ** argv) { + ChdbDestructorGuard guard; + auto query_result = pyEntryClickHouseLocal(argc, argv); if (!query_result->getError().empty() || query_result->result_buffer == nullptr) return nullptr; @@ -445,6 +473,8 @@ local_result * query_stable(int argc, char ** argv) void free_result(local_result * result) { + ChdbDestructorGuard guard; + if (!result) { return; @@ -460,6 +490,8 @@ void free_result(local_result * result) local_result_v2 * query_stable_v2(int argc, char ** argv) { + ChdbDestructorGuard guard; + // pyEntryClickHouseLocal may throw some serious exceptions, although it's not likely // to happen in the context of clickhouse-local. we catch them here and return an error local_result_v2 * res = nullptr; @@ -489,6 +521,8 @@ local_result_v2 * query_stable_v2(int argc, char ** argv) void free_result_v2(local_result_v2 * result) { + ChdbDestructorGuard guard; + if (!result) return; @@ -693,6 +727,8 @@ void close_conn(chdb_conn ** conn) struct local_result_v2 * query_conn(chdb_conn * conn, const char * query, const char * format) { + ChdbDestructorGuard guard; + // Add connection validity check under global lock std::shared_lock global_lock(global_connection_mutex); @@ -707,6 +743,8 @@ struct local_result_v2 * query_conn(chdb_conn * conn, const char * query, const chdb_streaming_result * query_conn_streaming(chdb_conn * conn, const char * query, const char * format) { + ChdbDestructorGuard guard; + // Add connection validity check under global lock std::shared_lock global_lock(global_connection_mutex); @@ -744,6 +782,8 @@ const char * chdb_streaming_result_error(chdb_streaming_result * result) local_result_v2 * chdb_streaming_fetch_result(chdb_conn * conn, chdb_streaming_result * result) { + ChdbDestructorGuard guard; + // Add connection validity check under global lock std::shared_lock global_lock(global_connection_mutex); @@ -758,6 +798,8 @@ local_result_v2 * chdb_streaming_fetch_result(chdb_conn * conn, chdb_streaming_r void chdb_streaming_cancel_query(chdb_conn * conn, chdb_streaming_result * result) { + ChdbDestructorGuard guard; + // Add connection validity check under global lock std::shared_lock global_lock(global_connection_mutex); @@ -766,15 +808,19 @@ void chdb_streaming_cancel_query(chdb_conn * conn, chdb_streaming_result * resul auto * queue = static_cast(conn->queue); auto query_result = executeQueryRequest(queue, nullptr, nullptr, CHDB::QueryType::TYPE_STREAMING_ITER, result, true); + query_result.reset(); } void chdb_destroy_result(chdb_streaming_result * result) { + ChdbDestructorGuard guard; + if (!result) return; auto stream_query_result = reinterpret_cast(result); + delete stream_query_result; } @@ -799,6 +845,8 @@ void chdb_close_conn(chdb_connection * conn) chdb_result * chdb_query(chdb_connection conn, const char * query, const char * format) { + ChdbDestructorGuard guard; + std::shared_lock global_lock(global_connection_mutex); if (!conn) @@ -823,6 +871,8 @@ chdb_result * chdb_query(chdb_connection conn, const char * query, const char * chdb_result * chdb_query_cmdline(int argc, char ** argv) { + ChdbDestructorGuard guard; + MaterializedQueryResult * result = nullptr; try { @@ -844,6 +894,8 @@ chdb_result * chdb_query_cmdline(int argc, char ** argv) chdb_result * chdb_stream_query(chdb_connection conn, const char * query, const char * format) { + ChdbDestructorGuard guard; + std::shared_lock global_lock(global_connection_mutex); if (!conn) @@ -873,6 +925,8 @@ chdb_result * chdb_stream_query(chdb_connection conn, const char * query, const chdb_result * chdb_stream_fetch_result(chdb_connection conn, chdb_result * result) { + ChdbDestructorGuard guard; + std::shared_lock global_lock(global_connection_mutex); if (!conn) @@ -903,6 +957,8 @@ chdb_result * chdb_stream_fetch_result(chdb_connection conn, chdb_result * resul void chdb_stream_cancel_query(chdb_connection conn, chdb_result * result) { + ChdbDestructorGuard guard; + std::shared_lock global_lock(global_connection_mutex); if (!result || !conn) @@ -919,6 +975,8 @@ void chdb_stream_cancel_query(chdb_connection conn, chdb_result * result) void chdb_destroy_query_result(chdb_result * result) { + ChdbDestructorGuard guard; + if (!result) return; diff --git a/src/Client/ClientBase.h b/src/Client/ClientBase.h index 743f8efb2c7..cb3e0a939ac 100644 --- a/src/Client/ClientBase.h +++ b/src/Client/ClientBase.h @@ -84,7 +84,6 @@ struct StreamingQueryContext ASTPtr parsed_query; void * streaming_result = nullptr; bool is_streaming_query = true; - Int64 limit = 0; StreamingQueryContext() = default; }; diff --git a/src/Common/CurrentMemoryTracker.cpp b/src/Common/CurrentMemoryTracker.cpp index e0aca0ecf15..e590b451f22 100644 --- a/src/Common/CurrentMemoryTracker.cpp +++ b/src/Common/CurrentMemoryTracker.cpp @@ -9,6 +9,8 @@ thread_local bool memory_tracker_always_throw_logical_error_on_allocation = false; #endif +thread_local bool chdb_destructor_cleanup_in_progress = false; + namespace DB { namespace ErrorCodes @@ -31,6 +33,9 @@ MemoryTracker * getMemoryTracker() if (DB::MainThreadStatus::get()) return &total_memory_tracker; + if (chdb_destructor_cleanup_in_progress) + return &total_memory_tracker; + return nullptr; } @@ -70,18 +75,23 @@ AllocationTrace CurrentMemoryTracker::allocImpl(Int64 size, bool throw_if_memory } else { - Int64 will_be = current_thread->untracked_memory + size; - - if (will_be > current_thread->untracked_memory_limit) + Int64 previous_untracked_memory = current_thread->untracked_memory; + current_thread->untracked_memory += size; + if (current_thread->untracked_memory > current_thread->untracked_memory_limit) { - auto res = memory_tracker->allocImpl(will_be, throw_if_memory_exceeded); + Int64 current_untracked_memory = current_thread->untracked_memory; current_thread->untracked_memory = 0; - return res; - } - /// Update after successful allocations, - /// since failed allocations should not be take into account. - current_thread->untracked_memory = will_be; + try + { + return memory_tracker->allocImpl(current_untracked_memory, throw_if_memory_exceeded); + } + catch (...) + { + current_thread->untracked_memory += previous_untracked_memory; + throw; + } + } } return AllocationTrace(memory_tracker->getSampleProbability(size)); diff --git a/src/Common/MemoryWorker.cpp b/src/Common/MemoryWorker.cpp index a4a25594f26..b57d6f5c2b3 100644 --- a/src/Common/MemoryWorker.cpp +++ b/src/Common/MemoryWorker.cpp @@ -236,7 +236,7 @@ MemoryWorker::MemoryWorker(uint64_t period_ms_, bool correct_tracker_, bool use_ static constexpr uint64_t cgroups_memory_usage_tick_ms{50}; const auto [cgroup_path, version] = getCgroupsPath(); - LOG_INFO( + LOG_TRACE( getLogger("CgroupsReader"), "Will create cgroup reader from '{}' (cgroups version: {})", cgroup_path, @@ -275,7 +275,7 @@ void MemoryWorker::start() if (source == MemoryUsageSource::None) return; - LOG_INFO( + LOG_TRACE( getLogger("MemoryWorker"), "Starting background memory thread with period of {}ms, using {} as source", period_ms, diff --git a/src/Common/ThreadStatus.cpp b/src/Common/ThreadStatus.cpp index b7e60ae10e4..bf1a4e4ff7e 100644 --- a/src/Common/ThreadStatus.cpp +++ b/src/Common/ThreadStatus.cpp @@ -229,8 +229,9 @@ void ThreadStatus::flushUntrackedMemory() if (untracked_memory == 0) return; - memory_tracker.adjustWithUntrackedMemory(untracked_memory); + Int64 current_untracked_memory = current_thread->untracked_memory; untracked_memory = 0; + memory_tracker.adjustWithUntrackedMemory(current_untracked_memory); } bool ThreadStatus::isQueryCanceled() const diff --git a/src/Core/BackgroundSchedulePool.cpp b/src/Core/BackgroundSchedulePool.cpp index 9297ec3212a..ad9270a2304 100644 --- a/src/Core/BackgroundSchedulePool.cpp +++ b/src/Core/BackgroundSchedulePool.cpp @@ -287,6 +287,8 @@ void BackgroundSchedulePool::threadFunction() { TaskInfoPtr task; + current_thread->flushUntrackedMemory(); + { UniqueLock tasks_lock(tasks_mutex); @@ -306,6 +308,8 @@ void BackgroundSchedulePool::threadFunction() if (task) task->execute(); + + current_thread->flushUntrackedMemory(); } } diff --git a/src/Storages/MergeTree/MergeTreeBackgroundExecutor.cpp b/src/Storages/MergeTree/MergeTreeBackgroundExecutor.cpp index 4c1d775488d..325e2051d0d 100644 --- a/src/Storages/MergeTree/MergeTreeBackgroundExecutor.cpp +++ b/src/Storages/MergeTree/MergeTreeBackgroundExecutor.cpp @@ -346,6 +346,8 @@ void MergeTreeBackgroundExecutor::threadFunction() { setThreadName(name.c_str()); + current_thread->flushUntrackedMemory(); + DENY_ALLOCATIONS_IN_SCOPE; while (true) @@ -373,6 +375,8 @@ void MergeTreeBackgroundExecutor::threadFunction() tryLogCurrentException(__PRETTY_FUNCTION__); }); } + + current_thread->flushUntrackedMemory(); } }