Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/src/olap/file_helper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ FileHandler::FileHandler() :
static std::once_flag once_flag;
#ifdef BE_TEST
std::call_once(once_flag, [] {
_s_fd_cache = new_lru_cache(config::file_descriptor_cache_capacity);
_s_fd_cache = new_lru_cache("FileHandlerCacheTest", config::file_descriptor_cache_capacity);
});
#else
// storage engine may not be opened when doris try to read and write
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/fs/file_block_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -387,9 +387,9 @@ FileBlockManager::FileBlockManager(Env* env, BlockManagerOptions opts) :
}

#ifdef BE_TEST
_file_cache.reset(new FileCache<RandomAccessFile>("Readable file cache", config::file_descriptor_cache_capacity));
_file_cache.reset(new FileCache<RandomAccessFile>("Readable_file_cache", config::file_descriptor_cache_capacity));
#else
_file_cache.reset(new FileCache<RandomAccessFile>("Readable file cache", StorageEngine::instance()->file_cache()));
_file_cache.reset(new FileCache<RandomAccessFile>("Readable_file_cache", StorageEngine::instance()->file_cache()));
#endif
}

Expand Down
121 changes: 51 additions & 70 deletions be/src/olap/lru_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,20 @@
#include "olap/olap_index.h"
#include "olap/row_block.h"
#include "olap/utils.h"
#include "util/doris_metrics.h"

using std::string;
using std::stringstream;

