Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix to the bug of getRecordsByPrefix #156

Merged
merged 1 commit into from
Aug 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions scripts/runtests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ $VALGRIND ./tests/memtable_test --abort-on-failure
$VALGRIND ./tests/table_lookup_booster_test --abort-on-failure

$VALGRIND ./tests/basic_op_test --abort-on-failure
$VALGRIND ./tests/nearest_search_test --abort-on-failure
$VALGRIND ./tests/seq_itr_test --abort-on-failure
$VALGRIND ./tests/key_itr_test --abort-on-failure
$VALGRIND ./tests/snapshot_test --abort-on-failure
Expand Down
42 changes: 25 additions & 17 deletions src/memtable.cc
Original file line number Diff line number Diff line change
Expand Up @@ -719,24 +719,32 @@ Status MemTable::getRecordsByPrefix(const uint64_t chk,
skiplist_release_node(&node->snode);
return Status::KEY_NOT_FOUND;
}
if ( valid_number(flushedSeqNum) &&
rec_ret->seqNum <= flushedSeqNum ) {
// Already purged KV pair, go to table.
if (!allow_flushed_log) {
skiplist_release_node(&node->snode);
return Status::KEY_NOT_FOUND;
} // Tolerate if this is snapshot.
}

if ( !allow_tombstone && rec_ret->isDel() ) {
// Last operation is deletion.
skiplist_release_node(&node->snode);
return Status::KEY_NOT_FOUND;
}
SearchCbDecision dec = cb_func({*rec_ret});
if (dec == SearchCbDecision::STOP) {
skiplist_release_node(&node->snode);
return Status::OPERATION_STOPPED;
bool found = true;
// Dummy loop to use `break`.
do {
if ( valid_number(flushedSeqNum) &&
rec_ret->seqNum <= flushedSeqNum ) {
// Already purged KV pair, go to the next matching key.
if (!allow_flushed_log) {
found = false;
break;
} // Tolerate if this is snapshot.
}

if ( !allow_tombstone && rec_ret->isDel() ) {
// Last operation is deletion, go to the next matching key.
found = false;
break;
}
} while (false);

if (found) {
SearchCbDecision dec = cb_func({*rec_ret});
if (dec == SearchCbDecision::STOP) {
skiplist_release_node(&node->snode);
return Status::OPERATION_STOPPED;
}
}

cursor = skiplist_next(idxByKey, &node->snode);
Expand Down
5 changes: 5 additions & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ add_executable(basic_op_test ${BASIC_OP_TEST})
target_link_libraries(basic_op_test ${JUNGLE_TEST_DEPS})
add_dependencies(basic_op_test static_lib)

set(NEAREST_SEARCH_TEST ${TEST_DIR}/jungle/nearest_search_test.cc)
add_executable(nearest_search_test ${NEAREST_SEARCH_TEST})
target_link_libraries(nearest_search_test ${JUNGLE_TEST_DEPS})
add_dependencies(nearest_search_test static_lib)

set(BUILDER_TEST ${TEST_DIR}/jungle/builder_test.cc)
add_executable(builder_test ${BUILDER_TEST})
target_link_libraries(builder_test ${JUNGLE_TEST_DEPS})
Expand Down
291 changes: 0 additions & 291 deletions tests/jungle/basic_op_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2433,292 +2433,6 @@ int discard_test(PARAM_BASE) {
return 0;
}

int get_nearest_test(int flush_options) {
// flush_options:
// 0: all in the log.
// 1: 3/4 in the log and 1/4 in the table.
// 2: 1/2 in the log and 1/2 in the table.
// 3: 1/4 in the log and 3/4 in the table.
// 4: all in the table.

std::string filename;
TEST_SUITE_PREPARE_PATH(filename);

jungle::Status s;
jungle::DBConfig config;
TEST_CUSTOM_DB_CONFIG(config);
jungle::DB* db;

config.maxEntriesInLogFile = 20;
CHK_Z(jungle::DB::open(&db, filename, config));

const size_t NUM = 100;

// Shuffle (0 -- 99).
std::vector<size_t> idx_arr(NUM);
std::iota(idx_arr.begin(), idx_arr.end(), 0);
/*
std::random_device rd;
std::mt19937 g(rd());
std::shuffle(idx_arr.begin(), idx_arr.end(), g);
*/
for (size_t ii = 0; ii < NUM; ++ii) {
size_t jj = std::rand() % NUM;
std::swap(idx_arr[ii], idx_arr[jj]);
}

for (size_t kk = 0; kk < 2; ++kk) {
for (size_t ii = 0; ii < NUM; ++ii) {
size_t idx = idx_arr[ii] * 10;
std::string key_str = "key" + TestSuite::lzStr(5, idx);
std::string val_str = "val" + TestSuite::lzStr(2, kk) +
"_" + TestSuite::lzStr(5, idx);
CHK_Z( db->set( jungle::KV(key_str, val_str) ) );

if ( (kk == 0 && ii == NUM/2 - 1 && flush_options == 1) ||
(kk == 1 && ii == NUM/2 - 1 && flush_options == 3) ) {
CHK_Z( db->sync(false) );
CHK_Z( db->flushLogs() );
}
}
CHK_Z( db->sync(false) );
if (kk == 0 && flush_options == 2) {
CHK_Z( db->flushLogs() );
}
}
CHK_Z( db->sync(false) );
if (flush_options == 4) {
CHK_Z( db->flushLogs() );
}

auto verify_func = [&]( size_t ii,
jungle::SearchOptions s_opt,
bool exact_query ) -> int {
TestSuite::setInfo("ii %zu, s_opt %d, exact_query %d",
ii, s_opt, exact_query);

int64_t idx = exact_query ? ii * 10 : ii * 10 + (s_opt.isGreater() ? 1 : -1);
int64_t exp_idx = 0;

if (exact_query) {
if (s_opt.isExactMatchAllowed()) {
exp_idx = idx;
} else {
exp_idx = (ii + (s_opt.isGreater() ? 1 : -1)) * 10;
}
} else {
exp_idx = (ii + (s_opt.isGreater() ? 1 : -1)) * 10;
}

std::string key_str;
if (idx >= 0) {
key_str = "key" + TestSuite::lzStr(5, idx);
} else {
key_str = "000";
}
std::string val_str = "val" + TestSuite::lzStr(2, 1) +
"_" + TestSuite::lzStr(5, exp_idx);

jungle::Record rec_out;
jungle::Record::Holder h_rec_out(rec_out);
s = db->getNearestRecordByKey(jungle::SizedBuf(key_str), rec_out, s_opt);
if (s_opt == jungle::SearchOptions::EQUAL) {
// EQUAL should be denied.
CHK_NOT(s);
return 0;
}

bool exp_succ = false;
if (exact_query) {
if ( s_opt.isExactMatchAllowed() ||
(s_opt.isGreater() && ii < NUM - 1) ||
(s_opt.isSmaller() && ii > 0) ) {
exp_succ = true;
}
} else {
if ( (s_opt.isGreater() && ii < NUM - 1) ||
(s_opt.isSmaller() && ii > 0) ) {
exp_succ = true;
}
}

if (exp_succ) {
CHK_Z(s);
CHK_EQ( jungle::SizedBuf(val_str), rec_out.kv.value );
} else {
CHK_NOT(s);
}
return 0;
};

using jungle::SearchOptions;
for (size_t ii = 0; ii < NUM; ++ii) {
for (bool exact_query: {true, false}) {
for (SearchOptions s_opt: { SearchOptions::GREATER_OR_EQUAL,
SearchOptions::GREATER,
SearchOptions::SMALLER_OR_EQUAL,
SearchOptions::SMALLER,
SearchOptions::EQUAL }) {
CHK_Z( verify_func(ii, s_opt, exact_query) );
}
}
}

CHK_Z(jungle::DB::close(db));
CHK_Z(jungle::shutdown());

TEST_SUITE_CLEANUP_PATH();
return 0;
}

int get_by_prefix_test(size_t hash_len) {
std::string filename;
TEST_SUITE_PREPARE_PATH(filename);

jungle::Status s;
jungle::DBConfig config;
TEST_CUSTOM_DB_CONFIG(config);
jungle::DB* db;

config.maxEntriesInLogFile = 20;
config.customLenForHash = [hash_len](const jungle::HashKeyLenParams& p) -> size_t {
return hash_len;
};
config.bloomFilterBitsPerUnit = 10;
CHK_Z(jungle::DB::open(&db, filename, config));

const size_t NUM = 100;

// Shuffle (0 -- 99).
std::vector<size_t> idx_arr(NUM);
std::iota(idx_arr.begin(), idx_arr.end(), 0);
#if 0
std::random_device rd;
std::mt19937 g(rd());
std::shuffle(idx_arr.begin(), idx_arr.end(), g);
#else
for (size_t ii = 0; ii < NUM; ++ii) {
size_t jj = std::rand() % NUM;
std::swap(idx_arr[ii], idx_arr[jj]);
}
#endif

const size_t ROUND_MAX = 10;
for (size_t round = 0; round < ROUND_MAX; ++round) {
for (size_t ii = 0; ii < NUM; ++ii) {
size_t idx = idx_arr[ii];
std::string key_str = "key" + TestSuite::lzStr(5, idx) + "_" +
TestSuite::lzStr(2, ROUND_MAX - round);
std::string val_str = "val" + TestSuite::lzStr(5, idx) + "_" +
TestSuite::lzStr(2, ROUND_MAX - round);
CHK_Z( db->set( jungle::KV(key_str, val_str) ) );
}
CHK_Z( db->sync(false) );
if (round == 0) {
CHK_Z( db->flushLogs() );
} else if (round >= 1 && round % 2 == 0 && round < ROUND_MAX - 2) {
for (size_t hh = 0; hh < config.numL0Partitions; ++hh) {
CHK_Z( db->compactL0(jungle::CompactOptions(), hh) );
}
CHK_Z( db->flushLogs() );
}
}

// Get all records with the given prefix.
for (size_t ii = 0; ii < NUM; ++ii) {
TestSuite::setInfo("ii = %zu", ii);
size_t idx = ii;
std::string prefix_str = "key" + TestSuite::lzStr(5, idx);
std::list<jungle::Record> recs_out;
auto cb_func = [&](const jungle::SearchCbParams& params) ->
jungle::SearchCbDecision {
jungle::Record dst;
params.rec.copyTo(dst);
recs_out.push_back(dst);
return jungle::SearchCbDecision::NEXT;
};
CHK_Z( db->getRecordsByPrefix(jungle::SizedBuf(prefix_str), cb_func) );

CHK_EQ(ROUND_MAX, recs_out.size());
for (jungle::Record& rec: recs_out) rec.free();
}

// Same but stop in the middle.
const size_t STOP_AT = 5;
for (size_t ii = 0; ii < NUM; ++ii) {
TestSuite::setInfo("ii = %zu", ii);
size_t idx = ii;
std::string prefix_str = "key" + TestSuite::lzStr(5, idx);
std::list<jungle::Record> recs_out;
auto cb_func = [&](const jungle::SearchCbParams& params) ->
jungle::SearchCbDecision {
jungle::Record dst;
params.rec.copyTo(dst);
recs_out.push_back(dst);
if (recs_out.size() >= STOP_AT) return jungle::SearchCbDecision::STOP;
return jungle::SearchCbDecision::NEXT;
};
CHK_Z( db->getRecordsByPrefix(jungle::SizedBuf(prefix_str), cb_func) );

CHK_EQ(STOP_AT, recs_out.size());
for (jungle::Record& rec: recs_out) rec.free();
}

// Exact match should work.
for (size_t round = 0; round < ROUND_MAX; ++round) {
for (size_t ii = 0; ii < NUM; ++ii) {
TestSuite::setInfo("round = %zu, ii = %zu", round, ii);
size_t idx = idx_arr[ii];
std::string key_str = "key" + TestSuite::lzStr(5, idx) + "_" +
TestSuite::lzStr(2, ROUND_MAX - round);
std::string val_str = "val" + TestSuite::lzStr(5, idx) + "_" +
TestSuite::lzStr(2, ROUND_MAX - round);
jungle::SizedBuf value_out;
jungle::SizedBuf::Holder h(value_out);
CHK_Z( db->get( jungle::SizedBuf(key_str), value_out ) );
CHK_EQ( val_str, value_out.toString() );
}
}

{ // Insert a record with new prefix, and keep it in log section.
std::string key_str = "key_newprefix";
std::string val_str = "val_newprefix";
CHK_Z( db->set( jungle::KV(key_str, val_str) ) );
CHK_Z( db->sync(false) );

// Search keys with the new prefix, the API should return OK.
bool cb_invoked = false;
auto cb_func = [&](const jungle::SearchCbParams& params) ->
jungle::SearchCbDecision {
cb_invoked = true;
return jungle::SearchCbDecision::NEXT;
};
CHK_Z( db->getRecordsByPrefix(jungle::SizedBuf(key_str), cb_func) );
CHK_TRUE( cb_invoked );

// Flush it to table, now log section is empty.
CHK_Z( db->flushLogs() );

cb_invoked = false;
CHK_Z( db->getRecordsByPrefix(jungle::SizedBuf(key_str), cb_func) );
CHK_TRUE( cb_invoked );

// Find non-existing key.
std::string ne_key_str = "non_existing_key";
cb_invoked = false;
s = db->getRecordsByPrefix(jungle::SizedBuf(ne_key_str), cb_func);
CHK_EQ( jungle::Status::KEY_NOT_FOUND, s.getValue() );
CHK_FALSE( cb_invoked );

}

CHK_Z(jungle::DB::close(db));
CHK_Z(jungle::shutdown());

TEST_SUITE_CLEANUP_PATH();
return 0;
}

int kmv_get_memory_test() {
std::string filename;
TEST_SUITE_PREPARE_PATH(filename);
Expand Down Expand Up @@ -3346,11 +3060,6 @@ int main(int argc, char** argv) {
SET_PARAMS(discard_test_args);
ts.doTest("discard dirty test", discard_test, discard_test_args);

ts.doTest("get nearest test",
get_nearest_test,
TestRange<int>(0, 4, 1, StepType::LINEAR));
ts.doTest("get by prefix test",
get_by_prefix_test, TestRange<size_t>( {0, 8, 16} ));
ts.doTest("kmv get memory test", kmv_get_memory_test);
ts.doTest("immediate purging test", immediate_purging_test);
ts.doTest("compaction by fast scan test", compaction_by_fast_scan_test);
Expand Down
Loading