Skip to content

Commit

Permalink
New API for clean handling of VariantLengthKey (#282)
Browse files Browse the repository at this point in the history
In this diff, we propose the new API for clean handling of VariantLengthKey.
Previously, handling VariantLengthKey is extremely tricky and fragile (if possible at all),
and requires writing code that any project with reasonable code quality standards would not tolerate.
This is even causing bugs in our own codebase (e.g. Compaction contexts, and even the test that tests VariantLengthKey is buggy itself).

We propose a backward-compatible new API to handle VariantLengthKey cleanly.
We add a new concept -- ShallowKey. This class is required to provide the same APIs
as the Key class (size(), GetHash() and operator==()), but unlike Key class,
which raw contents (interpreted as a uint8_t* string) is directly written into the log,
ShallowKey's internal representation does not matter.
In addition to the existing APIs, a new API with prototype
   void write_deep_key_at(Key* dst) const
is required, which should write the bytestream-representation of the Key into address 'dst'.

In order to use the new API, all you need to do is to change the key() API in Context to return
ShallowKey type, instead of key_t type. Example:

struct UpsertContext
{
	using key_t = Key;

	// uncomment below to use new API
	// ShallowKey key();

	// uncomment below to use old API
	// key_t key();
};
  • Loading branch information
sillycross authored Jul 3, 2020
1 parent 46de893 commit 88d7282
Show file tree
Hide file tree
Showing 7 changed files with 266 additions and 107 deletions.
2 changes: 1 addition & 1 deletion cc/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ if (MSVC)
else()
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++14")

set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -Og -g -D_DEBUG")
set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -O0 -g -D_DEBUG")
set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -O3 -g")
endif()

Expand Down
102 changes: 61 additions & 41 deletions cc/src/core/faster.h
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,9 @@ class FasterKv {
// create a new entry. The caller can use the "expected_entry" to CAS its desired address into
// the entry.
inline AtomicHashBucketEntry* FindOrCreateEntry(KeyHash hash, HashBucketEntry& expected_entry);
template<class C>
inline Address TraceBackForKeyMatchCtxt(const C& ctxt, Address from_address,
Address min_offset) const;
inline Address TraceBackForKeyMatch(const key_t& key, Address from_address,
Address min_offset) const;
Address TraceBackForOtherChainStart(uint64_t old_size, uint64_t new_size, Address from_address,
Expand Down Expand Up @@ -766,8 +769,7 @@ inline OperationStatus FasterKv<K, V, D>::InternalRead(C& pending_context) const
const_cast<faster_t*>(this)->HeavyEnter();
}

const key_t& key = pending_context.key();
KeyHash hash = key.GetHash();
KeyHash hash = pending_context.get_key_hash();
HashBucketEntry entry;
const AtomicHashBucketEntry* atomic_entry = FindEntry(hash, entry);
if(!atomic_entry) {
Expand All @@ -787,8 +789,8 @@ inline OperationStatus FasterKv<K, V, D>::InternalRead(C& pending_context) const
// matches.
const record_t* record = reinterpret_cast<const record_t*>(hlog.Get(address));
latest_record_version = record->header.checkpoint_version;
if(key != record->key()) {
address = TraceBackForKeyMatch(key, record->header.previous_address(), head_address);
if(!pending_context.is_key_equal(record->key())) {
address = TraceBackForKeyMatchCtxt(pending_context, record->header.previous_address(), head_address);
}
}

Expand Down Expand Up @@ -841,8 +843,7 @@ inline OperationStatus FasterKv<K, V, D>::InternalUpsert(C& pending_context) {
HeavyEnter();
}

const key_t& key = pending_context.key();
KeyHash hash = key.GetHash();
KeyHash hash = pending_context.get_key_hash();
HashBucketEntry expected_entry;
AtomicHashBucketEntry* atomic_entry = FindOrCreateEntry(hash, expected_entry);

Expand All @@ -857,8 +858,8 @@ inline OperationStatus FasterKv<K, V, D>::InternalUpsert(C& pending_context) {
// key that we might be able to update in place.
record_t* record = reinterpret_cast<record_t*>(hlog.Get(address));
latest_record_version = record->header.checkpoint_version;
if(key != record->key()) {
address = TraceBackForKeyMatch(key, record->header.previous_address(), head_address);
if(!pending_context.is_key_equal(record->key())) {
address = TraceBackForKeyMatchCtxt(pending_context, record->header.previous_address(), head_address);
}
}

Expand Down Expand Up @@ -948,14 +949,15 @@ inline OperationStatus FasterKv<K, V, D>::InternalUpsert(C& pending_context) {

// Create a record and attempt RCU.
create_record:
uint32_t record_size = record_t::size(key, pending_context.value_size());
uint32_t record_size = record_t::size(pending_context.key_size(), pending_context.value_size());
Address new_address = BlockAllocate(record_size);
record_t* record = reinterpret_cast<record_t*>(hlog.Get(new_address));
new(record) record_t{
RecordInfo{
static_cast<uint16_t>(thread_ctx().version), true, false, false,
expected_entry.address() },
key };
expected_entry.address() }
};
pending_context.write_deep_key_at(const_cast<key_t*>(&record->key()));
pending_context.Put(record);

HashBucketEntry updated_entry{ new_address, hash.tag(), false };
Expand All @@ -982,8 +984,7 @@ inline OperationStatus FasterKv<K, V, D>::InternalRmw(C& pending_context, bool r
HeavyEnter();
}

const key_t& key = pending_context.key();
KeyHash hash = key.GetHash();
KeyHash hash = pending_context.get_key_hash();
HashBucketEntry expected_entry;
AtomicHashBucketEntry* atomic_entry = FindOrCreateEntry(hash, expected_entry);

Expand All @@ -1000,8 +1001,8 @@ inline OperationStatus FasterKv<K, V, D>::InternalRmw(C& pending_context, bool r
// key that we might be able to update in place.
record_t* record = reinterpret_cast<record_t*>(hlog.Get(address));
latest_record_version = record->header.checkpoint_version;
if(key != record->key()) {
address = TraceBackForKeyMatch(key, record->header.previous_address(), head_address);
if(!pending_context.is_key_equal(record->key())) {
address = TraceBackForKeyMatchCtxt(pending_context, record->header.previous_address(), head_address);
}
}

Expand Down Expand Up @@ -1131,8 +1132,8 @@ inline OperationStatus FasterKv<K, V, D>::InternalRmw(C& pending_context, bool r
}
}
uint32_t record_size = old_record != nullptr ?
record_t::size(key, pending_context.value_size(old_record)) :
record_t::size(key, pending_context.value_size());
record_t::size(pending_context.key_size(), pending_context.value_size(old_record)) :
record_t::size(pending_context.key_size(), pending_context.value_size());

Address new_address = BlockAllocate(record_size);
record_t* new_record = reinterpret_cast<record_t*>(hlog.Get(new_address));
Expand All @@ -1149,8 +1150,10 @@ inline OperationStatus FasterKv<K, V, D>::InternalRmw(C& pending_context, bool r
new(new_record) record_t{
RecordInfo{
static_cast<uint16_t>(version), true, false, false,
expected_entry.address() },
key };
expected_entry.address() }
};
pending_context.write_deep_key_at(const_cast<key_t*>(&new_record->key()));

if(old_record == nullptr || address < hlog.begin_address.load()) {
pending_context.RmwInitial(new_record);
} else if(address >= head_address) {
Expand Down Expand Up @@ -1201,8 +1204,7 @@ inline OperationStatus FasterKv<K, V, D>::InternalDelete(C& pending_context) {
HeavyEnter();
}

const key_t& key = pending_context.key();
KeyHash hash = key.GetHash();
KeyHash hash = pending_context.get_key_hash();
HashBucketEntry expected_entry;
AtomicHashBucketEntry* atomic_entry = const_cast<AtomicHashBucketEntry*>(FindEntry(hash, expected_entry));
if(!atomic_entry) {
Expand All @@ -1219,8 +1221,8 @@ inline OperationStatus FasterKv<K, V, D>::InternalDelete(C& pending_context) {
if(address >= head_address) {
const record_t* record = reinterpret_cast<const record_t*>(hlog.Get(address));
latest_record_version = record->header.checkpoint_version;
if(key != record->key()) {
address = TraceBackForKeyMatch(key, record->header.previous_address(), head_address);
if(!pending_context.is_key_equal(record->key())) {
address = TraceBackForKeyMatchCtxt(pending_context, record->header.previous_address(), head_address);
}
}

Expand Down Expand Up @@ -1293,14 +1295,15 @@ inline OperationStatus FasterKv<K, V, D>::InternalDelete(C& pending_context) {
}

create_record:
uint32_t record_size = record_t::size(key, pending_context.value_size());
uint32_t record_size = record_t::size(pending_context.key_size(), pending_context.value_size());
Address new_address = BlockAllocate(record_size);
record_t* record = reinterpret_cast<record_t*>(hlog.Get(new_address));
new(record) record_t{
RecordInfo{
static_cast<uint16_t>(thread_ctx().version), true, true, false,
expected_entry.address() },
key };
};
pending_context.write_deep_key_at(const_cast<key_t*>(&record->key()));

HashBucketEntry updated_entry{ new_address, hash.tag(), false };

Expand All @@ -1315,8 +1318,24 @@ inline OperationStatus FasterKv<K, V, D>::InternalDelete(C& pending_context) {
}

template <class K, class V, class D>
inline Address FasterKv<K, V, D>::TraceBackForKeyMatch(const key_t& key, Address from_address,
template<class C>
inline Address FasterKv<K, V, D>::TraceBackForKeyMatchCtxt(const C& ctxt, Address from_address,
Address min_offset) const {
while(from_address >= min_offset) {
const record_t* record = reinterpret_cast<const record_t*>(hlog.Get(from_address));
if(ctxt.is_key_equal(record->key())) {
return from_address;
} else {
from_address = record->header.previous_address();
continue;
}
}
return from_address;
}

template <class K, class V, class D>
inline Address FasterKv<K, V, D>::TraceBackForKeyMatch(const key_t& key, Address from_address,
Address min_offset) const {
while(from_address >= min_offset) {
const record_t* record = reinterpret_cast<const record_t*>(hlog.Get(from_address));
if(key == record->key()) {
Expand Down Expand Up @@ -1373,7 +1392,7 @@ inline Status FasterKv<K, V, D>::HandleOperationStatus(ExecutionContext& ctx,
if(thread_ctx().phase == Phase::PREPARE) {
assert(pending_context.type == OperationType::RMW);
// Can I be marking an operation again and again?
if(!checkpoint_locks_.get_lock(pending_context.key().GetHash()).try_lock_old()) {
if(!checkpoint_locks_.get_lock(pending_context.get_key_hash()).try_lock_old()) {
return PivotAndRetry(ctx, pending_context, async);
}
}
Expand All @@ -1383,16 +1402,16 @@ inline Status FasterKv<K, V, D>::HandleOperationStatus(ExecutionContext& ctx,
assert(pending_context.type == OperationType::Read ||
pending_context.type == OperationType::RMW);
// Can I be marking an operation again and again?
if(!checkpoint_locks_.get_lock(pending_context.key().GetHash()).try_lock_old()) {
if(!checkpoint_locks_.get_lock(pending_context.get_key_hash()).try_lock_old()) {
return PivotAndRetry(ctx, pending_context, async);
}
}
return IssueAsyncIoRequest(ctx, pending_context, async);
case OperationStatus::SUCCESS_UNMARK:
checkpoint_locks_.get_lock(pending_context.key().GetHash()).unlock_old();
checkpoint_locks_.get_lock(pending_context.get_key_hash()).unlock_old();
return Status::Ok;
case OperationStatus::NOT_FOUND_UNMARK:
checkpoint_locks_.get_lock(pending_context.key().GetHash()).unlock_old();
checkpoint_locks_.get_lock(pending_context.get_key_hash()).unlock_old();
return Status::NotFound;
case OperationStatus::CPR_SHIFT_DETECTED:
return PivotAndRetry(ctx, pending_context, async);
Expand Down Expand Up @@ -1444,7 +1463,7 @@ inline Status FasterKv<K, V, D>::IssueAsyncIoRequest(ExecutionContext& ctx,
pending_context_t& pending_context, bool& async) {
// Issue asynchronous I/O request
uint64_t io_id = thread_ctx().io_id++;
thread_ctx().pending_ios.insert({ io_id, pending_context.key().GetHash() });
thread_ctx().pending_ios.insert({ io_id, pending_context.get_key_hash() });
async = true;
AsyncIOContext io_request{ this, pending_context.address, &pending_context,
&thread_ctx().io_responses, io_id };
Expand Down Expand Up @@ -1519,7 +1538,7 @@ void FasterKv<K, V, D>::AsyncGetFromDiskCallback(IAsyncContext* ctxt, Status res
faster->AsyncGetFromDisk(context->address, record->disk_size(),
AsyncGetFromDiskCallback, *context.get());
context.async = true;
} else if(pending_context->key() == record->key()) {
} else if(pending_context->is_key_equal(record->key())) {
//The keys are same, so I/O is complete
context->thread_io_responses->push(context.get());
} else {
Expand Down Expand Up @@ -1565,8 +1584,7 @@ OperationStatus FasterKv<K, V, D>::InternalContinuePendingRmw(ExecutionContext&
io_context.caller_context);

// Find a hash bucket entry to store the updated value in.
const key_t& key = pending_context->key();
KeyHash hash = key.GetHash();
KeyHash hash = pending_context->get_key_hash();
HashBucketEntry expected_entry;
AtomicHashBucketEntry* atomic_entry = FindOrCreateEntry(hash, expected_entry);

Expand All @@ -1577,8 +1595,8 @@ OperationStatus FasterKv<K, V, D>::InternalContinuePendingRmw(ExecutionContext&
// Make sure that atomic_entry is OK to update.
if(address >= head_address) {
record_t* record = reinterpret_cast<record_t*>(hlog.Get(address));
if(key != record->key()) {
address = TraceBackForKeyMatch(key, record->header.previous_address(), head_address);
if(!pending_context->is_key_equal(record->key())) {
address = TraceBackForKeyMatchCtxt(*pending_context, record->header.previous_address(), head_address);
}
}

Expand All @@ -1594,29 +1612,31 @@ OperationStatus FasterKv<K, V, D>::InternalContinuePendingRmw(ExecutionContext&
record_t* new_record;
if(io_context.address < hlog.begin_address.load()) {
// The on-disk trace back failed to find a key match.
uint32_t record_size = record_t::size(key, pending_context->value_size());
uint32_t record_size = record_t::size(pending_context->key_size(), pending_context->value_size());
new_address = BlockAllocate(record_size);
new_record = reinterpret_cast<record_t*>(hlog.Get(new_address));

new(new_record) record_t{
RecordInfo{
static_cast<uint16_t>(context.version), true, false, false,
expected_entry.address() },
key };
};
pending_context->write_deep_key_at(const_cast<key_t*>(&new_record->key()));
pending_context->RmwInitial(new_record);
} else {
// The record we read from disk.
const record_t* disk_record = reinterpret_cast<const record_t*>(
io_context.record.GetValidPointer());
uint32_t record_size = record_t::size(key, pending_context->value_size(disk_record));
uint32_t record_size = record_t::size(pending_context->key_size(), pending_context->value_size(disk_record));
new_address = BlockAllocate(record_size);
new_record = reinterpret_cast<record_t*>(hlog.Get(new_address));

new(new_record) record_t{
RecordInfo{
static_cast<uint16_t>(context.version), true, false, false,
expected_entry.address() },
key };
};
pending_context->write_deep_key_at(const_cast<key_t*>(&new_record->key()));
pending_context->RmwCopy(disk_record, new_record);
}

Expand Down Expand Up @@ -2465,7 +2485,7 @@ void FasterKv<K, V, D>::MarkAllPendingRequests() {
const pending_context_t* context = static_cast<const pending_context_t*>(ctxt);
// We will succeed, since no other thread can currently advance the entry's version, since this
// thread hasn't acked "PENDING" phase completion yet.
bool result = checkpoint_locks_.get_lock(context->key().GetHash()).try_lock_old();
bool result = checkpoint_locks_.get_lock(context->get_key_hash()).try_lock_old();
assert(result);
}
for(const auto& pending_io : thread_ctx().pending_ios) {
Expand Down
Loading

0 comments on commit 88d7282

Please sign in to comment.