Skip to content

Commit

Permalink
Enhance/fix FileBasedWAL (vesoft-inc#27)
Browse files Browse the repository at this point in the history
* Enhance/fix FileBasedWAL

1) Support log rollback
2) Separate buffer flusher logic, so that the number of flushers is independent from the number of WALs/Shards
3) Improve the multiple threads support, but still not support multiple threads calling appendLogs()/rollbackToLog()
4) Add UT for rollback
5) Fix the flaky UTs

* Rebased and addressed dutor's comments
  • Loading branch information
sherman-the-tank authored and dutor committed Oct 8, 2018
1 parent 4c78a44 commit 965f3c3
Show file tree
Hide file tree
Showing 17 changed files with 1,364 additions and 577 deletions.
3 changes: 2 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ set(CMAKE_EXE_LINKER_FLAGS "-static-libstdc++")

# Possible values are Debug, Release, RelWithDebInfo, MinSizeRel
if(NOT CMAKE_BUILD_TYPE)
set(CMAKE_BUILD_TYPE "Debug" FORCE)
set(CMAKE_BUILD_TYPE "Debug")
endif(NOT CMAKE_BUILD_TYPE)
message("== Build type is " ${CMAKE_BUILD_TYPE} " ==")

set(CMAKE_ARCHIVE_OUTPUT_DIRECTORY "_build")
set(CMAKE_LIBRARY_OUTPUT_DIRECTORY "_build")
Expand Down
2 changes: 2 additions & 0 deletions common/base/Base.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include <list>
#include <unordered_map>
#include <unordered_set>
#include <queue>
#include <deque>

#include <cstdio>
Expand All @@ -59,6 +60,7 @@
#include <folly/Varint.h>
#include <folly/dynamic.h>
#include <folly/json.h>
#include <folly/RWSpinLock.h>

#include "thread/NamedThread.h"
//#include "base/StringUnorderedMap.h"
Expand Down
3 changes: 2 additions & 1 deletion common/fs/TempDir.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ TempDir::TempDir(const char* pathTemplate, bool deleteOnDestroy)
strncpy(name.get(), pathTemplate, len);
name.get()[len] = '\0';

VLOG(2) << "Trying to create the temp directory \""
VLOG(2) << "Trying to create the temp directory with pattern \""
<< name.get() << "\"";

if (!mkdtemp(name.get())) {
Expand All @@ -27,6 +27,7 @@ TempDir::TempDir(const char* pathTemplate, bool deleteOnDestroy)
<< strerror(errno);
} else {
dirPath_ = std::move(name);
VLOG(2) << "Created temporary directory " << dirPath_.get();
}
}

Expand Down
2 changes: 1 addition & 1 deletion dataman/RowSetWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ RowSetWriter::RowSetWriter(const SchemaProviderIf* schema,
void RowSetWriter::writeRowLength(int64_t len) {
uint8_t buf[10];
size_t lenBytes = folly::encodeVarint(len, buf);
DCHECK_GT(lenBytes, 0);
DCHECK_GT(lenBytes, 0UL);
data_.append(reinterpret_cast<char*>(buf), lenBytes);
}

Expand Down
86 changes: 86 additions & 0 deletions raftex/BufferFlusher.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved
*
* This source code is licensed under Apache 2.0 License
* (found in the LICENSE.Apache file in the root directory)
*/

#include "base/Base.h"
#include "raftex/BufferFlusher.h"
#include "raftex/FileBasedWal.h"

namespace vesoft {
namespace vgraph {
namespace raftex {

BufferFlusher::BufferFlusher()
: flushThread_("Buffer flusher",
std::bind(&BufferFlusher::flushLoop, this)) {
}


BufferFlusher::~BufferFlusher() {
{
std::lock_guard<std::mutex> g(buffersLock_);
stopped_ = true;
}

bufferReadyCV_.notify_one();

flushThread_.join();
CHECK(buffers_.empty());
}


bool BufferFlusher::flushBuffer(std::shared_ptr<FileBasedWal> wal,
BufferPtr buffer) {
{
std::lock_guard<std::mutex> g(buffersLock_);

if (stopped_) {
LOG(ERROR) << "Buffer flusher has stopped";
return false;
}

buffers_.emplace(std::move(wal), std::move(buffer));
}

// Notify the loop thread
bufferReadyCV_.notify_one();

return true;
}


void BufferFlusher::flushLoop() {
LOG(INFO) << "Buffer flusher loop started";

while(true) {
decltype(buffers_)::value_type bufferPair;
{
std::unique_lock<std::mutex> g(buffersLock_);
if (buffers_.empty()) {
if (stopped_) {
VLOG(1) << "The buffer flusher has stopped,"
" so exiting the flush loop";
break;
}
// Otherwise need to wait
bufferReadyCV_.wait(g, [this] {
return !buffers_.empty() || stopped_;
});
} else {
bufferPair = std::move(buffers_.front());
buffers_.pop();

bufferPair.first->flushBuffer(bufferPair.second);
}
}
}

LOG(INFO) << "Buffer flusher loop finished";
}

} // namespace raftex
} // namespace vgraph
} // namespace vesoft

47 changes: 47 additions & 0 deletions raftex/BufferFlusher.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved
*
* This source code is licensed under Apache 2.0 License
* (found in the LICENSE.Apache file in the root directory)
*/

#ifndef RAFTEX_BUFFERFLUSHER_H_
#define RAFTEX_BUFFERFLUSHER_H_

#include "base/Base.h"
#include "thread/NamedThread.h"
#include "raftex/InMemoryLogBuffer.h"

namespace vesoft {
namespace vgraph {
namespace raftex {

class FileBasedWal;

class BufferFlusher final {
public:
BufferFlusher();
~BufferFlusher();

bool flushBuffer(std::shared_ptr<FileBasedWal> wal, BufferPtr buffer);

private:
void flushLoop();

private:
bool stopped_{false};

std::queue<
std::pair<std::shared_ptr<FileBasedWal>, BufferPtr>
> buffers_;
std::mutex buffersLock_;
std::condition_variable bufferReadyCV_;

thread::NamedThread flushThread_;
};

} // namespace raftex
} // namespace vgraph
} // namespace vesoft

#endif // RAFTEX_BUFFERFLUSHER_H_

3 changes: 3 additions & 0 deletions raftex/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
add_library(
raftex_obj OBJECT
BufferFlusher.cpp
InMemoryLogBuffer.cpp
FileBasedWalIterator.cpp
FileBasedWal.cpp
)
add_dependencies(raftex_obj common)
Expand Down
Loading

0 comments on commit 965f3c3

Please sign in to comment.