namespace doris {

DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(capacity, MetricUnit::BYTES);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(usage, MetricUnit::BYTES);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(usage_ratio, MetricUnit::NOUNIT);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(lookup_count, MetricUnit::OPERATIONS);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(hit_count, MetricUnit::OPERATIONS);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(hit_ratio, MetricUnit::NOUNIT);

uint32_t CacheKey::hash(const char* data, size_t n, uint32_t seed) const {
// Similar to murmur hash
const uint32_t m = 0xc6a4a793;
Expand Down Expand Up @@ -82,9 +90,7 @@ LRUHandle* HandleTable::insert(LRUHandle* h) {
if (_elems > _length) {
// Since each cache entry is fairly large, we aim for a small
// average linked list length (<= 1).
if (!_resize()) {
return NULL;
}
_resize();
}
}

Expand Down Expand Up @@ -114,29 +120,21 @@ LRUHandle** HandleTable::_find_pointer(const CacheKey& key, uint32_t hash) {
return ptr;
}

bool HandleTable::_resize() {
void HandleTable::_resize() {
uint32_t new_length = 4;

while (new_length < _elems) {
new_length *= 2;
}

LRUHandle** new_list = new(std::nothrow) LRUHandle*[new_length];

if (NULL == new_list) {
LOG(FATAL) << "failed to malloc new hash list. new_length=" << new_length;
return false;
}

memset(new_list, 0, sizeof(new_list[0]) * new_length);
uint32_t count = 0;

for (uint32_t i = 0; i < _length; i++) {
LRUHandle* h = _list[i];

while (h != NULL) {
LRUHandle* next = h->next_hash;
CacheKey key = h->key();
uint32_t hash = h->hash;
LRUHandle** ptr = &new_list[hash & (new_length - 1)];
h->next_hash = *ptr;
Expand All @@ -146,20 +144,13 @@ bool HandleTable::_resize() {
}
}

if (_elems != count) {
delete [] new_list;
LOG(FATAL) << "_elems not match new count. elems=" << _elems
<< ", count=" << count;
return false;
}

DCHECK_EQ(_elems, count);
delete [] _list;
_list = new_list;
_length = new_length;
return true;
}

LRUCache::LRUCache() : _usage(0), _last_id(0), _lookup_count(0),
LRUCache::LRUCache() : _usage(0), _lookup_count(0),
_hit_count(0) {
// Make empty circular linked list
_lru.next = &_lru;
Expand Down Expand Up @@ -376,15 +367,29 @@ uint32_t ShardedLRUCache::_shard(uint32_t hash) {
return hash >> (32 - kNumShardBits);
}

ShardedLRUCache::ShardedLRUCache(size_t capacity)
: _last_id(0) {
const size_t per_shard = (capacity + (kNumShards - 1)) / kNumShards;

for (int s = 0; s < kNumShards; s++) {
_shards[s].set_capacity(per_shard);
}
ShardedLRUCache::ShardedLRUCache(const std::string& name, size_t total_capacity)
: _name(name), _last_id(1) {
const size_t per_shard = (total_capacity + (kNumShards - 1)) / kNumShards;
for (int s = 0; s < kNumShards; s++) {
_shards[s].set_capacity(per_shard);
}

_entity = DorisMetrics::instance()->metric_registry()
->register_entity(std::string("lru_cache:") + name, {{"name", name}});
_entity->register_hook(name, std::bind(&ShardedLRUCache::update_cache_metrics, this));
INT_GAUGE_METRIC_REGISTER(_entity, capacity);
INT_GAUGE_METRIC_REGISTER(_entity, usage);
INT_DOUBLE_METRIC_REGISTER(_entity, usage_ratio);
INT_ATOMIC_COUNTER_METRIC_REGISTER(_entity, lookup_count);
INT_ATOMIC_COUNTER_METRIC_REGISTER(_entity, hit_count);
INT_DOUBLE_METRIC_REGISTER(_entity, hit_ratio);
}

ShardedLRUCache::~ShardedLRUCache() {
_entity->deregister_hook(_name);
DorisMetrics::instance()->metric_registry()->deregister_entity(_entity);
}

Cache::Handle* ShardedLRUCache::insert(
const CacheKey& key,
void* value,
Expand Down Expand Up @@ -420,8 +425,7 @@ Slice ShardedLRUCache::value_slice(Handle* handle) {
}

uint64_t ShardedLRUCache::new_id() {
MutexLock l(&_id_mutex);
return ++(_last_id);
return _last_id.fetch_add(1, std::memory_order_relaxed);
}

void ShardedLRUCache::prune() {
Expand All @@ -432,51 +436,28 @@ void ShardedLRUCache::prune() {
VLOG(7) << "Successfully prune cache, clean " << num_prune << " entries.";
}

size_t ShardedLRUCache::get_memory_usage() {
void ShardedLRUCache::update_cache_metrics() const {
size_t total_capacity = 0;
size_t total_usage = 0;
for (int s = 0; s < kNumShards; s++) {
total_usage += _shards[s].get_usage();
}
return total_usage;
}

void ShardedLRUCache::get_cache_status(rapidjson::Document* document) {
size_t shard_count = sizeof(_shards) / sizeof(LRUCache);

for (uint32_t i = 0; i < shard_count; ++i) {
size_t capacity = _shards[i].get_capacity();
size_t usage = _shards[i].get_usage();
rapidjson::Value shard_info(rapidjson::kObjectType);
shard_info.AddMember("capacity", static_cast<double>(capacity), document->GetAllocator());
shard_info.AddMember("usage", static_cast<double>(usage), document->GetAllocator());

float usage_ratio = 0.0f;

if (0 != capacity) {
usage_ratio = static_cast<float>(usage) / static_cast<float>(capacity);
}

shard_info.AddMember("usage_ratio", usage_ratio, document->GetAllocator());

size_t lookup_count = _shards[i].get_lookup_count();
size_t hit_count = _shards[i].get_hit_count();
shard_info.AddMember("lookup_count", static_cast<double>(lookup_count), document->GetAllocator());
shard_info.AddMember("hit_count", static_cast<double>(hit_count), document->GetAllocator());

float hit_ratio = 0.0f;

if (0 != lookup_count) {
hit_ratio = static_cast<float>(hit_count) / static_cast<float>(lookup_count);
}

shard_info.AddMember("hit_ratio", hit_ratio, document->GetAllocator());
document->PushBack(shard_info, document->GetAllocator());
size_t total_lookup_count = 0;
size_t total_hit_count = 0;
for (int i = 0; i < kNumShards; i++) {
total_capacity += _shards[i].get_capacity();
total_usage += _shards[i].get_usage();
total_lookup_count += _shards[i].get_lookup_count();
total_hit_count += _shards[i].get_hit_count();
}

capacity->set_value(total_capacity);
usage->set_value(total_usage);
lookup_count->set_value(total_lookup_count);
hit_count->set_value(total_hit_count);
usage_ratio->set_value(total_capacity == 0 ? 0 : (total_usage / total_capacity));
hit_ratio->set_value(total_lookup_count == 0 ? 0 : (total_hit_count / total_lookup_count));
}

Cache* new_lru_cache(size_t capacity) {
return new ShardedLRUCache(capacity);
Cache* new_lru_cache(const std::string& name, size_t capacity) {
return new ShardedLRUCache(name, capacity);
}

} // namespace doris
46 changes: 24 additions & 22 deletions be/src/olap/lru_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include <rapidjson/document.h>

#include "olap/olap_common.h"
#include "util/metrics.h"
#include "util/mutex.h"
#include "util/slice.h"

Expand Down Expand Up @@ -46,9 +47,9 @@ namespace doris {
class Cache;
class CacheKey;

// Create a new cache with a fixed size capacity. This implementation
// Create a new cache with a specified name and a fixed size capacity. This implementation
// of Cache uses a least-recently-used eviction policy.
extern Cache* new_lru_cache(size_t capacity);
extern Cache* new_lru_cache(const std::string& name, size_t capacity);

class CacheKey {
public:
Expand Down Expand Up @@ -221,11 +222,6 @@ namespace doris {
// leveldb may change prune() to a pure abstract method.
virtual void prune() {}

// 获取运行统计项,包括内存占用
virtual size_t get_memory_usage() = 0;
// cache命中率统计
virtual void get_cache_status(rapidjson::Document* document) = 0;

private:
DISALLOW_COPY_AND_ASSIGN(Cache);
};
Expand All @@ -235,9 +231,9 @@ namespace doris {
typedef struct LRUHandle {
void* value;
void (*deleter)(const CacheKey&, void* value);
LRUHandle* next_hash;
LRUHandle* next;
LRUHandle* prev;
LRUHandle* next_hash; // next entry in hash table
LRUHandle* next; // next entry in lru list
LRUHandle* prev; // previous entry in lru list
size_t charge;
size_t key_length;
bool in_cache; // Whether entry is in the cache.
Expand Down Expand Up @@ -296,7 +292,7 @@ namespace doris {
// matches key/hash. If there is no such cache entry, return a
// pointer to the trailing slot in the corresponding linked list.
LRUHandle** _find_pointer(const CacheKey& key, uint32_t hash);
bool _resize();
void _resize();
};

// A single shard of sharded cache.
Expand All @@ -323,16 +319,16 @@ namespace doris {
void erase(const CacheKey& key, uint32_t hash);
int prune();

uint64_t get_lookup_count() {
uint64_t get_lookup_count() const {
return _lookup_count;
}
uint64_t get_hit_count() {
uint64_t get_hit_count() const {
return _hit_count;
}
size_t get_usage() {
size_t get_usage() const {
return _usage;
}
size_t get_capacity() {
size_t get_capacity() const {
return _capacity;
}

Expand All @@ -349,7 +345,6 @@ namespace doris {
// _mutex protects the following state.
Mutex _mutex;
size_t _usage;
uint64_t _last_id;

// Dummy head of LRU list.
// lru.prev is newest entry, lru.next is oldest entry.
Expand All @@ -367,9 +362,9 @@ namespace doris {

class ShardedLRUCache : public Cache {
public:
explicit ShardedLRUCache(size_t capacity);
explicit ShardedLRUCache(const std::string& name, size_t total_capacity);
// TODO(fdy): 析构时清除所有cache元素
virtual ~ShardedLRUCache() {}
virtual ~ShardedLRUCache();
virtual Handle* insert(
const CacheKey& key,
void* value,
Expand All @@ -383,16 +378,23 @@ namespace doris {
Slice value_slice(Handle* handle) override;
virtual uint64_t new_id();
virtual void prune();
virtual size_t get_memory_usage();
virtual void get_cache_status(rapidjson::Document* document);
void update_cache_metrics() const;

private:
static inline uint32_t _hash_slice(const CacheKey& s);
static uint32_t _shard(uint32_t hash);

std::string _name;
LRUCache _shards[kNumShards];
Mutex _id_mutex;
uint64_t _last_id;
std::atomic<uint64_t> _last_id;

std::shared_ptr<MetricEntity> _entity = nullptr;
IntGauge* capacity = nullptr;
IntGauge* usage = nullptr;
DoubleGauge* usage_ratio = nullptr;
IntAtomicCounter* lookup_count = nullptr;
IntAtomicCounter* hit_count = nullptr;
DoubleGauge* hit_ratio = nullptr;
};

} // namespace doris
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/page_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ void StoragePageCache::create_global_cache(size_t capacity) {
_s_instance = &instance;
}

StoragePageCache::StoragePageCache(size_t capacity) : _cache(new_lru_cache(capacity)) {
StoragePageCache::StoragePageCache(size_t capacity) : _cache(new_lru_cache("StoragePageCache", capacity)) {
}

bool StoragePageCache::lookup(const CacheKey& key, PageCacheHandle* handle) {
Expand Down
8 changes: 2 additions & 6 deletions be/src/olap/storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,9 @@ Status StorageEngine::_open() {

RETURN_NOT_OK_STATUS_WITH_WARN(_check_file_descriptor_number(), "check fd number failed");

_index_stream_lru_cache = new_lru_cache(config::index_stream_cache_capacity);
_index_stream_lru_cache = new_lru_cache("SegmentIndexCache", config::index_stream_cache_capacity);

_file_cache.reset(new_lru_cache(config::file_descriptor_cache_capacity));
_file_cache.reset(new_lru_cache("FileHandlerCache", config::file_descriptor_cache_capacity));

auto dirs = get_stores<false>();
load_data_dirs(dirs);
Expand Down Expand Up @@ -633,10 +633,6 @@ void StorageEngine::_perform_base_compaction(TabletSharedPtr best_tablet) {
best_tablet->set_last_base_compaction_failure_time(0);
}

void StorageEngine::get_cache_status(rapidjson::Document* document) const {
return _index_stream_lru_cache->get_cache_status(document);
}

OLAPStatus StorageEngine::_start_trash_sweep(double* usage) {
OLAPStatus res = OLAP_SUCCESS;
LOG(INFO) << "start trash and snapshot sweep.";
Expand Down
4 changes: 0 additions & 4 deletions be/src/olap/storage_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,6 @@ class StorageEngine {
void clear_transaction_task(const TTransactionId transaction_id,
const std::vector<TPartitionId>& partition_ids);

// 获取cache的使用情况信息
void get_cache_status(rapidjson::Document* document) const;

// Note: 这里只能reload原先已经存在的root path,即re-load启动时就登记的root path
// 是允许的,但re-load全新的path是不允许的,因为此处没有彻底更新ce调度器信息
void load_data_dirs(const std::vector<DataDir*>& stores);
Expand Down Expand Up @@ -283,7 +280,6 @@ class StorageEngine {
int32_t _effective_cluster_id;
bool _is_all_cluster_id_exist;

Cache* _file_descriptor_lru_cache;
Cache* _index_stream_lru_cache;

// _file_cache is a lru_cache for file descriptors of files opened by doris,
Expand Down
Loading