Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions be/src/olap/storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,10 @@ void StorageEngine::stop() {
_cold_data_compaction_thread_pool->shutdown();
}

if (_cooldown_thread_pool) {
_cooldown_thread_pool->shutdown();
}

_memtable_flush_executor.reset(nullptr);
_calc_delete_bitmap_executor.reset(nullptr);

Expand Down
34 changes: 9 additions & 25 deletions be/src/olap/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,30 +168,6 @@ void set_last_failure_time(Tablet* tablet, const Compaction& compaction, int64_t

} // namespace

struct WriteCooldownMetaExecutors {
WriteCooldownMetaExecutors(size_t executor_nums = 5);

static WriteCooldownMetaExecutors* get_instance() {
static WriteCooldownMetaExecutors instance;
return &instance;
}

void submit(TabletSharedPtr tablet);
size_t _get_executor_pos(int64_t tablet_id) const {
return std::hash<int64_t>()(tablet_id) % _executor_nums;
};
// Each executor is a mpsc to ensure uploads of the same tablet meta are not concurrent
// FIXME(AlexYue): Use mpsc instead of `ThreadPool` with 1 thread
// We use PriorityThreadPool since it would call status inside it's `shutdown` function.
// Consider one situation where the StackTraceCache's singleton is detructed before
// this WriteCooldownMetaExecutors's singleton, then invoking the status would also call
// StackTraceCache which would then result in heap use after free like #23834
std::vector<std::unique_ptr<PriorityThreadPool>> _executors;
std::unordered_set<int64_t> _pending_tablets;
std::mutex _latch;
size_t _executor_nums;
};

WriteCooldownMetaExecutors::WriteCooldownMetaExecutors(size_t executor_nums)
: _executor_nums(executor_nums) {
for (size_t i = 0; i < _executor_nums; i++) {
Expand All @@ -205,6 +181,14 @@ WriteCooldownMetaExecutors::WriteCooldownMetaExecutors(size_t executor_nums)
}
}

void WriteCooldownMetaExecutors::stop() {
for (auto& pool_ptr : _executors) {
if (pool_ptr) {
pool_ptr->shutdown();
}
}
}

void WriteCooldownMetaExecutors::WriteCooldownMetaExecutors::submit(TabletSharedPtr tablet) {
auto tablet_id = tablet->tablet_id();

Expand Down Expand Up @@ -1960,7 +1944,7 @@ Status check_version_continuity(const std::vector<RowsetMetaSharedPtr>& rs_metas
// It's guaranteed the write cooldown meta task would be invoked at the end unless BE crashes
// one tablet would at most have one async task to be done
void Tablet::async_write_cooldown_meta(TabletSharedPtr tablet) {
WriteCooldownMetaExecutors::get_instance()->submit(std::move(tablet));
ExecEnv::GetInstance()->write_cooldown_meta_executors()->submit(std::move(tablet));
}

bool Tablet::update_cooldown_conf(int64_t cooldown_term, int64_t cooldown_replica_id) {
Expand Down
25 changes: 21 additions & 4 deletions be/src/olap/tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,13 @@
#include <cstddef>
#include <cstdint>
#include <functional>
#include <limits>
#include <list>
#include <map>
#include <memory>
#include <mutex>
#include <ostream>
#include <set>
#include <shared_mutex>
#include <string>
#include <string_view>
#include <unordered_map>
#include <utility>
#include <vector>

Expand Down Expand Up @@ -84,6 +80,27 @@ enum TabletStorageType { STORAGE_TYPE_LOCAL, STORAGE_TYPE_REMOTE, STORAGE_TYPE_R

static inline constexpr auto TRACE_TABLET_LOCK_THRESHOLD = std::chrono::seconds(1);

struct WriteCooldownMetaExecutors {
WriteCooldownMetaExecutors(size_t executor_nums = 5);

void stop();

void submit(TabletSharedPtr tablet);
size_t _get_executor_pos(int64_t tablet_id) const {
return std::hash<int64_t>()(tablet_id) % _executor_nums;
};
// Each executor is a mpsc to ensure uploads of the same tablet meta are not concurrent
// FIXME(AlexYue): Use mpsc instead of `ThreadPool` with 1 thread
// We use PriorityThreadPool since it would call status inside it's `shutdown` function.
// Consider one situation where the StackTraceCache's singleton is detructed before
// this WriteCooldownMetaExecutors's singleton, then invoking the status would also call
// StackTraceCache which would then result in heap use after free like #23834
std::vector<std::unique_ptr<PriorityThreadPool>> _executors;
std::unordered_set<int64_t> _pending_tablets;
std::mutex _latch;
size_t _executor_nums;
};

class Tablet final : public BaseTablet {
public:
Tablet(StorageEngine& engine, TabletMetaSharedPtr tablet_meta, DataDir* data_dir,
Expand Down
4 changes: 3 additions & 1 deletion be/src/runtime/exec_env.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
#include "olap/tablet_manager.h"
#include "runtime/fragment_mgr.h"
#include "runtime/frontend_info.h"
#include "time.h"
#include "util/debug_util.h"
#include "util/time.h"
#include "vec/sink/delta_writer_v2_pool.h"
Expand All @@ -48,6 +47,9 @@ ExecEnv::~ExecEnv() {
void ExecEnv::set_storage_engine(std::unique_ptr<BaseStorageEngine>&& engine) {
_storage_engine = std::move(engine);
}
void ExecEnv::set_write_cooldown_meta_executors() {
_write_cooldown_meta_executors = std::make_unique<WriteCooldownMetaExecutors>();
}
#endif // BE_TEST

Result<BaseTabletSPtr> ExecEnv::get_tablet(int64_t tablet_id) {
Expand Down
10 changes: 6 additions & 4 deletions be/src/runtime/exec_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,16 @@

#include <common/multi_version.h>

#include <algorithm>
#include <atomic>
#include <cstddef>
#include <map>
#include <memory>
#include <mutex>
#include <string>
#include <unordered_map>
#include <vector>

#include "common/status.h"
#include "io/cache/fs_file_cache_storage.h"
#include "olap/memtable_memory_limiter.h"
#include "olap/olap_define.h"
#include "olap/options.h"
#include "olap/rowset/segment_v2/inverted_index_writer.h"
#include "olap/tablet_fwd.h"
Expand All @@ -53,6 +49,7 @@ class BlockedTaskScheduler;
struct RuntimeFilterTimerQueue;
} // namespace pipeline
class WorkloadGroupMgr;
struct WriteCooldownMetaExecutors;
namespace io {
class FileCacheFactory;
class FDCache;
Expand Down Expand Up @@ -232,6 +229,9 @@ class ExecEnv {
MemTableMemoryLimiter* memtable_memory_limiter() { return _memtable_memory_limiter.get(); }
WalManager* wal_mgr() { return _wal_manager.get(); }
DNSCache* dns_cache() { return _dns_cache; }
WriteCooldownMetaExecutors* write_cooldown_meta_executors() {
return _write_cooldown_meta_executors.get();
}

#ifdef BE_TEST
void set_tmp_file_dir(std::unique_ptr<segment_v2::TmpFileDirs> tmp_file_dirs) {
Expand Down Expand Up @@ -262,6 +262,7 @@ class ExecEnv {
void set_dummy_lru_cache(std::shared_ptr<DummyLRUCache> dummy_lru_cache) {
this->_dummy_lru_cache = dummy_lru_cache;
}
void set_write_cooldown_meta_executors();

#endif
LoadStreamMapPool* load_stream_map_pool() { return _load_stream_map_pool.get(); }
Expand Down Expand Up @@ -397,6 +398,7 @@ class ExecEnv {
std::unique_ptr<vectorized::DeltaWriterV2Pool> _delta_writer_v2_pool;
std::shared_ptr<WalManager> _wal_manager;
DNSCache* _dns_cache = nullptr;
std::unique_ptr<WriteCooldownMetaExecutors> _write_cooldown_meta_executors;

std::mutex _frontends_lock;
// ip:brpc_port -> frontend_indo
Expand Down
8 changes: 3 additions & 5 deletions be/src/runtime/exec_env_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,9 @@
#include <cstdlib>
#include <cstring>
#include <limits>
#include <map>
#include <memory>
#include <ostream>
#include <string>
#include <unordered_map>
#include <utility>
#include <vector>

#include "cloud/cloud_storage_engine.h"
Expand All @@ -40,11 +37,9 @@
#include "common/config.h"
#include "common/logging.h"
#include "common/status.h"
#include "io/cache/block_file_cache.h"
#include "io/cache/block_file_cache_factory.h"
#include "io/cache/fs_file_cache_storage.h"
#include "io/fs/file_meta_cache.h"
#include "io/fs/s3_file_bufferpool.h"
#include "olap/memtable_memory_limiter.h"
#include "olap/olap_define.h"
#include "olap/options.h"
Expand Down Expand Up @@ -250,6 +245,7 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths,
_file_cache_open_fd_cache = std::make_unique<io::FDCache>();
_wal_manager = WalManager::create_shared(this, config::group_commit_wal_path);
_dns_cache = new DNSCache();
_write_cooldown_meta_executors = std::make_unique<WriteCooldownMetaExecutors>();
_spill_stream_mgr = new vectorized::SpillStreamManager(spill_store_paths);
_backend_client_cache->init_metrics("backend");
_frontend_client_cache->init_metrics("frontend");
Expand Down Expand Up @@ -597,6 +593,7 @@ void ExecEnv::destroy() {
_delta_writer_v2_pool.reset();
_load_stream_map_pool.reset();
_file_cache_open_fd_cache.reset();
SAFE_STOP(_write_cooldown_meta_executors);

// StorageEngine must be destoried before _page_no_cache_mem_tracker.reset and _cache_manager destory
// shouldn't use SAFE_STOP. otherwise will lead to twice stop.
Expand Down Expand Up @@ -658,6 +655,7 @@ void ExecEnv::destroy() {
_s3_file_upload_thread_pool.reset(nullptr);
_send_batch_thread_pool.reset(nullptr);
_file_cache_open_fd_cache.reset(nullptr);
_write_cooldown_meta_executors.reset(nullptr);

SAFE_DELETE(_broker_client_cache);
SAFE_DELETE(_frontend_client_cache);
Expand Down
1 change: 1 addition & 0 deletions be/test/olap/tablet_cooldown_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ class TabletCooldownTest : public testing::Test {
st = engine->open();
EXPECT_TRUE(st.ok()) << st.to_string();
ExecEnv* exec_env = doris::ExecEnv::GetInstance();
exec_env->set_write_cooldown_meta_executors(); // default cons
exec_env->set_storage_engine(std::move(engine));
exec_env->set_memtable_memory_limiter(new MemTableMemoryLimiter());
}
Expand Down