Skip to content

Commit

Permalink
Log iterator should not return records already flushed (#161)
Browse files Browse the repository at this point in the history
* Log iterator should not return records whose sequence number is
smaller than the last flushed one. Otherwise, it may result in
returning stale data, especially when the old data still resides
in the log file while the flushed data is already compacted and
removed from table file.
  • Loading branch information
greensky00 authored Dec 19, 2023
1 parent 1470ed7 commit c453a1b
Show file tree
Hide file tree
Showing 9 changed files with 65 additions and 32 deletions.
3 changes: 2 additions & 1 deletion src/log_file.cc
Original file line number Diff line number Diff line change
Expand Up @@ -798,11 +798,12 @@ LogFile::Iterator::~Iterator() {}
Status LogFile::Iterator::init(LogFile* l_file,
const SizedBuf& start_key,
const SizedBuf& end_key,
const uint64_t seq_from,
const uint64_t seq_upto)
{
lFile = l_file;
lFile->touch();
return mItr.init(lFile->mTable, start_key, end_key, seq_upto);
return mItr.init(lFile->mTable, start_key, end_key, seq_from, seq_upto);
}

Status LogFile::Iterator::initSN(LogFile* l_file,
Expand Down
1 change: 1 addition & 0 deletions src/log_file.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ class LogFile {
Status init(LogFile* l_file,
const SizedBuf& start_key,
const SizedBuf& end_key,
const uint64_t seq_from,
const uint64_t seq_upto);
Status initSN(LogFile* l_file,
const uint64_t min_seq,
Expand Down
2 changes: 1 addition & 1 deletion src/log_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ void LogMgr::Iterator::addLogFileItr(LogFileInfo* l_info, bool is_log_store_snap
if (type == BY_SEQ) {
l_itr->initSN(l_info->file, minSeqSnap, maxSeqSnap, is_log_store_snapshot);
} else if (type == BY_KEY) {
l_itr->init(l_info->file, startKey, endKey, maxSeqSnap);
l_itr->init(l_info->file, startKey, endKey, minSeqSnap, maxSeqSnap);
}

ItrItem* ctx = new ItrItem();
Expand Down
21 changes: 13 additions & 8 deletions src/memtable.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,18 @@ int MemTable::RecNode::cmp(skiplist_node *a, skiplist_node *b, void *aux) {
return SizedBuf::cmp(aa->key, bb->key);
}

Record* MemTable::RecNode::getLatestRecord(const uint64_t chk) {
Record* MemTable::RecNode::getLatestRecord( const uint64_t seq_from,
const uint64_t seq_upto )
{
mGuard l(recListLock);
Record* rec = nullptr;
auto entry = recList->rbegin();
while (entry != recList->rend()) {
Record* tmp = *entry;
if (!valid_number(chk) || tmp->seqNum <= chk) {
rec = tmp;
if (!valid_number(seq_upto) || tmp->seqNum <= seq_upto) {
if (!valid_number(seq_from) || tmp->seqNum >= seq_from) {
rec = tmp;
}
break;
}
entry++;
Expand Down Expand Up @@ -109,18 +113,19 @@ uint64_t MemTable::RecNode::getMinSeq() {
return rec->seqNum;
}

bool MemTable::RecNode::validKeyExist( const uint64_t chk,
bool MemTable::RecNode::validKeyExist( const uint64_t seq_from,
const uint64_t seq_upto,
bool allow_tombstone )
{
bool valid_key_exist = true;

if (getMinSeq() > chk) {
if (getMinSeq() > seq_upto) {
// No record belongs to the current snapshot (iterator),
valid_key_exist = false;
}

if (valid_key_exist) {
Record* rec = getLatestRecord(chk);
Record* rec = getLatestRecord(seq_from, seq_upto);
if (!rec) {
// All records in the list may have
// higher sequence number than given `chk`.
Expand Down Expand Up @@ -655,7 +660,7 @@ Status MemTable::getRecordByKeyInternal(const uint64_t chk,
}
}

Record* rec_ret = node->getLatestRecord(chk);
Record* rec_ret = node->getLatestRecord(NOT_INITIALIZED, chk);
if (!rec_ret) {
skiplist_release_node(&node->snode);
return Status::KEY_NOT_FOUND;
Expand Down Expand Up @@ -714,7 +719,7 @@ Status MemTable::getRecordsByPrefix(const uint64_t chk,
};

while ( is_prefix_match(node->key) ) {
Record* rec_ret = node->getLatestRecord(chk);
Record* rec_ret = node->getLatestRecord(NOT_INITIALIZED, chk);
if (!rec_ret) {
skiplist_release_node(&node->snode);
return Status::KEY_NOT_FOUND;
Expand Down
11 changes: 9 additions & 2 deletions src/memtable.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ class MemTable {
Status init(const MemTable* m_table,
const SizedBuf& start_key,
const SizedBuf& end_key,
const uint64_t seq_from,
const uint64_t seq_upto);
Status initSN(const MemTable* m_table,
const uint64_t min_seq,
Expand Down Expand Up @@ -161,6 +162,11 @@ class MemTable {
uint64_t maxSeq;
SizedBuf startKey;
SizedBuf endKey;

// (Key-iterator only) min allowed sequence number (inclusive).
uint64_t seqFrom;

// (Key-iterator only) max allowed sequence number (inclusive).
uint64_t seqUpto;
};

Expand All @@ -177,10 +183,11 @@ class MemTable {
~RecNode();

static int cmp(skiplist_node *a, skiplist_node *b, void *aux);
Record* getLatestRecord(const uint64_t chk);
Record* getLatestRecord(const uint64_t seq_from, const uint64_t seq_upto);
std::list<Record*> discardRecords(uint64_t seq_begin);
uint64_t getMinSeq();
bool validKeyExist(const uint64_t chk,
bool validKeyExist(const uint64_t seq_from,
const uint64_t seq_upto,
bool allow_tombstone = false);

skiplist_node snode;
Expand Down
15 changes: 9 additions & 6 deletions src/memtable_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ MemTable::Iterator::Iterator()
, cursor(nullptr)
, minSeq(NOT_INITIALIZED)
, maxSeq(NOT_INITIALIZED)
, seqFrom(NOT_INITIALIZED)
, seqUpto(NOT_INITIALIZED)
{}

Expand All @@ -33,6 +34,7 @@ MemTable::Iterator::~Iterator() {
Status MemTable::Iterator::init(const MemTable* m_table,
const SizedBuf& start_key,
const SizedBuf& end_key,
const uint64_t seq_from,
const uint64_t seq_upto)
{
mTable = m_table;
Expand All @@ -51,6 +53,7 @@ Status MemTable::Iterator::init(const MemTable* m_table,

startKey.alloc(start_key);
endKey.alloc(end_key);
seqFrom = seq_from;
seqUpto = seq_upto;

// Skip invalid keys.
Expand All @@ -64,7 +67,7 @@ Status MemTable::Iterator::init(const MemTable* m_table,
}

// Allow tombstone.
if ( !rn->validKeyExist(seqUpto, true) ) {
if ( !rn->validKeyExist(seqFrom, seqUpto, true) ) {
// No record belongs to the current snapshot (iterator),
// move further.
cursor = skiplist_next(mTable->idxByKey, cursor);
Expand Down Expand Up @@ -132,7 +135,7 @@ Status MemTable::Iterator::get(Record& rec_out) {
if (!cursor) return Status::KEY_NOT_FOUND;
if (type == BY_KEY) {
RecNode* node = _get_entry(cursor, RecNode, snode);
Record* rec = node->getLatestRecord(seqUpto);
Record* rec = node->getLatestRecord(seqFrom, seqUpto);
assert(rec);
rec_out = *rec;

Expand Down Expand Up @@ -163,7 +166,7 @@ Status MemTable::Iterator::prev(bool allow_tombstone) {
return Status::OUT_OF_RANGE;
}

if ( !rn->validKeyExist(seqUpto, allow_tombstone) ) {
if ( !rn->validKeyExist(seqFrom, seqUpto, allow_tombstone) ) {
prev_node = skiplist_prev(mTable->idxByKey, prev_node);
skiplist_release_node(&rn->snode);
continue;
Expand Down Expand Up @@ -209,7 +212,7 @@ Status MemTable::Iterator::next(bool allow_tombstone) {
return Status::OUT_OF_RANGE;
}

if ( !rn->validKeyExist(seqUpto, allow_tombstone) ) {
if ( !rn->validKeyExist(seqFrom, seqUpto, allow_tombstone) ) {
// No record belongs to the current snapshot (iterator),
// move further.
next_node = skiplist_next(mTable->idxByKey, next_node);
Expand Down Expand Up @@ -320,7 +323,7 @@ skiplist_node* MemTable::Iterator::findFirstValidNode(skiplist_node* seek_node,
return nullptr;
}

if ( !rn->validKeyExist(seqUpto, true) ) {
if ( !rn->validKeyExist(seqFrom, seqUpto, true) ) {
// No record belongs to the current snapshot (iterator),
// move further.
seek_node = skiplist_next(mTable->idxByKey, seek_node);
Expand All @@ -340,7 +343,7 @@ skiplist_node* MemTable::Iterator::findFirstValidNode(skiplist_node* seek_node,
return nullptr;
}

if ( !rn->validKeyExist(seqUpto, true) ) {
if ( !rn->validKeyExist(seqFrom, seqUpto, true) ) {
// No record belongs to the current snapshot (iterator),
// move further.
seek_node = skiplist_prev(mTable->idxByKey, seek_node);
Expand Down
2 changes: 1 addition & 1 deletion tests/jungle/key_itr_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ int itr_key_purge() {
CHK_Z(db->flushLogs(jungle::FlushOptions()));

// Update even number KV pairs.
int seq_count = n;
int seq_count = n + 1;
std::vector<jungle::KV> kv2(n);
CHK_Z(_init_kv_pairs(n, kv2, "key", "value2_"));
for (int ii=0; ii<n; ii+=2) {
Expand Down
36 changes: 26 additions & 10 deletions tests/jungle/log_reclaim_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -988,11 +988,19 @@ int immediate_log_purging_test() {
CHK_Z( db->flushLogs(jungle::FlushOptions(), 1000) );

// Scan all logs which will cause burst memtable load.
for (size_t ii=1001; ii<NUM; ++ii) {
jungle::KV kv_out;
jungle::KV::Holder h(kv_out);
CHK_Z( db->getSN(ii+1, kv_out) );
TestSuite::sleep_ms(1);
{
TestSuite::WorkloadGenerator wg(1000);
for (size_t ii=1001; ii<NUM; ++ii) {
jungle::KV kv_out;
jungle::KV::Holder h(kv_out);
CHK_Z( db->getSN(ii+1, kv_out) );

size_t todo = wg.getNumOpsToDo();
if (!todo) {
TestSuite::sleep_ms(1);
}
wg.addNumOpsDone(1);
}
}

jungle::DBStats stats;
Expand All @@ -1007,11 +1015,19 @@ int immediate_log_purging_test() {
CHK_Z( jungle::DB::open(&db, filename, config) );

// Do the same thing.
for (size_t ii=1001; ii<NUM; ++ii) {
jungle::KV kv_out;
jungle::KV::Holder h(kv_out);
CHK_Z( db->getSN(ii+1, kv_out) );
TestSuite::sleep_ms(1);
{
TestSuite::WorkloadGenerator wg(1000);
for (size_t ii=1001; ii<NUM; ++ii) {
jungle::KV kv_out;
jungle::KV::Holder h(kv_out);
CHK_Z( db->getSN(ii+1, kv_out) );

size_t todo = wg.getNumOpsToDo();
if (!todo) {
TestSuite::sleep_ms(1);
}
wg.addNumOpsDone(1);
}
}

db->getStats(stats);
Expand Down
6 changes: 3 additions & 3 deletions tests/unit/memtable_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ int memtable_key_itr_test() {
}

MemTable::Iterator m_itr;
CHK_Z(m_itr.init(&mt, SizedBuf(), SizedBuf(), NOT_INITIALIZED));
CHK_Z(m_itr.init(&mt, SizedBuf(), SizedBuf(), NOT_INITIALIZED, NOT_INITIALIZED));

size_t count = 0;

Expand Down Expand Up @@ -132,7 +132,7 @@ int memtable_key_itr_chk_test() {

MemTable::Iterator m_itr;
// Iterator on snapshot upto 2.
CHK_Z(m_itr.init(&mt, SizedBuf(), SizedBuf(), 2));
CHK_Z(m_itr.init(&mt, SizedBuf(), SizedBuf(), NOT_INITIALIZED, 2));

size_t count = 0;

Expand Down Expand Up @@ -372,7 +372,7 @@ int memtable_itr_seek_beyond_seq_test() {

MemTable::Iterator m_itr;
// Iterator on snapshot upto 500.
CHK_Z(m_itr.init(&mt, SizedBuf(), SizedBuf(), 500));
CHK_Z(m_itr.init(&mt, SizedBuf(), SizedBuf(), NOT_INITIALIZED, 500));

// Goto end.
m_itr.gotoEnd();
Expand Down

0 comments on commit c453a1b

Please sign in to comment.