Skip to content

Commit

Permalink
Merge branch 'master' into async-support-test
Browse files Browse the repository at this point in the history
  • Loading branch information
badrishc committed Sep 13, 2019
2 parents 669881d + f3c3eb0 commit 29c64e1
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 27 deletions.
33 changes: 22 additions & 11 deletions cc/src/core/faster.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,10 @@ class FasterKv {
typedef AsyncPendingRmwContext<key_t> async_pending_rmw_context_t;

FasterKv(uint64_t table_size, uint64_t log_size, const std::string& filename,
double log_mutable_fraction = 0.9)
double log_mutable_fraction = 0.9, bool pre_allocate_log = false)
: min_table_size_{ table_size }
, disk{ filename, epoch_ }
, hlog{ log_size, epoch_, disk, disk.log(), log_mutable_fraction }
, hlog{ log_size, epoch_, disk, disk.log(), log_mutable_fraction, pre_allocate_log }
, system_state_{ Action::None, Phase::REST, 1 }
, num_pending_ios{ 0 } {
if(!Utility::IsPowerOfTwo(table_size)) {
Expand Down Expand Up @@ -1405,22 +1405,33 @@ OperationStatus FasterKv<K, V, D>::InternalContinuePendingRmw(ExecutionContext&
assert(address < hlog.begin_address.load() || address == pending_context->entry.address());

// We have to do copy-on-write/RCU and write the updated value to the tail of the log.
uint32_t record_size = record_t::size(key, pending_context->value_size());
Address new_address = BlockAllocate(record_size);
record_t* new_record = reinterpret_cast<record_t*>(hlog.Get(new_address));

new(new_record) record_t{
RecordInfo{
static_cast<uint16_t>(context.version), true, false, false,
expected_entry.address() },
key };
Address new_address;
record_t* new_record;
if(io_context.address < hlog.begin_address.load()) {
// The on-disk trace back failed to find a key match.
uint32_t record_size = record_t::size(key, pending_context->value_size());
new_address = BlockAllocate(record_size);
new_record = reinterpret_cast<record_t*>(hlog.Get(new_address));

new(new_record) record_t{
RecordInfo{
static_cast<uint16_t>(context.version), true, false, false,
expected_entry.address() },
key };
pending_context->RmwInitial(new_record);
} else {
// The record we read from disk.
const record_t* disk_record = reinterpret_cast<const record_t*>(
io_context.record.GetValidPointer());
uint32_t record_size = record_t::size(key, pending_context->value_size(disk_record));
new_address = BlockAllocate(record_size);
new_record = reinterpret_cast<record_t*>(hlog.Get(new_address));

new(new_record) record_t{
RecordInfo{
static_cast<uint16_t>(context.version), true, false, false,
expected_entry.address() },
key };
pending_context->RmwCopy(disk_record, new_record);
}

Expand Down
37 changes: 24 additions & 13 deletions cc/src/core/persistent_memory_malloc.h
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ class PersistentMemoryMalloc {
static constexpr uint32_t kNumHeadPages = 4;

PersistentMemoryMalloc(uint64_t log_size, LightEpoch& epoch, disk_t& disk_, log_file_t& file_,
Address start_address, double log_mutable_fraction)
Address start_address, double log_mutable_fraction, bool pre_allocate_log)
: sector_size{ static_cast<uint32_t>(file_.alignment()) }
, epoch_{ &epoch }
, disk{ &disk_ }
Expand All @@ -260,7 +260,8 @@ class PersistentMemoryMalloc {
, tail_page_offset_{ start_address }
, buffer_size_{ 0 }
, pages_{ nullptr }
, page_status_{ nullptr } {
, page_status_{ nullptr }
, pre_allocate_log_{ pre_allocate_log } {
assert(start_address.page() <= Address::kMaxPage);

if(log_size % kPageSize != 0) {
Expand All @@ -282,21 +283,28 @@ class PersistentMemoryMalloc {
throw std::invalid_argument{ "Must have at least 2 mutable pages" };
}

page_status_ = new FullPageStatus[buffer_size_];

pages_ = new uint8_t* [buffer_size_];
for(uint32_t idx = 0; idx < buffer_size_; ++idx) {
pages_[idx] = nullptr;
if (pre_allocate_log_) {
pages_[idx] = reinterpret_cast<uint8_t*>(aligned_alloc(sector_size, kPageSize));
std::memset(pages_[idx], 0, kPageSize);
// Mark the page as accessible.
page_status_[idx].status.store(FlushStatus::Flushed, CloseStatus::Open);
} else {
pages_[idx] = nullptr;
}
}

page_status_ = new FullPageStatus[buffer_size_];

PageOffset tail_page_offset = tail_page_offset_.load();
AllocatePage(tail_page_offset.page());
AllocatePage(tail_page_offset.page() + 1);
}

PersistentMemoryMalloc(uint64_t log_size, LightEpoch& epoch, disk_t& disk_, log_file_t& file_,
double log_mutable_fraction)
: PersistentMemoryMalloc(log_size, epoch, disk_, file_, Address{ 0 }, log_mutable_fraction) {
double log_mutable_fraction, bool pre_allocate_log)
: PersistentMemoryMalloc(log_size, epoch, disk_, file_, Address{ 0 }, log_mutable_fraction, pre_allocate_log) {
/// Allocate the invalid page. Supports allocations aligned up to kCacheLineBytes.
uint32_t discard;
Allocate(Constants::kCacheLineBytes, discard);
Expand Down Expand Up @@ -555,6 +563,7 @@ class PersistentMemoryMalloc {

private:
uint32_t buffer_size_;
bool pre_allocate_log_;

/// -- the latest N pages should be mutable.
uint32_t num_mutable_pages_;
Expand All @@ -567,18 +576,20 @@ class PersistentMemoryMalloc {

// Global address of the current tail (next element to be allocated from the circular buffer)
AtomicPageOffset tail_page_offset_;

};

/// Implementations.
template <class D>
inline void PersistentMemoryMalloc<D>::AllocatePage(uint32_t index) {
index = index % buffer_size_;
assert(pages_[index] == nullptr);
pages_[index] = reinterpret_cast<uint8_t*>(aligned_alloc(sector_size, kPageSize));;
std::memset(pages_[index], 0, kPageSize);

// Mark the page as accessible.
page_status_[index].status.store(FlushStatus::Flushed, CloseStatus::Open);
if (!pre_allocate_log_) {
assert(pages_[index] == nullptr);
pages_[index] = reinterpret_cast<uint8_t*>(aligned_alloc(sector_size, kPageSize));
std::memset(pages_[index], 0, kPageSize);
// Mark the page as accessible.
page_status_[index].status.store(FlushStatus::Flushed, CloseStatus::Open);
}
}

template <class D>
Expand Down
6 changes: 3 additions & 3 deletions cs/src/core/Utilities/Utility.cs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ internal static int Murmur3(int h)
/// <summary>
/// Updates the variable to newValue only if the current value is smaller than the new value.
/// </summary>
/// <param name="variable"> The variable to possibly replace</param>
/// <param name="variable">The variable to possibly replace</param>
/// <param name="newValue">The value that replaces the variable if successful</param>
/// <param name="oldValue">The orignal value in the variable</param>
/// <returns> if oldValue less than newValue </returns>
Expand All @@ -228,15 +228,15 @@ public static bool MonotonicUpdate(ref long variable, long newValue, out long ol
do
{
oldValue = variable;
if (oldValue > newValue) return false;
if (oldValue >= newValue) return false;
} while (Interlocked.CompareExchange(ref variable, newValue, oldValue) != oldValue);
return true;
}

/// <summary>
/// Updates the variable to newValue only if the current value is smaller than the new value.
/// </summary>
/// <param name="variable"> The variable to possibly replace</param>
/// <param name="variable">The variable to possibly replace</param>
/// <param name="newValue">The value that replaces the variable if successful</param>
/// <param name="oldValue">The orignal value in the variable</param>
/// <returns>if oldValue less than or equal to newValue</returns>
Expand Down

0 comments on commit 29c64e1

Please sign in to comment.