diff --git a/src/log_file.cc b/src/log_file.cc index 0a97deb..23ef414 100644 --- a/src/log_file.cc +++ b/src/log_file.cc @@ -798,11 +798,12 @@ LogFile::Iterator::~Iterator() {} Status LogFile::Iterator::init(LogFile* l_file, const SizedBuf& start_key, const SizedBuf& end_key, + const uint64_t seq_from, const uint64_t seq_upto) { lFile = l_file; lFile->touch(); - return mItr.init(lFile->mTable, start_key, end_key, seq_upto); + return mItr.init(lFile->mTable, start_key, end_key, seq_from, seq_upto); } Status LogFile::Iterator::initSN(LogFile* l_file, diff --git a/src/log_file.h b/src/log_file.h index eb597fd..0e775d6 100644 --- a/src/log_file.h +++ b/src/log_file.h @@ -166,6 +166,7 @@ class LogFile { Status init(LogFile* l_file, const SizedBuf& start_key, const SizedBuf& end_key, + const uint64_t seq_from, const uint64_t seq_upto); Status initSN(LogFile* l_file, const uint64_t min_seq, diff --git a/src/log_iterator.cc b/src/log_iterator.cc index a070763..0a65b40 100644 --- a/src/log_iterator.cc +++ b/src/log_iterator.cc @@ -39,7 +39,7 @@ void LogMgr::Iterator::addLogFileItr(LogFileInfo* l_info, bool is_log_store_snap if (type == BY_SEQ) { l_itr->initSN(l_info->file, minSeqSnap, maxSeqSnap, is_log_store_snapshot); } else if (type == BY_KEY) { - l_itr->init(l_info->file, startKey, endKey, maxSeqSnap); + l_itr->init(l_info->file, startKey, endKey, minSeqSnap, maxSeqSnap); } ItrItem* ctx = new ItrItem(); diff --git a/src/memtable.cc b/src/memtable.cc index bc16533..394bfd3 100644 --- a/src/memtable.cc +++ b/src/memtable.cc @@ -66,14 +66,18 @@ int MemTable::RecNode::cmp(skiplist_node *a, skiplist_node *b, void *aux) { return SizedBuf::cmp(aa->key, bb->key); } -Record* MemTable::RecNode::getLatestRecord(const uint64_t chk) { +Record* MemTable::RecNode::getLatestRecord( const uint64_t seq_from, + const uint64_t seq_upto ) +{ mGuard l(recListLock); Record* rec = nullptr; auto entry = recList->rbegin(); while (entry != recList->rend()) { Record* tmp = *entry; - if (!valid_number(chk) || tmp->seqNum <= chk) { - rec = tmp; + if (!valid_number(seq_upto) || tmp->seqNum <= seq_upto) { + if (!valid_number(seq_from) || tmp->seqNum >= seq_from) { + rec = tmp; + } break; } entry++; @@ -109,18 +113,19 @@ uint64_t MemTable::RecNode::getMinSeq() { return rec->seqNum; } -bool MemTable::RecNode::validKeyExist( const uint64_t chk, +bool MemTable::RecNode::validKeyExist( const uint64_t seq_from, + const uint64_t seq_upto, bool allow_tombstone ) { bool valid_key_exist = true; - if (getMinSeq() > chk) { + if (getMinSeq() > seq_upto) { // No record belongs to the current snapshot (iterator), valid_key_exist = false; } if (valid_key_exist) { - Record* rec = getLatestRecord(chk); + Record* rec = getLatestRecord(seq_from, seq_upto); if (!rec) { // All records in the list may have // higher sequence number than given `chk`. @@ -655,7 +660,7 @@ Status MemTable::getRecordByKeyInternal(const uint64_t chk, } } - Record* rec_ret = node->getLatestRecord(chk); + Record* rec_ret = node->getLatestRecord(NOT_INITIALIZED, chk); if (!rec_ret) { skiplist_release_node(&node->snode); return Status::KEY_NOT_FOUND; @@ -714,7 +719,7 @@ Status MemTable::getRecordsByPrefix(const uint64_t chk, }; while ( is_prefix_match(node->key) ) { - Record* rec_ret = node->getLatestRecord(chk); + Record* rec_ret = node->getLatestRecord(NOT_INITIALIZED, chk); if (!rec_ret) { skiplist_release_node(&node->snode); return Status::KEY_NOT_FOUND; diff --git a/src/memtable.h b/src/memtable.h index 5e9f9b8..203ae32 100644 --- a/src/memtable.h +++ b/src/memtable.h @@ -132,6 +132,7 @@ class MemTable { Status init(const MemTable* m_table, const SizedBuf& start_key, const SizedBuf& end_key, + const uint64_t seq_from, const uint64_t seq_upto); Status initSN(const MemTable* m_table, const uint64_t min_seq, @@ -161,6 +162,11 @@ class MemTable { uint64_t maxSeq; SizedBuf startKey; SizedBuf endKey; + + // (Key-iterator only) min allowed sequence number (inclusive). + uint64_t seqFrom; + + // (Key-iterator only) max allowed sequence number (inclusive). uint64_t seqUpto; }; @@ -177,10 +183,11 @@ class MemTable { ~RecNode(); static int cmp(skiplist_node *a, skiplist_node *b, void *aux); - Record* getLatestRecord(const uint64_t chk); + Record* getLatestRecord(const uint64_t seq_from, const uint64_t seq_upto); std::list discardRecords(uint64_t seq_begin); uint64_t getMinSeq(); - bool validKeyExist(const uint64_t chk, + bool validKeyExist(const uint64_t seq_from, + const uint64_t seq_upto, bool allow_tombstone = false); skiplist_node snode; diff --git a/src/memtable_iterator.cc b/src/memtable_iterator.cc index 987dc14..e2aee3d 100644 --- a/src/memtable_iterator.cc +++ b/src/memtable_iterator.cc @@ -23,6 +23,7 @@ MemTable::Iterator::Iterator() , cursor(nullptr) , minSeq(NOT_INITIALIZED) , maxSeq(NOT_INITIALIZED) + , seqFrom(NOT_INITIALIZED) , seqUpto(NOT_INITIALIZED) {} @@ -33,6 +34,7 @@ MemTable::Iterator::~Iterator() { Status MemTable::Iterator::init(const MemTable* m_table, const SizedBuf& start_key, const SizedBuf& end_key, + const uint64_t seq_from, const uint64_t seq_upto) { mTable = m_table; @@ -51,6 +53,7 @@ Status MemTable::Iterator::init(const MemTable* m_table, startKey.alloc(start_key); endKey.alloc(end_key); + seqFrom = seq_from; seqUpto = seq_upto; // Skip invalid keys. @@ -64,7 +67,7 @@ Status MemTable::Iterator::init(const MemTable* m_table, } // Allow tombstone. - if ( !rn->validKeyExist(seqUpto, true) ) { + if ( !rn->validKeyExist(seqFrom, seqUpto, true) ) { // No record belongs to the current snapshot (iterator), // move further. cursor = skiplist_next(mTable->idxByKey, cursor); @@ -132,7 +135,7 @@ Status MemTable::Iterator::get(Record& rec_out) { if (!cursor) return Status::KEY_NOT_FOUND; if (type == BY_KEY) { RecNode* node = _get_entry(cursor, RecNode, snode); - Record* rec = node->getLatestRecord(seqUpto); + Record* rec = node->getLatestRecord(seqFrom, seqUpto); assert(rec); rec_out = *rec; @@ -163,7 +166,7 @@ Status MemTable::Iterator::prev(bool allow_tombstone) { return Status::OUT_OF_RANGE; } - if ( !rn->validKeyExist(seqUpto, allow_tombstone) ) { + if ( !rn->validKeyExist(seqFrom, seqUpto, allow_tombstone) ) { prev_node = skiplist_prev(mTable->idxByKey, prev_node); skiplist_release_node(&rn->snode); continue; @@ -209,7 +212,7 @@ Status MemTable::Iterator::next(bool allow_tombstone) { return Status::OUT_OF_RANGE; } - if ( !rn->validKeyExist(seqUpto, allow_tombstone) ) { + if ( !rn->validKeyExist(seqFrom, seqUpto, allow_tombstone) ) { // No record belongs to the current snapshot (iterator), // move further. next_node = skiplist_next(mTable->idxByKey, next_node); @@ -320,7 +323,7 @@ skiplist_node* MemTable::Iterator::findFirstValidNode(skiplist_node* seek_node, return nullptr; } - if ( !rn->validKeyExist(seqUpto, true) ) { + if ( !rn->validKeyExist(seqFrom, seqUpto, true) ) { // No record belongs to the current snapshot (iterator), // move further. seek_node = skiplist_next(mTable->idxByKey, seek_node); @@ -340,7 +343,7 @@ skiplist_node* MemTable::Iterator::findFirstValidNode(skiplist_node* seek_node, return nullptr; } - if ( !rn->validKeyExist(seqUpto, true) ) { + if ( !rn->validKeyExist(seqFrom, seqUpto, true) ) { // No record belongs to the current snapshot (iterator), // move further. seek_node = skiplist_prev(mTable->idxByKey, seek_node); diff --git a/tests/jungle/key_itr_test.cc b/tests/jungle/key_itr_test.cc index 5a13d87..8a13105 100644 --- a/tests/jungle/key_itr_test.cc +++ b/tests/jungle/key_itr_test.cc @@ -175,7 +175,7 @@ int itr_key_purge() { CHK_Z(db->flushLogs(jungle::FlushOptions())); // Update even number KV pairs. - int seq_count = n; + int seq_count = n + 1; std::vector kv2(n); CHK_Z(_init_kv_pairs(n, kv2, "key", "value2_")); for (int ii=0; iiflushLogs(jungle::FlushOptions(), 1000) ); // Scan all logs which will cause burst memtable load. - for (size_t ii=1001; iigetSN(ii+1, kv_out) ); - TestSuite::sleep_ms(1); + { + TestSuite::WorkloadGenerator wg(1000); + for (size_t ii=1001; iigetSN(ii+1, kv_out) ); + + size_t todo = wg.getNumOpsToDo(); + if (!todo) { + TestSuite::sleep_ms(1); + } + wg.addNumOpsDone(1); + } } jungle::DBStats stats; @@ -1007,11 +1015,19 @@ int immediate_log_purging_test() { CHK_Z( jungle::DB::open(&db, filename, config) ); // Do the same thing. - for (size_t ii=1001; iigetSN(ii+1, kv_out) ); - TestSuite::sleep_ms(1); + { + TestSuite::WorkloadGenerator wg(1000); + for (size_t ii=1001; iigetSN(ii+1, kv_out) ); + + size_t todo = wg.getNumOpsToDo(); + if (!todo) { + TestSuite::sleep_ms(1); + } + wg.addNumOpsDone(1); + } } db->getStats(stats); diff --git a/tests/unit/memtable_test.cc b/tests/unit/memtable_test.cc index 1b8d464..b58c4c2 100644 --- a/tests/unit/memtable_test.cc +++ b/tests/unit/memtable_test.cc @@ -66,7 +66,7 @@ int memtable_key_itr_test() { } MemTable::Iterator m_itr; - CHK_Z(m_itr.init(&mt, SizedBuf(), SizedBuf(), NOT_INITIALIZED)); + CHK_Z(m_itr.init(&mt, SizedBuf(), SizedBuf(), NOT_INITIALIZED, NOT_INITIALIZED)); size_t count = 0; @@ -132,7 +132,7 @@ int memtable_key_itr_chk_test() { MemTable::Iterator m_itr; // Iterator on snapshot upto 2. - CHK_Z(m_itr.init(&mt, SizedBuf(), SizedBuf(), 2)); + CHK_Z(m_itr.init(&mt, SizedBuf(), SizedBuf(), NOT_INITIALIZED, 2)); size_t count = 0; @@ -372,7 +372,7 @@ int memtable_itr_seek_beyond_seq_test() { MemTable::Iterator m_itr; // Iterator on snapshot upto 500. - CHK_Z(m_itr.init(&mt, SizedBuf(), SizedBuf(), 500)); + CHK_Z(m_itr.init(&mt, SizedBuf(), SizedBuf(), NOT_INITIALIZED, 500)); // Goto end. m_itr.gotoEnd();