Skip to content

Commit

Permalink
639: move hashSpdb memtable from plugin to main code
Browse files Browse the repository at this point in the history
  • Loading branch information
ayulas committed Aug 16, 2023
1 parent a53ad27 commit 8cceec7
Show file tree
Hide file tree
Showing 17 changed files with 111 additions and 847 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -818,6 +818,7 @@ set(SOURCES
memory/memory_allocator.cc
memtable/alloc_tracker.cc
memtable/hash_linklist_rep.cc
memtable/hash_spdb_rep.cc
memtable/hash_skiplist_rep.cc
memtable/skiplistrep.cc
memtable/vectorrep.cc
Expand Down
2 changes: 1 addition & 1 deletion HISTORY.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Speedb Change Log

## Unreleased
* move hashSpdb memtable from plugin to main code (#639)

### New Features
* Snapshot optimization - The most important information inside a snapshot is its Sequence number, which allows the compaction to know if the key-value should be deleted or not. The sequence number is being changed when modification happens in the db. This feature allows the db to take a snapshot without acquiring db mutex when the last snapshot has the same sequence number as a new one. In transactional db with mostly read operations, it should improve performance when used with multithreaded environment and as well other scenarios of taking large amount of snapshots with mostly read operations.
Expand Down Expand Up @@ -64,7 +65,6 @@ Also switch to waiting a sec on the CV each time. This is required since a bg er
* db_bench: Create a WBM once for all db-s regardless of their use in different groups (#550)
* Tompstone unit test faiure (#560)
* build: Remove unused variables in unit tests (#581)

### Miscellaneous
* disable failing unit tests and paired bloom filter stress testing
* version: update Speedb patch version to 2.4.1 (#503)
Expand Down
2 changes: 2 additions & 0 deletions TARGETS
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[
"memtable/alloc_tracker.cc",
"memtable/hash_linklist_rep.cc",
"memtable/hash_skiplist_rep.cc",
"memtable/hash_spdb_rep.cc",
"memtable/skiplistrep.cc",
"memtable/vectorrep.cc",
"memtable/write_buffer_manager.cc",
Expand Down Expand Up @@ -493,6 +494,7 @@ cpp_library_wrapper(name="rocksdb_whole_archive_lib", srcs=[
"memtable/alloc_tracker.cc",
"memtable/hash_linklist_rep.cc",
"memtable/hash_skiplist_rep.cc",
"memtable/hash_spdb_rep.cc",
"memtable/skiplistrep.cc",
"memtable/vectorrep.cc",
"memtable/write_buffer_manager.cc",
Expand Down
3 changes: 3 additions & 0 deletions include/rocksdb/memtablerep.h
Original file line number Diff line number Diff line change
Expand Up @@ -519,4 +519,7 @@ extern MemTableRepFactory* NewHashLinkListRepFactory(
bool if_log_bucket_dist_when_flash = true,
uint32_t threshold_use_skiplist = 256);

// The factory is to create memtables based on a sorted hash table - spdb hash:
extern MemTableRepFactory* NewHashSpdbRepFactory(size_t bucket_count = 1000000);

} // namespace ROCKSDB_NAMESPACE
124 changes: 78 additions & 46 deletions plugin/speedb/memtable/hash_spd_rep.cc → memtable/hash_spdb_rep.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include "plugin/speedb/memtable/hash_spd_rep.h"

#include <algorithm>
#include <atomic>
#include <list>
#include <vector>

#include "db/memtable.h"
#include "memory/arena.h"
#include "memtable/spdb_sorted_vector.h"
#include "memtable/stl_wrappers.h"
#include "monitoring/histogram.h"
#include "plugin/speedb/memtable/spdb_sorted_vector.h"
#include "port/port.h"
#include "rocksdb/memtablerep.h"
#include "rocksdb/slice.h"
Expand Down Expand Up @@ -398,13 +396,13 @@ void SpdbVectorContainer::SortThread() {
}
}

class HashSpdRep : public MemTableRep {
class HashSpdbRep : public MemTableRep {
public:
HashSpdRep(const MemTableRep::KeyComparator& compare, Allocator* allocator,
size_t bucket_size, bool use_seek_parallel_threshold = false);
HashSpdbRep(const MemTableRep::KeyComparator& compare, Allocator* allocator,
size_t bucket_size, bool use_seek_parallel_threshold = false);

HashSpdRep(Allocator* allocator, size_t bucket_size,
bool use_seek_parallel_threshold = false);
HashSpdbRep(Allocator* allocator, size_t bucket_size,
bool use_seek_parallel_threshold = false);
void PostCreate(const MemTableRep::KeyComparator& compare,
Allocator* allocator);

Expand Down Expand Up @@ -435,7 +433,7 @@ class HashSpdRep : public MemTableRep {
void Get(const LookupKey& k, void* callback_args,
bool (*callback_func)(void* arg, const char* entry)) override;

~HashSpdRep() override;
~HashSpdbRep() override;

MemTableRep::Iterator* GetIterator(Arena* arena = nullptr) override;

Expand All @@ -449,32 +447,32 @@ class HashSpdRep : public MemTableRep {
std::shared_ptr<SpdbVectorContainer> spdb_vectors_cont_ = nullptr;
};

HashSpdRep::HashSpdRep(const MemTableRep::KeyComparator& compare,
Allocator* allocator, size_t bucket_size,
bool use_seek_parallel_threshold)
: HashSpdRep(allocator, bucket_size, use_seek_parallel_threshold) {
HashSpdbRep::HashSpdbRep(const MemTableRep::KeyComparator& compare,
Allocator* allocator, size_t bucket_size,
bool use_seek_parallel_threshold)
: HashSpdbRep(allocator, bucket_size, use_seek_parallel_threshold) {
spdb_vectors_cont_ = std::make_shared<SpdbVectorContainer>(compare);
}

HashSpdRep::HashSpdRep(Allocator* allocator, size_t bucket_size,
bool use_seek_parallel_threshold)
HashSpdbRep::HashSpdbRep(Allocator* allocator, size_t bucket_size,
bool use_seek_parallel_threshold)
: MemTableRep(allocator),
spdb_hash_table_(bucket_size),
use_seek_parallel_threshold_(use_seek_parallel_threshold) {}

void HashSpdRep::PostCreate(const MemTableRep::KeyComparator& compare,
Allocator* allocator) {
void HashSpdbRep::PostCreate(const MemTableRep::KeyComparator& compare,
Allocator* allocator) {
allocator_ = allocator;
spdb_vectors_cont_ = std::make_shared<SpdbVectorContainer>(compare);
}

HashSpdRep::~HashSpdRep() {
HashSpdbRep::~HashSpdbRep() {
if (spdb_vectors_cont_) {
MarkReadOnly();
}
}

KeyHandle HashSpdRep::Allocate(const size_t len, char** buf) {
KeyHandle HashSpdbRep::Allocate(const size_t len, char** buf) {
// constexpr size_t kInlineDataSize =
// sizeof(SpdbKeyHandle) - offsetof(SpdbKeyHandle, key_);

Expand All @@ -488,7 +486,7 @@ KeyHandle HashSpdRep::Allocate(const size_t len, char** buf) {
return h;
}

bool HashSpdRep::InsertKey(KeyHandle handle) {
bool HashSpdbRep::InsertKey(KeyHandle handle) {
SpdbKeyHandle* spdb_handle = static_cast<SpdbKeyHandle*>(handle);
if (!spdb_hash_table_.Add(spdb_handle, GetComparator())) {
return false;
Expand All @@ -498,31 +496,31 @@ bool HashSpdRep::InsertKey(KeyHandle handle) {
return true;
}

bool HashSpdRep::Contains(const char* key) const {
bool HashSpdbRep::Contains(const char* key) const {
if (spdb_vectors_cont_->IsEmpty()) {
return false;
}
return spdb_hash_table_.Contains(key, GetComparator(),
!spdb_vectors_cont_->IsReadOnly());
}

void HashSpdRep::MarkReadOnly() { spdb_vectors_cont_->MarkReadOnly(); }
void HashSpdbRep::MarkReadOnly() { spdb_vectors_cont_->MarkReadOnly(); }

size_t HashSpdRep::ApproximateMemoryUsage() {
size_t HashSpdbRep::ApproximateMemoryUsage() {
// Memory is always allocated from the allocator.
return 0;
}

void HashSpdRep::Get(const LookupKey& k, void* callback_args,
bool (*callback_func)(void* arg, const char* entry)) {
void HashSpdbRep::Get(const LookupKey& k, void* callback_args,
bool (*callback_func)(void* arg, const char* entry)) {
if (spdb_vectors_cont_->IsEmpty()) {
return;
}
spdb_hash_table_.Get(k, GetComparator(), callback_args, callback_func,
!spdb_vectors_cont_->IsReadOnly());
}

MemTableRep::Iterator* HashSpdRep::GetIterator(Arena* arena) {
MemTableRep::Iterator* HashSpdbRep::GetIterator(Arena* arena) {
const bool empty_iter =
spdb_vectors_cont_->IsEmpty() ||
(use_seek_parallel_threshold_ && !spdb_vectors_cont_->IsReadOnly());
Expand All @@ -542,6 +540,11 @@ MemTableRep::Iterator* HashSpdRep::GetIterator(Arena* arena) {
return new SpdbVectorIterator(spdb_vectors_cont_, GetComparator());
}
}
struct HashSpdbRepOptions {
static const char* kName() { return "HashSpdbRepOptions"; }
size_t hash_bucket_count;
bool use_seek_parallel_threshold;
};

static std::unordered_map<std::string, OptionTypeInfo> hash_spdb_factory_info =
{
Expand All @@ -554,39 +557,68 @@ static std::unordered_map<std::string, OptionTypeInfo> hash_spdb_factory_info =
OptionType::kBoolean, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
};
} // namespace

// HashSpdRepFactory
class HashSpdbRepFactory : public MemTableRepFactory {
public:
explicit HashSpdbRepFactory(size_t hash_bucket_count = 1000000) {
options_.hash_bucket_count = hash_bucket_count;
options_.use_seek_parallel_threshold = false;

HashSpdRepFactory::HashSpdRepFactory(size_t hash_bucket_count) {
options_.hash_bucket_count = hash_bucket_count;
options_.use_seek_parallel_threshold = false;
if (hash_bucket_count == 0) {
options_.use_seek_parallel_threshold = true;
options_.hash_bucket_count = 1000000;
}
RegisterOptions(&options_, &hash_spdb_factory_info);
}

using MemTableRepFactory::CreateMemTableRep;
MemTableRep* CreateMemTableRep(const MemTableRep::KeyComparator& compare,
Allocator* allocator,
const SliceTransform* transform,
Logger* logger) override;
bool IsInsertConcurrentlySupported() const override { return true; }
bool CanHandleDuplicatedKey() const override { return true; }
MemTableRep* PreCreateMemTableRep() override;
void PostCreateMemTableRep(MemTableRep* switch_mem,
const MemTableRep::KeyComparator& compare,
Allocator* allocator,
const SliceTransform* transform,
Logger* logger) override;

static const char* kClassName() { return "HashSpdbRepFactory"; }
const char* Name() const override { return kClassName(); }

if (hash_bucket_count == 0) {
options_.use_seek_parallel_threshold = true;
options_.hash_bucket_count = 1000000;
}
RegisterOptions(&options_, &hash_spdb_factory_info);
}
private:
HashSpdbRepOptions options_;
};

} // namespace

MemTableRep* HashSpdRepFactory::PreCreateMemTableRep() {
MemTableRep* hash_spd = new HashSpdRep(nullptr, options_.hash_bucket_count,
options_.use_seek_parallel_threshold);
return hash_spd;
// HashSpdbRepFactory

MemTableRep* HashSpdbRepFactory::PreCreateMemTableRep() {
MemTableRep* hash_spdb =
new HashSpdbRep(nullptr, options_.hash_bucket_count,
options_.use_seek_parallel_threshold);
return hash_spdb;
}

void HashSpdRepFactory::PostCreateMemTableRep(
void HashSpdbRepFactory::PostCreateMemTableRep(
MemTableRep* switch_mem, const MemTableRep::KeyComparator& compare,
Allocator* allocator, const SliceTransform* /*transform*/,
Logger* /*logger*/) {
static_cast<HashSpdRep*>(switch_mem)->PostCreate(compare, allocator);
static_cast<HashSpdbRep*>(switch_mem)->PostCreate(compare, allocator);
}

MemTableRep* HashSpdRepFactory::CreateMemTableRep(
MemTableRep* HashSpdbRepFactory::CreateMemTableRep(
const MemTableRep::KeyComparator& compare, Allocator* allocator,
const SliceTransform* /*transform*/, Logger* /*logger*/) {
return new HashSpdRep(compare, allocator, options_.hash_bucket_count,
options_.use_seek_parallel_threshold);
return new HashSpdbRep(compare, allocator, options_.hash_bucket_count,
options_.use_seek_parallel_threshold);
}

MemTableRepFactory* NewHashSpdbRepFactory(size_t bucket_count) {
return new HashSpdbRepFactory(bucket_count);
}

} // namespace ROCKSDB_NAMESPACE
5 changes: 4 additions & 1 deletion memtable/memtablerep_bench.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,12 @@ DEFINE_string(memtablerep, "skiplist",
"\tvector -- backed by an std::vector\n"
"\thashskiplist -- backed by a hash skip list\n"
"\thashlinklist -- backed by a hash linked list\n"
"\thashspdb -- backed by a hash spdb\n"
"\tcuckoo -- backed by a cuckoo hash table");

DEFINE_int64(bucket_count, 1000000,
"bucket_count parameter to pass into NewHashSkiplistRepFactory or "
"NewHashLinkListRepFactory");
"NewHashLinkListRepFactory NewHashSpdbRepFactory");

DEFINE_int32(
hashskiplist_height, 4,
Expand Down Expand Up @@ -595,6 +596,8 @@ int main(int argc, char** argv) {
FLAGS_if_log_bucket_dist_when_flash, FLAGS_threshold_use_skiplist));
options.prefix_extractor.reset(
ROCKSDB_NAMESPACE::NewFixedPrefixTransform(FLAGS_prefix_length));
} else if (FLAGS_memtablerep == "hashspdb") {
factory.reset(ROCKSDB_NAMESPACE::NewHashSpdbRepFactory(FLAGS_bucket_count));
} else {
ROCKSDB_NAMESPACE::ConfigOptions config_options;
config_options.ignore_unsupported_options = false;
Expand Down
File renamed without changes.
10 changes: 1 addition & 9 deletions options/options.cc
Original file line number Diff line number Diff line change
Expand Up @@ -651,15 +651,7 @@ ColumnFamilyOptions* ColumnFamilyOptions::EnableSpeedbFeaturesCF(
if (prefix_extractor) {
memtable_factory.reset(NewHashSkipListRepFactory());
} else {
std::string memtablerep = "speedb.HashSpdRepFactory";
std::string memtable_opt;
memtable_opt = ":" + std::to_string(0);
std::unique_ptr<MemTableRepFactory> mem_factory_ptr;

auto s = MemTableRepFactory::CreateFromString(
ConfigOptions(), memtablerep + memtable_opt, &mem_factory_ptr);
assert(s.ok());
memtable_factory.reset(mem_factory_ptr.release());
memtable_factory.reset(NewHashSpdbRepFactory());
}
return this;
}
Expand Down
1 change: 0 additions & 1 deletion plugin/speedb/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

set(speedb_SOURCES
speedb_registry.cc
memtable/hash_spd_rep.cc
paired_filter/speedb_paired_bloom.cc
paired_filter/speedb_paired_bloom_internal.cc
pinning_policy/scoped_pinning_policy.cc)
Expand Down
56 changes: 0 additions & 56 deletions plugin/speedb/memtable/hash_spd_rep.h

This file was deleted.

Loading

0 comments on commit 8cceec7

Please sign in to comment.