Skip to content

Commit

Permalink
Fix to the bug of getRecordsByPrefix (#156)
Browse files Browse the repository at this point in the history
* If there are more than one matching records in the same log file,
and some of them are already flushed to the table section, the
remaining unflushed records will be ignored even though their prefix
matches.

* Fixed the logic in memtable.
  • Loading branch information
greensky00 authored Aug 22, 2023
1 parent c74a6c9 commit a29a8d7
Show file tree
Hide file tree
Showing 5 changed files with 396 additions and 308 deletions.
1 change: 1 addition & 0 deletions scripts/runtests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ $VALGRIND ./tests/memtable_test --abort-on-failure
$VALGRIND ./tests/table_lookup_booster_test --abort-on-failure

$VALGRIND ./tests/basic_op_test --abort-on-failure
$VALGRIND ./tests/nearest_search_test --abort-on-failure
$VALGRIND ./tests/seq_itr_test --abort-on-failure
$VALGRIND ./tests/key_itr_test --abort-on-failure
$VALGRIND ./tests/snapshot_test --abort-on-failure
Expand Down
42 changes: 25 additions & 17 deletions src/memtable.cc
Original file line number Diff line number Diff line change
Expand Up @@ -719,24 +719,32 @@ Status MemTable::getRecordsByPrefix(const uint64_t chk,
skiplist_release_node(&node->snode);
return Status::KEY_NOT_FOUND;
}
if ( valid_number(flushedSeqNum) &&
rec_ret->seqNum <= flushedSeqNum ) {
// Already purged KV pair, go to table.
if (!allow_flushed_log) {
skiplist_release_node(&node->snode);
return Status::KEY_NOT_FOUND;
} // Tolerate if this is snapshot.
}

if ( !allow_tombstone && rec_ret->isDel() ) {
// Last operation is deletion.
skiplist_release_node(&node->snode);
return Status::KEY_NOT_FOUND;
}
SearchCbDecision dec = cb_func({*rec_ret});
if (dec == SearchCbDecision::STOP) {
skiplist_release_node(&node->snode);
return Status::OPERATION_STOPPED;
bool found = true;
// Dummy loop to use `break`.
do {
if ( valid_number(flushedSeqNum) &&
rec_ret->seqNum <= flushedSeqNum ) {
// Already purged KV pair, go to the next matching key.
if (!allow_flushed_log) {
found = false;
break;
} // Tolerate if this is snapshot.
}

if ( !allow_tombstone && rec_ret->isDel() ) {
// Last operation is deletion, go to the next matching key.
found = false;
break;
}
} while (false);

if (found) {
SearchCbDecision dec = cb_func({*rec_ret});
if (dec == SearchCbDecision::STOP) {
skiplist_release_node(&node->snode);
return Status::OPERATION_STOPPED;
}
}

cursor = skiplist_next(idxByKey, &node->snode);
Expand Down
5 changes: 5 additions & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ add_executable(basic_op_test ${BASIC_OP_TEST})
target_link_libraries(basic_op_test ${JUNGLE_TEST_DEPS})
add_dependencies(basic_op_test static_lib)

set(NEAREST_SEARCH_TEST ${TEST_DIR}/jungle/nearest_search_test.cc)
add_executable(nearest_search_test ${NEAREST_SEARCH_TEST})
target_link_libraries(nearest_search_test ${JUNGLE_TEST_DEPS})
add_dependencies(nearest_search_test static_lib)

set(BUILDER_TEST ${TEST_DIR}/jungle/builder_test.cc)
add_executable(builder_test ${BUILDER_TEST})
target_link_libraries(builder_test ${JUNGLE_TEST_DEPS})
Expand Down
291 changes: 0 additions & 291 deletions tests/jungle/basic_op_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2433,292 +2433,6 @@ int discard_test(PARAM_BASE) {
return 0;
}

int get_nearest_test(int flush_options) {
// flush_options:
// 0: all in the log.
// 1: 3/4 in the log and 1/4 in the table.
// 2: 1/2 in the log and 1/2 in the table.
// 3: 1/4 in the log and 3/4 in the table.
// 4: all in the table.

std::string filename;
TEST_SUITE_PREPARE_PATH(filename);

jungle::Status s;
jungle::DBConfig config;
TEST_CUSTOM_DB_CONFIG(config);
jungle::DB* db;

config.maxEntriesInLogFile = 20;
CHK_Z(jungle::DB::open(&db, filename, config));

const size_t NUM = 100;

// Shuffle (0 -- 99).
std::vector<size_t> idx_arr(NUM);
std::iota(idx_arr.begin(), idx_arr.end(), 0);
/*
std::random_device rd;
std::mt19937 g(rd());
std::shuffle(idx_arr.begin(), idx_arr.end(), g);
*/
for (size_t ii = 0; ii < NUM; ++ii) {
size_t jj = std::rand() % NUM;
std::swap(idx_arr[ii], idx_arr[jj]);
}

for (size_t kk = 0; kk < 2; ++kk) {
for (size_t ii = 0; ii < NUM; ++ii) {
size_t idx = idx_arr[ii] * 10;
std::string key_str = "key" + TestSuite::lzStr(5, idx);
std::string val_str = "val" + TestSuite::lzStr(2, kk) +
"_" + TestSuite::lzStr(5, idx);
CHK_Z( db->set( jungle::KV(key_str, val_str) ) );

if ( (kk == 0 && ii == NUM/2 - 1 && flush_options == 1) ||
(kk == 1 && ii == NUM/2 - 1 && flush_options == 3) ) {
CHK_Z( db->sync(false) );
CHK_Z( db->flushLogs() );
}
}
CHK_Z( db->sync(false) );
if (kk == 0 && flush_options == 2) {
CHK_Z( db->flushLogs() );
}
}
CHK_Z( db->sync(false) );
if (flush_options == 4) {
CHK_Z( db->flushLogs() );
}

auto verify_func = [&]( size_t ii,
jungle::SearchOptions s_opt,
bool exact_query ) -> int {
TestSuite::setInfo("ii %zu, s_opt %d, exact_query %d",
ii, s_opt, exact_query);

int64_t idx = exact_query ? ii * 10 : ii * 10 + (s_opt.isGreater() ? 1 : -1);
int64_t exp_idx = 0;

if (exact_query) {
if (s_opt.isExactMatchAllowed()) {
exp_idx = idx;
} else {
exp_idx = (ii + (s_opt.isGreater() ? 1 : -1)) * 10;
}
} else {
exp_idx = (ii + (s_opt.isGreater() ? 1 : -1)) * 10;
}

std::string key_str;
if (idx >= 0) {
key_str = "key" + TestSuite::lzStr(5, idx);
} else {
key_str = "000";
}
std::string val_str = "val" + TestSuite::lzStr(2, 1) +
"_" + TestSuite::lzStr(5, exp_idx);

jungle::Record rec_out;
jungle::Record::Holder h_rec_out(rec_out);
s = db->getNearestRecordByKey(jungle::SizedBuf(key_str), rec_out, s_opt);
if (s_opt == jungle::SearchOptions::EQUAL) {
// EQUAL should be denied.
CHK_NOT(s);
return 0;
}

bool exp_succ = false;
if (exact_query) {
if ( s_opt.isExactMatchAllowed() ||
(s_opt.isGreater() && ii < NUM - 1) ||
(s_opt.isSmaller() && ii > 0) ) {
exp_succ = true;
}
} else {
if ( (s_opt.isGreater() && ii < NUM - 1) ||
(s_opt.isSmaller() && ii > 0) ) {
exp_succ = true;
}
}

if (exp_succ) {
CHK_Z(s);
CHK_EQ( jungle::SizedBuf(val_str), rec_out.kv.value );
} else {
CHK_NOT(s);
}
return 0;
};

using jungle::SearchOptions;
for (size_t ii = 0; ii < NUM; ++ii) {
for (bool exact_query: {true, false}) {
for (SearchOptions s_opt: { SearchOptions::GREATER_OR_EQUAL,
SearchOptions::GREATER,
SearchOptions::SMALLER_OR_EQUAL,
SearchOptions::SMALLER,
SearchOptions::EQUAL }) {
CHK_Z( verify_func(ii, s_opt, exact_query) );
}
}
}

CHK_Z(jungle::DB::close(db));
CHK_Z(jungle::shutdown());

TEST_SUITE_CLEANUP_PATH();
return 0;
}

int get_by_prefix_test(size_t hash_len) {
std::string filename;
TEST_SUITE_PREPARE_PATH(filename);

jungle::Status s;
jungle::DBConfig config;
TEST_CUSTOM_DB_CONFIG(config);
jungle::DB* db;

config.maxEntriesInLogFile = 20;
config.customLenForHash = [hash_len](const jungle::HashKeyLenParams& p) -> size_t {
return hash_len;
};
config.bloomFilterBitsPerUnit = 10;
CHK_Z(jungle::DB::open(&db, filename, config));

const size_t NUM = 100;

// Shuffle (0 -- 99).
std::vector<size_t> idx_arr(NUM);
std::iota(idx_arr.begin(), idx_arr.end(), 0);
#if 0
std::random_device rd;
std::mt19937 g(rd());
std::shuffle(idx_arr.begin(), idx_arr.end(), g);
#else
for (size_t ii = 0; ii < NUM; ++ii) {
size_t jj = std::rand() % NUM;
std::swap(idx_arr[ii], idx_arr[jj]);
}
#endif

const size_t ROUND_MAX = 10;
for (size_t round = 0; round < ROUND_MAX; ++round) {
for (size_t ii = 0; ii < NUM; ++ii) {
size_t idx = idx_arr[ii];
std::string key_str = "key" + TestSuite::lzStr(5, idx) + "_" +
TestSuite::lzStr(2, ROUND_MAX - round);
std::string val_str = "val" + TestSuite::lzStr(5, idx) + "_" +
TestSuite::lzStr(2, ROUND_MAX - round);
CHK_Z( db->set( jungle::KV(key_str, val_str) ) );
}
CHK_Z( db->sync(false) );
if (round == 0) {
CHK_Z( db->flushLogs() );
} else if (round >= 1 && round % 2 == 0 && round < ROUND_MAX - 2) {
for (size_t hh = 0; hh < config.numL0Partitions; ++hh) {
CHK_Z( db->compactL0(jungle::CompactOptions(), hh) );
}
CHK_Z( db->flushLogs() );
}
}

// Get all records with the given prefix.
for (size_t ii = 0; ii < NUM; ++ii) {
TestSuite::setInfo("ii = %zu", ii);
size_t idx = ii;
std::string prefix_str = "key" + TestSuite::lzStr(5, idx);
std::list<jungle::Record> recs_out;
auto cb_func = [&](const jungle::SearchCbParams& params) ->
jungle::SearchCbDecision {
jungle::Record dst;
params.rec.copyTo(dst);
recs_out.push_back(dst);
return jungle::SearchCbDecision::NEXT;
};
CHK_Z( db->getRecordsByPrefix(jungle::SizedBuf(prefix_str), cb_func) );

CHK_EQ(ROUND_MAX, recs_out.size());
for (jungle::Record& rec: recs_out) rec.free();
}

// Same but stop in the middle.
const size_t STOP_AT = 5;
for (size_t ii = 0; ii < NUM; ++ii) {
TestSuite::setInfo("ii = %zu", ii);
size_t idx = ii;
std::string prefix_str = "key" + TestSuite::lzStr(5, idx);
std::list<jungle::Record> recs_out;
auto cb_func = [&](const jungle::SearchCbParams& params) ->
jungle::SearchCbDecision {
jungle::Record dst;
params.rec.copyTo(dst);
recs_out.push_back(dst);
if (recs_out.size() >= STOP_AT) return jungle::SearchCbDecision::STOP;
return jungle::SearchCbDecision::NEXT;
};
CHK_Z( db->getRecordsByPrefix(jungle::SizedBuf(prefix_str), cb_func) );

CHK_EQ(STOP_AT, recs_out.size());
for (jungle::Record& rec: recs_out) rec.free();
}

// Exact match should work.
for (size_t round = 0; round < ROUND_MAX; ++round) {
for (size_t ii = 0; ii < NUM; ++ii) {
TestSuite::setInfo("round = %zu, ii = %zu", round, ii);
size_t idx = idx_arr[ii];
std::string key_str = "key" + TestSuite::lzStr(5, idx) + "_" +
TestSuite::lzStr(2, ROUND_MAX - round);
std::string val_str = "val" + TestSuite::lzStr(5, idx) + "_" +
TestSuite::lzStr(2, ROUND_MAX - round);
jungle::SizedBuf value_out;
jungle::SizedBuf::Holder h(value_out);
CHK_Z( db->get( jungle::SizedBuf(key_str), value_out ) );
CHK_EQ( val_str, value_out.toString() );
}
}

{ // Insert a record with new prefix, and keep it in log section.
std::string key_str = "key_newprefix";
std::string val_str = "val_newprefix";
CHK_Z( db->set( jungle::KV(key_str, val_str) ) );
CHK_Z( db->sync(false) );

// Search keys with the new prefix, the API should return OK.
bool cb_invoked = false;
auto cb_func = [&](const jungle::SearchCbParams& params) ->
jungle::SearchCbDecision {
cb_invoked = true;
return jungle::SearchCbDecision::NEXT;
};
CHK_Z( db->getRecordsByPrefix(jungle::SizedBuf(key_str), cb_func) );
CHK_TRUE( cb_invoked );

// Flush it to table, now log section is empty.
CHK_Z( db->flushLogs() );

cb_invoked = false;
CHK_Z( db->getRecordsByPrefix(jungle::SizedBuf(key_str), cb_func) );
CHK_TRUE( cb_invoked );

// Find non-existing key.
std::string ne_key_str = "non_existing_key";
cb_invoked = false;
s = db->getRecordsByPrefix(jungle::SizedBuf(ne_key_str), cb_func);
CHK_EQ( jungle::Status::KEY_NOT_FOUND, s.getValue() );
CHK_FALSE( cb_invoked );

}

CHK_Z(jungle::DB::close(db));
CHK_Z(jungle::shutdown());

TEST_SUITE_CLEANUP_PATH();
return 0;
}

int kmv_get_memory_test() {
std::string filename;
TEST_SUITE_PREPARE_PATH(filename);
Expand Down Expand Up @@ -3346,11 +3060,6 @@ int main(int argc, char** argv) {
SET_PARAMS(discard_test_args);
ts.doTest("discard dirty test", discard_test, discard_test_args);

ts.doTest("get nearest test",
get_nearest_test,
TestRange<int>(0, 4, 1, StepType::LINEAR));
ts.doTest("get by prefix test",
get_by_prefix_test, TestRange<size_t>( {0, 8, 16} ));
ts.doTest("kmv get memory test", kmv_get_memory_test);
ts.doTest("immediate purging test", immediate_purging_test);
ts.doTest("compaction by fast scan test", compaction_by_fast_scan_test);
Expand Down
Loading

0 comments on commit a29a8d7

Please sign in to comment.