-
Notifications
You must be signed in to change notification settings - Fork 6.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Avoid recompressing cold block in CompressedSecondaryCache #10527
Closed
Closed
Changes from all commits
Commits
Show all changes
56 commits
Select commit
Hold shift + click to select a range
88c4053
Avoid recompressing cold block
437d31e
update
98b36e8
update
0c5ff08
remove comments.
8994688
update
63f8bb8
This is a commit without bugs reported by db_stress.
920d991
polishing.
9d3a72c
update
a900e7f
workable commit.
ac32842
update unit tests.
48b6135
update unit tests.
1cfc7ce
Add use_compressed_secondary_cache_ , standalone_pool_ratio, etc.
da46184
update Lookup and Promote.
d71a192
update
b715163
update
087f7f1
update
597b180
update
07e45eb
update
59ced4b
update
ded5759
update
b9bde8a
Update Lookup in fault_injection_secondary_cache.cc
c14a653
fix one Release issue in CompressedSecondaryCache::Lookup
9d87893
this is a workable commit with extra logs.
8c34ff0
remove some extra comments.
3069c5d
update
941eacc
remove cout.
7501a84
use mutext for updating standalone_pool_usage_
b94da9b
update
5bf17e3
update the mutex use.
0073eee
update
7368125
fix a double to size_t conversion warning.
5d4763e
update parameter comments.
e363895
check Status.
3331694
fix lint issues.
b79c6d5
fix a lint issue.
d8d2fda
Avoid insert a block into sec cache if it is evicted for the first time.
88cc137
update 0.3 to 0.2.
aaff78b
update
b66459b
update unit tests.
6f7a477
update blob_source_test.
e0ed88f
update db_blob_basic_test
9556d50
remove standalone pool.
befc26d
update
9a1b93b
update blob_source_test
d1621ab
update malloc_bin_sizes_.
17f2eaa
fix unit tests.
cf9c869
avoid split and merge.
0b136a7
add HISTORY.md
778717f
address the comments.
22170f3
fix a crash issue.
92e5a50
merge main.
b6543b2
update
4ed4448
rename erase_handle to advise_erase.
f7dca22
address comments.
d39ae15
fix a character issue.
3c82ce3
address a comment.
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,9 +22,9 @@ CompressedSecondaryCache::CompressedSecondaryCache( | |
CacheMetadataChargePolicy metadata_charge_policy, | ||
CompressionType compression_type, uint32_t compress_format_version) | ||
: cache_options_(capacity, num_shard_bits, strict_capacity_limit, | ||
high_pri_pool_ratio, memory_allocator, use_adaptive_mutex, | ||
metadata_charge_policy, compression_type, | ||
compress_format_version, low_pri_pool_ratio) { | ||
high_pri_pool_ratio, low_pri_pool_ratio, memory_allocator, | ||
use_adaptive_mutex, metadata_charge_policy, | ||
compression_type, compress_format_version) { | ||
cache_ = | ||
NewLRUCache(capacity, num_shard_bits, strict_capacity_limit, | ||
high_pri_pool_ratio, memory_allocator, use_adaptive_mutex, | ||
|
@@ -35,58 +35,79 @@ CompressedSecondaryCache::~CompressedSecondaryCache() { cache_.reset(); } | |
|
||
std::unique_ptr<SecondaryCacheResultHandle> CompressedSecondaryCache::Lookup( | ||
const Slice& key, const Cache::CreateCallback& create_cb, bool /*wait*/, | ||
bool& is_in_sec_cache) { | ||
bool advise_erase, bool& is_in_sec_cache) { | ||
std::unique_ptr<SecondaryCacheResultHandle> handle; | ||
is_in_sec_cache = false; | ||
Cache::Handle* lru_handle = cache_->Lookup(key); | ||
if (lru_handle == nullptr) { | ||
return handle; | ||
return nullptr; | ||
} | ||
|
||
CacheValueChunk* handle_value = | ||
reinterpret_cast<CacheValueChunk*>(cache_->Value(lru_handle)); | ||
size_t handle_value_charge{0}; | ||
CacheAllocationPtr merged_value = | ||
MergeChunksIntoValue(handle_value, handle_value_charge); | ||
void* handle_value = cache_->Value(lru_handle); | ||
if (handle_value == nullptr) { | ||
cache_->Release(lru_handle, /*erase_if_last_ref=*/false); | ||
return nullptr; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we release the handle in this case? |
||
} | ||
|
||
CacheAllocationPtr* ptr = reinterpret_cast<CacheAllocationPtr*>(handle_value); | ||
|
||
Status s; | ||
void* value{nullptr}; | ||
size_t charge{0}; | ||
if (cache_options_.compression_type == kNoCompression) { | ||
s = create_cb(merged_value.get(), handle_value_charge, &value, &charge); | ||
s = create_cb(ptr->get(), cache_->GetCharge(lru_handle), &value, &charge); | ||
} else { | ||
UncompressionContext uncompression_context(cache_options_.compression_type); | ||
UncompressionInfo uncompression_info(uncompression_context, | ||
UncompressionDict::GetEmptyDict(), | ||
cache_options_.compression_type); | ||
|
||
size_t uncompressed_size{0}; | ||
CacheAllocationPtr uncompressed; | ||
uncompressed = UncompressData(uncompression_info, (char*)merged_value.get(), | ||
handle_value_charge, &uncompressed_size, | ||
cache_options_.compress_format_version, | ||
cache_options_.memory_allocator.get()); | ||
CacheAllocationPtr uncompressed = UncompressData( | ||
uncompression_info, (char*)ptr->get(), cache_->GetCharge(lru_handle), | ||
&uncompressed_size, cache_options_.compress_format_version, | ||
cache_options_.memory_allocator.get()); | ||
|
||
if (!uncompressed) { | ||
cache_->Release(lru_handle, /* erase_if_last_ref */ true); | ||
return handle; | ||
cache_->Release(lru_handle, /*erase_if_last_ref=*/true); | ||
return nullptr; | ||
} | ||
s = create_cb(uncompressed.get(), uncompressed_size, &value, &charge); | ||
} | ||
|
||
if (!s.ok()) { | ||
cache_->Release(lru_handle, /* erase_if_last_ref */ true); | ||
return handle; | ||
cache_->Release(lru_handle, /*erase_if_last_ref=*/true); | ||
return nullptr; | ||
} | ||
|
||
cache_->Release(lru_handle, /* erase_if_last_ref */ true); | ||
if (advise_erase) { | ||
cache_->Release(lru_handle, /*erase_if_last_ref=*/true); | ||
// Insert a dummy handle. | ||
cache_->Insert(key, /*value=*/nullptr, /*charge=*/0, DeletionCallback) | ||
.PermitUncheckedError(); | ||
} else { | ||
is_in_sec_cache = true; | ||
cache_->Release(lru_handle, /*erase_if_last_ref=*/false); | ||
} | ||
handle.reset(new CompressedSecondaryCacheResultHandle(value, charge)); | ||
|
||
return handle; | ||
} | ||
|
||
Status CompressedSecondaryCache::Insert(const Slice& key, void* value, | ||
const Cache::CacheItemHelper* helper) { | ||
if (value == nullptr) { | ||
return Status::InvalidArgument(); | ||
} | ||
|
||
Cache::Handle* lru_handle = cache_->Lookup(key); | ||
if (lru_handle == nullptr) { | ||
// Insert a dummy handle if the handle is evicted for the first time. | ||
return cache_->Insert(key, /*value=*/nullptr, /*charge=*/0, | ||
DeletionCallback); | ||
} else { | ||
cache_->Release(lru_handle, /*erase_if_last_ref=*/false); | ||
} | ||
|
||
size_t size = (*helper->size_cb)(value); | ||
CacheAllocationPtr ptr = | ||
AllocateBlock(size, cache_options_.memory_allocator.get()); | ||
|
@@ -115,12 +136,14 @@ Status CompressedSecondaryCache::Insert(const Slice& key, void* value, | |
} | ||
|
||
val = Slice(compressed_val); | ||
size = compressed_val.size(); | ||
ptr = AllocateBlock(size, cache_options_.memory_allocator.get()); | ||
memcpy(ptr.get(), compressed_val.data(), size); | ||
} | ||
|
||
size_t charge{0}; | ||
CacheValueChunk* value_chunks_head = | ||
SplitValueIntoChunks(val, cache_options_.compression_type, charge); | ||
return cache_->Insert(key, value_chunks_head, charge, DeletionCallback); | ||
CacheAllocationPtr* buf = new CacheAllocationPtr(std::move(ptr)); | ||
|
||
return cache_->Insert(key, buf, size, DeletionCallback); | ||
} | ||
|
||
void CompressedSecondaryCache::Erase(const Slice& key) { cache_->Erase(key); } | ||
|
@@ -212,22 +235,16 @@ CacheAllocationPtr CompressedSecondaryCache::MergeChunksIntoValue( | |
|
||
void CompressedSecondaryCache::DeletionCallback(const Slice& /*key*/, | ||
void* obj) { | ||
CacheValueChunk* chunks_head = reinterpret_cast<CacheValueChunk*>(obj); | ||
while (chunks_head != nullptr) { | ||
CacheValueChunk* tmp_chunk = chunks_head; | ||
chunks_head = chunks_head->next; | ||
tmp_chunk->Free(); | ||
} | ||
delete reinterpret_cast<CacheAllocationPtr*>(obj); | ||
obj = nullptr; | ||
} | ||
|
||
std::shared_ptr<SecondaryCache> NewCompressedSecondaryCache( | ||
size_t capacity, int num_shard_bits, bool strict_capacity_limit, | ||
double high_pri_pool_ratio, | ||
double high_pri_pool_ratio, double low_pri_pool_ratio, | ||
std::shared_ptr<MemoryAllocator> memory_allocator, bool use_adaptive_mutex, | ||
CacheMetadataChargePolicy metadata_charge_policy, | ||
CompressionType compression_type, uint32_t compress_format_version, | ||
double low_pri_pool_ratio) { | ||
CompressionType compression_type, uint32_t compress_format_version) { | ||
return std::make_shared<CompressedSecondaryCache>( | ||
capacity, num_shard_bits, strict_capacity_limit, high_pri_pool_ratio, | ||
low_pri_pool_ratio, memory_allocator, use_adaptive_mutex, | ||
|
@@ -240,9 +257,9 @@ std::shared_ptr<SecondaryCache> NewCompressedSecondaryCache( | |
assert(opts.secondary_cache == nullptr); | ||
return NewCompressedSecondaryCache( | ||
opts.capacity, opts.num_shard_bits, opts.strict_capacity_limit, | ||
opts.high_pri_pool_ratio, opts.memory_allocator, opts.use_adaptive_mutex, | ||
opts.metadata_charge_policy, opts.compression_type, | ||
opts.compress_format_version, opts.low_pri_pool_ratio); | ||
opts.high_pri_pool_ratio, opts.low_pri_pool_ratio, opts.memory_allocator, | ||
opts.use_adaptive_mutex, opts.metadata_charge_policy, | ||
opts.compression_type, opts.compress_format_version); | ||
} | ||
|
||
} // namespace ROCKSDB_NAMESPACE |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -45,7 +45,21 @@ class CompressedSecondaryCacheResultHandle : public SecondaryCacheResultHandle { | |
// The CompressedSecondaryCache is a concrete implementation of | ||
// rocksdb::SecondaryCache. | ||
// | ||
// Users can also cast a pointer to it and call methods on | ||
// When a block is found from CompressedSecondaryCache::Lookup, we check whether | ||
// there is a dummy block with the same key in the primary cache. | ||
// 1. If the dummy block exits, we erase the block from | ||
// CompressedSecondaryCache and insert it into the primary cache. | ||
// 2. If not, we just insert a dummy block into the primary cache | ||
// (charging the actual size of the block) and don not erase the block from | ||
// CompressedSecondaryCache. A standalone handle is returned to the caller. | ||
// | ||
// When a block is evicted from the primary cache, we check whether | ||
// there is a dummy block with the same key in CompressedSecondaryCache. | ||
// 1. If the dummy block exits, the block is inserted into | ||
// CompressedSecondaryCache. | ||
// 2. If not, we just insert a dummy block (size 0) in CompressedSecondaryCache. | ||
// | ||
// Users can also cast a pointer to CompressedSecondaryCache and call methods on | ||
// it directly, especially custom methods that may be added | ||
// in the future. For example - | ||
// std::unique_ptr<rocksdb::SecondaryCache> cache = | ||
|
@@ -72,7 +86,9 @@ class CompressedSecondaryCache : public SecondaryCache { | |
|
||
std::unique_ptr<SecondaryCacheResultHandle> Lookup( | ||
const Slice& key, const Cache::CreateCallback& create_cb, bool /*wait*/, | ||
bool& is_in_sec_cache) override; | ||
bool advise_erase, bool& is_in_sec_cache) override; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Need comment here to explain the requirement for the Lookup() implementation by |
||
|
||
bool SupportForceErase() const override { return true; } | ||
|
||
void Erase(const Slice& key) override; | ||
|
||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the change of removing MergeChunksIntoValue() related?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. The myshadow AB tests showed consistent better metrics (cache hit rate, cpu, and mem_rss) were achieved without current split and merge functions, so I remove them for now.