Skip to content

Commit

Permalink
fix some rocksdb api will fail in PlainTable (#4859)
Browse files Browse the repository at this point in the history
* fix some rocksdb api will fail in PlainTable

* address @wenhaocs's comments

* fix
  • Loading branch information
critical27 authored Nov 29, 2022
1 parent 50a2966 commit 5425036
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 37 deletions.
8 changes: 4 additions & 4 deletions src/kvstore/NebulaStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -568,13 +568,13 @@ void NebulaStore::removePart(GraphSpaceID spaceId, PartitionID partId, bool need
void NebulaStore::addListenerSpace(GraphSpaceID spaceId, meta::cpp2::ListenerType type) {
UNUSED(type);
folly::RWSpinLock::WriteHolder wh(&lock_);
// listener don't need engine for now
if (this->spaceListeners_.find(spaceId) != this->spaceListeners_.end()) {
LOG(INFO) << "Listener space " << spaceId << " has existed!";
return;
} else {
LOG(INFO) << "Create listener space " << spaceId;
this->spaceListeners_[spaceId] = std::make_unique<SpaceListenerInfo>();
}
LOG(INFO) << "Create listener space " << spaceId;
this->spaceListeners_[spaceId] = std::make_unique<SpaceListenerInfo>();
// Perform extra initialization of given type of listener here
}

void NebulaStore::removeListenerSpace(GraphSpaceID spaceId, meta::cpp2::ListenerType type) {
Expand Down
25 changes: 21 additions & 4 deletions src/kvstore/RocksEngine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,17 @@ RocksEngine::RocksEngine(GraphSpaceID spaceId,
CHECK(status.ok()) << status.ToString();
}
db_.reset(db);
extractorLen_ = sizeof(PartitionID) + vIdLen;
std::string factoryName = options.table_factory->Name();
if (factoryName == rocksdb::TableFactory::kBlockBasedTableName()) {
extractorLen_ = sizeof(PartitionID) + vIdLen;
} else if (factoryName == rocksdb::TableFactory::kPlainTableName()) {
// PlainTable only support prefix-based seek, which means if the prefix is not inserted into
// rocksdb, we can't read them from "prefix" api anymore. For simplicity, we just set the length
// of prefix extractor to the minimum length we used in "prefix" api, which is 4 when we seek by
// tagPrefix(partId) or edgePrefix(partId).
isPlainTable_ = true;
extractorLen_ = sizeof(PartitionID);
}
partsNum_ = allParts().size();
LOG(INFO) << "open rocksdb on " << path;

Expand Down Expand Up @@ -175,7 +185,11 @@ nebula::cpp2::ErrorCode RocksEngine::range(const std::string& start,
const std::string& end,
std::unique_ptr<KVIterator>* storageIter) {
rocksdb::ReadOptions options;
options.total_order_seek = FLAGS_enable_rocksdb_prefix_filtering;
if (!isPlainTable_) {
options.total_order_seek = FLAGS_enable_rocksdb_prefix_filtering;
} else {
options.prefix_same_as_start = true;
}
rocksdb::Iterator* iter = db_->NewIterator(options);
if (iter) {
iter->Seek(rocksdb::Slice(start));
Expand Down Expand Up @@ -232,8 +246,11 @@ nebula::cpp2::ErrorCode RocksEngine::rangeWithPrefix(const std::string& start,
const std::string& prefix,
std::unique_ptr<KVIterator>* storageIter) {
rocksdb::ReadOptions options;
// prefix_same_as_start is false by default
options.total_order_seek = FLAGS_enable_rocksdb_prefix_filtering;
if (!isPlainTable_) {
options.total_order_seek = FLAGS_enable_rocksdb_prefix_filtering;
} else {
options.prefix_same_as_start = true;
}
rocksdb::Iterator* iter = db_->NewIterator(options);
if (iter) {
iter->Seek(rocksdb::Slice(start));
Expand Down
1 change: 1 addition & 0 deletions src/kvstore/RocksEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,7 @@ class RocksEngine : public KVEngine {
std::unique_ptr<rocksdb::BackupEngine> backupDb_{nullptr};
int32_t partsNum_ = -1;
size_t extractorLen_;
bool isPlainTable_{false};
};

} // namespace kvstore
Expand Down
7 changes: 6 additions & 1 deletion src/kvstore/RocksEngineConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,6 @@ rocksdb::Status initRocksdbOptions(rocksdb::Options& baseOpts,
baseOpts.rate_limiter = rate_limiter;
}

size_t prefixLength = sizeof(PartitionID) + vidLen;
if (FLAGS_rocksdb_table_format == "BlockBasedTable") {
// BlockBasedTableOptions
std::unordered_map<std::string, std::string> bbtOptsMap;
Expand Down Expand Up @@ -330,6 +329,7 @@ rocksdb::Status initRocksdbOptions(rocksdb::Options& baseOpts,
baseOpts.compaction_style == rocksdb::CompactionStyle::kCompactionStyleLevel;
}
if (FLAGS_enable_rocksdb_prefix_filtering) {
size_t prefixLength = sizeof(PartitionID) + vidLen;
baseOpts.prefix_extractor.reset(rocksdb::NewCappedPrefixTransform(prefixLength));
}
bbtOpts.whole_key_filtering = FLAGS_enable_rocksdb_whole_key_filtering;
Expand All @@ -346,6 +346,11 @@ rocksdb::Status initRocksdbOptions(rocksdb::Options& baseOpts,
if (!FLAGS_enable_rocksdb_prefix_filtering) {
return rocksdb::Status::InvalidArgument("PlainTable should use prefix bloom filter");
}
// PlainTable only support prefix-based seek, which means if the prefix is not inserted into
// rocksdb, we can't read them from "prefix" api anymore. For simplicity, we just set the length
// of prefix extractor to the minimum length we used in "prefix" api, which is 4 when we seek by
// tagPrefix(partId) or edgePrefix(partId).
size_t prefixLength = sizeof(PartitionID);
baseOpts.prefix_extractor.reset(rocksdb::NewCappedPrefixTransform(prefixLength));
baseOpts.table_factory.reset(rocksdb::NewPlainTableFactory());
baseOpts.create_if_missing = true;
Expand Down
102 changes: 74 additions & 28 deletions src/kvstore/test/RocksEngineTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,30 @@ namespace kvstore {

const int32_t kDefaultVIdLen = 8;

class RocksEngineTest : public ::testing::TestWithParam<std::tuple<bool, bool, std::string>> {
class RocksEngineTest : public ::testing::TestWithParam<std::tuple<bool, bool, std::string, bool>> {
public:
void SetUp() override {
auto param = GetParam();
FLAGS_enable_rocksdb_prefix_filtering = std::get<0>(param);
FLAGS_enable_rocksdb_whole_key_filtering = std::get<1>(param);
FLAGS_rocksdb_table_format = std::get<2>(param);
flush_ = std::get<3>(param);
}

void TearDown() override {}

protected:
bool flush_;
};

TEST_P(RocksEngineTest, SimpleTest) {
fs::TempDir rootPath("/tmp/rocksdb_engine_SimpleTest.XXXXXX");
auto engine = std::make_unique<RocksEngine>(0, kDefaultVIdLen, rootPath.path());
EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, engine->put("key", "val"));
std::string val;
if (flush_) {
EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, engine->flush());
}
EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, engine->get("key", &val));
EXPECT_EQ("val", val);
}
Expand All @@ -45,22 +52,26 @@ TEST_P(RocksEngineTest, RangeTest) {
auto engine = std::make_unique<RocksEngine>(0, kDefaultVIdLen, rootPath.path());
std::vector<KV> data;
for (int32_t i = 10; i < 20; i++) {
data.emplace_back(std::string(reinterpret_cast<const char*>(&i), sizeof(int32_t)),
data.emplace_back("key_" + std::string(reinterpret_cast<const char*>(&i), sizeof(int32_t)),
folly::stringPrintf("val_%d", i));
}
EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, engine->multiPut(std::move(data)));
if (flush_) {
EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, engine->flush());
}

auto checkRange = [&](int32_t start, int32_t end, int32_t expectedFrom, int32_t expectedTotal) {
VLOG(1) << "start " << start << ", end " << end << ", expectedFrom " << expectedFrom
<< ", expectedTotal " << expectedTotal;
std::string s(reinterpret_cast<const char*>(&start), sizeof(int32_t));
std::string e(reinterpret_cast<const char*>(&end), sizeof(int32_t));
std::string s = "key_" + std::string(reinterpret_cast<const char*>(&start), sizeof(int32_t));
std::string e = "key_" + std::string(reinterpret_cast<const char*>(&end), sizeof(int32_t));
std::unique_ptr<KVIterator> iter;
EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, engine->range(s, e, &iter));
int num = 0;
while (iter->valid()) {
num++;
auto key = *reinterpret_cast<const int32_t*>(iter->key().data());
// remove the prefix "key_"
auto key = *reinterpret_cast<const int32_t*>(iter->key().subpiece(4).data());
auto val = iter->val();
EXPECT_EQ(expectedFrom, key);
EXPECT_EQ(folly::stringPrintf("val_%d", expectedFrom), val);
Expand All @@ -83,15 +94,18 @@ TEST_P(RocksEngineTest, PrefixTest) {
LOG(INFO) << "Write data in batch and scan them...";
std::vector<KV> data;
for (int32_t i = 0; i < 10; i++) {
data.emplace_back(folly::stringPrintf("a_%d", i), folly::stringPrintf("val_%d", i));
data.emplace_back(folly::stringPrintf("key_a_%d", i), folly::stringPrintf("val_%d", i));
}
for (int32_t i = 10; i < 15; i++) {
data.emplace_back(folly::stringPrintf("b_%d", i), folly::stringPrintf("val_%d", i));
data.emplace_back(folly::stringPrintf("key_b_%d", i), folly::stringPrintf("val_%d", i));
}
for (int32_t i = 20; i < 40; i++) {
data.emplace_back(folly::stringPrintf("c_%d", i), folly::stringPrintf("val_%d", i));
data.emplace_back(folly::stringPrintf("key_c_%d", i), folly::stringPrintf("val_%d", i));
}
EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, engine->multiPut(std::move(data)));
if (flush_) {
EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, engine->flush());
}

auto checkPrefix = [&](const std::string& prefix, int32_t expectedFrom, int32_t expectedTotal) {
VLOG(1) << "prefix " << prefix << ", expectedFrom " << expectedFrom << ", expectedTotal "
Expand All @@ -111,9 +125,9 @@ TEST_P(RocksEngineTest, PrefixTest) {
}
EXPECT_EQ(expectedTotal, num);
};
checkPrefix("a", 0, 10);
checkPrefix("b", 10, 5);
checkPrefix("c", 20, 20);
checkPrefix("key_a", 0, 10);
checkPrefix("key_b", 10, 5);
checkPrefix("key_c", 20, 20);
}

TEST_P(RocksEngineTest, RemoveTest) {
Expand Down Expand Up @@ -147,6 +161,9 @@ TEST_P(RocksEngineTest, RemoveRangeTest) {
engine->removeRange(std::string(reinterpret_cast<const char*>(&s), sizeof(int32_t)),
std::string(reinterpret_cast<const char*>(&e), sizeof(int32_t))));
}
if (flush_) {
EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, engine->flush());
}
{
int32_t s = 0, e = 100;
std::unique_ptr<KVIterator> iter;
Expand Down Expand Up @@ -218,6 +235,9 @@ TEST_P(RocksEngineTest, IngestTest) {
auto engine = std::make_unique<RocksEngine>(0, kDefaultVIdLen, rootPath.path());
std::vector<std::string> files = {file};
EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, engine->ingest(files));
if (flush_) {
EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, engine->flush());
}

std::string result;
EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, engine->get("key", &result));
Expand Down Expand Up @@ -264,6 +284,9 @@ TEST_P(RocksEngineTest, BackupRestoreTable) {
fs::TempDir restoreRootPath("/tmp/rocksdb_engine_restoretable.XXXXXX");
auto restore_engine = std::make_unique<RocksEngine>(0, kDefaultVIdLen, restoreRootPath.path());
EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, restore_engine->ingest(sst_files));
if (flush_) {
EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, engine->flush());
}

std::unique_ptr<KVIterator> iter;
EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, restore_engine->prefix(partPrefix, &iter));
Expand Down Expand Up @@ -465,6 +488,9 @@ TEST_P(RocksEngineTest, PrefixBloomTest) {
data.emplace_back(NebulaKeyUtils::systemCommitKey(1), "123");
data.emplace_back(NebulaKeyUtils::systemCommitKey(2), "123");
EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, engine->multiPut(std::move(data)));
if (flush_) {
EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, engine->flush());
}

{
// vertexPrefix(partId) will not be included
Expand Down Expand Up @@ -531,15 +557,21 @@ TEST_P(RocksEngineTest, PrefixBloomTest) {
}
}

INSTANTIATE_TEST_SUITE_P(EnablePrefixExtractor_EnableWholeKeyFilter_TableFormat,
INSTANTIATE_TEST_SUITE_P(EnablePrefixExtractor_EnableWholeKeyFilter_TableFormat_FlushOrNot,
RocksEngineTest,
::testing::Values(std::make_tuple(false, false, "BlockBasedTable"),
std::make_tuple(false, true, "BlockBasedTable"),
std::make_tuple(true, false, "BlockBasedTable"),
std::make_tuple(true, true, "BlockBasedTable"),
::testing::Values(std::make_tuple(false, false, "BlockBasedTable", true),
std::make_tuple(false, false, "BlockBasedTable", false),
std::make_tuple(false, true, "BlockBasedTable", true),
std::make_tuple(false, true, "BlockBasedTable", false),
std::make_tuple(true, false, "BlockBasedTable", true),
std::make_tuple(true, false, "BlockBasedTable", false),
std::make_tuple(true, true, "BlockBasedTable", true),
std::make_tuple(true, true, "BlockBasedTable", false),
// PlainTable will always enable prefix extractor
std::make_tuple(true, false, "PlainTable"),
std::make_tuple(true, true, "PlainTable")));
std::make_tuple(true, false, "PlainTable", true),
std::make_tuple(true, false, "PlainTable", false),
std::make_tuple(true, true, "PlainTable", true),
std::make_tuple(true, true, "PlainTable", false)));

TEST(PlainTableTest, BackupRestoreWithoutData) {
fs::TempDir dataPath("/tmp/rocks_engine_test_data_path.XXXXXX");
Expand Down Expand Up @@ -580,19 +612,33 @@ TEST(PlainTableTest, BackupRestoreWithData) {
PartitionID partId = 1;

auto checkData = [&] {
std::string prefix = NebulaKeyUtils::tagPrefix(kDefaultVIdLen, partId, "vertex");
std::unique_ptr<KVIterator> iter;
auto code = engine->prefix(prefix, &iter);
EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code);
int32_t num = 0;
while (iter->valid()) {
num++;
iter->next();
{
std::string prefix = NebulaKeyUtils::tagPrefix(kDefaultVIdLen, partId, "vertex");
std::unique_ptr<KVIterator> iter;
auto code = engine->prefix(prefix, &iter);
EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code);
int32_t num = 0;
while (iter->valid()) {
num++;
iter->next();
}
EXPECT_EQ(num, 10);
}
{
std::string prefix = NebulaKeyUtils::tagPrefix(partId);
std::unique_ptr<KVIterator> iter;
auto code = engine->prefix(prefix, &iter);
EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code);
int32_t num = 0;
while (iter->valid()) {
num++;
iter->next();
}
EXPECT_EQ(num, 10);
}
EXPECT_EQ(num, 10);

std::string value;
code = engine->get(NebulaKeyUtils::systemCommitKey(partId), &value);
auto code = engine->get(NebulaKeyUtils::systemCommitKey(partId), &value);
EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code);
EXPECT_EQ("123", value);
};
Expand Down

0 comments on commit 5425036

Please sign in to comment.