diff --git a/plugin/speedb/memtable/hash_spd_rep.cc b/plugin/speedb/memtable/hash_spd_rep.cc index f3f11d5ee5..93950c88a9 100644 --- a/plugin/speedb/memtable/hash_spd_rep.cc +++ b/plugin/speedb/memtable/hash_spd_rep.cc @@ -84,8 +84,8 @@ struct BucketHeader { BucketHeader() { items_.clear(); } - bool InternalContains(const MemTableRep::KeyComparator& comparator, - const char* check_key) { + bool InternalContains(const char* check_key, + const MemTableRep::KeyComparator& comparator) { if (items_.size() == 0) { return false; } @@ -102,17 +102,17 @@ struct BucketHeader { return false; } - bool Contains(const MemTableRep::KeyComparator& comparator, - const char* check_key) { + bool Contains(const char* check_key, + const MemTableRep::KeyComparator& comparator) { MutexLock l(&mutex_); - return InternalContains(comparator, check_key); + return InternalContains(check_key, comparator); } bool Add(SortItem* sort_item, const MemTableRep::KeyComparator& comparator, bool check_exist) { MutexLock l(&mutex_); if (check_exist) { - if (InternalContains(comparator, sort_item->key_)) return false; + if (InternalContains(sort_item->key_, comparator)) return false; } items_.push_front(sort_item); @@ -456,7 +456,7 @@ class SortVectorContainer { void Insert(SortItem* new_item); - void InitIterator(IterSortSettingInfo* sort_set_info); + void InitIterator(std::shared_ptr sort_set_info); void SeekIter(std::list>::iterator iter_anchor, IterHeapInfo* iter_heap_info, const char* seek_key, @@ -517,7 +517,7 @@ void SortVectorContainer::Insert(SortItem* new_item) { } } -void SortVectorContainer::InitIterator(IterSortSettingInfo* sort_set_info) { +void SortVectorContainer::InitIterator(std::shared_ptr sort_set_info) { SortItem* sort_item; bool immutable = false; @@ -530,7 +530,7 @@ void SortVectorContainer::InitIterator(IterSortSettingInfo* sort_set_info) { sort_set_info->iter_sort_vector_ = std::make_shared(switch_vector_limit_); sort_item = sort_set_info->iter_sort_vector_->GetIterPoint(); - sort_item->SetSortSetInfo(static_cast(sort_set_info)); + sort_item->SetSortSetInfo(sort_set_info.get()); SortItem* prev_item = last_item_.exchange(sort_item); prev_item->SetNext(sort_item); { @@ -621,7 +621,7 @@ void SortVectorContainer::SortThread() { AdvanceAndSort(sort_set_info->iter_sort_vector_); } else { // need to add to the empty_iter_sort_vectors_ - // TBD AYELET + // TBD AYELET - od we really need that? empty_iter_sort_vectors_.push_back( std::make_shared(switch_vector_limit_)); } @@ -703,19 +703,37 @@ class HashLocklessRep : public MemTableRep { std::shared_ptr sort_vectors_cont_; - size_t GetHash(const char* key) const { - Slice slice = UserKey(key); - return MurmurHash(slice.data(), static_cast(slice.size()), 0) % - bucket_size_; + static size_t GetHash(const Slice& user_key_without_ts) { + return MurmurHash(user_key_without_ts.data(), + static_cast(user_key_without_ts.size()), 0); } - bool InsertWithCheck(KeyHandle handle); - BucketHeader* GetBucket(size_t i) const { return &buckets_.get()[i]; } + static Slice UserKeyWithoutTimestamp( + const Slice internal_key, const MemTableRep::KeyComparator& compare) { +#if defined(ROCKSDB_USE_RTTI) && !defined(NDEBUG) + auto key_comparator = + dynamic_cast(&compare); + assert(key_comparator != nullptr); +#else + auto key_comparator = static_cast(&compare); +#endif + const Comparator* user_comparator = + key_comparator->comparator.user_comparator(); + const size_t ts_sz = user_comparator->timestamp_size(); + return ExtractUserKeyAndStripTimestamp(internal_key, ts_sz); + } - BucketHeader* GetBucket(const char* key) const { - return GetBucket(GetHash(key)); + BucketHeader* GetBucket(const char* key, + const MemTableRep::KeyComparator& comparator) const { + const size_t hash = GetHash( + UserKeyWithoutTimestamp(comparator.decode_key(key), comparator)); + BucketHeader* bucket = + const_cast(&buckets_[hash % bucket_size_]); + return bucket; } + bool InsertWithCheck(KeyHandle handle); + class Iterator : public MemTableRep::Iterator { public: // Initialize an iterator over the specified list. @@ -725,10 +743,9 @@ class HashLocklessRep : public MemTableRep { : sort_vectors_cont_(sort_vectors_cont), iter_heap_info_(comparator), up_iter_direction_(true) { - IterSortSettingInfo sort_set_info; - sort_vectors_cont_->InitIterator(&sort_set_info); - iter_anchor_ = sort_set_info.iter_anchor_; - iter_sort_items_num_ = sort_set_info.iter_size_; + sort_vectors_cont_->InitIterator(sort_set_info_); + iter_anchor_ = sort_set_info_->iter_anchor_; + iter_sort_items_num_ = sort_set_info_->iter_size_; // allocate iter_heap_info iter_heap_info_.Init(iter_sort_items_num_); } @@ -816,6 +833,7 @@ class HashLocklessRep : public MemTableRep { uint32_t iter_sort_items_num_; IterHeapInfo iter_heap_info_; bool up_iter_direction_; + std::shared_ptr sort_set_info_; protected: std::string tmp_; // For passing to EncodeKey @@ -879,7 +897,7 @@ KeyHandle HashLocklessRep::Allocate(const size_t len, char** buf) { void HashLocklessRep::Insert(KeyHandle handle) { SortItem* sort_item = static_cast(handle); - BucketHeader* bucket = GetBucket(sort_item->key_); + BucketHeader* bucket = GetBucket(sort_item->key_, this->compare_); bucket->Add(sort_item, this->compare_, false); // insert to later sorter list sort_vectors_cont_->Insert(sort_item); @@ -889,7 +907,7 @@ void HashLocklessRep::Insert(KeyHandle handle) { bool HashLocklessRep::InsertWithCheck(KeyHandle handle) { SortItem* sort_item = static_cast(handle); - BucketHeader* bucket = GetBucket(sort_item->key_); + BucketHeader* bucket = GetBucket(sort_item->key_, this->compare_); if (!bucket->Add(sort_item, this->compare_, true)) { return false; @@ -920,9 +938,9 @@ bool HashLocklessRep::InsertKeyConcurrently(KeyHandle handle) { } bool HashLocklessRep::Contains(const char* key) const { - BucketHeader* bucket = GetBucket(key); + BucketHeader* bucket = GetBucket(key, this->compare_); - return bucket->Contains(this->compare_, key); + return bucket->Contains(key, this->compare_); } void HashLocklessRep::MarkReadOnly() { sort_vectors_cont_->Immutable(); } @@ -934,7 +952,7 @@ size_t HashLocklessRep::ApproximateMemoryUsage() { void HashLocklessRep::Get(const LookupKey& k, void* callback_args, bool (*callback_func)(void* arg, const char* entry)) { - BucketHeader* bucket = GetBucket(k.memtable_key().data()); + BucketHeader* bucket = GetBucket(k.memtable_key().data(), this->compare_); MutexLock l(&bucket->mutex_); for (auto iter = bucket->items_.begin(); iter != bucket->items_.end(); @@ -947,7 +965,6 @@ void HashLocklessRep::Get(const LookupKey& k, void* callback_args, MemTableRep::Iterator* HashLocklessRep::GetIterator(Arena* arena) { bool empty_iter = (sort_vectors_cont_->items_count_.load() == 0); - if (!sort_vectors_cont_->immutable_.load()) empty_iter = true; if (arena != nullptr) { void* mem; if (empty_iter) { diff --git a/tools/db_crashtest.py b/tools/db_crashtest.py index b3bda51e5c..c530d69b91 100644 --- a/tools/db_crashtest.py +++ b/tools/db_crashtest.py @@ -310,7 +310,7 @@ def random_distribution(cuts_count): "experimental_mempurge_threshold": lambda: 10.0*random.random(), "max_background_compactions": 1, "max_bytes_for_level_base": 67108864, - "memtablerep": "skip_list", + "memtablerep": lambda: random.choice(["skip_list", "speedb.HashSpdRepFactory"]), "target_file_size_base": 16777216, "target_file_size_multiplier": 1, "test_batches_snapshots": 0, @@ -537,8 +537,9 @@ def finalize_and_sanitize(src_params, counter): if dest_params.get("compression_type") != "zstd": dest_params["compression_zstd_max_train_bytes"] = 0 if dest_params.get("allow_concurrent_memtable_write", 1) == 1: - # TODO: yuval- add hash_spd memtable - dest_params["memtablerep"] = "skip_list" + dest_params["memtablerep"] = random.choice( + ["skip_list", "speedb.HashSpdRepFactory"] + ) if dest_params["mmap_read"] == 1: dest_params["use_direct_io_for_flush_and_compaction"] = 0 dest_params["use_direct_reads"] = 0