Skip to content
This repository has been archived by the owner on Dec 1, 2022. It is now read-only.

fix disk manager thread conflict #512

Merged
merged 5 commits into from
Jul 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions conf/nebula-storaged.conf.default
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@
# One path per Rocksdb instance.
--data_path=data/storage

# Minimum reserved bytes of each data path
--minimum_reserved_bytes=268435456

# The default reserved bytes for one batch operation
--rocksdb_batch_size=4096
# The default block cache size used in BlockBasedTable.
Expand Down
3 changes: 3 additions & 0 deletions conf/nebula-storaged.conf.production
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@
# One path per Rocksdb instance.
--data_path=data/storage

# Minimum reserved bytes of each data path
--minimum_reserved_bytes=268435456

# The default reserved bytes for one batch operation
--rocksdb_batch_size=4096
# The default block cache size used in BlockBasedTable. (MB)
Expand Down
6 changes: 4 additions & 2 deletions src/kvstore/DiskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ StatusOr<std::vector<std::string>> DiskManager::path(GraphSpaceID spaceId) {
return Status::Error("Space not found");
}
std::vector<std::string> paths;
for (const auto& [path, _] : spaceIt->second) {
paths.emplace_back(path);
for (const auto& partEntry : spaceIt->second) {
paths.emplace_back(partEntry.first);
}
return paths;
}
Expand All @@ -70,6 +70,7 @@ StatusOr<std::string> DiskManager::path(GraphSpaceID spaceId, PartitionID partId
void DiskManager::addPartToPath(GraphSpaceID spaceId,
PartitionID partId,
const std::string& path) {
std::lock_guard<std::mutex> lg(lock_);
try {
auto canonical = boost::filesystem::canonical(path);
auto dataPath = canonical.parent_path().parent_path();
Expand All @@ -85,6 +86,7 @@ void DiskManager::addPartToPath(GraphSpaceID spaceId,
void DiskManager::removePartFromPath(GraphSpaceID spaceId,
PartitionID partId,
const std::string& path) {
std::lock_guard<std::mutex> lg(lock_);
try {
auto canonical = boost::filesystem::canonical(path);
auto dataPath = canonical.parent_path().parent_path();
Expand Down
3 changes: 3 additions & 0 deletions src/kvstore/DiskManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ class DiskManager {

// the index in dataPaths_ for a given space + part
std::unordered_map<GraphSpaceID, std::unordered_map<PartitionID, size_t>> partIndex_;

// lock used to protect partPath_ and partIndex_
std::mutex lock_;
};

} // namespace kvstore
Expand Down
2 changes: 1 addition & 1 deletion src/kvstore/NebulaStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ std::shared_ptr<Part> NebulaStore::newPart(GraphSpaceID spaceId,
KVEngine* engine,
bool asLearner,
const std::vector<HostAddr>& defaultPeers) {
auto walPath = folly::stringPrintf("%s/wal/%d", engine->getDataRoot(), partId);
auto walPath = folly::stringPrintf("%s/wal/%d", engine->getWalRoot(), partId);
auto part = std::make_shared<Part>(spaceId,
partId,
raftAddr_,
Expand Down
8 changes: 5 additions & 3 deletions src/kvstore/raftex/RaftPart.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -768,7 +768,7 @@ void RaftPart::appendLogsInternal(AppendLogsIterator iter, TermID termId) {
} while (false);

if (!checkAppendLogResult(res)) {
LOG(ERROR) << idStr_ << "Failed append logs";
LOG_EVERY_N(WARNING, 100) << idStr_ << "Failed to write wal";
return;
}
// Step 2: Replicate to followers
Expand Down Expand Up @@ -805,7 +805,7 @@ void RaftPart::replicateLogs(folly::EventBase* eb,
} while (false);

if (!checkAppendLogResult(res)) {
LOG(ERROR) << idStr_ << "Replicate logs failed";
LOG(WARNING) << idStr_ << "replicateLogs failed because of not leader or term changed";
return;
}

Expand Down Expand Up @@ -911,7 +911,9 @@ void RaftPart::processAppendLogResponses(
} while (false);

if (!checkAppendLogResult(res)) {
LOG(ERROR) << idStr_ << "processAppendLogResponses failed!";
LOG(WARNING)
<< idStr_
<< "processAppendLogResponses failed because of not leader or term changed";
return;
}

Expand Down