Skip to content

Commit

Permalink
revert src/kvstore
Browse files Browse the repository at this point in the history
  • Loading branch information
veezhang authored and xtcyclist committed Dec 27, 2022
1 parent 2eeb304 commit 11a2429
Show file tree
Hide file tree
Showing 13 changed files with 33 additions and 55 deletions.
6 changes: 2 additions & 4 deletions src/kvstore/DiskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ DiskManager::DiskManager(const std::vector<std::string>& dataPaths,
Paths* paths = new Paths();
paths_.store(paths);
size_t index = 0;

// TODO(vee): Add initialize function to avoid using LOG(FATAL) in constructor.
for (const auto& path : dataPaths) {
auto absolute = boost::filesystem::absolute(path);
if (!boost::filesystem::exists(absolute)) {
Expand Down Expand Up @@ -95,7 +93,7 @@ void DiskManager::addPartToPath(GraphSpaceID spaceId, PartitionID partId, const
paths_.store(newPaths, std::memory_order_release);
folly::rcu_retire(oldPaths, std::default_delete<Paths>());
} catch (boost::filesystem::filesystem_error& e) {
LOG(DFATAL) << "Invalid path: " << e.what();
LOG(FATAL) << "Invalid path: " << e.what();
}
}

Expand All @@ -116,7 +114,7 @@ void DiskManager::removePartFromPath(GraphSpaceID spaceId,
paths_.store(newPaths, std::memory_order_release);
folly::rcu_retire(oldPaths, std::default_delete<Paths>());
} catch (boost::filesystem::filesystem_error& e) {
LOG(DFATAL) << "Invalid path: " << e.what();
LOG(FATAL) << "Invalid path: " << e.what();
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/kvstore/NebulaStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ std::unique_ptr<KVEngine> NebulaStore::newEngine(GraphSpaceID spaceId,
return std::make_unique<RocksEngine>(
spaceId, vIdLen, dataPath, walPath, options_.mergeOp_, cfFactory);
} else {
LOG(DFATAL) << "Unknown engine type " << FLAGS_engine_type;
LOG(FATAL) << "Unknown engine type " << FLAGS_engine_type;
return nullptr;
}
}
Expand Down Expand Up @@ -627,7 +627,7 @@ std::shared_ptr<Listener> NebulaStore::newListener(GraphSpaceID spaceId,
listener = std::make_shared<ESListener>(
spaceId, partId, raftAddr_, walPath, ioPool_, bgWorkers_, workers_, options_.schemaMan_);
} else {
LOG(DFATAL) << "Should not reach here";
LOG(FATAL) << "Should not reach here";
return nullptr;
}
raftService_->addPartition(listener);
Expand Down
2 changes: 0 additions & 2 deletions src/kvstore/RocksEngine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@ RocksEngine::RocksEngine(GraphSpaceID spaceId,
walPath_ = folly::stringPrintf("%s/nebula/%d", walPath.c_str(), spaceId);
}
auto path = folly::stringPrintf("%s/data", dataPath_.c_str());

// TODO(vee): Add initialize function to avoid using LOG(FATAL) in constructor.
if (FileUtils::fileType(path.c_str()) == FileType::NOTEXIST) {
if (readonly) {
LOG(FATAL) << "Path " << path << " not exist";
Expand Down
3 changes: 1 addition & 2 deletions src/kvstore/RocksEngineConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,7 @@ rocksdb::Status initRocksdbOptions(rocksdb::Options& baseOpts,
auto walDir = folly::stringPrintf("%s/rocksdb_wal/%d", FLAGS_rocksdb_wal_dir.c_str(), spaceId);
if (fs::FileUtils::fileType(walDir.c_str()) == fs::FileType::NOTEXIST) {
if (!fs::FileUtils::makeDir(walDir)) {
LOG(DFATAL) << "makeDir " << walDir << " failed";
return rocksdb::Status::InvalidArgument();
LOG(FATAL) << "makeDir " << walDir << " failed";
}
}
LOG(INFO) << "set rocksdb wal of space " << spaceId << " to " << walDir;
Expand Down
5 changes: 1 addition & 4 deletions src/kvstore/listener/Listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,7 @@ Listener::Listener(GraphSpaceID spaceId,
void Listener::start(std::vector<HostAddr>&& peers, bool) {
std::lock_guard<std::mutex> g(raftLock_);

if (!init()) {
// TODO(vee): return bool to avoid using LOG(FATAL)
LOG(FATAL) << "Listener init failed";
}
init();

lastLogId_ = wal_->lastLogId();
lastLogTerm_ = wal_->lastLogTerm();
Expand Down
10 changes: 5 additions & 5 deletions src/kvstore/listener/Listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ using RaftClient = thrift::ThriftClientManager<raftex::cpp2::RaftexServiceAsyncC
*
* * Must implement in derived class
* // extra initialize work could do here
* bool init()
* void init()
*
* // Main interface to process logs, listener need to apply the committed log entry to their
* // state machine. Once apply succeeded, user should call persist() to make their progress
Expand Down Expand Up @@ -166,7 +166,7 @@ class Listener : public raftex::RaftPart {
/**
* @brief extra initialize work could do here
*/
virtual bool init() = 0;
virtual void init() = 0;

/**
* @brief Get last apply id from persistence storage, used in initialization
Expand Down Expand Up @@ -194,7 +194,7 @@ class Listener : public raftex::RaftPart {
*/
void onLostLeadership(TermID term) override {
UNUSED(term);
LOG(DFATAL) << "Should not reach here";
LOG(FATAL) << "Should not reach here";
}

/**
Expand All @@ -204,7 +204,7 @@ class Listener : public raftex::RaftPart {
*/
void onElected(TermID term) override {
UNUSED(term);
LOG(DFATAL) << "Should not reach here";
LOG(FATAL) << "Should not reach here";
}

/**
Expand All @@ -214,7 +214,7 @@ class Listener : public raftex::RaftPart {
*/
void onLeaderReady(TermID term) override {
UNUSED(term);
LOG(DFATAL) << "Should not reach here";
LOG(FATAL) << "Should not reach here";
}

/**
Expand Down
27 changes: 10 additions & 17 deletions src/kvstore/listener/elasticsearch/ESListener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@ DEFINE_int32(listener_commit_batch_size, 1000, "Max batch size when listener com

namespace nebula {
namespace kvstore {
bool ESListener::init() {
void ESListener::init() {
auto vRet = schemaMan_->getSpaceVidLen(spaceId_);
if (!vRet.ok()) {
LOG(DFATAL) << "vid length error";
return false;
LOG(FATAL) << "vid length error";
}
vIdLen_ = vRet.value();
auto vidTypeRet = schemaMan_->getSpaceVidType(spaceId_);
Expand All @@ -29,8 +28,7 @@ bool ESListener::init() {

auto cRet = schemaMan_->getServiceClients(meta::cpp2::ExternalServiceType::ELASTICSEARCH);
if (!cRet.ok() || cRet.value().empty()) {
LOG(DFATAL) << "elasticsearch clients error";
return false;
LOG(FATAL) << "elasticsearch clients error";
}
std::vector<nebula::plugin::ESClient> esClients;
for (const auto& c : cRet.value()) {
Expand All @@ -46,11 +44,9 @@ bool ESListener::init() {
esAdapter_.setClients(std::move(esClients));
auto sRet = schemaMan_->toGraphSpaceName(spaceId_);
if (!sRet.ok()) {
LOG(DFATAL) << "space name error";
return false;
LOG(FATAL) << "space name error";
}
spaceName_ = std::make_unique<std::string>(sRet.value());
return true;
}

bool ESListener::apply(const BatchHolder& batch) {
Expand All @@ -67,7 +63,7 @@ bool ESListener::apply(const BatchHolder& batch) {
} else if (type == BatchLogType::OP_BATCH_REMOVE) {
bulk.delete_(index, vid, src, dst, rank);
} else {
LOG(DFATAL) << "Unexpect";
LOG(FATAL) << "Unexpect";
}
};
for (const auto& log : batch.getBatch()) {
Expand Down Expand Up @@ -156,8 +152,7 @@ void ESListener::pickTagAndEdgeData(BatchLogType type,

bool ESListener::persist(LogID lastId, TermID lastTerm, LogID lastApplyLogId) {
if (!writeAppliedId(lastId, lastTerm, lastApplyLogId)) {
LOG(DFATAL) << "last apply ids write failed";
return false;
LOG(FATAL) << "last apply ids write failed";
}
return true;
}
Expand All @@ -169,9 +164,8 @@ std::pair<LogID, TermID> ESListener::lastCommittedLogId() {
}
int32_t fd = open(lastApplyLogFile_->c_str(), O_RDONLY);
if (fd < 0) {
LOG(DFATAL) << "Failed to open the file \"" << lastApplyLogFile_->c_str() << "\" (" << errno
<< "): " << strerror(errno);
return {0, 0};
LOG(FATAL) << "Failed to open the file \"" << lastApplyLogFile_->c_str() << "\" (" << errno
<< "): " << strerror(errno);
}
// read last logId from listener wal file.
LogID logId;
Expand All @@ -193,9 +187,8 @@ LogID ESListener::lastApplyLogId() {
}
int32_t fd = open(lastApplyLogFile_->c_str(), O_RDONLY);
if (fd < 0) {
LOG(DFATAL) << "Failed to open the file \"" << lastApplyLogFile_->c_str() << "\" (" << errno
<< "): " << strerror(errno);
return 0;
LOG(FATAL) << "Failed to open the file \"" << lastApplyLogFile_->c_str() << "\" (" << errno
<< "): " << strerror(errno);
}
// read last applied logId from listener wal file.
LogID logId;
Expand Down
2 changes: 1 addition & 1 deletion src/kvstore/listener/elasticsearch/ESListener.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class ESListener : public Listener {
/**
* @brief Init work: get vid length, get es client
*/
bool init() override;
void init() override;

/**
* @brief Send data by es client
Expand Down
4 changes: 1 addition & 3 deletions src/kvstore/listener/test/NebulaListenerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -211,9 +211,7 @@ class DummyListener : public Listener {
}

protected:
bool init() override {
return true;
}
void init() override {}

bool apply(const BatchHolder& batch) {
for (auto& log : batch.getBatch()) {
Expand Down
9 changes: 3 additions & 6 deletions src/kvstore/plugins/hbase/HBaseStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,7 @@ ResultCode HBaseStore::rangeWithPrefix(GraphSpaceID spaceId,
ResultCode HBaseStore::sync(GraphSpaceID spaceId, PartitionID partId) {
UNUSED(spaceId);
UNUSED(partId);
LOG(DFATAL) << "Unimplement";
return ResultCode::ERR_UNSUPPORTED;
LOG(FATAL) << "Unimplement";
}

ResultCode HBaseStore::multiRemove(GraphSpaceID spaceId, std::vector<std::string>& keys) {
Expand Down Expand Up @@ -404,13 +403,11 @@ void HBaseStore::asyncRemovePrefix(GraphSpaceID spaceId,
}

ResultCode HBaseStore::ingest(GraphSpaceID) {
LOG(DFATAL) << "Unimplement";
return ResultCode::ERR_UNSUPPORTED;
LOG(FATAL) << "Unimplement";
}

int32_t HBaseStore::allLeader(std::unordered_map<GraphSpaceID, std::vector<PartitionID>>&) {
LOG(DFATAL) << "Unimplement";
return 0;
LOG(FATAL) << "Unimplement";
}

} // namespace kvstore
Expand Down
4 changes: 2 additions & 2 deletions src/kvstore/plugins/hbase/HBaseStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -181,11 +181,11 @@ class HBaseStore : public KVStore {
KVCallback cb);

void asyncAtomicOp(GraphSpaceID, PartitionID, raftex::AtomicOp, KVCallback) override {
LOG(DFATAL) << "Not supported yet!";
LOG(FATAL) << "Not supported yet!";
}

void asyncAtomicOp(GraphSpaceID, PartitionID, std::string&& multiValues, KVCallback) override {
LOG(DFATAL) << "Not supported yet!";
LOG(FATAL) << "Not supported yet!";
}

ResultCode ingest(GraphSpaceID spaceId) override;
Expand Down
7 changes: 3 additions & 4 deletions src/kvstore/raftex/RaftPart.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ class AppendLogsIterator final : public LogIterator {
}
}
if (notFulfilledPromise > 0) {
LOG(DFATAL) << "notFulfilledPromise == " << notFulfilledPromise;
LOG(FATAL) << "notFulfilledPromise == " << notFulfilledPromise;
}
}
}
Expand Down Expand Up @@ -394,7 +394,7 @@ const char* RaftPart::roleStr(Role role) const {
case Role::LEARNER:
return "Learner";
default:
LOG(DFATAL) << idStr_ << "Invalid role";
LOG(FATAL) << idStr_ << "Invalid role";
}
return nullptr;
}
Expand Down Expand Up @@ -1091,8 +1091,7 @@ void RaftPart::processAppendLogResponses(const AppendLogResponses& resps,
[self = shared_from_this(), term = term_] { self->onLeaderReady(term); });
}
} else {
LOG(DFATAL) << idStr_ << "Failed to commit logs";
return;
LOG(FATAL) << idStr_ << "Failed to commit logs";
}
VLOG(4) << idStr_ << "Leader succeeded in committing the logs " << committedId + 1 << " to "
<< lastLogId;
Expand Down
5 changes: 2 additions & 3 deletions src/kvstore/wal/FileBasedWal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -479,9 +479,8 @@ bool FileBasedWal::appendLogInternal(LogID id,

ssize_t bytesWritten = write(currFd_, strBuf.data(), strBuf.size());
if (bytesWritten != (ssize_t)strBuf.size()) {
LOG(DFATAL) << idStr_ << "bytesWritten:" << bytesWritten << ", expected:" << strBuf.size()
<< ", error:" << strerror(errno);
return false;
LOG(FATAL) << idStr_ << "bytesWritten:" << bytesWritten << ", expected:" << strBuf.size()
<< ", error:" << strerror(errno);
}

if (policy_.sync && ::fsync(currFd_) == -1) {
Expand Down

0 comments on commit 11a2429

Please sign in to comment.