Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C++] New API for clean handling of variable length keys #282

Merged
merged 2 commits into from
Jul 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cc/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ if (MSVC)
else()
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++14")

set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -Og -g -D_DEBUG")
set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -O0 -g -D_DEBUG")
set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -O3 -g")
endif()

Expand Down
102 changes: 61 additions & 41 deletions cc/src/core/faster.h
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,9 @@ class FasterKv {
// create a new entry. The caller can use the "expected_entry" to CAS its desired address into
// the entry.
inline AtomicHashBucketEntry* FindOrCreateEntry(KeyHash hash, HashBucketEntry& expected_entry);
template<class C>
inline Address TraceBackForKeyMatchCtxt(const C& ctxt, Address from_address,
Address min_offset) const;
inline Address TraceBackForKeyMatch(const key_t& key, Address from_address,
Address min_offset) const;
Address TraceBackForOtherChainStart(uint64_t old_size, uint64_t new_size, Address from_address,
Expand Down Expand Up @@ -766,8 +769,7 @@ inline OperationStatus FasterKv<K, V, D>::InternalRead(C& pending_context) const
const_cast<faster_t*>(this)->HeavyEnter();
}

const key_t& key = pending_context.key();
KeyHash hash = key.GetHash();
KeyHash hash = pending_context.get_key_hash();
HashBucketEntry entry;
const AtomicHashBucketEntry* atomic_entry = FindEntry(hash, entry);
if(!atomic_entry) {
Expand All @@ -787,8 +789,8 @@ inline OperationStatus FasterKv<K, V, D>::InternalRead(C& pending_context) const
// matches.
const record_t* record = reinterpret_cast<const record_t*>(hlog.Get(address));
latest_record_version = record->header.checkpoint_version;
if(key != record->key()) {
address = TraceBackForKeyMatch(key, record->header.previous_address(), head_address);
if(!pending_context.is_key_equal(record->key())) {
address = TraceBackForKeyMatchCtxt(pending_context, record->header.previous_address(), head_address);
}
}

Expand Down Expand Up @@ -841,8 +843,7 @@ inline OperationStatus FasterKv<K, V, D>::InternalUpsert(C& pending_context) {
HeavyEnter();
}

const key_t& key = pending_context.key();
KeyHash hash = key.GetHash();
KeyHash hash = pending_context.get_key_hash();
HashBucketEntry expected_entry;
AtomicHashBucketEntry* atomic_entry = FindOrCreateEntry(hash, expected_entry);

Expand All @@ -857,8 +858,8 @@ inline OperationStatus FasterKv<K, V, D>::InternalUpsert(C& pending_context) {
// key that we might be able to update in place.
record_t* record = reinterpret_cast<record_t*>(hlog.Get(address));
latest_record_version = record->header.checkpoint_version;
if(key != record->key()) {
address = TraceBackForKeyMatch(key, record->header.previous_address(), head_address);
if(!pending_context.is_key_equal(record->key())) {
address = TraceBackForKeyMatchCtxt(pending_context, record->header.previous_address(), head_address);
}
}

Expand Down Expand Up @@ -948,14 +949,15 @@ inline OperationStatus FasterKv<K, V, D>::InternalUpsert(C& pending_context) {

// Create a record and attempt RCU.
create_record:
uint32_t record_size = record_t::size(key, pending_context.value_size());
uint32_t record_size = record_t::size(pending_context.key_size(), pending_context.value_size());
Address new_address = BlockAllocate(record_size);
record_t* record = reinterpret_cast<record_t*>(hlog.Get(new_address));
new(record) record_t{
RecordInfo{
static_cast<uint16_t>(thread_ctx().version), true, false, false,
expected_entry.address() },
key };
expected_entry.address() }
};
pending_context.write_deep_key_at(const_cast<key_t*>(&record->key()));
pending_context.Put(record);

HashBucketEntry updated_entry{ new_address, hash.tag(), false };
Expand All @@ -982,8 +984,7 @@ inline OperationStatus FasterKv<K, V, D>::InternalRmw(C& pending_context, bool r
HeavyEnter();
}

const key_t& key = pending_context.key();
KeyHash hash = key.GetHash();
KeyHash hash = pending_context.get_key_hash();
HashBucketEntry expected_entry;
AtomicHashBucketEntry* atomic_entry = FindOrCreateEntry(hash, expected_entry);

Expand All @@ -1000,8 +1001,8 @@ inline OperationStatus FasterKv<K, V, D>::InternalRmw(C& pending_context, bool r
// key that we might be able to update in place.
record_t* record = reinterpret_cast<record_t*>(hlog.Get(address));
latest_record_version = record->header.checkpoint_version;
if(key != record->key()) {
address = TraceBackForKeyMatch(key, record->header.previous_address(), head_address);
if(!pending_context.is_key_equal(record->key())) {
address = TraceBackForKeyMatchCtxt(pending_context, record->header.previous_address(), head_address);
}
}

Expand Down Expand Up @@ -1131,8 +1132,8 @@ inline OperationStatus FasterKv<K, V, D>::InternalRmw(C& pending_context, bool r
}
}
uint32_t record_size = old_record != nullptr ?
record_t::size(key, pending_context.value_size(old_record)) :
record_t::size(key, pending_context.value_size());
record_t::size(pending_context.key_size(), pending_context.value_size(old_record)) :
record_t::size(pending_context.key_size(), pending_context.value_size());

Address new_address = BlockAllocate(record_size);
record_t* new_record = reinterpret_cast<record_t*>(hlog.Get(new_address));
Expand All @@ -1149,8 +1150,10 @@ inline OperationStatus FasterKv<K, V, D>::InternalRmw(C& pending_context, bool r
new(new_record) record_t{
RecordInfo{
static_cast<uint16_t>(version), true, false, false,
expected_entry.address() },
key };
expected_entry.address() }
};
pending_context.write_deep_key_at(const_cast<key_t*>(&new_record->key()));

if(old_record == nullptr || address < hlog.begin_address.load()) {
pending_context.RmwInitial(new_record);
} else if(address >= head_address) {
Expand Down Expand Up @@ -1201,8 +1204,7 @@ inline OperationStatus FasterKv<K, V, D>::InternalDelete(C& pending_context) {
HeavyEnter();
}

const key_t& key = pending_context.key();
KeyHash hash = key.GetHash();
KeyHash hash = pending_context.get_key_hash();
HashBucketEntry expected_entry;
AtomicHashBucketEntry* atomic_entry = const_cast<AtomicHashBucketEntry*>(FindEntry(hash, expected_entry));
if(!atomic_entry) {
Expand All @@ -1219,8 +1221,8 @@ inline OperationStatus FasterKv<K, V, D>::InternalDelete(C& pending_context) {
if(address >= head_address) {
const record_t* record = reinterpret_cast<const record_t*>(hlog.Get(address));
latest_record_version = record->header.checkpoint_version;
if(key != record->key()) {
address = TraceBackForKeyMatch(key, record->header.previous_address(), head_address);
if(!pending_context.is_key_equal(record->key())) {
address = TraceBackForKeyMatchCtxt(pending_context, record->header.previous_address(), head_address);
}
}

Expand Down Expand Up @@ -1293,14 +1295,15 @@ inline OperationStatus FasterKv<K, V, D>::InternalDelete(C& pending_context) {
}

create_record:
uint32_t record_size = record_t::size(key, pending_context.value_size());
uint32_t record_size = record_t::size(pending_context.key_size(), pending_context.value_size());
Address new_address = BlockAllocate(record_size);
record_t* record = reinterpret_cast<record_t*>(hlog.Get(new_address));
new(record) record_t{
RecordInfo{
static_cast<uint16_t>(thread_ctx().version), true, true, false,
expected_entry.address() },
key };
};
pending_context.write_deep_key_at(const_cast<key_t*>(&record->key()));

HashBucketEntry updated_entry{ new_address, hash.tag(), false };

Expand All @@ -1315,8 +1318,24 @@ inline OperationStatus FasterKv<K, V, D>::InternalDelete(C& pending_context) {
}

template <class K, class V, class D>
inline Address FasterKv<K, V, D>::TraceBackForKeyMatch(const key_t& key, Address from_address,
template<class C>
inline Address FasterKv<K, V, D>::TraceBackForKeyMatchCtxt(const C& ctxt, Address from_address,
Address min_offset) const {
while(from_address >= min_offset) {
const record_t* record = reinterpret_cast<const record_t*>(hlog.Get(from_address));
if(ctxt.is_key_equal(record->key())) {
return from_address;
} else {
from_address = record->header.previous_address();
continue;
}
}
return from_address;
}

template <class K, class V, class D>
inline Address FasterKv<K, V, D>::TraceBackForKeyMatch(const key_t& key, Address from_address,
Address min_offset) const {
while(from_address >= min_offset) {
const record_t* record = reinterpret_cast<const record_t*>(hlog.Get(from_address));
if(key == record->key()) {
Expand Down Expand Up @@ -1373,7 +1392,7 @@ inline Status FasterKv<K, V, D>::HandleOperationStatus(ExecutionContext& ctx,
if(thread_ctx().phase == Phase::PREPARE) {
assert(pending_context.type == OperationType::RMW);
// Can I be marking an operation again and again?
if(!checkpoint_locks_.get_lock(pending_context.key().GetHash()).try_lock_old()) {
if(!checkpoint_locks_.get_lock(pending_context.get_key_hash()).try_lock_old()) {
return PivotAndRetry(ctx, pending_context, async);
}
}
Expand All @@ -1383,16 +1402,16 @@ inline Status FasterKv<K, V, D>::HandleOperationStatus(ExecutionContext& ctx,
assert(pending_context.type == OperationType::Read ||
pending_context.type == OperationType::RMW);
// Can I be marking an operation again and again?
if(!checkpoint_locks_.get_lock(pending_context.key().GetHash()).try_lock_old()) {
if(!checkpoint_locks_.get_lock(pending_context.get_key_hash()).try_lock_old()) {
return PivotAndRetry(ctx, pending_context, async);
}
}
return IssueAsyncIoRequest(ctx, pending_context, async);
case OperationStatus::SUCCESS_UNMARK:
checkpoint_locks_.get_lock(pending_context.key().GetHash()).unlock_old();
checkpoint_locks_.get_lock(pending_context.get_key_hash()).unlock_old();
return Status::Ok;
case OperationStatus::NOT_FOUND_UNMARK:
checkpoint_locks_.get_lock(pending_context.key().GetHash()).unlock_old();
checkpoint_locks_.get_lock(pending_context.get_key_hash()).unlock_old();
return Status::NotFound;
case OperationStatus::CPR_SHIFT_DETECTED:
return PivotAndRetry(ctx, pending_context, async);
Expand Down Expand Up @@ -1444,7 +1463,7 @@ inline Status FasterKv<K, V, D>::IssueAsyncIoRequest(ExecutionContext& ctx,
pending_context_t& pending_context, bool& async) {
// Issue asynchronous I/O request
uint64_t io_id = thread_ctx().io_id++;
thread_ctx().pending_ios.insert({ io_id, pending_context.key().GetHash() });
thread_ctx().pending_ios.insert({ io_id, pending_context.get_key_hash() });
async = true;
AsyncIOContext io_request{ this, pending_context.address, &pending_context,
&thread_ctx().io_responses, io_id };
Expand Down Expand Up @@ -1519,7 +1538,7 @@ void FasterKv<K, V, D>::AsyncGetFromDiskCallback(IAsyncContext* ctxt, Status res
faster->AsyncGetFromDisk(context->address, record->disk_size(),
AsyncGetFromDiskCallback, *context.get());
context.async = true;
} else if(pending_context->key() == record->key()) {
} else if(pending_context->is_key_equal(record->key())) {
//The keys are same, so I/O is complete
context->thread_io_responses->push(context.get());
} else {
Expand Down Expand Up @@ -1565,8 +1584,7 @@ OperationStatus FasterKv<K, V, D>::InternalContinuePendingRmw(ExecutionContext&
io_context.caller_context);

// Find a hash bucket entry to store the updated value in.
const key_t& key = pending_context->key();
KeyHash hash = key.GetHash();
KeyHash hash = pending_context->get_key_hash();
HashBucketEntry expected_entry;
AtomicHashBucketEntry* atomic_entry = FindOrCreateEntry(hash, expected_entry);

Expand All @@ -1577,8 +1595,8 @@ OperationStatus FasterKv<K, V, D>::InternalContinuePendingRmw(ExecutionContext&
// Make sure that atomic_entry is OK to update.
if(address >= head_address) {
record_t* record = reinterpret_cast<record_t*>(hlog.Get(address));
if(key != record->key()) {
address = TraceBackForKeyMatch(key, record->header.previous_address(), head_address);
if(!pending_context->is_key_equal(record->key())) {
address = TraceBackForKeyMatchCtxt(*pending_context, record->header.previous_address(), head_address);
}
}

Expand All @@ -1594,29 +1612,31 @@ OperationStatus FasterKv<K, V, D>::InternalContinuePendingRmw(ExecutionContext&
record_t* new_record;
if(io_context.address < hlog.begin_address.load()) {
// The on-disk trace back failed to find a key match.
uint32_t record_size = record_t::size(key, pending_context->value_size());
uint32_t record_size = record_t::size(pending_context->key_size(), pending_context->value_size());
new_address = BlockAllocate(record_size);
new_record = reinterpret_cast<record_t*>(hlog.Get(new_address));

new(new_record) record_t{
RecordInfo{
static_cast<uint16_t>(context.version), true, false, false,
expected_entry.address() },
key };
};
pending_context->write_deep_key_at(const_cast<key_t*>(&new_record->key()));
pending_context->RmwInitial(new_record);
} else {
// The record we read from disk.
const record_t* disk_record = reinterpret_cast<const record_t*>(
io_context.record.GetValidPointer());
uint32_t record_size = record_t::size(key, pending_context->value_size(disk_record));
uint32_t record_size = record_t::size(pending_context->key_size(), pending_context->value_size(disk_record));
new_address = BlockAllocate(record_size);
new_record = reinterpret_cast<record_t*>(hlog.Get(new_address));

new(new_record) record_t{
RecordInfo{
static_cast<uint16_t>(context.version), true, false, false,
expected_entry.address() },
key };
};
pending_context->write_deep_key_at(const_cast<key_t*>(&new_record->key()));
pending_context->RmwCopy(disk_record, new_record);
}

Expand Down Expand Up @@ -2465,7 +2485,7 @@ void FasterKv<K, V, D>::MarkAllPendingRequests() {
const pending_context_t* context = static_cast<const pending_context_t*>(ctxt);
// We will succeed, since no other thread can currently advance the entry's version, since this
// thread hasn't acked "PENDING" phase completion yet.
bool result = checkpoint_locks_.get_lock(context->key().GetHash()).try_lock_old();
bool result = checkpoint_locks_.get_lock(context->get_key_hash()).try_lock_old();
assert(result);
}
for(const auto& pending_io : thread_ctx().pending_ios) {
Expand Down
Loading