From c98de54b6f7c8f5fea0f88c50edf347eb23a5fe9 Mon Sep 17 00:00:00 2001 From: Chinmay Kulkarni Date: Tue, 24 Mar 2020 18:00:36 -0600 Subject: [PATCH] Compaction for C++ FASTER (#250) * Scan Iterator over Hybrid Log This iterator takes in a begin and end address. Calling getNext() returns a pointer to the next record in the range. If the end has been reached, then nullptr is returned. If we need to scan over disk, then the iterator can optionally scan extra pages and buffer them internally. * Upsert and Delete contexts for Log Compaction The Log Compaction algorithm requires that we a) Collect records into a temporary faster instance b) Delete dead records from this instance c) Upsert live records at the tail of the log To perform the above three we need an Upsert and Delete context. * Compaction for HybridLog Implements a 3 phased approach similar to the C# version. Phase 1: Collects records from the region to be compacted into a mini-FASTER instance. Phase 2: Scans records in FASTER's log upto the safe read-only offset, deleting them from the mini-FASTER instance. Phase 3: Inserts records from the mini-FASTER into the hybrid log as long as they don't exist in it's mutable region. * Unit test for in-memory scan iterator * Read correct page from disk when scanning * Simple unit test for compaction * Fix Windows compile error. * Fix copyright on compact.h Co-authored-by: Badrish Chandramouli --- cc/src/core/compact.h | 134 ++++++++++++++++++++++++ cc/src/core/faster.h | 183 ++++++++++++++++++++++++++++++++- cc/src/core/log_scan.h | 222 ++++++++++++++++++++++++++++++++++++++++ cc/test/CMakeLists.txt | 2 + cc/test/compact_test.cc | 214 ++++++++++++++++++++++++++++++++++++++ cc/test/scan_test.cc | 100 ++++++++++++++++++ 6 files changed, 854 insertions(+), 1 deletion(-) create mode 100644 cc/src/core/compact.h create mode 100644 cc/src/core/log_scan.h create mode 100644 cc/test/compact_test.cc create mode 100644 cc/test/scan_test.cc diff --git a/cc/src/core/compact.h b/cc/src/core/compact.h new file mode 100644 index 000000000..6489fccc6 --- /dev/null +++ b/cc/src/core/compact.h @@ -0,0 +1,134 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. + +#pragma once + +#include "core/async.h" + +#include "record.h" + +namespace FASTER { +namespace core { + +/// Upsert context used by FASTER's compaction algorithm. +/// +/// The following are template arguments. +/// K: The type on the key of each record. +/// V: The type on the value stored inside FASTER. +template +class CompactionUpsert : public IAsyncContext { + public: + // Typedefs on the key and value required internally by FASTER. + typedef K key_t; + typedef V value_t; + + // Type signature on the record. Required by the constructor. + typedef Record record_t; + + /// Constructs and returns a context given a pointer to a record. + CompactionUpsert(record_t* record) + : key_(record->key()) + , value_(record->value()) + {} + + /// Copy constructor. Required for when an Upsert operation goes async + /// inside FASTER. + CompactionUpsert(const CompactionUpsert& from) + : key_(from.key_) + , value_(from.value_) + {} + + /// Accessor for the key. Invoked from within FASTER. + inline const K& key() const { + return key_; + } + + /// Returns the size of the value. Invoked from within FASTER when creating + /// a new key-value pair (because the key did not map to a value to begin + /// with). + inline static constexpr uint32_t value_size() { + return V::size(); + } + + /// Stores this context's value into a passed in reference. This is + /// typically invoked from within FASTER when a new record corresponding + /// to the key-value pair is created at the tail of the hybrid log. + inline void Put(V& val) { + new(&val) V(value_); + } + + /// Atomically stores this context's value into a passed in reference. This + /// is typically invoked from within FASTER when performing an Upsert on a + /// key-value pair in the HybridLog's mutable region. + inline bool PutAtomic(V& val) { + new(&val) V(value_); + return true; + } + + protected: + /// Copies this context into a passed-in pointer if the operation goes + /// asynchronous inside FASTER. + Status DeepCopy_Internal(IAsyncContext*& context_copy) { + return IAsyncContext::DeepCopy_Internal(*this, context_copy); + } + + private: + /// The key the upsert must be performed against. + K key_; + + /// The value that the key should map to after the Upsert operation. + V value_; +}; + +/// Delete context used by FASTER's compaction algorithm. +/// +/// The following are template arguments. +/// K: The type on the key of each record. +/// V: The type on the value stored inside FASTER. +template +class CompactionDelete : public IAsyncContext { + public: + // Typedefs on the key and value required internally by FASTER. + typedef K key_t; + typedef V value_t; + + // Type signature on the record. Required by the constructor. + typedef Record record_t; + + /// Constructs and returns a context given a pointer to a record. + CompactionDelete(record_t* record) + : key_(record->key()) + {} + + /// Copy constructor. Required for when the operation goes async + /// inside FASTER. + CompactionDelete(const CompactionDelete& from) + : key_(from.key_) + {} + + /// Accessor for the key. Invoked from within FASTER. + inline const K& key() const { + return key_; + } + + /// Returns the size of the value. Invoked from within FASTER when creating + /// a new key-value pair (because the key did not map to a value to begin + /// with). + inline static constexpr uint32_t value_size() { + return V::size(); + } + + protected: + /// Copies this context into a passed-in pointer if the operation goes + /// asynchronous inside FASTER. + Status DeepCopy_Internal(IAsyncContext*& context_copy) { + return IAsyncContext::DeepCopy_Internal(*this, context_copy); + } + + private: + /// The key the delete must be performed against. + K key_; +}; + +} // namespace core +} // namespace FASTER diff --git a/cc/src/core/faster.h b/cc/src/core/faster.h index 27869e6d6..652b9b3e1 100644 --- a/cc/src/core/faster.h +++ b/cc/src/core/faster.h @@ -10,6 +10,7 @@ #include #include #include +#include #include "device/file_system_disk.h" @@ -30,6 +31,8 @@ #include "state_transitions.h" #include "status.h" #include "utility.h" +#include "log_scan.h" +#include "compact.h" using namespace std::chrono_literals; @@ -146,6 +149,9 @@ class FasterKv { Status Recover(const Guid& index_token, const Guid& hybrid_log_token, uint32_t& version, std::vector& session_ids); + /// Log compaction entry method. + bool Compact(uint64_t untilAddress); + /// Truncating the head of the log. bool ShiftBeginAddress(Address address, GcState::truncate_callback_t truncate_callback, GcState::complete_callback_t complete_callback); @@ -258,6 +264,9 @@ class FasterKv { void AddHashEntry(HashBucket*& bucket, uint32_t& next_idx, uint8_t version, HashBucketEntry entry); + Address LogScanForValidity(Address from, faster_t* temp); + bool ContainsKeyInMemory(key_t key, Address offset); + /// Access the current and previous (thread-local) execution contexts. const ExecutionContext& thread_ctx() const { return thread_contexts_[Thread::id()].cur(); @@ -2881,5 +2890,177 @@ inline std::ostream& operator << (std::ostream& out, const FixedPageAddress addr return out << address.control(); } +/// When invoked, compacts the hybrid-log between the begin address and a +/// passed in offset (`untilAddress`). +template +bool FasterKv::Compact(uint64_t untilAddress) +{ + // First, initialize a mini FASTER that will store all live records in + // the range [beginAddress, untilAddress). + Address begin = hlog.begin_address.load(); + auto size = 2 * (untilAddress - begin.control()); + if (size < 0) return false; + + auto pSize = PersistentMemoryMalloc::kPageSize; + size = std::max(8 * pSize, size); + + if (size % pSize != 0) size += pSize - (size % pSize); + + faster_t tempKv(min_table_size_, size, ""); + tempKv.StartSession(); + + // In the first phase of compaction, scan the hybrid-log between addresses + // [beginAddress, untilAddress), adding all live records to the mini FASTER. + // On encountering a tombstone, we try to delete the record from the mini + // instance of FASTER. + int numOps = 0; + ScanIterator iter(&hlog, Buffering::DOUBLE_PAGE, begin, + Address(untilAddress), &disk); + while (true) { + auto r = iter.GetNext(); + if (r == nullptr) break; + + if (!r->header.tombstone) { + CompactionUpsert ctxt(r); + auto cb = [](IAsyncContext* ctxt, Status result) { + CallbackContext> context(ctxt); + assert(result == Status::Ok); + }; + tempKv.Upsert(ctxt, cb, 0); + } else { + CompactionDelete ctxt(r); + auto cb = [](IAsyncContext* ctxt, Status result) { + CallbackContext> context(ctxt); + assert(result == Status::Ok); + }; + tempKv.Delete(ctxt, cb, 0); + } + + if (++numOps % 1000 == 0) { + tempKv.Refresh(); + Refresh(); + } + } + + // Scan the remainder of the hybrid log, deleting all encountered records + // from the temporary/mini FASTER instance. + auto upto = LogScanForValidity(Address(untilAddress), &tempKv); + + // Finally, scan through all records within the temporary FASTER instance, + // inserting those that don't already exist within FASTER's mutable region. + numOps = 0; + ScanIterator iter2(&tempKv.hlog, Buffering::DOUBLE_PAGE, + tempKv.hlog.begin_address.load(), + tempKv.hlog.GetTailAddress(), &tempKv.disk); + while (true) { + auto r = iter2.GetNext(); + if (r == nullptr) break; + + if (!r->header.tombstone && !ContainsKeyInMemory(r->key(), upto)) { + CompactionUpsert ctxt(r); + auto cb = [](IAsyncContext* ctxt, Status result) { + CallbackContext> context(ctxt); + assert(result == Status::Ok); + }; + + Upsert(ctxt, cb, 0); + } + + if (++numOps % 1000 == 0) { + tempKv.Refresh(); + Refresh(); + } + + // The safe-read-only region might have moved forward since the previous + // log scan. If it has, perform another validity scan over the delta. + if (upto < hlog.safe_read_only_address.load()) { + upto = LogScanForValidity(upto, &tempKv); + } + } + + tempKv.StopSession(); + return true; +} + +/// Scans the hybrid log starting at `from` until the safe-read-only address, +/// deleting all encountered records from within a passed in temporary FASTER +/// instance. +/// +/// Useful for log compaction where the temporary instance contains potentially +/// live records that were found before `from` on the log. This method will then +/// delete all records from within that instance that are dead because they exist +/// in the safe-read-only region of the main FASTER instance. +/// +/// Returns the address upto which the scan was performed. +template +Address FasterKv::LogScanForValidity(Address from, faster_t* temp) +{ + // Scan upto the safe read only region of the log, deleting all encountered + // records from the temporary instance of FASTER. Since the safe-read-only + // offset can advance while we're scanning, we repeat this operation until + // we converge. + Address sRO = hlog.safe_read_only_address.load(); + while (from < sRO) { + int numOps = 0; + ScanIterator iter(&hlog, Buffering::DOUBLE_PAGE, from, + sRO, &disk); + while (true) { + auto r = iter.GetNext(); + if (r == nullptr) break; + + CompactionDelete ctxt(r); + auto cb = [](IAsyncContext* ctxt, Status result) { + CallbackContext> context(ctxt); + assert(result == Status::Ok); + }; + temp->Delete(ctxt, cb, 0); + + if (++numOps % 1000 == 0) { + temp->Refresh(); + Refresh(); + } + } + + // Refresh Faster, updating our start and end addresses for the convergence + // check in the while loop above. + Refresh(); + from = sRO; + sRO = hlog.safe_read_only_address.load(); + } + + return sRO; +} + +/// Checks if a key exists between a passed in address (`offset`) and the +/// current tail of the hybrid log. +template +bool FasterKv::ContainsKeyInMemory(key_t key, Address offset) +{ + // First, retrieve the hash table entry corresponding to this key. + KeyHash hash = key.GetHash(); + HashBucketEntry _entry; + const AtomicHashBucketEntry* atomic_entry = FindEntry(hash, _entry); + if (!atomic_entry) return false; + + HashBucketEntry entry = atomic_entry->load(); + Address address = entry.address(); + + if (address >= offset) { + // Look through the in-memory portion of the log, to find the first record + // (if any) whose key matches. + const record_t* record = + reinterpret_cast(hlog.Get(address)); + if(key != record->key()) { + address = TraceBackForKeyMatch(key, record->header.previous_address(), + offset); + } + } + + // If we found a record after the passed in address then we succeeded. + // Otherwise, we failed and so return false. + if (address >= offset) return true; + return false; +} + } -} // namespace FASTER::core \ No newline at end of file +} // namespace FASTER::core diff --git a/cc/src/core/log_scan.h b/cc/src/core/log_scan.h new file mode 100644 index 000000000..a6ca3647e --- /dev/null +++ b/cc/src/core/log_scan.h @@ -0,0 +1,222 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. + +#pragma once + +#include + +#include "alloc.h" +#include "record.h" +#include "persistent_memory_malloc.h" + +namespace FASTER { +namespace core { + +/// Represents the buffering mode used when scanning pages on the log. +enum class Buffering : uint8_t { + /// Pages on persistent storage will not be buffered by ScanIterator. + /// When scanning such a page, the iterator will synchronously block + /// until it has been fetched from the persistent storage layer. + UN_BUFFERED = 0, + + /// Every time the iterator begins scanning a page, it will issue an + /// asynchronous read for the next one. If the read hasn't completed + /// by the time it reaches the next page the iterator will block. + SINGLE_PAGE = 1, + + /// Every time the iterator begins scanning a page, it will issue an + /// asynchronous read for the next two. If these reads have not completed + /// by the time it reaches the end of what's already been read the iterator + /// will block. + DOUBLE_PAGE = 2, +}; + +/// Helps scan through records on a Log. +/// `F` is the type signature on the FASTER instance using this iterator. +template +class ScanIterator { + public: + /// For convenience. Typedef type signatures on records and the hybrid log. + typedef Record record_t; + typedef typename F::hlog_t hlog_t; + typedef typename F::disk_t disk_t; + + /// Constructs and returns an Iterator given a pointer to a log. + /// + /// \param log + /// Pointer to the log that this iterator must scan over. + /// \param mode + /// Buffering mode to be used when scanning the log. Refer to the + /// `Buffering` enum class for more details. + /// \param begin + /// Logical log address at which to start scanning. + /// \param end + /// Logical log address at which to stop scanning. + /// \param disk + /// Pointer to the disk the log was allocated under. Required for + /// when we need to issue IO requests and make sure they complete. + ScanIterator(hlog_t* log, Buffering mode, Address begin, Address end, + disk_t* disk) + : hLog(log) + , numFrames(0) + , frames(nullptr) + , start(begin) + , until(end) + , current(start) + , currentFrame(0) + , completedIOs(0) + , disk(disk) + { + // Allocate one extra frame than what was requested so that we can + // hold the page that we're currently scanning. + switch (mode) { + case Buffering::SINGLE_PAGE: + numFrames = 2; + break; + + case Buffering::DOUBLE_PAGE: + numFrames = 3; + break; + + case Buffering::UN_BUFFERED: + default: + numFrames = 1; + break; + } + + frames = new uint8_t* [numFrames]; + for (auto i = 0; i < numFrames; i++) { + frames[i] = reinterpret_cast(aligned_alloc(hLog->sector_size, + hlog_t::kPageSize)); + } + } + + /// Destroys the iterator freeing up memory allocated for the buffer. + ~ScanIterator() { + for (auto i = 0; i < numFrames; i++) { + aligned_free(reinterpret_cast(frames[i])); + } + + delete frames; + } + + /// Disallow copy and copy-assign constructors. + ScanIterator(const ScanIterator& from) = delete; + ScanIterator& operator=(const ScanIterator& from) = delete; + + /// Returns a pointer to the next record. + record_t* GetNext() { + // We've exceeded the range over which we had to perform our scan. + // No work to do over here other than returning a nullptr. + if (current >= until) return nullptr; + + // If we're within the in-memory region, then just lookup the address, + // increment it and return a pointer to the record. + if (current >= hLog->head_address.load()) { + auto record = reinterpret_cast(hLog->Get(current)); + current += record->size(); + return record; + } + + // If we're over here then we need to issue reads to persistent storage. + return blockAndLoad(); + } + + private: + /// Loads pages from persistent storage if needed, and returns a pointer + /// to the next record in the log. + record_t* blockAndLoad() { + // We are at the first address of the first frame in the buffer. Issue + // an IO to the persistent layer and then wait for it to complete. + if (currentFrame == 0 && current.offset() == 0) { + auto cb = [](IAsyncContext* ctxt, Status result, size_t bytes) { + assert(result == Status::Ok); + assert(bytes == hlog_t::kPageSize); + + CallbackContext context(ctxt); + context->counter->fetch_add(1); + }; + + // Issue reads to fill up the buffer and wait for them to complete. + for (auto i = 0; i < numFrames; i++) { + auto ctxt = Context(&completedIOs); + auto addr = current.control() + (i * hlog_t::kPageSize); + hLog->file->ReadAsync(addr, + reinterpret_cast(frames[i]), + hlog_t::kPageSize, cb, ctxt); + } + + while (completedIOs.load() < numFrames) disk->TryComplete(); + completedIOs.store(0); + } + + // We have the corresponding page in our buffer. Look it up, increment + // the current address and current frame (if necessary), return a + // pointer to the record. + auto record = reinterpret_cast(frames[currentFrame] + + current.offset()); + current += record->size(); + if (current.offset() == 0) currentFrame = (currentFrame + 1) % numFrames; + return record; + } + + /// Passed in to the persistent layer when reading pages. Contains a pointer + /// to an atomic counter. We read `numFrames` pages at a time in + /// blockAndLoad(). When an IO completes, the callback increments the counter, + /// allowing ScanIterator to keep track of the number of completed IOs. + class Context : public IAsyncContext { + public: + /// Constructs a context given a pointer to an atomic counter keeping + /// track of the number of IOs that have completed so far. + Context(std::atomic* ctr) + : counter(ctr) + {} + + /// Destroys a Context. + ~Context() {} + + protected: + /// Copies this context into a passed in pointer. + Status DeepCopy_Internal(IAsyncContext*& copy) { + return IAsyncContext::DeepCopy_Internal(*this, copy); + } + + public: + /// Pointer to an atomic counter. Counts the number of IOs that have + /// completed so far. + std::atomic* counter; + }; + + /// The underlying hybrid log to scan over. + hlog_t* hLog; + + /// The number of page frames within the buffer. + int numFrames; + + /// Buffer for pages that need to be scanned but are currently on disk. + uint8_t** frames; + + /// Logical address within the log at which to start scanning. + Address start; + + /// Logical address within the log at which to stop scanning. + Address until; + + /// Logical address within the log at which we are currently scanning. + Address current; + + /// Current frame within the buffer we're scanning through for records + /// on persistent storage. + uint64_t currentFrame; + + /// The number of read requests to the persistent storage layer that + /// have completed so far. Refreshed every numFrames. + std::atomic completedIOs; + + /// Pointer to the disk hLog was allocated under. Required so that we + /// can make sure that reads issued to the persistent layer complete. + disk_t* disk; +}; + +} // namespace core +} // namespace FASTER diff --git a/cc/test/CMakeLists.txt b/cc/test/CMakeLists.txt index acb8c8f8f..67df2443d 100644 --- a/cc/test/CMakeLists.txt +++ b/cc/test/CMakeLists.txt @@ -9,3 +9,5 @@ if(MSVC) ADD_FASTER_TEST(recovery_threadpool_test "recovery_test.h") endif() ADD_FASTER_TEST(utility_test "") +ADD_FASTER_TEST(scan_test "") +ADD_FASTER_TEST(compact_test "") diff --git a/cc/test/compact_test.cc b/cc/test/compact_test.cc new file mode 100644 index 000000000..497bc2dc0 --- /dev/null +++ b/cc/test/compact_test.cc @@ -0,0 +1,214 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. + +#include "gtest/gtest.h" + +#include "core/faster.h" + +#include "device/null_disk.h" + +#include "test_types.h" + +using namespace FASTER::core; +using FASTER::test::FixedSizeKey; +using FASTER::test::SimpleAtomicValue; + +using Key = FixedSizeKey; +using Value = SimpleAtomicValue; + +/// Upsert context required to insert data for unit testing. +class UpsertContext : public IAsyncContext { + public: + typedef Key key_t; + typedef Value value_t; + + UpsertContext(uint64_t key) + : key_{ key } + {} + + /// Copy (and deep-copy) constructor. + UpsertContext(const UpsertContext& other) + : key_{ other.key_ } + {} + + /// The implicit and explicit interfaces require a key() accessor. + inline const Key& key() const { + return key_; + } + inline static constexpr uint32_t value_size() { + return sizeof(value_t); + } + /// Non-atomic and atomic Put() methods. + inline void Put(Value& value) { + value.value = key_.key; + } + inline bool PutAtomic(Value& value) { + value.atomic_value.store(key_.key); + return true; + } + + protected: + /// The explicit interface requires a DeepCopy_Internal() implementation. + Status DeepCopy_Internal(IAsyncContext*& context_copy) { + return IAsyncContext::DeepCopy_Internal(*this, context_copy); + } + + private: + Key key_; +}; + +/// Context to read a key when unit testing. +class ReadContext : public IAsyncContext { + public: + typedef Key key_t; + typedef Value value_t; + + ReadContext(uint64_t key) + : key_{ key } + {} + + /// Copy (and deep-copy) constructor. + ReadContext(const ReadContext& other) + : key_{ other.key_ } + {} + + /// The implicit and explicit interfaces require a key() accessor. + inline const Key& key() const { + return key_; + } + + inline void Get(const Value& value) { + output = value.value; + } + inline void GetAtomic(const Value& value) { + output = value.atomic_value.load(); + } + + protected: + /// The explicit interface requires a DeepCopy_Internal() implementation. + Status DeepCopy_Internal(IAsyncContext*& context_copy) { + return IAsyncContext::DeepCopy_Internal(*this, context_copy); + } + + private: + Key key_; + public: + uint64_t output; +}; + +/// Context to delete a key when unit testing. +class DeleteContext : public IAsyncContext { + private: + Key key_; + + public: + typedef Key key_t; + typedef Value value_t; + + explicit DeleteContext(const Key& key) + : key_{ key } + {} + + inline const Key& key() const { + return key_; + } + + inline static constexpr uint32_t value_size() { + return Value::size(); + } + + protected: + /// The explicit interface requires a DeepCopy_Internal() implementation. + Status DeepCopy_Internal(IAsyncContext*& context_copy) { + return IAsyncContext::DeepCopy_Internal(*this, context_copy); + } +}; + +/// Inserts a bunch of records into a FASTER instance and invokes the +/// compaction algorithm. Since all records are still live, checks if +/// they remain so after the algorithm completes/returns. +TEST(Compact, AllLive) { + typedef FasterKv faster_t; + + faster_t store { 128, 1073741824, "" }; + + store.StartSession(); + + int numRecords = 256; + for (size_t idx = 0; idx < numRecords; ++idx) { + auto callback = [](IAsyncContext* ctxt, Status result) { + ASSERT_TRUE(false); + }; + UpsertContext context{ static_cast(idx) }; + Status result = store.Upsert(context, callback, 1); + ASSERT_EQ(Status::Ok, result); + } + + store.Compact(store.hlog.GetTailAddress().control()); + + for (size_t idx = 0; idx < numRecords; ++idx) { + auto callback = [](IAsyncContext* ctxt, Status result) { + ASSERT_TRUE(false); + }; + ReadContext context{ idx }; + Status result = store.Read(context, callback, 1); + ASSERT_EQ(Status::Ok, result); + ASSERT_EQ(idx, context.output); + } + + store.StopSession(); +} + +/// Inserts a bunch of records into a FASTER instance, deletes half of them +/// and invokes the compaction algorithm. Checks that the ones that should +/// be alive are alive and the ones that should be dead stay dead. +TEST(Compact, HalfLive) { + typedef FasterKv faster_t; + + faster_t store { 128, 1073741824, "" }; + + store.StartSession(); + + int numRecords = 256; + for (size_t idx = 0; idx < numRecords; ++idx) { + auto callback = [](IAsyncContext* ctxt, Status result) { + ASSERT_TRUE(false); + }; + UpsertContext context{ static_cast(idx) }; + Status result = store.Upsert(context, callback, 1); + ASSERT_EQ(Status::Ok, result); + } + + // Delete every alternate key here. + for (size_t idx = 0; idx < numRecords; ++idx) { + if (idx % 2 == 0) continue; + auto callback = [](IAsyncContext* ctxt, Status result) { + ASSERT_TRUE(false); + }; + Key key{ idx }; + DeleteContext context{ key }; + Status result = store.Delete(context, callback, 1); + ASSERT_EQ(Status::Ok, result); + } + + store.Compact(store.hlog.GetTailAddress().control()); + + // After compaction, deleted keys stay deleted. + for (size_t idx = 0; idx < numRecords; ++idx) { + auto callback = [](IAsyncContext* ctxt, Status result) { + ASSERT_TRUE(false); + }; + ReadContext context{ idx }; + Status result = store.Read(context, callback, 1); + Status expect = idx % 2 == 0 ? Status::Ok : Status::NotFound; + ASSERT_EQ(expect, result); + if (idx % 2 == 0) ASSERT_EQ(idx, context.output); + } + + store.StopSession(); +} + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/cc/test/scan_test.cc b/cc/test/scan_test.cc new file mode 100644 index 000000000..0e4610143 --- /dev/null +++ b/cc/test/scan_test.cc @@ -0,0 +1,100 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. + +#include "gtest/gtest.h" + +#include "core/faster.h" +#include "core/log_scan.h" + +#include "device/null_disk.h" + +#include "test_types.h" + +using namespace FASTER::core; +using FASTER::test::FixedSizeKey; +using FASTER::test::SimpleAtomicValue; + +using Key = FixedSizeKey; +using Value = SimpleAtomicValue; + +/// Upsert context required to insert data for unit testing. +class UpsertContext : public IAsyncContext { + public: + typedef Key key_t; + typedef Value value_t; + + UpsertContext(uint64_t key) + : key_{ key } + {} + + /// Copy (and deep-copy) constructor. + UpsertContext(const UpsertContext& other) + : key_{ other.key_ } + {} + + /// The implicit and explicit interfaces require a key() accessor. + inline const Key& key() const { + return key_; + } + inline static constexpr uint32_t value_size() { + return sizeof(value_t); + } + /// Non-atomic and atomic Put() methods. + inline void Put(Value& value) { + value.value = 23; + } + inline bool PutAtomic(Value& value) { + value.atomic_value.store(42); + return true; + } + + protected: + /// The explicit interface requires a DeepCopy_Internal() implementation. + Status DeepCopy_Internal(IAsyncContext*& context_copy) { + return IAsyncContext::DeepCopy_Internal(*this, context_copy); + } + + private: + Key key_; +}; + +/// Inserts 256 8 Byte keys into FASTER and tests that the scan iterator +/// scans over and returns all of them correctly. +TEST(ScanIter, InMem) { + typedef FasterKv faster_t; + + faster_t store { 128, 1073741824, "" }; + + store.StartSession(); + + int numRecords = 256; + for (size_t idx = 0; idx < numRecords; ++idx) { + auto callback = [](IAsyncContext* ctxt, Status result) { + ASSERT_TRUE(false); + }; + UpsertContext context{ static_cast(idx) }; + Status result = store.Upsert(context, callback, 1); + ASSERT_EQ(Status::Ok, result); + } + + ScanIterator iter(&(store.hlog), Buffering::UN_BUFFERED, + store.hlog.begin_address.load(), + store.hlog.GetTailAddress(), &(store.disk)); + + int num = 0; + while (true) { + auto r = iter.GetNext(); + if (r == nullptr) break; + ASSERT_EQ(num, r->key().key); + num++; + } + + ASSERT_EQ(numRecords, num); + + store.StopSession(); +} + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +}