Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 23 additions & 24 deletions be/src/olap/tablet_meta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -972,21 +972,21 @@ bool operator!=(const TabletMeta& a, const TabletMeta& b) {
return !(a == b);
}

DeleteBitmap::DeleteBitmap(int64_t tablet_id) : _tablet_id(tablet_id) {
// The default delete bitmap cache is set to 100MB,
// which can be insufficient and cause performance issues when the amount of user data is large.
// To mitigate the problem of an inadequate cache,
// we will take the larger of 0.5% of the total memory and 100MB as the delete bitmap cache size.
bool is_percent = false;
int64_t delete_bitmap_agg_cache_cache_limit =
ParseUtil::parse_mem_spec(config::delete_bitmap_dynamic_agg_cache_limit,
MemInfo::mem_limit(), MemInfo::physical_mem(), &is_percent);
_agg_cache.reset(new AggCache(delete_bitmap_agg_cache_cache_limit >
config::delete_bitmap_agg_cache_capacity
? delete_bitmap_agg_cache_cache_limit
: config::delete_bitmap_agg_cache_capacity));
DeleteBitmapAggCache::DeleteBitmapAggCache(size_t capacity)
: LRUCachePolicyTrackingManual(CachePolicy::CacheType::DELETE_BITMAP_AGG_CACHE, capacity,
LRUCacheType::SIZE,
config::delete_bitmap_agg_cache_stale_sweep_time_sec, 256) {}

DeleteBitmapAggCache* DeleteBitmapAggCache::instance() {
return ExecEnv::GetInstance()->delete_bitmap_agg_cache();
}

DeleteBitmapAggCache* DeleteBitmapAggCache::create_instance(size_t capacity) {
return new DeleteBitmapAggCache(capacity);
}

DeleteBitmap::DeleteBitmap(int64_t tablet_id) : _tablet_id(tablet_id) {}

DeleteBitmap::DeleteBitmap(const DeleteBitmap& o) {
delete_bitmap = o.delete_bitmap; // just copy data
_tablet_id = o._tablet_id;
Expand Down Expand Up @@ -1201,16 +1201,16 @@ static std::string agg_cache_key(int64_t tablet_id, const DeleteBitmap::BitmapKe
std::shared_ptr<roaring::Roaring> DeleteBitmap::get_agg(const BitmapKey& bmk) const {
std::string key_str = agg_cache_key(_tablet_id, bmk); // Cache key container
CacheKey key(key_str);
Cache::Handle* handle = _agg_cache->repr()->lookup(key);
Cache::Handle* handle = DeleteBitmapAggCache::instance()->lookup(key);

AggCache::Value* val =
handle == nullptr
? nullptr
: reinterpret_cast<AggCache::Value*>(_agg_cache->repr()->value(handle));
DeleteBitmapAggCache::Value* val =
handle == nullptr ? nullptr
: reinterpret_cast<DeleteBitmapAggCache::Value*>(
DeleteBitmapAggCache::instance()->value(handle));
// FIXME: do we need a mutex here to get rid of duplicated initializations
// of cache entries in some cases?
if (val == nullptr) { // Renew if needed, put a new Value to cache
val = new AggCache::Value();
val = new DeleteBitmapAggCache::Value();
{
std::shared_lock l(lock);
DeleteBitmap::BitmapKey start {std::get<0>(bmk), std::get<1>(bmk), 0};
Expand All @@ -1223,17 +1223,16 @@ std::shared_ptr<roaring::Roaring> DeleteBitmap::get_agg(const BitmapKey& bmk) co
val->bitmap |= bm;
}
}
size_t charge = val->bitmap.getSizeInBytes() + sizeof(AggCache::Value);
handle = _agg_cache->repr()->insert(key, val, charge, charge, CachePriority::NORMAL);
size_t charge = val->bitmap.getSizeInBytes() + sizeof(DeleteBitmapAggCache::Value);
handle = DeleteBitmapAggCache::instance()->insert(key, val, charge, charge,
CachePriority::NORMAL);
}

// It is natural for the cache to reclaim the underlying memory
return std::shared_ptr<roaring::Roaring>(
&val->bitmap, [this, handle](...) { _agg_cache->repr()->release(handle); });
&val->bitmap, [handle](...) { DeleteBitmapAggCache::instance()->release(handle); });
}

std::atomic<DeleteBitmap::AggCachePolicy*> DeleteBitmap::AggCache::s_repr {nullptr};

std::string tablet_state_name(TabletState state) {
switch (state) {
case TABLET_NOTREADY:
Expand Down
53 changes: 14 additions & 39 deletions be/src/olap/tablet_meta.h
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,20 @@ class TabletMeta : public MetadataAdder<TabletMeta> {
mutable std::shared_mutex _meta_lock;
};

class DeleteBitmapAggCache : public LRUCachePolicyTrackingManual {
public:
DeleteBitmapAggCache(size_t capacity);

static DeleteBitmapAggCache* instance();

static DeleteBitmapAggCache* create_instance(size_t capacity);

class Value : public LRUCacheValueBase {
public:
roaring::Roaring bitmap;
};
};

/**
* Wraps multiple bitmaps for recording rows (row id) that are deleted or
* overwritten. For now, it's only used when unique key merge-on-write property
Expand Down Expand Up @@ -515,45 +529,6 @@ class DeleteBitmap {
const std::function<void(const DeleteBitmap&, const RowsetId& rowsetId)>& func) const;
uint64_t count_key_with_rowset_id_unlocked(const RowsetId& rowset_id) const;

class AggCachePolicy : public LRUCachePolicyTrackingManual {
public:
AggCachePolicy(size_t capacity)
: LRUCachePolicyTrackingManual(CachePolicy::CacheType::DELETE_BITMAP_AGG_CACHE,
capacity, LRUCacheType::SIZE,
config::delete_bitmap_agg_cache_stale_sweep_time_sec,
256) {}
};

class AggCache {
public:
class Value : public LRUCacheValueBase {
public:
roaring::Roaring bitmap;
};

AggCache(size_t size_in_bytes) {
static std::once_flag once;
std::call_once(once, [size_in_bytes] {
auto* tmp = new AggCachePolicy(size_in_bytes);
AggCache::s_repr.store(tmp, std::memory_order_release);

// release the sigleton instance at program exit
std::atexit([] {
auto* ptr = AggCache::s_repr.exchange(nullptr, std::memory_order_acquire);
delete ptr;
});
});

while (!s_repr.load(std::memory_order_acquire)) {
}
}

static LRUCachePolicy* repr() { return s_repr.load(std::memory_order_acquire); }
static std::atomic<AggCachePolicy*> s_repr;
};

private:
mutable std::shared_ptr<AggCache> _agg_cache;
int64_t _tablet_id;
};

Expand Down
5 changes: 5 additions & 0 deletions be/src/runtime/exec_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ class CacheManager;
class HeapProfiler;
class WalManager;
class DNSCache;
class DeleteBitmapAggCache;

inline bool k_doris_exit = false;

Expand Down Expand Up @@ -275,6 +276,7 @@ class ExecEnv {
void set_storage_engine(StorageEngine* se) { this->_storage_engine = se; }
void set_cache_manager(CacheManager* cm) { this->_cache_manager = cm; }
void set_tablet_schema_cache(TabletSchemaCache* c) { this->_tablet_schema_cache = c; }
void set_delete_bitmap_agg_cache(DeleteBitmapAggCache* c) { _delete_bitmap_agg_cache = c; }
void set_tablet_column_object_pool(TabletColumnObjectPool* c) {
this->_tablet_column_object_pool = c;
}
Expand Down Expand Up @@ -337,6 +339,8 @@ class ExecEnv {
orc::MemoryPool* orc_memory_pool() { return _orc_memory_pool; }
arrow::MemoryPool* arrow_memory_pool() { return _arrow_memory_pool; }

DeleteBitmapAggCache* delete_bitmap_agg_cache() { return _delete_bitmap_agg_cache; }

private:
ExecEnv();

Expand Down Expand Up @@ -466,6 +470,7 @@ class ExecEnv {
std::shared_ptr<pipeline::BlockedTaskScheduler> _global_block_scheduler;
// used for query without workload group
std::shared_ptr<pipeline::BlockedTaskScheduler> _without_group_block_scheduler;
DeleteBitmapAggCache* _delete_bitmap_agg_cache {nullptr};

pipeline::RuntimeFilterTimerQueue* _runtime_filter_timer_queue = nullptr;

Expand Down
12 changes: 12 additions & 0 deletions be/src/runtime/exec_env_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
#include "olap/segment_loader.h"
#include "olap/storage_engine.h"
#include "olap/tablet_column_object_pool.h"
#include "olap/tablet_meta.h"
#include "olap/tablet_schema_cache.h"
#include "olap/wal/wal_manager.h"
#include "pipeline/pipeline_tracing.h"
Expand Down Expand Up @@ -521,6 +522,16 @@ Status ExecEnv::_init_mem_env() {
_orc_memory_pool = new doris::vectorized::ORCMemoryPool();
_arrow_memory_pool = new doris::vectorized::ArrowMemoryPool();

// The default delete bitmap cache is set to 100MB,
// which can be insufficient and cause performance issues when the amount of user data is large.
// To mitigate the problem of an inadequate cache,
// we will take the larger of 0.5% of the total memory and 100MB as the delete bitmap cache size.
int64_t delete_bitmap_agg_cache_cache_limit =
ParseUtil::parse_mem_spec(config::delete_bitmap_dynamic_agg_cache_limit,
MemInfo::mem_limit(), MemInfo::physical_mem(), &is_percent);
_delete_bitmap_agg_cache = DeleteBitmapAggCache::create_instance(std::max(
delete_bitmap_agg_cache_cache_limit, config::delete_bitmap_agg_cache_capacity));

return Status::OK();
}

Expand Down Expand Up @@ -633,6 +644,7 @@ void ExecEnv::destroy() {
// StorageEngine must be destoried before _page_no_cache_mem_tracker.reset
// StorageEngine must be destoried before _cache_manager destory
SAFE_DELETE(_storage_engine);
SAFE_DELETE(_delete_bitmap_agg_cache);

// Free resource after threads are stopped.
// Some threads are still running, like threads created by _new_load_stream_mgr ...
Expand Down
4 changes: 4 additions & 0 deletions be/test/testutil/run_all_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "olap/page_cache.h"
#include "olap/segment_loader.h"
#include "olap/tablet_column_object_pool.h"
#include "olap/tablet_meta.h"
#include "olap/tablet_schema_cache.h"
#include "runtime/exec_env.h"
#include "runtime/memory/cache_manager.h"
Expand Down Expand Up @@ -57,6 +58,9 @@ int main(int argc, char** argv) {
doris::ExecEnv::GetInstance()->set_tablet_schema_cache(
doris::TabletSchemaCache::create_global_schema_cache(
doris::config::tablet_schema_cache_capacity));
doris::ExecEnv::GetInstance()->set_delete_bitmap_agg_cache(
doris::DeleteBitmapAggCache::create_instance(
doris::config::delete_bitmap_agg_cache_capacity));
doris::ExecEnv::GetInstance()->set_tablet_column_object_pool(
doris::TabletColumnObjectPool::create_global_column_cache(
doris::config::tablet_schema_cache_capacity));
Expand Down
Loading