Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate raft into storage #405

Merged
merged 5 commits into from
Jun 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -304,8 +304,8 @@ macro(nebula_link_libraries target)
${OPENSSL_CRYPTO_LIBRARY}
${KRB5_LIBRARIES}
${COMPRESSION_LIBRARIES}
${JEMALLOC_LIB}
${LIBUNWIND_LIBRARIES}
${JEMALLOC_LIB}
dl
${GETTIME_LIB}
-pthread
Expand Down
2 changes: 1 addition & 1 deletion cmake/FindNCURSES.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ find_path(

find_path(
NCURSES_INCLUDE_DIR
NAMES ncurses/curses.h
NAMES ncurses.h curses.h ncurses/curses.h
HINTS ${NCURSES_ROOT_DIR}/include
)

Expand Down
4 changes: 0 additions & 4 deletions src/common/base/EitherOr.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@

namespace nebula {

namespace { // NOLINT

using LeftType = std::true_type;
using RightType = std::false_type;

Expand All @@ -23,8 +21,6 @@ enum class State : int16_t {
RIGHT_TYPE = 2,
};

} // Anonymous namespace

static constexpr LeftType* kConstructLeft = nullptr;
static constexpr RightType* kConstructRight = nullptr;

Expand Down
1 change: 1 addition & 0 deletions src/common/network/NetworkUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ StatusOr<std::unordered_map<std::string, std::string>> NetworkUtils::listDeviceA
return dev2ipv4s;
}


bool NetworkUtils::getDynamicPortRange(uint16_t& low, uint16_t& high) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does return value use StatusOr?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not quite understand why we need StatusOr here

FILE* pipe = popen("cat /proc/sys/net/ipv4/ip_local_port_range", "r");
if (!pipe) {
Expand Down
44 changes: 22 additions & 22 deletions src/common/test/ServerContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,39 +23,39 @@ struct ServerContext {
if (thread_ != nullptr) {
thread_->join();
}
KVStore_ = nullptr;
kvStore_ = nullptr;
server_ = nullptr;
thread_ = nullptr;
VLOG(3) << "~ServerContext";
}


void mockCommon(const std::string& name,
uint16_t port,
std::shared_ptr<apache::thrift::ServerInterface> handler) {
server_ = std::make_unique<apache::thrift::ThriftServer>();
server_->setInterface(std::move(handler));
server_->setPort(port);
thread_ = std::make_unique<thread::NamedThread>(name, [this, name] {
server_->serve();
LOG(INFO) << "The " << name << " server has stopped";
});

while (!server_->getServeEventBase() ||
!server_->getServeEventBase()->isRunning()) {
usleep(10000);
}
port_ = server_->getAddress().getPort();
}


std::unique_ptr<apache::thrift::ThriftServer> server_{nullptr};
std::unique_ptr<thread::NamedThread> thread_{nullptr};
// To keep meta and storage's KVStore
std::unique_ptr<kvstore::KVStore> KVStore_{nullptr};
std::unique_ptr<kvstore::KVStore> kvStore_{nullptr};
uint16_t port_{0};
};

static void mockCommon(test::ServerContext *sc,
const std::string &name,
uint16_t port,
std::shared_ptr<apache::thrift::ServerInterface> handler) {
if (nullptr == sc) {
LOG(ERROR) << "ServerContext is nullptr";
return;
}
sc->server_ = std::make_unique<apache::thrift::ThriftServer>();
sc->server_->setInterface(std::move(handler));
sc->server_->setPort(port);
sc->thread_ = std::make_unique<thread::NamedThread>(name, [&]() {
sc->server_->serve();
LOG(INFO) << "Stop the server...";
});
while (!sc->server_->getServeEventBase() ||
!sc->server_->getServeEventBase()->isRunning()) {
}
sc->port_ = sc->server_->getAddress().getPort();
}

} // namespace test
} // namespace nebula
Expand Down
2 changes: 1 addition & 1 deletion src/common/test/TestServerContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class TestServer final : public apache::thrift::ServerInterface {
TEST(ServerContext, mockCommon) {
auto sc = std::make_unique<ServerContext>();
auto handler = std::make_shared<TestServer>();
test::mockCommon(sc.get(), "test", 0, handler);
sc->mockCommon("test", 0, handler);
}

} // namespace test
Expand Down
11 changes: 8 additions & 3 deletions src/daemons/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,13 @@ add_executable(
$<TARGET_OBJECTS:meta_client>
$<TARGET_OBJECTS:meta_thrift_obj>
$<TARGET_OBJECTS:common_thrift_obj>
$<TARGET_OBJECTS:thrift_obj>
$<TARGET_OBJECTS:raftex_obj>
$<TARGET_OBJECTS:raftex_thrift_obj>
$<TARGET_OBJECTS:wal_obj>
$<TARGET_OBJECTS:dataman_obj>
$<TARGET_OBJECTS:schema_obj>
$<TARGET_OBJECTS:base_obj>
$<TARGET_OBJECTS:thrift_obj>
$<TARGET_OBJECTS:thread_obj>
$<TARGET_OBJECTS:time_obj>
$<TARGET_OBJECTS:fs_obj>
Expand Down Expand Up @@ -77,9 +80,11 @@ add_executable(
$<TARGET_OBJECTS:meta_client>
$<TARGET_OBJECTS:meta_thrift_obj>
$<TARGET_OBJECTS:common_thrift_obj>
$<TARGET_OBJECTS:thrift_obj>
$<TARGET_OBJECTS:network_obj>
$<TARGET_OBJECTS:raftex_obj>
$<TARGET_OBJECTS:raftex_thrift_obj>
$<TARGET_OBJECTS:wal_obj>
$<TARGET_OBJECTS:base_obj>
$<TARGET_OBJECTS:thrift_obj>
$<TARGET_OBJECTS:network_obj>
$<TARGET_OBJECTS:thread_obj>
$<TARGET_OBJECTS:time_obj>
Expand Down
24 changes: 19 additions & 5 deletions src/daemons/MetaDaemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
#include "webservice/WebService.h"
#include "network/NetworkUtils.h"
#include "process/ProcessUtils.h"
#include "thread/GenericThreadPool.h"
#include "kvstore/PartManager.h"
#include "kvstore/NebulaStore.h"

using nebula::ProcessUtils;
using nebula::Status;
Expand All @@ -23,6 +25,8 @@ DEFINE_string(peers, "", "It is a list of IPs split by comma,"
"the ips number equals replica number."
"If empty, it means replica is 1");
DEFINE_string(local_ip, "", "Local ip speicified for NetworkUtils::getLocalIP");
DEFINE_int32(num_workers, 4, "Number of worker threads");
DEFINE_int32(num_io_threads, 16, "Number of IO threads");
DECLARE_string(part_man_type);

DEFINE_string(pid_file, "pids/nebula-metad.pid", "File to hold the process id");
Expand Down Expand Up @@ -89,7 +93,7 @@ int main(int argc, char *argv[]) {
LOG(ERROR) << "Bad local host addr, status:" << hostAddrRet.status();
return EXIT_FAILURE;
}
auto& localHost = hostAddrRet.value();
auto& localhost = hostAddrRet.value();

auto peersRet = nebula::network::NetworkUtils::toHosts(FLAGS_peers);
if (!peersRet.ok()) {
Expand All @@ -108,22 +112,32 @@ int main(int argc, char *argv[]) {
// The meta server has only one space, one part.
partMan->addPart(0, 0, std::move(peersRet.value()));

// Generic thread pool
auto workers = std::make_shared<nebula::thread::GenericThreadPool>();
sherman-the-tank marked this conversation as resolved.
Show resolved Hide resolved
workers->start(FLAGS_num_workers);

// folly IOThreadPoolExecutor
auto ioPool = std::make_shared<folly::IOThreadPoolExecutor>(FLAGS_num_io_threads);
sherman-the-tank marked this conversation as resolved.
Show resolved Hide resolved

nebula::kvstore::KVOptions options;
options.local_ = localHost;
options.dataPaths_ = {FLAGS_data_path};
options.partMan_ = std::move(partMan);
std::unique_ptr<nebula::kvstore::KVStore> kvstore(
nebula::kvstore::KVStore::instance(std::move(options)));
std::unique_ptr<nebula::kvstore::KVStore> kvstore =
std::make_unique<nebula::kvstore::NebulaStore>(std::move(options),
ioPool,
workers,
localhost);

auto handler = std::make_shared<nebula::meta::MetaServiceHandler>(kvstore.get());

nebula::operator<<(operator<<(LOG(INFO), "The meta deamon start on "), localHost);
nebula::operator<<(operator<<(LOG(INFO), "The meta deamon start on "), localhost);
try {
gServer = std::make_unique<apache::thrift::ThriftServer>();
gServer->setInterface(std::move(handler));
gServer->setPort(FLAGS_port);
gServer->setReusePort(FLAGS_reuse_port);
gServer->setIdleTimeout(std::chrono::seconds(0)); // No idle timeout on client connection
gServer->setIOThreadPool(ioPool);
gServer->serve(); // Will wait until the server shuts down
} catch (const std::exception &e) {
LOG(ERROR) << "Exception thrown: " << e.what();
Expand Down
70 changes: 54 additions & 16 deletions src/daemons/StorageDaemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
#include "base/Base.h"
#include <thrift/lib/cpp2/server/ThriftServer.h>
#include "network/NetworkUtils.h"
#include "thread/GenericThreadPool.h"
#include "storage/StorageServiceHandler.h"
#include "storage/StorageHttpHandler.h"
#include "kvstore/KVStore.h"
#include "kvstore/NebulaStore.h"
#include "kvstore/PartManager.h"
#include "process/ProcessUtils.h"
#include "storage/test/TestUtils.h"
Expand All @@ -21,13 +22,16 @@ DEFINE_int32(port, 44500, "Storage daemon listening port");
DEFINE_bool(reuse_port, true, "Whether to turn on the SO_REUSEPORT option");
DEFINE_string(data_path, "", "Root data path, multi paths should be split by comma."
"For rocksdb engine, one path one instance.");
DEFINE_string(local_ip, "", "Local ip speicified for NetworkUtils::getLocalIP");
DEFINE_bool(mock_server, true, "start mock server");
DEFINE_bool(daemonize, true, "Whether to run the process as a daemon");
DEFINE_string(pid_file, "pids/nebula-storaged.pid", "File to hold the process id");
DEFINE_string(meta_server_addrs, "", "list of meta server addresses,"
"the format looks like ip1:port1, ip2:port2, ip3:port3");
DEFINE_int32(io_handlers, 10, "io handlers");
DEFINE_string(store_type, "nebula",
"Which type of KVStore to be used by the storage daemon."
" Options can be \"nebula\", \"hbase\", etc.");
DEFINE_int32(num_workers, 4, "Number of worker threads");
DEFINE_int32(num_io_threads, 16, "Number of IO threads");

using nebula::Status;
using nebula::HostAddr;
Expand All @@ -44,6 +48,33 @@ static void signalHandler(int sig);
static Status setupSignalHandler();


std::unique_ptr<nebula::kvstore::KVStore> getStoreInstance(
sherman-the-tank marked this conversation as resolved.
Show resolved Hide resolved
HostAddr localhost,
std::vector<std::string> paths,
std::shared_ptr<folly::IOThreadPoolExecutor> ioPool,
std::shared_ptr<nebula::thread::GenericThreadPool> workers,
nebula::meta::MetaClient* metaClient) {
nebula::kvstore::KVOptions options;
options.dataPaths_ = std::move(paths);
options.partMan_ = std::make_unique<nebula::kvstore::MetaServerBasedPartManager>(
localhost,
metaClient);

if (FLAGS_store_type == "nebula") {
return std::make_unique<nebula::kvstore::NebulaStore>(std::move(options),
ioPool,
workers,
localhost);
} else if (FLAGS_store_type == "hbase") {
LOG(FATAL) << "HBase store has not been implemented";
} else {
LOG(FATAL) << "Unknown store type \"" << FLAGS_store_type << "\"";
}

return nullptr;
}


int main(int argc, char *argv[]) {
google::SetVersionString(nebula::versionString());
folly::init(&argc, &argv, true);
Expand Down Expand Up @@ -81,18 +112,17 @@ int main(int argc, char *argv[]) {
return EXIT_FAILURE;
}

auto result = nebula::network::NetworkUtils::getLocalIP(FLAGS_local_ip);
auto result = nebula::network::NetworkUtils::getLocalIP();
if (!result.ok()) {
LOG(ERROR) << "Get localIp failed, ip " << FLAGS_local_ip
<< ", status:" << result.status();
LOG(ERROR) << "Get localIp failed, status:" << result.status();
return EXIT_FAILURE;
}
auto hostRet = nebula::network::NetworkUtils::toHostAddr(result.value(), FLAGS_port);
if (!hostRet.ok()) {
LOG(ERROR) << "Bad local host addr, status:" << hostRet.status();
return EXIT_FAILURE;
}
auto& localHost = hostRet.value();
auto& localhost = hostRet.value();
auto metaAddrsRet = nebula::network::NetworkUtils::toHosts(FLAGS_meta_server_addrs);
if (!metaAddrsRet.ok() || metaAddrsRet.value().empty()) {
LOG(ERROR) << "Can't get metaServer address, status:" << metaAddrsRet.status()
Expand All @@ -110,19 +140,25 @@ int main(int argc, char *argv[]) {
return EXIT_FAILURE;
}

auto ioThreadPool = std::make_shared<folly::IOThreadPoolExecutor>(FLAGS_io_handlers);
// Generic thread pool
auto workers = std::make_shared<nebula::thread::GenericThreadPool>();
sherman-the-tank marked this conversation as resolved.
Show resolved Hide resolved
workers->start(FLAGS_num_workers);

// folly IOThreadPoolExecutor
auto ioThreadPool = std::make_shared<folly::IOThreadPoolExecutor>(FLAGS_num_io_threads);
sherman-the-tank marked this conversation as resolved.
Show resolved Hide resolved

// Meta client
auto metaClient = std::make_unique<nebula::meta::MetaClient>(ioThreadPool,
std::move(metaAddrsRet.value()),
true);
metaClient->init();

nebula::kvstore::KVOptions options;
options.local_ = localHost;
options.dataPaths_ = std::move(paths);
options.partMan_ = std::make_unique<nebula::kvstore::MetaServerBasedPartManager>(
options.local_, metaClient.get());
std::unique_ptr<nebula::kvstore::KVStore> kvstore(
nebula::kvstore::KVStore::instance(std::move(options)));
std::unique_ptr<KVStore> kvstore = getStoreInstance(localhost,
std::move(paths),
ioThreadPool,
workers,
metaClient.get());

auto schemaMan = nebula::meta::SchemaManager::create();
schemaMan->init(metaClient.get());

Expand All @@ -136,16 +172,18 @@ int main(int argc, char *argv[]) {
LOG(ERROR) << "Failed to start web service: " << status;
return EXIT_FAILURE;
}

// Setup the signal handlers
status = setupSignalHandler();
if (!status.ok()) {
LOG(ERROR) << status;
return EXIT_FAILURE;
}

// Thrift server
auto handler = std::make_shared<StorageServiceHandler>(kvstore.get(), std::move(schemaMan));
try {
nebula::operator<<(operator<<(LOG(INFO), "The storage deamon start on "), localHost);
nebula::operator<<(operator<<(LOG(INFO), "The storage deamon start on "), localhost);
gServer = std::make_unique<apache::thrift::ThriftServer>();
gServer->setInterface(std::move(handler));
gServer->setPort(FLAGS_port);
Expand Down
Loading