Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log iterator should not return records already flushed #161

Merged
merged 1 commit into from
Dec 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/log_file.cc
Original file line number Diff line number Diff line change
Expand Up @@ -798,11 +798,12 @@ LogFile::Iterator::~Iterator() {}
Status LogFile::Iterator::init(LogFile* l_file,
const SizedBuf& start_key,
const SizedBuf& end_key,
const uint64_t seq_from,
const uint64_t seq_upto)
{
lFile = l_file;
lFile->touch();
return mItr.init(lFile->mTable, start_key, end_key, seq_upto);
return mItr.init(lFile->mTable, start_key, end_key, seq_from, seq_upto);
}

Status LogFile::Iterator::initSN(LogFile* l_file,
Expand Down
1 change: 1 addition & 0 deletions src/log_file.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ class LogFile {
Status init(LogFile* l_file,
const SizedBuf& start_key,
const SizedBuf& end_key,
const uint64_t seq_from,
const uint64_t seq_upto);
Status initSN(LogFile* l_file,
const uint64_t min_seq,
Expand Down
2 changes: 1 addition & 1 deletion src/log_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ void LogMgr::Iterator::addLogFileItr(LogFileInfo* l_info, bool is_log_store_snap
if (type == BY_SEQ) {
l_itr->initSN(l_info->file, minSeqSnap, maxSeqSnap, is_log_store_snapshot);
} else if (type == BY_KEY) {
l_itr->init(l_info->file, startKey, endKey, maxSeqSnap);
l_itr->init(l_info->file, startKey, endKey, minSeqSnap, maxSeqSnap);
}

ItrItem* ctx = new ItrItem();
Expand Down
21 changes: 13 additions & 8 deletions src/memtable.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,18 @@ int MemTable::RecNode::cmp(skiplist_node *a, skiplist_node *b, void *aux) {
return SizedBuf::cmp(aa->key, bb->key);
}

Record* MemTable::RecNode::getLatestRecord(const uint64_t chk) {
Record* MemTable::RecNode::getLatestRecord( const uint64_t seq_from,
const uint64_t seq_upto )
{
mGuard l(recListLock);
Record* rec = nullptr;
auto entry = recList->rbegin();
while (entry != recList->rend()) {
Record* tmp = *entry;
if (!valid_number(chk) || tmp->seqNum <= chk) {
rec = tmp;
if (!valid_number(seq_upto) || tmp->seqNum <= seq_upto) {
if (!valid_number(seq_from) || tmp->seqNum >= seq_from) {
rec = tmp;
}
break;
}
entry++;
Expand Down Expand Up @@ -109,18 +113,19 @@ uint64_t MemTable::RecNode::getMinSeq() {
return rec->seqNum;
}

bool MemTable::RecNode::validKeyExist( const uint64_t chk,
bool MemTable::RecNode::validKeyExist( const uint64_t seq_from,
const uint64_t seq_upto,
bool allow_tombstone )
{
bool valid_key_exist = true;

if (getMinSeq() > chk) {
if (getMinSeq() > seq_upto) {
// No record belongs to the current snapshot (iterator),
valid_key_exist = false;
}

if (valid_key_exist) {
Record* rec = getLatestRecord(chk);
Record* rec = getLatestRecord(seq_from, seq_upto);
if (!rec) {
// All records in the list may have
// higher sequence number than given `chk`.
Expand Down Expand Up @@ -655,7 +660,7 @@ Status MemTable::getRecordByKeyInternal(const uint64_t chk,
}
}

Record* rec_ret = node->getLatestRecord(chk);
Record* rec_ret = node->getLatestRecord(NOT_INITIALIZED, chk);
if (!rec_ret) {
skiplist_release_node(&node->snode);
return Status::KEY_NOT_FOUND;
Expand Down Expand Up @@ -714,7 +719,7 @@ Status MemTable::getRecordsByPrefix(const uint64_t chk,
};

while ( is_prefix_match(node->key) ) {
Record* rec_ret = node->getLatestRecord(chk);
Record* rec_ret = node->getLatestRecord(NOT_INITIALIZED, chk);
if (!rec_ret) {
skiplist_release_node(&node->snode);
return Status::KEY_NOT_FOUND;
Expand Down
11 changes: 9 additions & 2 deletions src/memtable.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ class MemTable {
Status init(const MemTable* m_table,
const SizedBuf& start_key,
const SizedBuf& end_key,
const uint64_t seq_from,
const uint64_t seq_upto);
Status initSN(const MemTable* m_table,
const uint64_t min_seq,
Expand Down Expand Up @@ -161,6 +162,11 @@ class MemTable {
uint64_t maxSeq;
SizedBuf startKey;
SizedBuf endKey;

// (Key-iterator only) min allowed sequence number (inclusive).
uint64_t seqFrom;

// (Key-iterator only) max allowed sequence number (inclusive).
uint64_t seqUpto;
};

Expand All @@ -177,10 +183,11 @@ class MemTable {
~RecNode();

static int cmp(skiplist_node *a, skiplist_node *b, void *aux);
Record* getLatestRecord(const uint64_t chk);
Record* getLatestRecord(const uint64_t seq_from, const uint64_t seq_upto);
std::list<Record*> discardRecords(uint64_t seq_begin);
uint64_t getMinSeq();
bool validKeyExist(const uint64_t chk,
bool validKeyExist(const uint64_t seq_from,
const uint64_t seq_upto,
bool allow_tombstone = false);

skiplist_node snode;
Expand Down
15 changes: 9 additions & 6 deletions src/memtable_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ MemTable::Iterator::Iterator()
, cursor(nullptr)
, minSeq(NOT_INITIALIZED)
, maxSeq(NOT_INITIALIZED)
, seqFrom(NOT_INITIALIZED)
, seqUpto(NOT_INITIALIZED)
{}

Expand All @@ -33,6 +34,7 @@ MemTable::Iterator::~Iterator() {
Status MemTable::Iterator::init(const MemTable* m_table,
const SizedBuf& start_key,
const SizedBuf& end_key,
const uint64_t seq_from,
const uint64_t seq_upto)
{
mTable = m_table;
Expand All @@ -51,6 +53,7 @@ Status MemTable::Iterator::init(const MemTable* m_table,

startKey.alloc(start_key);
endKey.alloc(end_key);
seqFrom = seq_from;
seqUpto = seq_upto;

// Skip invalid keys.
Expand All @@ -64,7 +67,7 @@ Status MemTable::Iterator::init(const MemTable* m_table,
}

// Allow tombstone.
if ( !rn->validKeyExist(seqUpto, true) ) {
if ( !rn->validKeyExist(seqFrom, seqUpto, true) ) {
// No record belongs to the current snapshot (iterator),
// move further.
cursor = skiplist_next(mTable->idxByKey, cursor);
Expand Down Expand Up @@ -132,7 +135,7 @@ Status MemTable::Iterator::get(Record& rec_out) {
if (!cursor) return Status::KEY_NOT_FOUND;
if (type == BY_KEY) {
RecNode* node = _get_entry(cursor, RecNode, snode);
Record* rec = node->getLatestRecord(seqUpto);
Record* rec = node->getLatestRecord(seqFrom, seqUpto);
assert(rec);
rec_out = *rec;

Expand Down Expand Up @@ -163,7 +166,7 @@ Status MemTable::Iterator::prev(bool allow_tombstone) {
return Status::OUT_OF_RANGE;
}

if ( !rn->validKeyExist(seqUpto, allow_tombstone) ) {
if ( !rn->validKeyExist(seqFrom, seqUpto, allow_tombstone) ) {
prev_node = skiplist_prev(mTable->idxByKey, prev_node);
skiplist_release_node(&rn->snode);
continue;
Expand Down Expand Up @@ -209,7 +212,7 @@ Status MemTable::Iterator::next(bool allow_tombstone) {
return Status::OUT_OF_RANGE;
}

if ( !rn->validKeyExist(seqUpto, allow_tombstone) ) {
if ( !rn->validKeyExist(seqFrom, seqUpto, allow_tombstone) ) {
// No record belongs to the current snapshot (iterator),
// move further.
next_node = skiplist_next(mTable->idxByKey, next_node);
Expand Down Expand Up @@ -320,7 +323,7 @@ skiplist_node* MemTable::Iterator::findFirstValidNode(skiplist_node* seek_node,
return nullptr;
}

if ( !rn->validKeyExist(seqUpto, true) ) {
if ( !rn->validKeyExist(seqFrom, seqUpto, true) ) {
// No record belongs to the current snapshot (iterator),
// move further.
seek_node = skiplist_next(mTable->idxByKey, seek_node);
Expand All @@ -340,7 +343,7 @@ skiplist_node* MemTable::Iterator::findFirstValidNode(skiplist_node* seek_node,
return nullptr;
}

if ( !rn->validKeyExist(seqUpto, true) ) {
if ( !rn->validKeyExist(seqFrom, seqUpto, true) ) {
// No record belongs to the current snapshot (iterator),
// move further.
seek_node = skiplist_prev(mTable->idxByKey, seek_node);
Expand Down
2 changes: 1 addition & 1 deletion tests/jungle/key_itr_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ int itr_key_purge() {
CHK_Z(db->flushLogs(jungle::FlushOptions()));

// Update even number KV pairs.
int seq_count = n;
int seq_count = n + 1;
std::vector<jungle::KV> kv2(n);
CHK_Z(_init_kv_pairs(n, kv2, "key", "value2_"));
for (int ii=0; ii<n; ii+=2) {
Expand Down
36 changes: 26 additions & 10 deletions tests/jungle/log_reclaim_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -988,11 +988,19 @@ int immediate_log_purging_test() {
CHK_Z( db->flushLogs(jungle::FlushOptions(), 1000) );

// Scan all logs which will cause burst memtable load.
for (size_t ii=1001; ii<NUM; ++ii) {
jungle::KV kv_out;
jungle::KV::Holder h(kv_out);
CHK_Z( db->getSN(ii+1, kv_out) );
TestSuite::sleep_ms(1);
{
TestSuite::WorkloadGenerator wg(1000);
for (size_t ii=1001; ii<NUM; ++ii) {
jungle::KV kv_out;
jungle::KV::Holder h(kv_out);
CHK_Z( db->getSN(ii+1, kv_out) );

size_t todo = wg.getNumOpsToDo();
if (!todo) {
TestSuite::sleep_ms(1);
}
wg.addNumOpsDone(1);
}
}

jungle::DBStats stats;
Expand All @@ -1007,11 +1015,19 @@ int immediate_log_purging_test() {
CHK_Z( jungle::DB::open(&db, filename, config) );

// Do the same thing.
for (size_t ii=1001; ii<NUM; ++ii) {
jungle::KV kv_out;
jungle::KV::Holder h(kv_out);
CHK_Z( db->getSN(ii+1, kv_out) );
TestSuite::sleep_ms(1);
{
TestSuite::WorkloadGenerator wg(1000);
for (size_t ii=1001; ii<NUM; ++ii) {
jungle::KV kv_out;
jungle::KV::Holder h(kv_out);
CHK_Z( db->getSN(ii+1, kv_out) );

size_t todo = wg.getNumOpsToDo();
if (!todo) {
TestSuite::sleep_ms(1);
}
wg.addNumOpsDone(1);
}
}

db->getStats(stats);
Expand Down
6 changes: 3 additions & 3 deletions tests/unit/memtable_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ int memtable_key_itr_test() {
}

MemTable::Iterator m_itr;
CHK_Z(m_itr.init(&mt, SizedBuf(), SizedBuf(), NOT_INITIALIZED));
CHK_Z(m_itr.init(&mt, SizedBuf(), SizedBuf(), NOT_INITIALIZED, NOT_INITIALIZED));

size_t count = 0;

Expand Down Expand Up @@ -132,7 +132,7 @@ int memtable_key_itr_chk_test() {

MemTable::Iterator m_itr;
// Iterator on snapshot upto 2.
CHK_Z(m_itr.init(&mt, SizedBuf(), SizedBuf(), 2));
CHK_Z(m_itr.init(&mt, SizedBuf(), SizedBuf(), NOT_INITIALIZED, 2));

size_t count = 0;

Expand Down Expand Up @@ -372,7 +372,7 @@ int memtable_itr_seek_beyond_seq_test() {

MemTable::Iterator m_itr;
// Iterator on snapshot upto 500.
CHK_Z(m_itr.init(&mt, SizedBuf(), SizedBuf(), 500));
CHK_Z(m_itr.init(&mt, SizedBuf(), SizedBuf(), NOT_INITIALIZED, 500));

// Goto end.
m_itr.gotoEnd();
Expand Down