Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[c++] Add a new value_size() method to RmwContext for RCU operations #145

Merged
merged 5 commits into from
Jul 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cc/benchmark-dir/benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,9 @@ class RmwContext : public IAsyncContext {
inline static constexpr uint32_t value_size() {
return sizeof(value_t);
}
inline static constexpr uint32_t value_size(const value_t& old_value) {
return sizeof(value_t);
}

/// Initial, non-atomic, and atomic RMW methods.
inline void RmwInitial(value_t& value) {
Expand Down
3 changes: 3 additions & 0 deletions cc/playground/sum_store-dir/sum_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ class RmwContext : public IAsyncContext {
inline static constexpr uint32_t value_size() {
return sizeof(value_t);
}
inline static constexpr uint32_t value_size(const NumClicks& old_value) {
return sizeof(value_t);
}

protected:
/// The explicit interface requires a DeepCopy_Internal() implementation.
Expand Down
10 changes: 8 additions & 2 deletions cc/src/core/faster.h
Original file line number Diff line number Diff line change
Expand Up @@ -1074,7 +1074,14 @@ inline OperationStatus FasterKv<K, V, D>::InternalRmw(C& pending_context, bool r

// Create a record and attempt RCU.
create_record:
uint32_t record_size = record_t::size(key, pending_context.value_size());
uint32_t record_size;
const record_t* old_record;
if (address >= head_address) {
old_record = reinterpret_cast<const record_t*>(hlog.Get(address));
record_size = record_t::size(key, pending_context.value_size(old_record));
} else {
record_size = record_t::size(key, pending_context.value_size());
}
Address new_address = BlockAllocate(record_size);
record_t* new_record = reinterpret_cast<record_t*>(hlog.Get(new_address));

Expand All @@ -1095,7 +1102,6 @@ inline OperationStatus FasterKv<K, V, D>::InternalRmw(C& pending_context, bool r
if(address < hlog.begin_address.load()) {
pending_context.RmwInitial(new_record);
} else if(address >= head_address) {
const record_t* old_record = reinterpret_cast<const record_t*>(hlog.Get(address));
pending_context.RmwCopy(old_record, new_record);
} else {
// The block we allocated for the new record caused the head address to advance beyond
Expand Down
9 changes: 9 additions & 0 deletions cc/src/core/internal_contexts.h
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,10 @@ class AsyncPendingRmwContext : public PendingContext<K> {
virtual void RmwCopy(const void* old_rec, void* rec) = 0;
/// in-place update.
virtual bool RmwAtomic(void* rec) = 0;
/// Get value size for initial value or in-place update
virtual uint32_t value_size() const = 0;
/// Get value size for RCU
virtual uint32_t value_size(const void* old_rec) const = 0;
};

/// A synchronous Rmw() context preserves its type information.
Expand Down Expand Up @@ -312,9 +315,15 @@ class PendingRmwContext : public AsyncPendingRmwContext<typename MC::key_t> {
record_t* record = reinterpret_cast<record_t*>(rec);
return rmw_context().RmwAtomic(record->value());
}
/// Get value size for initial value or in-place update
inline constexpr uint32_t value_size() const final {
return rmw_context().value_size();
}
/// Get value size for RCU
inline constexpr uint32_t value_size(const void* old_rec) const final {
const record_t* old_record = reinterpret_cast<const record_t*>(old_rec);
return rmw_context().value_size(old_record->value());
}
};

class AsyncIOContext;
Expand Down
242 changes: 242 additions & 0 deletions cc/test/in_memory_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -989,6 +989,9 @@ TEST(InMemFaster, Rmw) {
inline static constexpr uint32_t value_size() {
return sizeof(value_t);
}
inline static constexpr uint32_t value_size(const Value& old_value) {
return sizeof(value_t);
}
inline void RmwInitial(Value& value) {
value.value_ = incr_;
}
Expand Down Expand Up @@ -1178,6 +1181,9 @@ TEST(InMemFaster, Rmw_Concurrent) {
inline static constexpr uint32_t value_size() {
return sizeof(value_t);
}
inline static constexpr uint32_t value_size(const Value& old_value) {
return sizeof(value_t);
}

inline void RmwInitial(Value& value) {
value.value_ = incr_;
Expand Down Expand Up @@ -1472,6 +1478,9 @@ TEST(InMemFaster, Rmw_ResizeValue_Concurrent) {
inline uint32_t value_size() const {
return sizeof(value_t) + length_;
}
inline uint32_t value_size(const Value& old_value) const {
return sizeof(value_t) + length_;
}

inline void RmwInitial(Value& value) {
value.gen_lock_.store(GenLock{});
Expand Down Expand Up @@ -1653,6 +1662,236 @@ TEST(InMemFaster, Rmw_ResizeValue_Concurrent) {
store.StopSession();
}

TEST(InMemFaster, Rmw_GrowString_Concurrent) {
class Key {
public:
Key(uint64_t key)
: key_{ key } {
}

inline static constexpr uint32_t size() {
return static_cast<uint32_t>(sizeof(Key));
}
inline KeyHash GetHash() const {
std::hash<uint64_t> hash_fn;
return KeyHash{ hash_fn(key_) };
}

/// Comparison operators.
inline bool operator==(const Key& other) const {
return key_ == other.key_;
}
inline bool operator!=(const Key& other) const {
return key_ != other.key_;
}

private:
uint64_t key_;
};

class RmwContext;
class ReadContext;

class Value {
public:
Value()
: length_{ 0 } {
}

inline uint32_t size() const {
return length_;
}

friend class RmwContext;
friend class ReadContext;

private:
uint32_t length_;

const char* buffer() const {
return reinterpret_cast<const char*>(this + 1);
}
char* buffer() {
return reinterpret_cast<char*>(this + 1);
}
};

class RmwContext : public IAsyncContext {
public:
typedef Key key_t;
typedef Value value_t;

RmwContext(uint64_t key, char letter)
: key_{ key }
, letter_{ letter } {
}

/// Copy (and deep-copy) constructor.
RmwContext(const RmwContext& other)
: key_{ other.key_ }
, letter_{ other.letter_ } {
}

/// The implicit and explicit interfaces require a key() accessor.
inline const Key& key() const {
return key_;
}
inline uint32_t value_size() const {
return sizeof(value_t) + sizeof(char);
}
inline uint32_t value_size(const Value& old_value) const {
return sizeof(value_t) + old_value.length_ + sizeof(char);
}

inline void RmwInitial(Value& value) {
value.length_ = sizeof(char);
value.buffer()[0] = letter_;
}
inline void RmwCopy(const Value& old_value, Value& value) {
value.length_ = old_value.length_ + sizeof(char);
std::memcpy(value.buffer(), old_value.buffer(), old_value.length_);
value.buffer()[old_value.length_] = letter_;
}
inline bool RmwAtomic(Value& value) {
// All RMW operations use Read-Copy-Update
return false;
}

protected:
/// The explicit interface requires a DeepCopy_Internal() implementation.
Status DeepCopy_Internal(IAsyncContext*& context_copy) {
return IAsyncContext::DeepCopy_Internal(*this, context_copy);
}

private:
char letter_;
Key key_;
};

class ReadContext : public IAsyncContext {
public:
typedef Key key_t;
typedef Value value_t;

ReadContext(uint64_t key)
: key_{ key }
, output_length{ 0 } {
}

/// Copy (and deep-copy) constructor.
ReadContext(const ReadContext& other)
: key_{ other.key_ }
, output_length{ 0 } {
}

/// The implicit and explicit interfaces require a key() accessor.
inline const Key& key() const {
return key_;
}

inline void Get(const Value& value) {
// All reads should be atomic (from the mutable tail).
ASSERT_TRUE(false);
}
inline void GetAtomic(const Value& value) {
// There are no concurrent updates
output_length = value.length_;
output_letters[0] = value.buffer()[0];
output_letters[1] = value.buffer()[value.length_ - 1];
}

protected:
/// The explicit interface requires a DeepCopy_Internal() implementation.
Status DeepCopy_Internal(IAsyncContext*& context_copy) {
return IAsyncContext::DeepCopy_Internal(*this, context_copy);
}

private:
Key key_;
public:
uint8_t output_length;
// Extract two letters of output.
char output_letters[2];
};

static constexpr int8_t kNumThreads = 2;
static constexpr size_t kNumRmws = 2048;
static constexpr size_t kRange = 512;

auto rmw_worker = [](FasterKv<Key, Value, FASTER::device::NullDisk>* store_, char start_letter){
store_->StartSession();

for(size_t idx = 0; idx < kNumRmws; ++idx) {
auto callback = [](IAsyncContext* ctxt, Status result) {
// In-memory test.
ASSERT_TRUE(false);
};
char letter = static_cast<char>(start_letter + idx / kRange);
RmwContext context{ idx % kRange, letter };
Status result = store_->Rmw(context, callback, 1);
ASSERT_EQ(Status::Ok, result);
}

store_->StopSession();
};

FasterKv<Key, Value, FASTER::device::NullDisk> store{ 256, 1073741824, "" };

// Rmw.
std::deque<std::thread> threads{};
for(int64_t idx = 0; idx < kNumThreads; ++idx) {
threads.emplace_back(rmw_worker, &store, 'A');
}
for(auto& thread : threads) {
thread.join();
}

// Read.
store.StartSession();

for(size_t idx = 0; idx < kRange; ++idx) {
auto callback = [](IAsyncContext* ctxt, Status result) {
// In-memory test.
ASSERT_TRUE(false);
};
ReadContext context{ idx };
Status result = store.Read(context, callback, 1);
ASSERT_EQ(Status::Ok, result) << idx;
ASSERT_EQ(kNumThreads * kNumRmws / kRange, context.output_length);
ASSERT_EQ('A', context.output_letters[0]);
ASSERT_EQ('D', context.output_letters[1]);
}

store.StopSession();

// Rmw.
threads.clear();
for(int64_t idx = 0; idx < kNumThreads; ++idx) {
threads.emplace_back(rmw_worker, &store, 'E');
}
for(auto& thread : threads) {
thread.join();
}

// Read again.
store.StartSession();

for(size_t idx = 0; idx < kRange; ++idx) {
auto callback = [](IAsyncContext* ctxt, Status result) {
// In-memory test.
ASSERT_TRUE(false);
};
ReadContext context{ static_cast<uint8_t>(idx) };
Status result = store.Read(context, callback, 1);
ASSERT_EQ(Status::Ok, result);
ASSERT_EQ(2 * kNumThreads * kNumRmws / kRange, context.output_length);
ASSERT_EQ('A', context.output_letters[0]);
ASSERT_EQ('H', context.output_letters[1]);
}

store.StopSession();
}

TEST(InMemFaster, GrowHashTable) {
class Key {
public:
Expand Down Expand Up @@ -1729,6 +1968,9 @@ TEST(InMemFaster, GrowHashTable) {
inline static constexpr uint32_t value_size() {
return sizeof(value_t);
}
inline static constexpr uint32_t value_size(const Value& old_value) {
return sizeof(value_t);
}

inline void RmwInitial(Value& value) {
value.value_ = incr_;
Expand Down
12 changes: 12 additions & 0 deletions cc/test/paging_test.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ TEST(CLASS, UpsertRead_Serial) {
inline static constexpr uint32_t value_size() {
return sizeof(value_t);
}
inline static constexpr uint32_t value_size(const Value& old_value) {
return sizeof(value_t);
}
/// Non-atomic and atomic Put() methods.
inline void Put(Value& value) {
value.gen_ = 0;
Expand Down Expand Up @@ -369,6 +372,9 @@ TEST(CLASS, UpsertRead_Concurrent) {
inline static constexpr uint32_t value_size() {
return sizeof(value_t);
}
inline static constexpr uint32_t value_size(const Value& old_value) {
return sizeof(value_t);
}
/// Non-atomic and atomic Put() methods.
inline void Put(Value& value) {
value.gen_ = 0;
Expand Down Expand Up @@ -665,6 +671,9 @@ TEST(CLASS, Rmw) {
inline static constexpr uint32_t value_size() {
return sizeof(value_t);
}
inline static constexpr uint32_t value_size(const Value& old_value) {
return sizeof(value_t);
}
inline void RmwInitial(Value& value) {
value.counter_ = incr_;
val_ = value.counter_;
Expand Down Expand Up @@ -836,6 +845,9 @@ TEST(CLASS, Rmw_Concurrent) {
inline static constexpr uint32_t value_size() {
return sizeof(value_t);
}
inline static constexpr uint32_t value_size(const Value& old_value) {
return sizeof(value_t);
}
inline void RmwInitial(Value& value) {
value.counter_ = incr_;
}
Expand Down
Loading