Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix crash problem when stop process. #705

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 17 additions & 5 deletions src/daemons/MetaDaemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,16 @@ int main(int argc, char *argv[]) {
// The meta server has only one space, one part.
partMan->addPart(0, 0, std::move(peersRet.value()));

// folly IOThreadPoolExecutor
// notice: ioThreadPool and acceptThreadPool will stopped when NebulaStore free raftservice
auto ioPool = std::make_shared<folly::IOThreadPoolExecutor>(FLAGS_num_io_threads);
auto acceptThreadPool = std::make_shared<folly::IOThreadPoolExecutor>(1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make it configurable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, I think about this problem, but I get the result is there is no need configure, 1 thread is too much, because accept thread pool just accept connection from client which is our process like graphd or meta, and they a long connection, so the performance is not high, what do you think about?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Why MetaDeamon share the acceptors with NebulaStore? Same question with StorageDeamon.
  2. Why do we create acceptors with 1 worker? The ThriftServer create acceptors with 1 worker by default after all.
  3. Can acceptors with 1 worker meet our performance requirements?


nebula::kvstore::KVOptions options;
options.dataPaths_ = {FLAGS_data_path};
options.partMan_ = std::move(partMan);
auto kvstore = std::make_unique<nebula::kvstore::NebulaStore>(std::move(options),
ioPool,
auto kvstore = std::make_unique<nebula::kvstore::NebulaStore>(std::move(options),
ioPool,
acceptThreadPool,
localhost);
if (!(kvstore->init())) {
LOG(ERROR) << "nebula store init failed";
Expand Down Expand Up @@ -154,7 +156,15 @@ int main(int argc, char *argv[]) {
gServer->setReusePort(FLAGS_reuse_port);
gServer->setIdleTimeout(std::chrono::seconds(0)); // No idle timeout on client connection
gServer->setIOThreadPool(ioPool);
gServer->setAcceptExecutor(acceptThreadPool);

// set false to stop that gServer stop all io thread when call gServer->stop();
// because NebulaStore's raft part dependencies the io thread pool.
gServer->setStopWorkersOnStopListening(false);
gServer->serve(); // Will wait until the server shuts down

// must stop the cpu worker first, because kvstore will free before gServer.
gServer->getThreadManager()->join();
} catch (const std::exception &e) {
nebula::WebService::stop();
LOG(ERROR) << "Exception thrown: " << e.what();
Expand All @@ -169,7 +179,7 @@ int main(int argc, char *argv[]) {

Status setupSignalHandler() {
return nebula::SignalHandler::install(
{SIGINT, SIGTERM},
{SIGINT, SIGTERM},
[](nebula::SignalHandler::GeneralSignalInfo *info) {
signalHandler(info->sig());
});
Expand All @@ -181,7 +191,9 @@ void signalHandler(int sig) {
case SIGINT:
case SIGTERM:
FLOG_INFO("Signal %d(%s) received, stopping this server", sig, ::strsignal(sig));
gServer->stop();
if (gServer) {
gServer->stop();
}
break;
default:
FLOG_ERROR("Signal %d(%s) received but ignored", sig, ::strsignal(sig));
Expand Down
21 changes: 18 additions & 3 deletions src/daemons/StorageDaemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ std::unique_ptr<nebula::kvstore::KVStore> getStoreInstance(
HostAddr localhost,
std::vector<std::string> paths,
std::shared_ptr<folly::IOThreadPoolExecutor> ioPool,
std::shared_ptr<folly::IOThreadPoolExecutor> acceptPool,
nebula::meta::MetaClient* metaClient,
nebula::meta::SchemaManager* schemaMan) {
nebula::kvstore::KVOptions options;
Expand All @@ -73,6 +74,7 @@ std::unique_ptr<nebula::kvstore::KVStore> getStoreInstance(
if (FLAGS_store_type == "nebula") {
auto nbStore = std::make_unique<nebula::kvstore::NebulaStore>(std::move(options),
ioPool,
acceptPool,
localhost);
if (!(nbStore->init())) {
LOG(ERROR) << "nebula store init failed";
Expand Down Expand Up @@ -156,11 +158,12 @@ int main(int argc, char *argv[]) {
return EXIT_FAILURE;
}

// notice: ioThreadPool and acceptThreadPool will stopped when NebulaStore free raftservice
auto ioThreadPool = std::make_shared<folly::IOThreadPoolExecutor>(FLAGS_num_io_threads);
auto acceptThreadPool = std::make_shared<folly::IOThreadPoolExecutor>(1);

// Meta client
auto metaClient = std::make_unique<nebula::meta::MetaClient>(ioThreadPool,
std::move(metaAddrsRet.value()),
auto metaClient = std::make_unique<nebula::meta::MetaClient>(std::move(metaAddrsRet.value()),
localhost,
true);
if (!metaClient->waitForMetadReady()) {
Expand All @@ -175,6 +178,7 @@ int main(int argc, char *argv[]) {
std::unique_ptr<KVStore> kvstore = getStoreInstance(localhost,
std::move(paths),
ioThreadPool,
acceptThreadPool,
metaClient.get(),
schemaMan.get());

Expand Down Expand Up @@ -220,9 +224,18 @@ int main(int argc, char *argv[]) {
gServer->setReusePort(FLAGS_reuse_port);
gServer->setIdleTimeout(std::chrono::seconds(0)); // No idle timeout on client connection
gServer->setIOThreadPool(ioThreadPool);
gServer->setAcceptExecutor(acceptThreadPool);
gServer->setNumCPUWorkerThreads(FLAGS_num_worker_threads);
gServer->setCPUWorkerThreadName("executor");

// set false to stop that gServer stop all io thread when call gServer->stop();
// because NebulaStore's raft part dependencies the io thread pool.
gServer->setStopWorkersOnStopListening(false);

gServer->serve(); // Will wait until the server shuts down

// must stop the cpu worker first, because kvstore will free before gServer.
gServer->getThreadManager()->join();
} catch (const std::exception& e) {
nebula::WebService::stop();
LOG(ERROR) << "Start thrift server failed, error:" << e.what();
Expand All @@ -249,7 +262,9 @@ void signalHandler(int sig) {
case SIGINT:
case SIGTERM:
FLOG_INFO("Signal %d(%s) received, stopping this server", sig, ::strsignal(sig));
gServer->stop();
if (gServer) {
gServer->stop();
}
break;
default:
FLOG_ERROR("Signal %d(%s) received but ignored", sig, ::strsignal(sig));
Expand Down
2 changes: 1 addition & 1 deletion src/graph/ExecutionEngine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ Status ExecutionEngine::init(std::shared_ptr<folly::IOThreadPoolExecutor> ioExec
if (!addrs.ok()) {
return addrs.status();
}
metaClient_ = std::make_unique<meta::MetaClient>(ioExecutor, std::move(addrs.value()));
metaClient_ = std::make_unique<meta::MetaClient>(std::move(addrs.value()));
// load data try 3 time
bool loadDataOk = metaClient_->waitForMetadReady(3);
if (!loadDataOk) {
Expand Down
4 changes: 1 addition & 3 deletions src/graph/test/TestEnv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ void TestEnv::SetUp() {
FLAGS_meta_server_addrs = folly::stringPrintf("127.0.0.1:%d", metaServerPort());

// Create storageServer
auto threadPool = std::make_shared<folly::IOThreadPoolExecutor>(1);
auto addrsRet
= network::NetworkUtils::toHosts(folly::stringPrintf("127.0.0.1:%d", metaServerPort()));
CHECK(addrsRet.ok()) << addrsRet.status();
Expand All @@ -45,8 +44,7 @@ void TestEnv::SetUp() {
LOG(ERROR) << "Bad local host addr, status:" << hostRet.status();
}
auto& localhost = hostRet.value();
mClient_ = std::make_unique<meta::MetaClient>(threadPool,
std::move(addrsRet.value()),
mClient_ = std::make_unique<meta::MetaClient>(std::move(addrsRet.value()),
localhost,
true);
auto r = mClient_->addHosts({localhost}).get();
Expand Down
3 changes: 2 additions & 1 deletion src/kvstore/NebulaStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ namespace nebula {
namespace kvstore {

NebulaStore::~NebulaStore() {
// we must stop worker before raft service, because raftservice will stop io thread pool
workers_->stop();
workers_->wait();
LOG(INFO) << "Stop the raft service...";
Expand All @@ -50,7 +51,7 @@ bool NebulaStore::init() {
LOG(INFO) << "Start the raft service...";
workers_ = std::make_shared<thread::GenericThreadPool>();
workers_->start(FLAGS_num_workers);
raftService_ = raftex::RaftexService::createService(ioPool_, raftAddr_.second);
raftService_ = raftex::RaftexService::createService(ioPool_, acceptPool_, raftAddr_.second);
if (!raftService_->start()) {
LOG(ERROR) << "Start the raft service failed";
return false;
Expand Down
3 changes: 3 additions & 0 deletions src/kvstore/NebulaStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@ class NebulaStore : public KVStore, public Handler {
public:
NebulaStore(KVOptions options,
std::shared_ptr<folly::IOThreadPoolExecutor> ioPool,
std::shared_ptr<folly::IOThreadPoolExecutor> acceptPool,
HostAddr serviceAddr)
: ioPool_(ioPool)
, acceptPool_(acceptPool)
, storeSvcAddr_(serviceAddr)
, raftAddr_(getRaftAddr(serviceAddr))
, options_(std::move(options)) {
Expand Down Expand Up @@ -187,6 +189,7 @@ class NebulaStore : public KVStore, public Handler {
std::unordered_map<GraphSpaceID, std::shared_ptr<SpacePartInfo>> spaces_;

std::shared_ptr<folly::IOThreadPoolExecutor> ioPool_;
std::shared_ptr<folly::IOThreadPoolExecutor> acceptPool_;
std::shared_ptr<thread::GenericThreadPool> workers_;
HostAddr storeSvcAddr_;
HostAddr raftAddr_;
Expand Down
16 changes: 10 additions & 6 deletions src/kvstore/raftex/RaftexService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ namespace raftex {
*
******************************************************/
std::shared_ptr<RaftexService> RaftexService::createService(
std::shared_ptr<folly::IOThreadPoolExecutor> pool,
std::shared_ptr<folly::IOThreadPoolExecutor> ioPool,
std::shared_ptr<folly::IOThreadPoolExecutor> acceptPool,
uint16_t port) {
auto svc = std::shared_ptr<RaftexService>(new RaftexService());
CHECK(svc != nullptr) << "Failed to create a raft service";
Expand All @@ -27,7 +28,7 @@ std::shared_ptr<RaftexService> RaftexService::createService(
CHECK(svc->server_ != nullptr) << "Failed to create a thrift server";
svc->server_->setInterface(svc);

svc->initThriftServer(pool, port);
svc->initThriftServer(ioPool, acceptPool, port);
return svc;
}

Expand Down Expand Up @@ -58,12 +59,15 @@ void RaftexService::waitUntilReady() {
}


void RaftexService::initThriftServer(std::shared_ptr<folly::IOThreadPoolExecutor> pool,
uint16_t port) {
void RaftexService::initThriftServer(std::shared_ptr<folly::IOThreadPoolExecutor> ioPool,
std::shared_ptr<folly::IOThreadPoolExecutor> acceptPool,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

alignment

uint16_t port) {
LOG(INFO) << "Init thrift server for raft service.";
server_->setPort(port);
if (pool != nullptr) {
server_->setIOThreadPool(pool);
if (ioPool != nullptr && acceptPool != nullptr) {
server_->setIOThreadPool(ioPool);
server_->setAcceptExecutor(acceptPool);
server_->setStopWorkersOnStopListening(false);
}
}

Expand Down
7 changes: 5 additions & 2 deletions src/kvstore/raftex/RaftexService.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ class IOThreadPoolObserver;
class RaftexService : public cpp2::RaftexServiceSvIf {
public:
static std::shared_ptr<RaftexService> createService(
std::shared_ptr<folly::IOThreadPoolExecutor> pool,
std::shared_ptr<folly::IOThreadPoolExecutor> ioPool,
std::shared_ptr<folly::IOThreadPoolExecutor> acceptPool,
uint16_t port = 0);
virtual ~RaftexService();

Expand All @@ -47,7 +48,9 @@ class RaftexService : public cpp2::RaftexServiceSvIf {
void removePartition(std::shared_ptr<RaftPart> part);

private:
void initThriftServer(std::shared_ptr<folly::IOThreadPoolExecutor> pool, uint16_t port = 0);
void initThriftServer(std::shared_ptr<folly::IOThreadPoolExecutor> ioPool,
std::shared_ptr<folly::IOThreadPoolExecutor> acceptPool,
uint16_t port = 0);
bool setup();
void serve();

Expand Down
2 changes: 1 addition & 1 deletion src/kvstore/raftex/test/RaftexTestBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ void setupRaft(

// Set up services
for (int i = 0; i < numCopies; ++i) {
services.emplace_back(RaftexService::createService(nullptr));
services.emplace_back(RaftexService::createService(nullptr, nullptr));
if (!services.back()->start())
return;
uint16_t port = services.back()->getServerPort();
Expand Down
13 changes: 12 additions & 1 deletion src/kvstore/test/NebulaStoreTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ DECLARE_uint32(heartbeat_interval);
namespace nebula {
namespace kvstore {

auto ioThreadPool = std::make_shared<folly::IOThreadPoolExecutor>(4);

template<typename T>
void dump(const std::vector<T>& v) {
Expand Down Expand Up @@ -56,8 +55,13 @@ TEST(NebulaStoreTest, SimpleTest) {
options.dataPaths_ = std::move(paths);
options.partMan_ = std::move(partMan);
HostAddr local = {0, 0};

auto ioThreadPool = std::make_shared<folly::IOThreadPoolExecutor>(4);
auto acceptThreadPool = std::make_shared<folly::IOThreadPoolExecutor>(1);

auto store = std::make_unique<NebulaStore>(std::move(options),
ioThreadPool,
acceptThreadPool,
local);
store->init();
sleep(1);
Expand Down Expand Up @@ -155,8 +159,13 @@ TEST(NebulaStoreTest, PartsTest) {
options.dataPaths_ = std::move(paths);
options.partMan_ = std::move(partMan);
HostAddr local = {0, 0};

auto ioThreadPool = std::make_shared<folly::IOThreadPoolExecutor>(4);
auto acceptThreadPool = std::make_shared<folly::IOThreadPoolExecutor>(1);

auto store = std::make_unique<NebulaStore>(std::move(options),
ioThreadPool,
acceptThreadPool,
local);
store->init();
auto check = [&](GraphSpaceID spaceId) {
Expand Down Expand Up @@ -244,6 +253,7 @@ TEST(NebulaStoreTest, ThreeCopiesTest) {
const std::string& path) -> std::unique_ptr<NebulaStore> {
LOG(INFO) << "Start nebula store on " << peers[index];
auto sIoThreadPool = std::make_shared<folly::IOThreadPoolExecutor>(4);
auto sAcceptThreadPool = std::make_shared<folly::IOThreadPoolExecutor>(1);
auto partMan = std::make_unique<MemPartManager>();
for (auto partId = 0; partId < 3; partId++) {
PartMeta pm;
Expand All @@ -260,6 +270,7 @@ TEST(NebulaStoreTest, ThreeCopiesTest) {
HostAddr local = peers[index];
return std::make_unique<NebulaStore>(std::move(options),
sIoThreadPool,
sAcceptThreadPool,
local);
};
int32_t replicas = 3;
Expand Down
3 changes: 3 additions & 0 deletions src/meta/MetaServiceHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ class MetaServiceHandler final : public cpp2::MetaServiceSvIf {
explicit MetaServiceHandler(kvstore::KVStore* kv)
: kvstore_(kv) {}

~MetaServiceHandler() {
LOG(INFO) << "~MetaServiceHandler";
}
/**
* Parts distribution related operations.
* */
Expand Down
10 changes: 6 additions & 4 deletions src/meta/client/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,18 @@

DEFINE_int32(load_data_interval_secs, 2 * 60, "Load data interval");
DEFINE_int32(heartbeat_interval_secs, 10, "Heartbeat interval");
DEFINE_int32(meta_client_io_thread_num, 2, "meta client io thread number");

namespace nebula {
namespace meta {

MetaClient::MetaClient(std::shared_ptr<folly::IOThreadPoolExecutor> ioThreadPool,
std::vector<HostAddr> addrs,
MetaClient::MetaClient(std::vector<HostAddr> addrs,
HostAddr localHost,
bool sendHeartBeat)
: ioThreadPool_(ioThreadPool)
, addrs_(std::move(addrs))
: addrs_(std::move(addrs))
, localHost_(localHost)
, sendHeartBeat_(sendHeartBeat) {
ioThreadPool_ = std::make_unique<folly::IOThreadPoolExecutor>(FLAGS_meta_client_io_thread_num);
CHECK(ioThreadPool_ != nullptr) << "IOThreadPool is required";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't seems necessary anymore.

CHECK(!addrs_.empty())
<< "No meta server address is specified. Meta server is required";
Expand All @@ -37,6 +37,8 @@ MetaClient::MetaClient(std::shared_ptr<folly::IOThreadPoolExecutor> ioThreadPool
MetaClient::~MetaClient() {
bgThread_.stop();
bgThread_.wait();

ioThreadPool_->stop();
VLOG(3) << "~MetaClient";
}

Expand Down
5 changes: 2 additions & 3 deletions src/meta/client/MetaClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,7 @@ class MetaChangedListener {

class MetaClient {
public:
explicit MetaClient(std::shared_ptr<folly::IOThreadPoolExecutor> ioThreadPool,
std::vector<HostAddr> addrs,
explicit MetaClient(std::vector<HostAddr> addrs,
HostAddr localHost = HostAddr(0, 0),
bool sendHeartBeat = false);

Expand Down Expand Up @@ -259,7 +258,7 @@ class MetaClient {
const LocalCache& localCache);

private:
std::shared_ptr<folly::IOThreadPoolExecutor> ioThreadPool_;
std::unique_ptr<folly::IOThreadPoolExecutor> ioThreadPool_{nullptr};
std::shared_ptr<thrift::ThriftClientManager<meta::cpp2::MetaServiceAsyncClient>> clientsMan_;

LocalCache localCache_;
Expand Down
Loading