From f7930d0a8dfc5e35396dcdde5abd5fab2f38f0f9 Mon Sep 17 00:00:00 2001 From: Haoran Xu <haoranxu510@gmail.com> Date: Tue, 30 Jun 2020 20:39:18 -0700 Subject: [PATCH] New API for clean handling of VariantLengthKey In this diff, we propose the new API for clean handling of VariantLengthKey. Previously, handling VariantLengthKey is extremely tricky and fragile (if possible at all), and requires writing code that any project with reasonable code quality standards would not tolerate. This is even causing bugs in our own codebase (e.g. Compaction contexts, and even the test that tests VariantLengthKey is buggy itself). We propose a backward-compatible new API to handle VariantLengthKey cleanly. We add a new concept -- ShallowKey. This class is required to provide the same APIs as the Key class (size(), GetHash() and operator==()), but unlike Key class, which raw contents (interpreted as a uint8_t* string) is directly written into the log, ShallowKey's internal representation does not matter. In addition to the existing APIs, a new API with prototype void write_deep_key_at(Key* dst) const is required, which should write the bytestream-representation of the Key into address 'dst'. In order to use the new API, all you need to do is to change the key() API in Context to return ShallowKey type, instead of key_t type. Example: struct UpsertContext { using key_t = Key; // uncomment below to use new API // ShallowKey key(); // uncomment below to use old API // key_t key(); }; --- cc/CMakeLists.txt | 2 +- cc/README.md | 6 +- cc/benchmark-dir/README.md | 29 ++++++ cc/benchmark-dir/benchmark.cc | 8 +- cc/playground/sum_store-dir/sum_store.cc | 1 - cc/src/core/faster.h | 104 ++++++++++++-------- cc/src/core/internal_contexts.h | 99 +++++++++++++++++-- cc/src/core/record.h | 12 ++- cc/src/core/utility.h | 17 ++++ cc/test/in_memory_test.cc | 119 ++++++++++++----------- cc/test/test_types.h | 22 +++++ 11 files changed, 304 insertions(+), 115 deletions(-) diff --git a/cc/CMakeLists.txt b/cc/CMakeLists.txt index 77fef02df..e6fdf4a4f 100644 --- a/cc/CMakeLists.txt +++ b/cc/CMakeLists.txt @@ -17,7 +17,7 @@ if (MSVC) else() set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++14") - set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -Og -g -D_DEBUG") + set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -O0 -g -D_DEBUG") set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -O3 -g") endif() diff --git a/cc/README.md b/cc/README.md index 2f8036e7f..6ddd6b5d7 100644 --- a/cc/README.md +++ b/cc/README.md @@ -13,14 +13,14 @@ Create new directory "build" off the root directory (FASTER\cc). From the new "build" directory, execute: ```sh -cmake .. -G "<MSVC compiler> Win64" +cmake .. -G "<MSVC compiler>" ``` To see a list of supported MSVC compiler versions, just run "cmake -G". As of -this writing, we're using Visual Studio 2017, so you would execute: +this writing, we're using Visual Studio 2019, so you would execute: ```sh -cmake .. -G "Visual Studio 15 2017 Win64" +cmake .. -G "Visual Studio 16 2019" ``` That will create build scripts inside your new "build" directory, including diff --git a/cc/benchmark-dir/README.md b/cc/benchmark-dir/README.md index 3fb5f8e4d..d4d51ce64 100644 --- a/cc/benchmark-dir/README.md +++ b/cc/benchmark-dir/README.md @@ -15,3 +15,32 @@ file consists only of the 8-byte-integer portion of the key--e.g.: 5575651532496486335 To convert YCSB "basic" output to the format we expect, run "process_ycsb." + + +Running YCSB +============ + +1) 50:50 reads:updates, 72 threads, and YCSB zipf distribution files: + +``` +benchmark.exe 0 72 d:\ycsb_files\load_zipf_250M_raw.dat d:\ycsb_files\run_zipf_250M_1000M_raw.dat +``` + +2) 50:50 reads:updates, 72 threads, and YCSB uniform distribution files: + +``` +benchmark.exe 0 72 d:\ycsb_files\load_uniform_250M_raw.dat d:\ycsb_files\run_uniform_250M_1000M_raw.dat +``` + +3) 100% RMW, 72 threads, and YCSB zipf distribution files: + +``` +benchmark.exe 1 72 d:\ycsb_files\load_zipf_250M_raw.dat d:\ycsb_files\run_zipf_250M_1000M_raw.dat +``` + +4) 100% RMW, 72 threads, and YCSB uniform distribution files: + +``` +benchmark.exe 1 72 d:\ycsb_files\load_uniform_250M_raw.dat d:\ycsb_files\run_uniform_250M_1000M_raw.dat +``` + diff --git a/cc/benchmark-dir/benchmark.cc b/cc/benchmark-dir/benchmark.cc index 607566643..cef4011ca 100644 --- a/cc/benchmark-dir/benchmark.cc +++ b/cc/benchmark-dir/benchmark.cc @@ -47,8 +47,8 @@ static_assert(kCompletePendingInterval % kRefreshInterval == 0, static constexpr uint64_t kNanosPerSecond = 1000000000; static constexpr uint64_t kMaxKey = 268435456; -static constexpr uint64_t kRunSeconds = 360; -static constexpr uint64_t kCheckpointSeconds = 30; +static constexpr uint64_t kRunSeconds = 30; +static constexpr uint64_t kCheckpointSeconds = 0; aligned_unique_ptr_t<uint64_t> init_keys_; aligned_unique_ptr_t<uint64_t> txn_keys_; @@ -577,10 +577,10 @@ void run_benchmark(store_t* store, size_t num_threads) { last_checkpoint_time = current_time; } } - - done_ = true; } + done_ = true; + for(auto& thread : threads) { thread.join(); } diff --git a/cc/playground/sum_store-dir/sum_store.cc b/cc/playground/sum_store-dir/sum_store.cc index e375826b0..c8b930379 100644 --- a/cc/playground/sum_store-dir/sum_store.cc +++ b/cc/playground/sum_store-dir/sum_store.cc @@ -4,7 +4,6 @@ #include <cstdint> #include <cstdio> #include <cstdlib> -#include <experimental/filesystem> #include <string> #include "concurrent_recovery_test.h" diff --git a/cc/src/core/faster.h b/cc/src/core/faster.h index 652b9b3e1..39a4d84d3 100644 --- a/cc/src/core/faster.h +++ b/cc/src/core/faster.h @@ -3,6 +3,8 @@ #pragma once +#define _SILENCE_EXPERIMENTAL_FILESYSTEM_DEPRECATION_WARNING + #include <atomic> #include <cassert> #include <cinttypes> @@ -199,6 +201,9 @@ class FasterKv { // create a new entry. The caller can use the "expected_entry" to CAS its desired address into // the entry. inline AtomicHashBucketEntry* FindOrCreateEntry(KeyHash hash, HashBucketEntry& expected_entry); + template<class C> + inline Address TraceBackForKeyMatchCtxt(const C& ctxt, Address from_address, + Address min_offset) const; inline Address TraceBackForKeyMatch(const key_t& key, Address from_address, Address min_offset) const; Address TraceBackForOtherChainStart(uint64_t old_size, uint64_t new_size, Address from_address, @@ -764,8 +769,7 @@ inline OperationStatus FasterKv<K, V, D>::InternalRead(C& pending_context) const const_cast<faster_t*>(this)->HeavyEnter(); } - const key_t& key = pending_context.key(); - KeyHash hash = key.GetHash(); + KeyHash hash = pending_context.get_key_hash(); HashBucketEntry entry; const AtomicHashBucketEntry* atomic_entry = FindEntry(hash, entry); if(!atomic_entry) { @@ -785,8 +789,8 @@ inline OperationStatus FasterKv<K, V, D>::InternalRead(C& pending_context) const // matches. const record_t* record = reinterpret_cast<const record_t*>(hlog.Get(address)); latest_record_version = record->header.checkpoint_version; - if(key != record->key()) { - address = TraceBackForKeyMatch(key, record->header.previous_address(), head_address); + if(!pending_context.is_key_equal(record->key())) { + address = TraceBackForKeyMatchCtxt(pending_context, record->header.previous_address(), head_address); } } @@ -839,8 +843,7 @@ inline OperationStatus FasterKv<K, V, D>::InternalUpsert(C& pending_context) { HeavyEnter(); } - const key_t& key = pending_context.key(); - KeyHash hash = key.GetHash(); + KeyHash hash = pending_context.get_key_hash(); HashBucketEntry expected_entry; AtomicHashBucketEntry* atomic_entry = FindOrCreateEntry(hash, expected_entry); @@ -855,8 +858,8 @@ inline OperationStatus FasterKv<K, V, D>::InternalUpsert(C& pending_context) { // key that we might be able to update in place. record_t* record = reinterpret_cast<record_t*>(hlog.Get(address)); latest_record_version = record->header.checkpoint_version; - if(key != record->key()) { - address = TraceBackForKeyMatch(key, record->header.previous_address(), head_address); + if(!pending_context.is_key_equal(record->key())) { + address = TraceBackForKeyMatchCtxt(pending_context, record->header.previous_address(), head_address); } } @@ -946,14 +949,15 @@ inline OperationStatus FasterKv<K, V, D>::InternalUpsert(C& pending_context) { // Create a record and attempt RCU. create_record: - uint32_t record_size = record_t::size(key, pending_context.value_size()); + uint32_t record_size = record_t::size(pending_context.key_size(), pending_context.value_size()); Address new_address = BlockAllocate(record_size); record_t* record = reinterpret_cast<record_t*>(hlog.Get(new_address)); new(record) record_t{ RecordInfo{ static_cast<uint16_t>(thread_ctx().version), true, false, false, - expected_entry.address() }, - key }; + expected_entry.address() } + }; + pending_context.write_deep_key_at(const_cast<key_t*>(&record->key())); pending_context.Put(record); HashBucketEntry updated_entry{ new_address, hash.tag(), false }; @@ -980,8 +984,7 @@ inline OperationStatus FasterKv<K, V, D>::InternalRmw(C& pending_context, bool r HeavyEnter(); } - const key_t& key = pending_context.key(); - KeyHash hash = key.GetHash(); + KeyHash hash = pending_context.get_key_hash(); HashBucketEntry expected_entry; AtomicHashBucketEntry* atomic_entry = FindOrCreateEntry(hash, expected_entry); @@ -998,8 +1001,8 @@ inline OperationStatus FasterKv<K, V, D>::InternalRmw(C& pending_context, bool r // key that we might be able to update in place. record_t* record = reinterpret_cast<record_t*>(hlog.Get(address)); latest_record_version = record->header.checkpoint_version; - if(key != record->key()) { - address = TraceBackForKeyMatch(key, record->header.previous_address(), head_address); + if(!pending_context.is_key_equal(record->key())) { + address = TraceBackForKeyMatchCtxt(pending_context, record->header.previous_address(), head_address); } } @@ -1129,8 +1132,8 @@ inline OperationStatus FasterKv<K, V, D>::InternalRmw(C& pending_context, bool r } } uint32_t record_size = old_record != nullptr ? - record_t::size(key, pending_context.value_size(old_record)) : - record_t::size(key, pending_context.value_size()); + record_t::size(pending_context.key_size(), pending_context.value_size(old_record)) : + record_t::size(pending_context.key_size(), pending_context.value_size()); Address new_address = BlockAllocate(record_size); record_t* new_record = reinterpret_cast<record_t*>(hlog.Get(new_address)); @@ -1147,8 +1150,10 @@ inline OperationStatus FasterKv<K, V, D>::InternalRmw(C& pending_context, bool r new(new_record) record_t{ RecordInfo{ static_cast<uint16_t>(version), true, false, false, - expected_entry.address() }, - key }; + expected_entry.address() } + }; + pending_context.write_deep_key_at(const_cast<key_t*>(&new_record->key())); + if(old_record == nullptr || address < hlog.begin_address.load()) { pending_context.RmwInitial(new_record); } else if(address >= head_address) { @@ -1199,8 +1204,7 @@ inline OperationStatus FasterKv<K, V, D>::InternalDelete(C& pending_context) { HeavyEnter(); } - const key_t& key = pending_context.key(); - KeyHash hash = key.GetHash(); + KeyHash hash = pending_context.get_key_hash(); HashBucketEntry expected_entry; AtomicHashBucketEntry* atomic_entry = const_cast<AtomicHashBucketEntry*>(FindEntry(hash, expected_entry)); if(!atomic_entry) { @@ -1217,8 +1221,8 @@ inline OperationStatus FasterKv<K, V, D>::InternalDelete(C& pending_context) { if(address >= head_address) { const record_t* record = reinterpret_cast<const record_t*>(hlog.Get(address)); latest_record_version = record->header.checkpoint_version; - if(key != record->key()) { - address = TraceBackForKeyMatch(key, record->header.previous_address(), head_address); + if(!pending_context.is_key_equal(record->key())) { + address = TraceBackForKeyMatchCtxt(pending_context, record->header.previous_address(), head_address); } } @@ -1291,14 +1295,15 @@ inline OperationStatus FasterKv<K, V, D>::InternalDelete(C& pending_context) { } create_record: - uint32_t record_size = record_t::size(key, pending_context.value_size()); + uint32_t record_size = record_t::size(pending_context.key_size(), pending_context.value_size()); Address new_address = BlockAllocate(record_size); record_t* record = reinterpret_cast<record_t*>(hlog.Get(new_address)); new(record) record_t{ RecordInfo{ static_cast<uint16_t>(thread_ctx().version), true, true, false, expected_entry.address() }, - key }; + }; + pending_context.write_deep_key_at(const_cast<key_t*>(&record->key())); HashBucketEntry updated_entry{ new_address, hash.tag(), false }; @@ -1313,8 +1318,24 @@ inline OperationStatus FasterKv<K, V, D>::InternalDelete(C& pending_context) { } template <class K, class V, class D> -inline Address FasterKv<K, V, D>::TraceBackForKeyMatch(const key_t& key, Address from_address, +template<class C> +inline Address FasterKv<K, V, D>::TraceBackForKeyMatchCtxt(const C& ctxt, Address from_address, Address min_offset) const { + while(from_address >= min_offset) { + const record_t* record = reinterpret_cast<const record_t*>(hlog.Get(from_address)); + if(ctxt.is_key_equal(record->key())) { + return from_address; + } else { + from_address = record->header.previous_address(); + continue; + } + } + return from_address; +} + +template <class K, class V, class D> +inline Address FasterKv<K, V, D>::TraceBackForKeyMatch(const key_t& key, Address from_address, + Address min_offset) const { while(from_address >= min_offset) { const record_t* record = reinterpret_cast<const record_t*>(hlog.Get(from_address)); if(key == record->key()) { @@ -1371,7 +1392,7 @@ inline Status FasterKv<K, V, D>::HandleOperationStatus(ExecutionContext& ctx, if(thread_ctx().phase == Phase::PREPARE) { assert(pending_context.type == OperationType::RMW); // Can I be marking an operation again and again? - if(!checkpoint_locks_.get_lock(pending_context.key().GetHash()).try_lock_old()) { + if(!checkpoint_locks_.get_lock(pending_context.get_key_hash()).try_lock_old()) { return PivotAndRetry(ctx, pending_context, async); } } @@ -1381,16 +1402,16 @@ inline Status FasterKv<K, V, D>::HandleOperationStatus(ExecutionContext& ctx, assert(pending_context.type == OperationType::Read || pending_context.type == OperationType::RMW); // Can I be marking an operation again and again? - if(!checkpoint_locks_.get_lock(pending_context.key().GetHash()).try_lock_old()) { + if(!checkpoint_locks_.get_lock(pending_context.get_key_hash()).try_lock_old()) { return PivotAndRetry(ctx, pending_context, async); } } return IssueAsyncIoRequest(ctx, pending_context, async); case OperationStatus::SUCCESS_UNMARK: - checkpoint_locks_.get_lock(pending_context.key().GetHash()).unlock_old(); + checkpoint_locks_.get_lock(pending_context.get_key_hash()).unlock_old(); return Status::Ok; case OperationStatus::NOT_FOUND_UNMARK: - checkpoint_locks_.get_lock(pending_context.key().GetHash()).unlock_old(); + checkpoint_locks_.get_lock(pending_context.get_key_hash()).unlock_old(); return Status::NotFound; case OperationStatus::CPR_SHIFT_DETECTED: return PivotAndRetry(ctx, pending_context, async); @@ -1442,7 +1463,7 @@ inline Status FasterKv<K, V, D>::IssueAsyncIoRequest(ExecutionContext& ctx, pending_context_t& pending_context, bool& async) { // Issue asynchronous I/O request uint64_t io_id = thread_ctx().io_id++; - thread_ctx().pending_ios.insert({ io_id, pending_context.key().GetHash() }); + thread_ctx().pending_ios.insert({ io_id, pending_context.get_key_hash() }); async = true; AsyncIOContext io_request{ this, pending_context.address, &pending_context, &thread_ctx().io_responses, io_id }; @@ -1517,7 +1538,7 @@ void FasterKv<K, V, D>::AsyncGetFromDiskCallback(IAsyncContext* ctxt, Status res faster->AsyncGetFromDisk(context->address, record->disk_size(), AsyncGetFromDiskCallback, *context.get()); context.async = true; - } else if(pending_context->key() == record->key()) { + } else if(pending_context->is_key_equal(record->key())) { //The keys are same, so I/O is complete context->thread_io_responses->push(context.get()); } else { @@ -1563,8 +1584,7 @@ OperationStatus FasterKv<K, V, D>::InternalContinuePendingRmw(ExecutionContext& io_context.caller_context); // Find a hash bucket entry to store the updated value in. - const key_t& key = pending_context->key(); - KeyHash hash = key.GetHash(); + KeyHash hash = pending_context->get_key_hash(); HashBucketEntry expected_entry; AtomicHashBucketEntry* atomic_entry = FindOrCreateEntry(hash, expected_entry); @@ -1575,8 +1595,8 @@ OperationStatus FasterKv<K, V, D>::InternalContinuePendingRmw(ExecutionContext& // Make sure that atomic_entry is OK to update. if(address >= head_address) { record_t* record = reinterpret_cast<record_t*>(hlog.Get(address)); - if(key != record->key()) { - address = TraceBackForKeyMatch(key, record->header.previous_address(), head_address); + if(!pending_context->is_key_equal(record->key())) { + address = TraceBackForKeyMatchCtxt(*pending_context, record->header.previous_address(), head_address); } } @@ -1592,7 +1612,7 @@ OperationStatus FasterKv<K, V, D>::InternalContinuePendingRmw(ExecutionContext& record_t* new_record; if(io_context.address < hlog.begin_address.load()) { // The on-disk trace back failed to find a key match. - uint32_t record_size = record_t::size(key, pending_context->value_size()); + uint32_t record_size = record_t::size(pending_context->key_size(), pending_context->value_size()); new_address = BlockAllocate(record_size); new_record = reinterpret_cast<record_t*>(hlog.Get(new_address)); @@ -1600,13 +1620,14 @@ OperationStatus FasterKv<K, V, D>::InternalContinuePendingRmw(ExecutionContext& RecordInfo{ static_cast<uint16_t>(context.version), true, false, false, expected_entry.address() }, - key }; + }; + pending_context->write_deep_key_at(const_cast<key_t*>(&new_record->key())); pending_context->RmwInitial(new_record); } else { // The record we read from disk. const record_t* disk_record = reinterpret_cast<const record_t*>( io_context.record.GetValidPointer()); - uint32_t record_size = record_t::size(key, pending_context->value_size(disk_record)); + uint32_t record_size = record_t::size(pending_context->key_size(), pending_context->value_size(disk_record)); new_address = BlockAllocate(record_size); new_record = reinterpret_cast<record_t*>(hlog.Get(new_address)); @@ -1614,7 +1635,8 @@ OperationStatus FasterKv<K, V, D>::InternalContinuePendingRmw(ExecutionContext& RecordInfo{ static_cast<uint16_t>(context.version), true, false, false, expected_entry.address() }, - key }; + }; + pending_context->write_deep_key_at(const_cast<key_t*>(&new_record->key())); pending_context->RmwCopy(disk_record, new_record); } @@ -2463,7 +2485,7 @@ void FasterKv<K, V, D>::MarkAllPendingRequests() { const pending_context_t* context = static_cast<const pending_context_t*>(ctxt); // We will succeed, since no other thread can currently advance the entry's version, since this // thread hasn't acked "PENDING" phase completion yet. - bool result = checkpoint_locks_.get_lock(context->key().GetHash()).try_lock_old(); + bool result = checkpoint_locks_.get_lock(context->get_key_hash()).try_lock_old(); assert(result); } for(const auto& pending_io : thread_ctx().pending_ios) { diff --git a/cc/src/core/internal_contexts.h b/cc/src/core/internal_contexts.h index 0d67282fb..869557240 100644 --- a/cc/src/core/internal_contexts.h +++ b/cc/src/core/internal_contexts.h @@ -17,6 +17,7 @@ #include "record.h" #include "state_transitions.h" #include "thread.h" +#include "key_hash.h" namespace FASTER { namespace core { @@ -89,7 +90,10 @@ class PendingContext : public IAsyncContext { entry = entry_; } - virtual const key_t& key() const = 0; + virtual uint32_t key_size() const = 0; + virtual void write_deep_key_at(key_t* dst) const = 0; + virtual KeyHash get_key_hash() const = 0; + virtual bool is_key_equal(const key_t& other) const = 0; /// Caller context. IAsyncContext* caller_context; @@ -109,6 +113,32 @@ class PendingContext : public IAsyncContext { HashBucketEntry entry; }; +// A helper class to copy the key into FASTER log. +// In old API, the Key provided is just the Key type, and we use in-place-new and copy constructor +// to copy the key into the log. In new API, the user provides a ShallowKey, and we call the +// ShallowKey's write_deep_key_at() method to write the key content into the log. +// New API case (user provides ShallowKey) +// +template<bool isShallowKey> +struct write_deep_key_at_helper +{ + template<class ShallowKey, class Key> + static inline void execute(const ShallowKey& key, Key* dst) { + key.write_deep_key_at(dst); + } +}; + +// Old API case (user provides Key) +// +template<> +struct write_deep_key_at_helper<false> +{ + template<class Key> + static inline void execute(const Key& key, Key* dst) { + new (dst) Key(key); + } +}; + /// FASTER's internal Read() context. /// An internal Read() context that has gone async and lost its type information. @@ -136,7 +166,9 @@ class PendingReadContext : public AsyncPendingReadContext<typename RC::key_t> { typedef RC read_context_t; typedef typename read_context_t::key_t key_t; typedef typename read_context_t::value_t value_t; + using key_or_shallow_key_t = std::remove_const_t<std::remove_reference_t<std::result_of_t<decltype(&RC::key)(RC)>>>; typedef Record<key_t, value_t> record_t; + constexpr static const bool kIsShallowKey = !std::is_same<key_or_shallow_key_t, key_t>::value; PendingReadContext(read_context_t& caller_context_, AsyncCallback caller_callback_) : AsyncPendingReadContext<key_t>(caller_context_, caller_callback_) { @@ -159,9 +191,21 @@ class PendingReadContext : public AsyncPendingReadContext<typename RC::key_t> { } public: /// Accessors. - inline const key_t& key() const final { + inline const key_or_shallow_key_t& get_key_or_shallow_key() const { return read_context().key(); } + inline uint32_t key_size() const final { + return read_context().key().size(); + } + inline void write_deep_key_at(key_t* dst) const final { + write_deep_key_at_helper<kIsShallowKey>::execute(read_context().key(), dst); + } + inline KeyHash get_key_hash() const final { + return read_context().key().GetHash(); + } + inline bool is_key_equal(const key_t& other) const final { + return read_context().key() == other; + } inline void Get(const void* rec) final { const record_t* record = reinterpret_cast<const record_t*>(rec); read_context().Get(record->value()); @@ -200,7 +244,9 @@ class PendingUpsertContext : public AsyncPendingUpsertContext<typename UC::key_t typedef UC upsert_context_t; typedef typename upsert_context_t::key_t key_t; typedef typename upsert_context_t::value_t value_t; + using key_or_shallow_key_t = std::remove_const_t<std::remove_reference_t<std::result_of_t<decltype(&UC::key)(UC)>>>; typedef Record<key_t, value_t> record_t; + constexpr static const bool kIsShallowKey = !std::is_same<key_or_shallow_key_t, key_t>::value; PendingUpsertContext(upsert_context_t& caller_context_, AsyncCallback caller_callback_) : AsyncPendingUpsertContext<key_t>(caller_context_, caller_callback_) { @@ -221,10 +267,23 @@ class PendingUpsertContext : public AsyncPendingUpsertContext<typename UC::key_t inline upsert_context_t& upsert_context() { return *static_cast<upsert_context_t*>(PendingContext<key_t>::caller_context); } + public: /// Accessors. - inline const key_t& key() const final { - return upsert_context().key(); + inline const key_or_shallow_key_t& get_key_or_shallow_key() const { + return upsert_context().key(); + } + inline uint32_t key_size() const final { + return upsert_context().key().size(); + } + inline void write_deep_key_at(key_t* dst) const final { + write_deep_key_at_helper<kIsShallowKey>::execute(upsert_context().key(), dst); + } + inline KeyHash get_key_hash() const final { + return upsert_context().key().GetHash(); + } + inline bool is_key_equal(const key_t& other) const final { + return upsert_context().key() == other; } inline void Put(void* rec) final { record_t* record = reinterpret_cast<record_t*>(rec); @@ -273,7 +332,9 @@ class PendingRmwContext : public AsyncPendingRmwContext<typename MC::key_t> { typedef MC rmw_context_t; typedef typename rmw_context_t::key_t key_t; typedef typename rmw_context_t::value_t value_t; + using key_or_shallow_key_t = std::remove_const_t<std::remove_reference_t<std::result_of_t<decltype(&MC::key)(MC)>>>; typedef Record<key_t, value_t> record_t; + constexpr static const bool kIsShallowKey = !std::is_same<key_or_shallow_key_t, key_t>::value; PendingRmwContext(rmw_context_t& caller_context_, AsyncCallback caller_callback_) : AsyncPendingRmwContext<key_t>(caller_context_, caller_callback_) { @@ -296,9 +357,21 @@ class PendingRmwContext : public AsyncPendingRmwContext<typename MC::key_t> { } public: /// Accessors. - const key_t& key() const { + inline const key_or_shallow_key_t& get_key_or_shallow_key() const { return rmw_context().key(); } + inline uint32_t key_size() const final { + return rmw_context().key().size(); + } + inline void write_deep_key_at(key_t* dst) const final { + write_deep_key_at_helper<kIsShallowKey>::execute(rmw_context().key(), dst); + } + inline KeyHash get_key_hash() const final { + return rmw_context().key().GetHash(); + } + inline bool is_key_equal(const key_t& other) const final { + return rmw_context().key() == other; + } /// Set initial value. inline void RmwInitial(void* rec) final { record_t* record = reinterpret_cast<record_t*>(rec); @@ -353,7 +426,9 @@ class PendingDeleteContext : public AsyncPendingDeleteContext<typename MC::key_t typedef MC delete_context_t; typedef typename delete_context_t::key_t key_t; typedef typename delete_context_t::value_t value_t; + using key_or_shallow_key_t = std::remove_const_t<std::remove_reference_t<std::result_of_t<decltype(&MC::key)(MC)>>>; typedef Record<key_t, value_t> record_t; + constexpr static const bool kIsShallowKey = !std::is_same<key_or_shallow_key_t, key_t>::value; PendingDeleteContext(delete_context_t& caller_context_, AsyncCallback caller_callback_) : AsyncPendingDeleteContext<key_t>(caller_context_, caller_callback_) { @@ -376,9 +451,21 @@ class PendingDeleteContext : public AsyncPendingDeleteContext<typename MC::key_t } public: /// Accessors. - inline const key_t& key() const final { + inline const key_or_shallow_key_t& get_key_or_shallow_key() const { return delete_context().key(); } + inline uint32_t key_size() const final { + return delete_context().key().size(); + } + inline void write_deep_key_at(key_t* dst) const final { + write_deep_key_at_helper<kIsShallowKey>::execute(delete_context().key(), dst); + } + inline KeyHash get_key_hash() const final { + return delete_context().key().GetHash(); + } + inline bool is_key_equal(const key_t& other) const final { + return delete_context().key() == other; + } /// Get value size for initial value inline uint32_t value_size() const final { return delete_context().value_size(); diff --git a/cc/src/core/record.h b/cc/src/core/record.h index 80905f47b..4f1f92a98 100644 --- a/cc/src/core/record.h +++ b/cc/src/core/record.h @@ -59,13 +59,19 @@ struct Record { static_assert(alignof(value_t) <= Constants::kCacheLineBytes, "alignof(value_t) > Constants::kCacheLineBytes)"); + Record(RecordInfo header_) + : header{ header_ } { + } + /// For placement new() operator. Can't set value, since it might be set by value = input (for /// upsert), or rmw_initial(...) (for RMW). + /* Record(RecordInfo header_, const key_t& key_) : header{ header_ } { void* buffer = const_cast<key_t*>(&key()); new(buffer)key_t{ key_ }; } + */ /// Key appears immediately after record header (subject to alignment padding). Keys are /// immutable. @@ -93,12 +99,12 @@ struct Record { /// Size of a record to be created, in memory. (Includes padding, if any, after the value, so /// that the next record stored in the log is properly aligned.) - static inline constexpr uint32_t size(const key_t& key_, uint32_t value_size) { + static inline constexpr uint32_t size(uint32_t key_size, uint32_t value_size) { return static_cast<uint32_t>( // --plus Value size, all padded to Header alignment. pad_alignment(value_size + // --plus Key size, all padded to Value alignment. - pad_alignment(key_.size() + + pad_alignment(key_size + // Header, padded to Key alignment. pad_alignment(sizeof(RecordInfo), alignof(key_t)), alignof(value_t)), @@ -106,7 +112,7 @@ struct Record { } /// Size of the existing record, in memory. (Includes padding, if any, after the value.) inline constexpr uint32_t size() const { - return size(key(), value().size()); + return size(key().size(), value().size()); } /// Minimum size of a read from disk that is guaranteed to include the record's header + whatever diff --git a/cc/src/core/utility.h b/cc/src/core/utility.h index 83211bddd..a663c9cf3 100644 --- a/cc/src/core/utility.h +++ b/cc/src/core/utility.h @@ -47,6 +47,23 @@ class Utility { return Rotr64(kMagicNum * hashState, 6); } + static inline uint64_t HashBytesUint8(const uint8_t* str, size_t len) { + // 40343 is a "magic constant" that works well, + // 38299 is another good value. + // Both are primes and have a good distribution of bits. + const uint64_t kMagicNum = 40343; + uint64_t hashState = len; + + for(size_t idx = 0; idx < len; ++idx) { + hashState = kMagicNum * hashState + str[idx]; + } + + // The final scrambling helps with short keys that vary only on the high order bits. + // Low order bits are not always well distributed so shift them to the high end, where they'll + // form part of the 14-bit tag. + return Rotr64(kMagicNum * hashState, 6); + } + static constexpr inline bool IsPowerOfTwo(uint64_t x) { return (x > 0) && ((x & (x - 1)) == 0); } diff --git a/cc/test/in_memory_test.cc b/cc/test/in_memory_test.cc index 8d07927a5..073582ade 100644 --- a/cc/test/in_memory_test.cc +++ b/cc/test/in_memory_test.cc @@ -14,6 +14,8 @@ #include "test_types.h" using namespace FASTER::core; +using FASTER::test::NonMovable; +using FASTER::test::NonCopyable; using FASTER::test::FixedSizeKey; using FASTER::test::SimpleAtomicValue; @@ -1907,30 +1909,15 @@ TEST(InMemFaster, GrowHashTable) { } TEST(InMemFaster, UpsertRead_VariableLengthKey) { - class Key { + class Key : NonCopyable, NonMovable { public: - /// This constructor is called when creating a Context so we keep track of memory containing key - Key(const uint16_t* key, const uint64_t key_length) - : temp_buffer{ key } - , key_length_{ key_length } { + static uint32_t size(uint32_t key_length) { + return static_cast<uint32_t>(sizeof(Key) + key_length); } - /// This constructor is called when record is being allocated so we can freely copy into our buffer - Key(const Key& other) { - key_length_ = other.key_length_; - temp_buffer = NULL; - if (other.temp_buffer == NULL) { - memcpy(buffer(), other.buffer(), key_length_); - } else { - memcpy(buffer(), other.temp_buffer, key_length_); - } - } - - /// This destructor ensures we don't leak memory due to Key objects not allocated on HybridLog - ~Key() { - if (this->temp_buffer != NULL) { - free((void*)temp_buffer); - } + static void Create(Key* dst, uint32_t key_length, uint8_t* key_data) { + dst->key_length_ = key_length; + memcpy(dst->buffer(), key_data, key_length); } /// Methods and operators required by the (implicit) interface: @@ -1938,40 +1925,56 @@ TEST(InMemFaster, UpsertRead_VariableLengthKey) { return static_cast<uint32_t>(sizeof(Key) + key_length_); } inline KeyHash GetHash() const { - if (this->temp_buffer != NULL) { - return KeyHash(Utility::HashBytes(temp_buffer, key_length_)); - } - return KeyHash(Utility::HashBytes(buffer(), key_length_)); + return KeyHash(Utility::HashBytesUint8(buffer(), key_length_)); } /// Comparison operators. inline bool operator==(const Key& other) const { if (this->key_length_ != other.key_length_) return false; - if (this->temp_buffer != NULL) { - return memcmp(temp_buffer, other.buffer(), key_length_) == 0; - } return memcmp(buffer(), other.buffer(), key_length_) == 0; } inline bool operator!=(const Key& other) const { - if (this->key_length_ != other.key_length_) return true; - if (this->temp_buffer != NULL) { - return memcmp(temp_buffer, other.buffer(), key_length_) != 0; - } - return memcmp(buffer(), other.buffer(), key_length_) != 0; + return !(*this == other); } - private: - uint64_t key_length_; - const uint16_t* temp_buffer; + uint32_t key_length_; - inline const uint16_t* buffer() const { - return reinterpret_cast<const uint16_t*>(this + 1); + inline const uint8_t* buffer() const { + return reinterpret_cast<const uint8_t*>(this + 1); } - inline uint16_t* buffer() { - return reinterpret_cast<uint16_t*>(this + 1); + inline uint8_t* buffer() { + return reinterpret_cast<uint8_t*>(this + 1); } }; + class ShallowKey { + public: + ShallowKey(uint8_t* key_data, uint32_t key_length) + : key_length_(key_length), key_data_(key_data) + { } + + inline uint32_t size() const { + return Key::size(key_length_); + } + inline KeyHash GetHash() const { + return KeyHash(Utility::HashBytesUint8(key_data_, key_length_)); + } + inline void write_deep_key_at(Key* dst) const { + Key::Create(dst, key_length_, key_data_); + } + /// Comparison operators. + inline bool operator==(const Key& other) const { + if (this->key_length_ != other.key_length_) return false; + return memcmp(key_data_, other.buffer(), key_length_) == 0; + } + inline bool operator!=(const Key& other) const { + return !(*this == other); + } + + uint32_t key_length_; + uint8_t* key_data_; + }; + using Value = SimpleAtomicValue<uint8_t>; class UpsertContext : public IAsyncContext { @@ -1979,17 +1982,17 @@ TEST(InMemFaster, UpsertRead_VariableLengthKey) { typedef Key key_t; typedef Value value_t; - UpsertContext(uint16_t* key, uint64_t key_length) + UpsertContext(uint8_t* key, uint32_t key_length) : key_{ key, key_length } { } /// Copy (and deep-copy) constructor. UpsertContext(const UpsertContext& other) - : key_{ other.key_ } { + : key_{ other.key_ } { } /// The implicit and explicit interfaces require a key() accessor. - inline const Key& key() const { + inline const ShallowKey& key() const { return key_; } inline static constexpr uint32_t value_size() { @@ -2007,11 +2010,15 @@ TEST(InMemFaster, UpsertRead_VariableLengthKey) { protected: /// The explicit interface requires a DeepCopy_Internal() implementation. Status DeepCopy_Internal(IAsyncContext*& context_copy) { + // In this particular test, the key content is always on the heap and always available, + // so we don't need to copy the key content. If the key content were on the stack, + // we would need to copy the key content to the heap as well + // return IAsyncContext::DeepCopy_Internal(*this, context_copy); } private: - Key key_; + ShallowKey key_; }; class ReadContext : public IAsyncContext { @@ -2019,7 +2026,7 @@ TEST(InMemFaster, UpsertRead_VariableLengthKey) { typedef Key key_t; typedef Value value_t; - ReadContext(uint16_t* key, uint64_t key_length) + ReadContext(uint8_t* key, uint32_t key_length) : key_{ key, key_length } { } @@ -2029,7 +2036,7 @@ TEST(InMemFaster, UpsertRead_VariableLengthKey) { } /// The implicit and explicit interfaces require a key() accessor. - inline const Key& key() const { + inline const ShallowKey& key() const { return key_; } @@ -2048,7 +2055,7 @@ TEST(InMemFaster, UpsertRead_VariableLengthKey) { } private: - Key key_; + ShallowKey key_; public: uint8_t output; }; @@ -2058,14 +2065,14 @@ TEST(InMemFaster, UpsertRead_VariableLengthKey) { store.StartSession(); // Insert. - for(size_t idx = 1; idx < 256; ++idx) { + for(uint32_t idx = 1; idx < 256; ++idx) { auto callback = [](IAsyncContext* ctxt, Status result) { // In-memory test. ASSERT_TRUE(false); }; // Create the key as a variable length array - uint16_t* key = (uint16_t*) malloc(idx * sizeof(uint16_t)); + uint8_t* key = (uint8_t*) malloc(idx); for (size_t j = 0; j < idx; ++j) { key[j] = 42; } @@ -2075,14 +2082,14 @@ TEST(InMemFaster, UpsertRead_VariableLengthKey) { ASSERT_EQ(Status::Ok, result); } // Read. - for(size_t idx = 1; idx < 256; ++idx) { + for(uint32_t idx = 1; idx < 256; ++idx) { auto callback = [](IAsyncContext* ctxt, Status result) { // In-memory test. ASSERT_TRUE(false); }; // Create the key as a variable length array - uint16_t* key = (uint16_t*) malloc(idx * sizeof(uint16_t)); + uint8_t* key = (uint8_t*) malloc(idx); for (size_t j = 0; j < idx; ++j) { key[j] = 42; } @@ -2094,14 +2101,14 @@ TEST(InMemFaster, UpsertRead_VariableLengthKey) { ASSERT_EQ(23, context.output); } // Update. - for(size_t idx = 1; idx < 256; ++idx) { + for(uint32_t idx = 1; idx < 256; ++idx) { auto callback = [](IAsyncContext* ctxt, Status result) { // In-memory test. ASSERT_TRUE(false); }; // Create the key as a variable length array - uint16_t* key = (uint16_t*) malloc(idx * sizeof(uint16_t)); + uint8_t* key = (uint8_t*) malloc(idx); for (size_t j = 0; j < idx; ++j) { key[j] = 42; } @@ -2111,14 +2118,14 @@ TEST(InMemFaster, UpsertRead_VariableLengthKey) { ASSERT_EQ(Status::Ok, result); } // Read again. - for(size_t idx = 1; idx < 256; ++idx) { + for(uint32_t idx = 1; idx < 256; ++idx) { auto callback = [](IAsyncContext* ctxt, Status result) { // In-memory test. ASSERT_TRUE(false); }; // Create the key as a variable length array - uint16_t* key = (uint16_t*) malloc(idx * sizeof(uint16_t)); + uint8_t* key = (uint8_t*) malloc(idx); for (size_t j = 0; j < idx; ++j) { key[j] = 42; } @@ -2136,4 +2143,4 @@ TEST(InMemFaster, UpsertRead_VariableLengthKey) { int main(int argc, char** argv) { ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); -} \ No newline at end of file +} diff --git a/cc/test/test_types.h b/cc/test/test_types.h index cc56a2c0a..cac0d4903 100644 --- a/cc/test/test_types.h +++ b/cc/test/test_types.h @@ -62,5 +62,27 @@ class SimpleAtomicValue { }; }; +class NonCopyable +{ + public: + NonCopyable(const NonCopyable&) = delete; + NonCopyable& operator=(const NonCopyable&) = delete; + + protected: + NonCopyable() = default; + ~NonCopyable() = default; +}; + +class NonMovable +{ + public: + NonMovable(NonMovable&&) = delete; + NonMovable& operator=(NonMovable&&) = delete; + + protected: + NonMovable() = default; + ~NonMovable() = default; +}; + } } // namespace FASTER::test