Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

hash spd bucket get - use key without timestamp #142

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 45 additions & 28 deletions plugin/speedb/memtable/hash_spd_rep.cc
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ struct BucketHeader {

BucketHeader() { items_.clear(); }

bool InternalContains(const MemTableRep::KeyComparator& comparator,
const char* check_key) {
bool InternalContains(const char* check_key,
const MemTableRep::KeyComparator& comparator) {
if (items_.size() == 0) {
return false;
}
Expand All @@ -102,17 +102,17 @@ struct BucketHeader {
return false;
}

bool Contains(const MemTableRep::KeyComparator& comparator,
const char* check_key) {
bool Contains(const char* check_key,
const MemTableRep::KeyComparator& comparator) {
MutexLock l(&mutex_);
return InternalContains(comparator, check_key);
return InternalContains(check_key, comparator);
}

bool Add(SortItem* sort_item, const MemTableRep::KeyComparator& comparator,
bool check_exist) {
MutexLock l(&mutex_);
if (check_exist) {
if (InternalContains(comparator, sort_item->key_)) return false;
if (InternalContains(sort_item->key_, comparator)) return false;
}

items_.push_front(sort_item);
Expand Down Expand Up @@ -456,7 +456,7 @@ class SortVectorContainer {

void Insert(SortItem* new_item);

void InitIterator(IterSortSettingInfo* sort_set_info);
void InitIterator(std::shared_ptr<IterSortSettingInfo> sort_set_info);

void SeekIter(std::list<std::shared_ptr<SortVector>>::iterator iter_anchor,
IterHeapInfo* iter_heap_info, const char* seek_key,
Expand Down Expand Up @@ -517,7 +517,7 @@ void SortVectorContainer::Insert(SortItem* new_item) {
}
}

void SortVectorContainer::InitIterator(IterSortSettingInfo* sort_set_info) {
void SortVectorContainer::InitIterator(std::shared_ptr<IterSortSettingInfo> sort_set_info) {
SortItem* sort_item;
bool immutable = false;

Expand All @@ -530,7 +530,7 @@ void SortVectorContainer::InitIterator(IterSortSettingInfo* sort_set_info) {
sort_set_info->iter_sort_vector_ =
std::make_shared<SortVector>(switch_vector_limit_);
sort_item = sort_set_info->iter_sort_vector_->GetIterPoint();
sort_item->SetSortSetInfo(static_cast<void*>(sort_set_info));
sort_item->SetSortSetInfo(sort_set_info.get());
SortItem* prev_item = last_item_.exchange(sort_item);
prev_item->SetNext(sort_item);
{
Expand Down Expand Up @@ -621,7 +621,7 @@ void SortVectorContainer::SortThread() {
AdvanceAndSort(sort_set_info->iter_sort_vector_);
} else {
// need to add to the empty_iter_sort_vectors_
// TBD AYELET
// TBD AYELET - od we really need that?
empty_iter_sort_vectors_.push_back(
std::make_shared<SortVector>(switch_vector_limit_));
}
Expand Down Expand Up @@ -703,19 +703,37 @@ class HashLocklessRep : public MemTableRep {

std::shared_ptr<SortVectorContainer> sort_vectors_cont_;

size_t GetHash(const char* key) const {
Slice slice = UserKey(key);
return MurmurHash(slice.data(), static_cast<int>(slice.size()), 0) %
bucket_size_;
static size_t GetHash(const Slice& user_key_without_ts) {
return MurmurHash(user_key_without_ts.data(),
static_cast<int>(user_key_without_ts.size()), 0);
}
bool InsertWithCheck(KeyHandle handle);

BucketHeader* GetBucket(size_t i) const { return &buckets_.get()[i]; }
static Slice UserKeyWithoutTimestamp(
const Slice internal_key, const MemTableRep::KeyComparator& compare) {
#if defined(ROCKSDB_USE_RTTI) && !defined(NDEBUG)
auto key_comparator =
dynamic_cast<const MemTable::KeyComparator*>(&compare);
assert(key_comparator != nullptr);
#else
auto key_comparator = static_cast<const MemTable::KeyComparator*>(&compare);
ayulas marked this conversation as resolved.
Show resolved Hide resolved
#endif
const Comparator* user_comparator =
key_comparator->comparator.user_comparator();
const size_t ts_sz = user_comparator->timestamp_size();
return ExtractUserKeyAndStripTimestamp(internal_key, ts_sz);
}

BucketHeader* GetBucket(const char* key) const {
return GetBucket(GetHash(key));
BucketHeader* GetBucket(const char* key,
const MemTableRep::KeyComparator& comparator) const {
const size_t hash = GetHash(
UserKeyWithoutTimestamp(comparator.decode_key(key), comparator));
BucketHeader* bucket =
const_cast<BucketHeader*>(&buckets_[hash % bucket_size_]);
return bucket;
}

bool InsertWithCheck(KeyHandle handle);

class Iterator : public MemTableRep::Iterator {
public:
// Initialize an iterator over the specified list.
Expand All @@ -725,10 +743,9 @@ class HashLocklessRep : public MemTableRep {
: sort_vectors_cont_(sort_vectors_cont),
iter_heap_info_(comparator),
up_iter_direction_(true) {
IterSortSettingInfo sort_set_info;
sort_vectors_cont_->InitIterator(&sort_set_info);
iter_anchor_ = sort_set_info.iter_anchor_;
iter_sort_items_num_ = sort_set_info.iter_size_;
sort_vectors_cont_->InitIterator(sort_set_info_);
iter_anchor_ = sort_set_info_->iter_anchor_;
iter_sort_items_num_ = sort_set_info_->iter_size_;
// allocate iter_heap_info
iter_heap_info_.Init(iter_sort_items_num_);
}
Expand Down Expand Up @@ -816,6 +833,7 @@ class HashLocklessRep : public MemTableRep {
uint32_t iter_sort_items_num_;
IterHeapInfo iter_heap_info_;
bool up_iter_direction_;
std::shared_ptr<IterSortSettingInfo> sort_set_info_;

protected:
std::string tmp_; // For passing to EncodeKey
Expand Down Expand Up @@ -879,7 +897,7 @@ KeyHandle HashLocklessRep::Allocate(const size_t len, char** buf) {

void HashLocklessRep::Insert(KeyHandle handle) {
SortItem* sort_item = static_cast<SortItem*>(handle);
BucketHeader* bucket = GetBucket(sort_item->key_);
BucketHeader* bucket = GetBucket(sort_item->key_, this->compare_);
bucket->Add(sort_item, this->compare_, false);
// insert to later sorter list
sort_vectors_cont_->Insert(sort_item);
Expand All @@ -889,7 +907,7 @@ void HashLocklessRep::Insert(KeyHandle handle) {

bool HashLocklessRep::InsertWithCheck(KeyHandle handle) {
SortItem* sort_item = static_cast<SortItem*>(handle);
BucketHeader* bucket = GetBucket(sort_item->key_);
BucketHeader* bucket = GetBucket(sort_item->key_, this->compare_);

if (!bucket->Add(sort_item, this->compare_, true)) {
return false;
Expand Down Expand Up @@ -920,9 +938,9 @@ bool HashLocklessRep::InsertKeyConcurrently(KeyHandle handle) {
}

bool HashLocklessRep::Contains(const char* key) const {
BucketHeader* bucket = GetBucket(key);
BucketHeader* bucket = GetBucket(key, this->compare_);

return bucket->Contains(this->compare_, key);
return bucket->Contains(key, this->compare_);
}

void HashLocklessRep::MarkReadOnly() { sort_vectors_cont_->Immutable(); }
Expand All @@ -934,7 +952,7 @@ size_t HashLocklessRep::ApproximateMemoryUsage() {

void HashLocklessRep::Get(const LookupKey& k, void* callback_args,
bool (*callback_func)(void* arg, const char* entry)) {
BucketHeader* bucket = GetBucket(k.memtable_key().data());
BucketHeader* bucket = GetBucket(k.memtable_key().data(), this->compare_);
MutexLock l(&bucket->mutex_);

for (auto iter = bucket->items_.begin(); iter != bucket->items_.end();
Expand All @@ -947,7 +965,6 @@ void HashLocklessRep::Get(const LookupKey& k, void* callback_args,

MemTableRep::Iterator* HashLocklessRep::GetIterator(Arena* arena) {
bool empty_iter = (sort_vectors_cont_->items_count_.load() == 0);
if (!sort_vectors_cont_->immutable_.load()) empty_iter = true;
if (arena != nullptr) {
void* mem;
if (empty_iter) {
Expand Down
7 changes: 4 additions & 3 deletions tools/db_crashtest.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ def random_distribution(cuts_count):
"experimental_mempurge_threshold": lambda: 10.0*random.random(),
"max_background_compactions": 1,
"max_bytes_for_level_base": 67108864,
"memtablerep": "skip_list",
"memtablerep": lambda: random.choice(["skip_list", "speedb.HashSpdRepFactory"]),
"target_file_size_base": 16777216,
"target_file_size_multiplier": 1,
"test_batches_snapshots": 0,
Expand Down Expand Up @@ -537,8 +537,9 @@ def finalize_and_sanitize(src_params, counter):
if dest_params.get("compression_type") != "zstd":
dest_params["compression_zstd_max_train_bytes"] = 0
if dest_params.get("allow_concurrent_memtable_write", 1) == 1:
# TODO: yuval- add hash_spd memtable
dest_params["memtablerep"] = "skip_list"
dest_params["memtablerep"] = random.choice(
["skip_list", "speedb.HashSpdRepFactory"]
)
if dest_params["mmap_read"] == 1:
dest_params["use_direct_io_for_flush_and_compaction"] = 0
dest_params["use_direct_reads"] = 0
Expand Down