Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,9 @@ CONF_Int64(index_stream_cache_capacity, "10737418240");

// Cache for storage page size
CONF_String(storage_page_cache_limit, "20%");
// Percentage for index page cache
// all storage page cache will be divided into data_page_cache and index_page_cache
CONF_Int32(index_page_cache_percentage, "10");
// whether to disable page cache feature in storage
CONF_Bool(disable_storage_page_cache, "false");

Expand Down
33 changes: 23 additions & 10 deletions be/src/olap/page_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,35 +21,48 @@ namespace doris {

StoragePageCache* StoragePageCache::_s_instance = nullptr;

void StoragePageCache::create_global_cache(size_t capacity) {
void StoragePageCache::create_global_cache(size_t capacity, int32_t index_cache_percentage) {
DCHECK(_s_instance == nullptr);
static StoragePageCache instance(capacity);
static StoragePageCache instance(capacity, index_cache_percentage);
_s_instance = &instance;
}

StoragePageCache::StoragePageCache(size_t capacity)
: _cache(new_lru_cache("StoragePageCache", capacity)) {}
StoragePageCache::StoragePageCache(size_t capacity, int32_t index_cache_percentage)
: _index_cache_percentage(index_cache_percentage) {
if (index_cache_percentage == 0) {
_data_page_cache = std::unique_ptr<Cache>(new_lru_cache("DataPageCache", capacity));
} else if (index_cache_percentage == 100) {
_index_page_cache = std::unique_ptr<Cache>(new_lru_cache("IndexPageCache", capacity));
} else if (index_cache_percentage > 0 && index_cache_percentage < 100) {
_data_page_cache = std::unique_ptr<Cache>(new_lru_cache("DataPageCache", capacity * (100 - index_cache_percentage) / 100));
_index_page_cache = std::unique_ptr<Cache>(new_lru_cache("IndexPageCache", capacity * index_cache_percentage / 100));
} else {
CHECK(false) << "invalid index page cache percentage";
}
}

bool StoragePageCache::lookup(const CacheKey& key, PageCacheHandle* handle) {
auto lru_handle = _cache->lookup(key.encode());
bool StoragePageCache::lookup(const CacheKey& key, PageCacheHandle* handle, segment_v2::PageTypePB page_type) {
auto cache = _get_page_cache(page_type);
auto lru_handle = cache->lookup(key.encode());
if (lru_handle == nullptr) {
return false;
}
*handle = PageCacheHandle(_cache.get(), lru_handle);
*handle = PageCacheHandle(cache, lru_handle);
return true;
}

void StoragePageCache::insert(const CacheKey& key, const Slice& data, PageCacheHandle* handle,
bool in_memory) {
segment_v2::PageTypePB page_type, bool in_memory) {
auto deleter = [](const doris::CacheKey& key, void* value) { delete[](uint8_t*) value; };

CachePriority priority = CachePriority::NORMAL;
if (in_memory) {
priority = CachePriority::DURABLE;
}

auto lru_handle = _cache->insert(key.encode(), data.data, data.size, deleter, priority);
*handle = PageCacheHandle(_cache.get(), lru_handle);
auto cache = _get_page_cache(page_type);
auto lru_handle = cache->insert(key.encode(), data.data, data.size, deleter, priority);
*handle = PageCacheHandle(cache, lru_handle);
}

} // namespace doris
33 changes: 28 additions & 5 deletions be/src/olap/page_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

#include "gutil/macros.h" // for DISALLOW_COPY_AND_ASSIGN
#include "olap/lru_cache.h"
#include "gen_cpp/segment_v2.pb.h" // for cache allocation

namespace doris {

Expand Down Expand Up @@ -53,36 +54,58 @@ class StoragePageCache {
};

// Create global instance of this class
static void create_global_cache(size_t capacity);
static void create_global_cache(size_t capacity, int32_t index_cache_percentage);

// Return global instance.
// Client should call create_global_cache before.
static StoragePageCache* instance() { return _s_instance; }

StoragePageCache(size_t capacity);
StoragePageCache(size_t capacity, int32_t index_cache_percentage);

// Lookup the given page in the cache.
//
// If the page is found, the cache entry will be written into handle.
// PageCacheHandle will release cache entry to cache when it
// destructs.
//
// Cache type selection is determined by page_type argument
//
// Return true if entry is found, otherwise return false.
bool lookup(const CacheKey& key, PageCacheHandle* handle);
bool lookup(const CacheKey& key, PageCacheHandle* handle, segment_v2::PageTypePB page_type);

// Insert a page with key into this cache.
// Given handle will be set to valid reference.
// This function is thread-safe, and when two clients insert two same key
// concurrently, this function can assure that only one page is cached.
// The in_memory page will have higher priority.
void insert(const CacheKey& key, const Slice& data, PageCacheHandle* handle,
bool in_memory = false);
segment_v2::PageTypePB page_type, bool in_memory = false);

// Page cache available check.
// When percentage is set to 0 or 100, the index or data cache will not be allocated.
bool is_cache_available(segment_v2::PageTypePB page_type) {
return _get_page_cache(page_type) != nullptr;
}

private:
StoragePageCache();
static StoragePageCache* _s_instance;

std::unique_ptr<Cache> _cache = nullptr;
int32_t _index_cache_percentage = 0;
std::unique_ptr<Cache> _data_page_cache = nullptr;
std::unique_ptr<Cache> _index_page_cache = nullptr;

Cache* _get_page_cache(segment_v2::PageTypePB page_type) {
switch (page_type)
{
case segment_v2::DATA_PAGE:
return _data_page_cache.get();
case segment_v2::INDEX_PAGE:
return _index_page_cache.get();
default:
return nullptr;
}
}
};

// A handle for StoragePageCache entry. This class make it easy to handle
Expand Down
3 changes: 3 additions & 0 deletions be/src/olap/rowset/segment_v2/column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ Status ColumnReader::read_page(const ColumnIteratorOptions& iter_opts, const Pag
opts.verify_checksum = _opts.verify_checksum;
opts.use_page_cache = iter_opts.use_page_cache;
opts.kept_in_memory = _opts.kept_in_memory;
opts.type = iter_opts.type;

return PageIO::read_and_decompress_page(opts, handle, page_body, footer);
}
Expand Down Expand Up @@ -569,6 +570,7 @@ Status FileColumnIterator::_read_data_page(const OrdinalPageIndexIterator& iter)
PageHandle handle;
Slice page_body;
PageFooterPB footer;
_opts.type = DATA_PAGE;
RETURN_IF_ERROR(_reader->read_page(_opts, iter.page(), &handle, &page_body, &footer));
// parse data page
RETURN_IF_ERROR(ParsedPage::create(std::move(handle), page_body, footer.data_page_footer(),
Expand All @@ -587,6 +589,7 @@ Status FileColumnIterator::_read_data_page(const OrdinalPageIndexIterator& iter)
// read dictionary page
Slice dict_data;
PageFooterPB dict_footer;
_opts.type = INDEX_PAGE;
RETURN_IF_ERROR(_reader->read_page(_opts, _reader->get_dict_page_pointer(),
&_dict_page_handle, &dict_data, &dict_footer));
// ignore dict_footer.dict_page_footer().encoding() due to only
Expand Down
4 changes: 4 additions & 0 deletions be/src/olap/rowset/segment_v2/column_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ struct ColumnIteratorOptions {
// reader statistics
OlapReaderStatistics* stats = nullptr;
bool use_page_cache = false;
// for page cache allocation
// page types are divided into DATA_PAGE & INDEX_PAGE
// INDEX_PAGE including index_page, dict_page and short_key_page
PageTypePB type;

void sanity_check() const {
CHECK_NOTNULL(rblock);
Expand Down
7 changes: 4 additions & 3 deletions be/src/olap/rowset/segment_v2/indexed_column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,13 @@ Status IndexedColumnReader::load_index_page(fs::ReadableBlock* rblock, const Pag
PageHandle* handle, IndexPageReader* reader) {
Slice body;
PageFooterPB footer;
RETURN_IF_ERROR(read_page(rblock, PagePointer(pp), handle, &body, &footer));
RETURN_IF_ERROR(read_page(rblock, PagePointer(pp), handle, &body, &footer, INDEX_PAGE));
RETURN_IF_ERROR(reader->parse(body, footer.index_page_footer()));
return Status::OK();
}

Status IndexedColumnReader::read_page(fs::ReadableBlock* rblock, const PagePointer& pp,
PageHandle* handle, Slice* body, PageFooterPB* footer) const {
PageHandle* handle, Slice* body, PageFooterPB* footer, PageTypePB type) const {
PageReadOptions opts;
opts.rblock = rblock;
opts.page_pointer = pp;
Expand All @@ -87,6 +87,7 @@ Status IndexedColumnReader::read_page(fs::ReadableBlock* rblock, const PagePoint
opts.stats = &tmp_stats;
opts.use_page_cache = _use_page_cache;
opts.kept_in_memory = _kept_in_memory;
opts.type = type;

return PageIO::read_and_decompress_page(opts, handle, body, footer);
}
Expand All @@ -97,7 +98,7 @@ Status IndexedColumnIterator::_read_data_page(const PagePointer& pp) {
PageHandle handle;
Slice body;
PageFooterPB footer;
RETURN_IF_ERROR(_reader->read_page(_rblock.get(), pp, &handle, &body, &footer));
RETURN_IF_ERROR(_reader->read_page(_rblock.get(), pp, &handle, &body, &footer, DATA_PAGE));
// parse data page
// note that page_index is not used in IndexedColumnIterator, so we pass 0
return ParsedPage::create(std::move(handle), body, footer.data_page_footer(),
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/segment_v2/indexed_column_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class IndexedColumnReader {

// read a page specified by `pp' from `file' into `handle'
Status read_page(fs::ReadableBlock* rblock, const PagePointer& pp, PageHandle* handle,
Slice* body, PageFooterPB* footer) const;
Slice* body, PageFooterPB* footer, PageTypePB type) const;

int64_t num_values() const { return _num_values; }
const EncodingInfo* encoding_info() const { return _encoding_info; }
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/rowset/segment_v2/ordinal_page_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ Status OrdinalIndexReader::load(bool use_page_cache, bool kept_in_memory) {
opts.stats = &tmp_stats;
opts.use_page_cache = use_page_cache;
opts.kept_in_memory = kept_in_memory;
opts.type = INDEX_PAGE;

// read index page
PageHandle page_handle;
Expand Down
6 changes: 3 additions & 3 deletions be/src/olap/rowset/segment_v2/page_io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ Status PageIO::read_and_decompress_page(const PageReadOptions& opts, PageHandle*
auto cache = StoragePageCache::instance();
PageCacheHandle cache_handle;
StoragePageCache::CacheKey cache_key(opts.rblock->path(), opts.page_pointer.offset);
if (opts.use_page_cache && cache->lookup(cache_key, &cache_handle)) {
if (opts.use_page_cache && cache->is_cache_available(opts.type) && cache->lookup(cache_key, &cache_handle, opts.type)) {
// we find page in cache, use it
*handle = PageHandle(std::move(cache_handle));
opts.stats->cached_pages_num++;
Expand Down Expand Up @@ -189,9 +189,9 @@ Status PageIO::read_and_decompress_page(const PageReadOptions& opts, PageHandle*
}

*body = Slice(page_slice.data, page_slice.size - 4 - footer_size);
if (opts.use_page_cache) {
if (opts.use_page_cache && cache->is_cache_available(opts.type)) {
// insert this page into cache and return the cache handle
cache->insert(cache_key, page_slice, &cache_handle, opts.kept_in_memory);
cache->insert(cache_key, page_slice, &cache_handle, opts.type, opts.kept_in_memory);
*handle = PageHandle(std::move(cache_handle));
} else {
*handle = PageHandle(page_slice);
Expand Down
4 changes: 4 additions & 0 deletions be/src/olap/rowset/segment_v2/page_io.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ struct PageReadOptions {
// if true, use DURABLE CachePriority in page cache
// currently used for in memory olap table
bool kept_in_memory = false;
// for page cache allocation
// page types are divided into DATA_PAGE & INDEX_PAGE
// INDEX_PAGE including index_page, dict_page and short_key_page
PageTypePB type;

void sanity_check() const {
CHECK_NOTNULL(rblock);
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/rowset/segment_v2/segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ Status Segment::_load_index() {
opts.codec = nullptr; // short key index page uses NO_COMPRESSION for now
OlapReaderStatistics tmp_stats;
opts.stats = &tmp_stats;
opts.type = INDEX_PAGE;

Slice body;
PageFooterPB footer;
Expand Down
3 changes: 2 additions & 1 deletion be/src/runtime/exec_env_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,8 @@ Status ExecEnv::_init_mem_tracker() {
LOG(WARNING) << "Config storage_page_cache_limit is greater than memory size, config="
<< config::storage_page_cache_limit << ", memory=" << MemInfo::physical_mem();
}
StoragePageCache::create_global_cache(storage_cache_limit);
int32_t index_page_cache_percentage = config::index_page_cache_percentage;
StoragePageCache::create_global_cache(storage_cache_limit, index_page_cache_percentage);

// TODO(zc): The current memory usage configuration is a bit confusing,
// we need to sort out the use of memory
Expand Down
Loading