diff --git a/cc/src/core/compact.h b/cc/src/core/compact.h new file mode 100644 index 000000000..6489fccc6 --- /dev/null +++ b/cc/src/core/compact.h @@ -0,0 +1,134 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. + +#pragma once + +#include "core/async.h" + +#include "record.h" + +namespace FASTER { +namespace core { + +/// Upsert context used by FASTER's compaction algorithm. +/// +/// The following are template arguments. +/// K: The type on the key of each record. +/// V: The type on the value stored inside FASTER. +template +class CompactionUpsert : public IAsyncContext { + public: + // Typedefs on the key and value required internally by FASTER. + typedef K key_t; + typedef V value_t; + + // Type signature on the record. Required by the constructor. + typedef Record record_t; + + /// Constructs and returns a context given a pointer to a record. + CompactionUpsert(record_t* record) + : key_(record->key()) + , value_(record->value()) + {} + + /// Copy constructor. Required for when an Upsert operation goes async + /// inside FASTER. + CompactionUpsert(const CompactionUpsert& from) + : key_(from.key_) + , value_(from.value_) + {} + + /// Accessor for the key. Invoked from within FASTER. + inline const K& key() const { + return key_; + } + + /// Returns the size of the value. Invoked from within FASTER when creating + /// a new key-value pair (because the key did not map to a value to begin + /// with). + inline static constexpr uint32_t value_size() { + return V::size(); + } + + /// Stores this context's value into a passed in reference. This is + /// typically invoked from within FASTER when a new record corresponding + /// to the key-value pair is created at the tail of the hybrid log. + inline void Put(V& val) { + new(&val) V(value_); + } + + /// Atomically stores this context's value into a passed in reference. This + /// is typically invoked from within FASTER when performing an Upsert on a + /// key-value pair in the HybridLog's mutable region. + inline bool PutAtomic(V& val) { + new(&val) V(value_); + return true; + } + + protected: + /// Copies this context into a passed-in pointer if the operation goes + /// asynchronous inside FASTER. + Status DeepCopy_Internal(IAsyncContext*& context_copy) { + return IAsyncContext::DeepCopy_Internal(*this, context_copy); + } + + private: + /// The key the upsert must be performed against. + K key_; + + /// The value that the key should map to after the Upsert operation. + V value_; +}; + +/// Delete context used by FASTER's compaction algorithm. +/// +/// The following are template arguments. +/// K: The type on the key of each record. +/// V: The type on the value stored inside FASTER. +template +class CompactionDelete : public IAsyncContext { + public: + // Typedefs on the key and value required internally by FASTER. + typedef K key_t; + typedef V value_t; + + // Type signature on the record. Required by the constructor. + typedef Record record_t; + + /// Constructs and returns a context given a pointer to a record. + CompactionDelete(record_t* record) + : key_(record->key()) + {} + + /// Copy constructor. Required for when the operation goes async + /// inside FASTER. + CompactionDelete(const CompactionDelete& from) + : key_(from.key_) + {} + + /// Accessor for the key. Invoked from within FASTER. + inline const K& key() const { + return key_; + } + + /// Returns the size of the value. Invoked from within FASTER when creating + /// a new key-value pair (because the key did not map to a value to begin + /// with). + inline static constexpr uint32_t value_size() { + return V::size(); + } + + protected: + /// Copies this context into a passed-in pointer if the operation goes + /// asynchronous inside FASTER. + Status DeepCopy_Internal(IAsyncContext*& context_copy) { + return IAsyncContext::DeepCopy_Internal(*this, context_copy); + } + + private: + /// The key the delete must be performed against. + K key_; +}; + +} // namespace core +} // namespace FASTER diff --git a/cc/src/core/faster.h b/cc/src/core/faster.h index 27869e6d6..652b9b3e1 100644 --- a/cc/src/core/faster.h +++ b/cc/src/core/faster.h @@ -10,6 +10,7 @@ #include #include #include +#include #include "device/file_system_disk.h" @@ -30,6 +31,8 @@ #include "state_transitions.h" #include "status.h" #include "utility.h" +#include "log_scan.h" +#include "compact.h" using namespace std::chrono_literals; @@ -146,6 +149,9 @@ class FasterKv { Status Recover(const Guid& index_token, const Guid& hybrid_log_token, uint32_t& version, std::vector& session_ids); + /// Log compaction entry method. + bool Compact(uint64_t untilAddress); + /// Truncating the head of the log. bool ShiftBeginAddress(Address address, GcState::truncate_callback_t truncate_callback, GcState::complete_callback_t complete_callback); @@ -258,6 +264,9 @@ class FasterKv { void AddHashEntry(HashBucket*& bucket, uint32_t& next_idx, uint8_t version, HashBucketEntry entry); + Address LogScanForValidity(Address from, faster_t* temp); + bool ContainsKeyInMemory(key_t key, Address offset); + /// Access the current and previous (thread-local) execution contexts. const ExecutionContext& thread_ctx() const { return thread_contexts_[Thread::id()].cur(); @@ -2881,5 +2890,177 @@ inline std::ostream& operator << (std::ostream& out, const FixedPageAddress addr return out << address.control(); } +/// When invoked, compacts the hybrid-log between the begin address and a +/// passed in offset (`untilAddress`). +template +bool FasterKv::Compact(uint64_t untilAddress) +{ + // First, initialize a mini FASTER that will store all live records in + // the range [beginAddress, untilAddress). + Address begin = hlog.begin_address.load(); + auto size = 2 * (untilAddress - begin.control()); + if (size < 0) return false; + + auto pSize = PersistentMemoryMalloc::kPageSize; + size = std::max(8 * pSize, size); + + if (size % pSize != 0) size += pSize - (size % pSize); + + faster_t tempKv(min_table_size_, size, ""); + tempKv.StartSession(); + + // In the first phase of compaction, scan the hybrid-log between addresses + // [beginAddress, untilAddress), adding all live records to the mini FASTER. + // On encountering a tombstone, we try to delete the record from the mini + // instance of FASTER. + int numOps = 0; + ScanIterator iter(&hlog, Buffering::DOUBLE_PAGE, begin, + Address(untilAddress), &disk); + while (true) { + auto r = iter.GetNext(); + if (r == nullptr) break; + + if (!r->header.tombstone) { + CompactionUpsert ctxt(r); + auto cb = [](IAsyncContext* ctxt, Status result) { + CallbackContext> context(ctxt); + assert(result == Status::Ok); + }; + tempKv.Upsert(ctxt, cb, 0); + } else { + CompactionDelete ctxt(r); + auto cb = [](IAsyncContext* ctxt, Status result) { + CallbackContext> context(ctxt); + assert(result == Status::Ok); + }; + tempKv.Delete(ctxt, cb, 0); + } + + if (++numOps % 1000 == 0) { + tempKv.Refresh(); + Refresh(); + } + } + + // Scan the remainder of the hybrid log, deleting all encountered records + // from the temporary/mini FASTER instance. + auto upto = LogScanForValidity(Address(untilAddress), &tempKv); + + // Finally, scan through all records within the temporary FASTER instance, + // inserting those that don't already exist within FASTER's mutable region. + numOps = 0; + ScanIterator iter2(&tempKv.hlog, Buffering::DOUBLE_PAGE, + tempKv.hlog.begin_address.load(), + tempKv.hlog.GetTailAddress(), &tempKv.disk); + while (true) { + auto r = iter2.GetNext(); + if (r == nullptr) break; + + if (!r->header.tombstone && !ContainsKeyInMemory(r->key(), upto)) { + CompactionUpsert ctxt(r); + auto cb = [](IAsyncContext* ctxt, Status result) { + CallbackContext> context(ctxt); + assert(result == Status::Ok); + }; + + Upsert(ctxt, cb, 0); + } + + if (++numOps % 1000 == 0) { + tempKv.Refresh(); + Refresh(); + } + + // The safe-read-only region might have moved forward since the previous + // log scan. If it has, perform another validity scan over the delta. + if (upto < hlog.safe_read_only_address.load()) { + upto = LogScanForValidity(upto, &tempKv); + } + } + + tempKv.StopSession(); + return true; +} + +/// Scans the hybrid log starting at `from` until the safe-read-only address, +/// deleting all encountered records from within a passed in temporary FASTER +/// instance. +/// +/// Useful for log compaction where the temporary instance contains potentially +/// live records that were found before `from` on the log. This method will then +/// delete all records from within that instance that are dead because they exist +/// in the safe-read-only region of the main FASTER instance. +/// +/// Returns the address upto which the scan was performed. +template +Address FasterKv::LogScanForValidity(Address from, faster_t* temp) +{ + // Scan upto the safe read only region of the log, deleting all encountered + // records from the temporary instance of FASTER. Since the safe-read-only + // offset can advance while we're scanning, we repeat this operation until + // we converge. + Address sRO = hlog.safe_read_only_address.load(); + while (from < sRO) { + int numOps = 0; + ScanIterator iter(&hlog, Buffering::DOUBLE_PAGE, from, + sRO, &disk); + while (true) { + auto r = iter.GetNext(); + if (r == nullptr) break; + + CompactionDelete ctxt(r); + auto cb = [](IAsyncContext* ctxt, Status result) { + CallbackContext> context(ctxt); + assert(result == Status::Ok); + }; + temp->Delete(ctxt, cb, 0); + + if (++numOps % 1000 == 0) { + temp->Refresh(); + Refresh(); + } + } + + // Refresh Faster, updating our start and end addresses for the convergence + // check in the while loop above. + Refresh(); + from = sRO; + sRO = hlog.safe_read_only_address.load(); + } + + return sRO; +} + +/// Checks if a key exists between a passed in address (`offset`) and the +/// current tail of the hybrid log. +template +bool FasterKv::ContainsKeyInMemory(key_t key, Address offset) +{ + // First, retrieve the hash table entry corresponding to this key. + KeyHash hash = key.GetHash(); + HashBucketEntry _entry; + const AtomicHashBucketEntry* atomic_entry = FindEntry(hash, _entry); + if (!atomic_entry) return false; + + HashBucketEntry entry = atomic_entry->load(); + Address address = entry.address(); + + if (address >= offset) { + // Look through the in-memory portion of the log, to find the first record + // (if any) whose key matches. + const record_t* record = + reinterpret_cast(hlog.Get(address)); + if(key != record->key()) { + address = TraceBackForKeyMatch(key, record->header.previous_address(), + offset); + } + } + + // If we found a record after the passed in address then we succeeded. + // Otherwise, we failed and so return false. + if (address >= offset) return true; + return false; +} + } -} // namespace FASTER::core \ No newline at end of file +} // namespace FASTER::core diff --git a/cc/src/core/log_scan.h b/cc/src/core/log_scan.h new file mode 100644 index 000000000..a6ca3647e --- /dev/null +++ b/cc/src/core/log_scan.h @@ -0,0 +1,222 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. + +#pragma once + +#include + +#include "alloc.h" +#include "record.h" +#include "persistent_memory_malloc.h" + +namespace FASTER { +namespace core { + +/// Represents the buffering mode used when scanning pages on the log. +enum class Buffering : uint8_t { + /// Pages on persistent storage will not be buffered by ScanIterator. + /// When scanning such a page, the iterator will synchronously block + /// until it has been fetched from the persistent storage layer. + UN_BUFFERED = 0, + + /// Every time the iterator begins scanning a page, it will issue an + /// asynchronous read for the next one. If the read hasn't completed + /// by the time it reaches the next page the iterator will block. + SINGLE_PAGE = 1, + + /// Every time the iterator begins scanning a page, it will issue an + /// asynchronous read for the next two. If these reads have not completed + /// by the time it reaches the end of what's already been read the iterator + /// will block. + DOUBLE_PAGE = 2, +}; + +/// Helps scan through records on a Log. +/// `F` is the type signature on the FASTER instance using this iterator. +template +class ScanIterator { + public: + /// For convenience. Typedef type signatures on records and the hybrid log. + typedef Record record_t; + typedef typename F::hlog_t hlog_t; + typedef typename F::disk_t disk_t; + + /// Constructs and returns an Iterator given a pointer to a log. + /// + /// \param log + /// Pointer to the log that this iterator must scan over. + /// \param mode + /// Buffering mode to be used when scanning the log. Refer to the + /// `Buffering` enum class for more details. + /// \param begin + /// Logical log address at which to start scanning. + /// \param end + /// Logical log address at which to stop scanning. + /// \param disk + /// Pointer to the disk the log was allocated under. Required for + /// when we need to issue IO requests and make sure they complete. + ScanIterator(hlog_t* log, Buffering mode, Address begin, Address end, + disk_t* disk) + : hLog(log) + , numFrames(0) + , frames(nullptr) + , start(begin) + , until(end) + , current(start) + , currentFrame(0) + , completedIOs(0) + , disk(disk) + { + // Allocate one extra frame than what was requested so that we can + // hold the page that we're currently scanning. + switch (mode) { + case Buffering::SINGLE_PAGE: + numFrames = 2; + break; + + case Buffering::DOUBLE_PAGE: + numFrames = 3; + break; + + case Buffering::UN_BUFFERED: + default: + numFrames = 1; + break; + } + + frames = new uint8_t* [numFrames]; + for (auto i = 0; i < numFrames; i++) { + frames[i] = reinterpret_cast(aligned_alloc(hLog->sector_size, + hlog_t::kPageSize)); + } + } + + /// Destroys the iterator freeing up memory allocated for the buffer. + ~ScanIterator() { + for (auto i = 0; i < numFrames; i++) { + aligned_free(reinterpret_cast(frames[i])); + } + + delete frames; + } + + /// Disallow copy and copy-assign constructors. + ScanIterator(const ScanIterator& from) = delete; + ScanIterator& operator=(const ScanIterator& from) = delete; + + /// Returns a pointer to the next record. + record_t* GetNext() { + // We've exceeded the range over which we had to perform our scan. + // No work to do over here other than returning a nullptr. + if (current >= until) return nullptr; + + // If we're within the in-memory region, then just lookup the address, + // increment it and return a pointer to the record. + if (current >= hLog->head_address.load()) { + auto record = reinterpret_cast(hLog->Get(current)); + current += record->size(); + return record; + } + + // If we're over here then we need to issue reads to persistent storage. + return blockAndLoad(); + } + + private: + /// Loads pages from persistent storage if needed, and returns a pointer + /// to the next record in the log. + record_t* blockAndLoad() { + // We are at the first address of the first frame in the buffer. Issue + // an IO to the persistent layer and then wait for it to complete. + if (currentFrame == 0 && current.offset() == 0) { + auto cb = [](IAsyncContext* ctxt, Status result, size_t bytes) { + assert(result == Status::Ok); + assert(bytes == hlog_t::kPageSize); + + CallbackContext context(ctxt); + context->counter->fetch_add(1); + }; + + // Issue reads to fill up the buffer and wait for them to complete. + for (auto i = 0; i < numFrames; i++) { + auto ctxt = Context(&completedIOs); + auto addr = current.control() + (i * hlog_t::kPageSize); + hLog->file->ReadAsync(addr, + reinterpret_cast(frames[i]), + hlog_t::kPageSize, cb, ctxt); + } + + while (completedIOs.load() < numFrames) disk->TryComplete(); + completedIOs.store(0); + } + + // We have the corresponding page in our buffer. Look it up, increment + // the current address and current frame (if necessary), return a + // pointer to the record. + auto record = reinterpret_cast(frames[currentFrame] + + current.offset()); + current += record->size(); + if (current.offset() == 0) currentFrame = (currentFrame + 1) % numFrames; + return record; + } + + /// Passed in to the persistent layer when reading pages. Contains a pointer + /// to an atomic counter. We read `numFrames` pages at a time in + /// blockAndLoad(). When an IO completes, the callback increments the counter, + /// allowing ScanIterator to keep track of the number of completed IOs. + class Context : public IAsyncContext { + public: + /// Constructs a context given a pointer to an atomic counter keeping + /// track of the number of IOs that have completed so far. + Context(std::atomic* ctr) + : counter(ctr) + {} + + /// Destroys a Context. + ~Context() {} + + protected: + /// Copies this context into a passed in pointer. + Status DeepCopy_Internal(IAsyncContext*& copy) { + return IAsyncContext::DeepCopy_Internal(*this, copy); + } + + public: + /// Pointer to an atomic counter. Counts the number of IOs that have + /// completed so far. + std::atomic* counter; + }; + + /// The underlying hybrid log to scan over. + hlog_t* hLog; + + /// The number of page frames within the buffer. + int numFrames; + + /// Buffer for pages that need to be scanned but are currently on disk. + uint8_t** frames; + + /// Logical address within the log at which to start scanning. + Address start; + + /// Logical address within the log at which to stop scanning. + Address until; + + /// Logical address within the log at which we are currently scanning. + Address current; + + /// Current frame within the buffer we're scanning through for records + /// on persistent storage. + uint64_t currentFrame; + + /// The number of read requests to the persistent storage layer that + /// have completed so far. Refreshed every numFrames. + std::atomic completedIOs; + + /// Pointer to the disk hLog was allocated under. Required so that we + /// can make sure that reads issued to the persistent layer complete. + disk_t* disk; +}; + +} // namespace core +} // namespace FASTER diff --git a/cc/test/CMakeLists.txt b/cc/test/CMakeLists.txt index acb8c8f8f..67df2443d 100644 --- a/cc/test/CMakeLists.txt +++ b/cc/test/CMakeLists.txt @@ -9,3 +9,5 @@ if(MSVC) ADD_FASTER_TEST(recovery_threadpool_test "recovery_test.h") endif() ADD_FASTER_TEST(utility_test "") +ADD_FASTER_TEST(scan_test "") +ADD_FASTER_TEST(compact_test "") diff --git a/cc/test/compact_test.cc b/cc/test/compact_test.cc new file mode 100644 index 000000000..497bc2dc0 --- /dev/null +++ b/cc/test/compact_test.cc @@ -0,0 +1,214 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. + +#include "gtest/gtest.h" + +#include "core/faster.h" + +#include "device/null_disk.h" + +#include "test_types.h" + +using namespace FASTER::core; +using FASTER::test::FixedSizeKey; +using FASTER::test::SimpleAtomicValue; + +using Key = FixedSizeKey; +using Value = SimpleAtomicValue; + +/// Upsert context required to insert data for unit testing. +class UpsertContext : public IAsyncContext { + public: + typedef Key key_t; + typedef Value value_t; + + UpsertContext(uint64_t key) + : key_{ key } + {} + + /// Copy (and deep-copy) constructor. + UpsertContext(const UpsertContext& other) + : key_{ other.key_ } + {} + + /// The implicit and explicit interfaces require a key() accessor. + inline const Key& key() const { + return key_; + } + inline static constexpr uint32_t value_size() { + return sizeof(value_t); + } + /// Non-atomic and atomic Put() methods. + inline void Put(Value& value) { + value.value = key_.key; + } + inline bool PutAtomic(Value& value) { + value.atomic_value.store(key_.key); + return true; + } + + protected: + /// The explicit interface requires a DeepCopy_Internal() implementation. + Status DeepCopy_Internal(IAsyncContext*& context_copy) { + return IAsyncContext::DeepCopy_Internal(*this, context_copy); + } + + private: + Key key_; +}; + +/// Context to read a key when unit testing. +class ReadContext : public IAsyncContext { + public: + typedef Key key_t; + typedef Value value_t; + + ReadContext(uint64_t key) + : key_{ key } + {} + + /// Copy (and deep-copy) constructor. + ReadContext(const ReadContext& other) + : key_{ other.key_ } + {} + + /// The implicit and explicit interfaces require a key() accessor. + inline const Key& key() const { + return key_; + } + + inline void Get(const Value& value) { + output = value.value; + } + inline void GetAtomic(const Value& value) { + output = value.atomic_value.load(); + } + + protected: + /// The explicit interface requires a DeepCopy_Internal() implementation. + Status DeepCopy_Internal(IAsyncContext*& context_copy) { + return IAsyncContext::DeepCopy_Internal(*this, context_copy); + } + + private: + Key key_; + public: + uint64_t output; +}; + +/// Context to delete a key when unit testing. +class DeleteContext : public IAsyncContext { + private: + Key key_; + + public: + typedef Key key_t; + typedef Value value_t; + + explicit DeleteContext(const Key& key) + : key_{ key } + {} + + inline const Key& key() const { + return key_; + } + + inline static constexpr uint32_t value_size() { + return Value::size(); + } + + protected: + /// The explicit interface requires a DeepCopy_Internal() implementation. + Status DeepCopy_Internal(IAsyncContext*& context_copy) { + return IAsyncContext::DeepCopy_Internal(*this, context_copy); + } +}; + +/// Inserts a bunch of records into a FASTER instance and invokes the +/// compaction algorithm. Since all records are still live, checks if +/// they remain so after the algorithm completes/returns. +TEST(Compact, AllLive) { + typedef FasterKv faster_t; + + faster_t store { 128, 1073741824, "" }; + + store.StartSession(); + + int numRecords = 256; + for (size_t idx = 0; idx < numRecords; ++idx) { + auto callback = [](IAsyncContext* ctxt, Status result) { + ASSERT_TRUE(false); + }; + UpsertContext context{ static_cast(idx) }; + Status result = store.Upsert(context, callback, 1); + ASSERT_EQ(Status::Ok, result); + } + + store.Compact(store.hlog.GetTailAddress().control()); + + for (size_t idx = 0; idx < numRecords; ++idx) { + auto callback = [](IAsyncContext* ctxt, Status result) { + ASSERT_TRUE(false); + }; + ReadContext context{ idx }; + Status result = store.Read(context, callback, 1); + ASSERT_EQ(Status::Ok, result); + ASSERT_EQ(idx, context.output); + } + + store.StopSession(); +} + +/// Inserts a bunch of records into a FASTER instance, deletes half of them +/// and invokes the compaction algorithm. Checks that the ones that should +/// be alive are alive and the ones that should be dead stay dead. +TEST(Compact, HalfLive) { + typedef FasterKv faster_t; + + faster_t store { 128, 1073741824, "" }; + + store.StartSession(); + + int numRecords = 256; + for (size_t idx = 0; idx < numRecords; ++idx) { + auto callback = [](IAsyncContext* ctxt, Status result) { + ASSERT_TRUE(false); + }; + UpsertContext context{ static_cast(idx) }; + Status result = store.Upsert(context, callback, 1); + ASSERT_EQ(Status::Ok, result); + } + + // Delete every alternate key here. + for (size_t idx = 0; idx < numRecords; ++idx) { + if (idx % 2 == 0) continue; + auto callback = [](IAsyncContext* ctxt, Status result) { + ASSERT_TRUE(false); + }; + Key key{ idx }; + DeleteContext context{ key }; + Status result = store.Delete(context, callback, 1); + ASSERT_EQ(Status::Ok, result); + } + + store.Compact(store.hlog.GetTailAddress().control()); + + // After compaction, deleted keys stay deleted. + for (size_t idx = 0; idx < numRecords; ++idx) { + auto callback = [](IAsyncContext* ctxt, Status result) { + ASSERT_TRUE(false); + }; + ReadContext context{ idx }; + Status result = store.Read(context, callback, 1); + Status expect = idx % 2 == 0 ? Status::Ok : Status::NotFound; + ASSERT_EQ(expect, result); + if (idx % 2 == 0) ASSERT_EQ(idx, context.output); + } + + store.StopSession(); +} + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/cc/test/scan_test.cc b/cc/test/scan_test.cc new file mode 100644 index 000000000..0e4610143 --- /dev/null +++ b/cc/test/scan_test.cc @@ -0,0 +1,100 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. + +#include "gtest/gtest.h" + +#include "core/faster.h" +#include "core/log_scan.h" + +#include "device/null_disk.h" + +#include "test_types.h" + +using namespace FASTER::core; +using FASTER::test::FixedSizeKey; +using FASTER::test::SimpleAtomicValue; + +using Key = FixedSizeKey; +using Value = SimpleAtomicValue; + +/// Upsert context required to insert data for unit testing. +class UpsertContext : public IAsyncContext { + public: + typedef Key key_t; + typedef Value value_t; + + UpsertContext(uint64_t key) + : key_{ key } + {} + + /// Copy (and deep-copy) constructor. + UpsertContext(const UpsertContext& other) + : key_{ other.key_ } + {} + + /// The implicit and explicit interfaces require a key() accessor. + inline const Key& key() const { + return key_; + } + inline static constexpr uint32_t value_size() { + return sizeof(value_t); + } + /// Non-atomic and atomic Put() methods. + inline void Put(Value& value) { + value.value = 23; + } + inline bool PutAtomic(Value& value) { + value.atomic_value.store(42); + return true; + } + + protected: + /// The explicit interface requires a DeepCopy_Internal() implementation. + Status DeepCopy_Internal(IAsyncContext*& context_copy) { + return IAsyncContext::DeepCopy_Internal(*this, context_copy); + } + + private: + Key key_; +}; + +/// Inserts 256 8 Byte keys into FASTER and tests that the scan iterator +/// scans over and returns all of them correctly. +TEST(ScanIter, InMem) { + typedef FasterKv faster_t; + + faster_t store { 128, 1073741824, "" }; + + store.StartSession(); + + int numRecords = 256; + for (size_t idx = 0; idx < numRecords; ++idx) { + auto callback = [](IAsyncContext* ctxt, Status result) { + ASSERT_TRUE(false); + }; + UpsertContext context{ static_cast(idx) }; + Status result = store.Upsert(context, callback, 1); + ASSERT_EQ(Status::Ok, result); + } + + ScanIterator iter(&(store.hlog), Buffering::UN_BUFFERED, + store.hlog.begin_address.load(), + store.hlog.GetTailAddress(), &(store.disk)); + + int num = 0; + while (true) { + auto r = iter.GetNext(); + if (r == nullptr) break; + ASSERT_EQ(num, r->key().key); + num++; + } + + ASSERT_EQ(numRecords, num); + + store.StopSession(); +} + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +}