From e6a5cfddb70824da6ccb9bd1b5759ce139fa6fd3 Mon Sep 17 00:00:00 2001 From: zanmato1984 Date: Mon, 16 Sep 2019 17:50:07 +0800 Subject: [PATCH 1/3] Combine raft service and flash service --- dbms/src/Flash/FlashService.cpp | 7 ++++++- dbms/src/Flash/FlashService.h | 4 +--- dbms/src/Interpreters/Context.cpp | 4 ++-- dbms/src/Interpreters/Context.h | 2 +- dbms/src/Raft/RaftService.cpp | 27 ++++----------------------- dbms/src/Raft/RaftService.h | 6 +----- dbms/src/Server/Server.cpp | 5 +---- 7 files changed, 16 insertions(+), 39 deletions(-) diff --git a/dbms/src/Flash/FlashService.cpp b/dbms/src/Flash/FlashService.cpp index b2a473d083a..9810c649c8f 100644 --- a/dbms/src/Flash/FlashService.cpp +++ b/dbms/src/Flash/FlashService.cpp @@ -15,11 +15,16 @@ extern const int NOT_IMPLEMENTED; } FlashService::FlashService(const std::string & address_, IServer & server_) - : server(server_), address(address_), log(&Logger::get("FlashService")) + : address(address_), server(server_), log(&Logger::get("FlashService")) { grpc::ServerBuilder builder; builder.AddListeningPort(address, grpc::InsecureServerCredentials()); builder.RegisterService(this); + builder.RegisterService(&server.context().getRaftService()); + + // Prevent TiKV from throwing "Received message larger than max (4404462 vs. 4194304)" error. + builder.SetMaxReceiveMessageSize(-1); + builder.SetMaxSendMessageSize(-1); // todo should set a reasonable value?? builder.SetMaxReceiveMessageSize(-1); diff --git a/dbms/src/Flash/FlashService.h b/dbms/src/Flash/FlashService.h index f208ec04db0..92b895f0137 100644 --- a/dbms/src/Flash/FlashService.h +++ b/dbms/src/Flash/FlashService.h @@ -34,10 +34,8 @@ class FlashService final : public tikvpb::Tikv::Service, public std::enable_shar std::tuple createDBContext(grpc::ServerContext * grpc_contex); private: - IServer & server; - std::string address; - + IServer & server; GRPCServerPtr grpc_server; Logger * log; diff --git a/dbms/src/Interpreters/Context.cpp b/dbms/src/Interpreters/Context.cpp index 2247dd3f04f..67a2b035caf 100644 --- a/dbms/src/Interpreters/Context.cpp +++ b/dbms/src/Interpreters/Context.cpp @@ -1402,12 +1402,12 @@ DDLWorker & Context::getDDLWorker() const return *shared->ddl_worker; } -void Context::initializeRaftService(const std::string & service_addr) +void Context::initializeRaftService() { auto lock = getLock(); if (shared->raft_service) throw Exception("Raft Service has already been initialized.", ErrorCodes::LOGICAL_ERROR); - shared->raft_service = std::make_shared(service_addr, *this); + shared->raft_service = std::make_shared(*this); } void Context::shutdownRaftService() diff --git a/dbms/src/Interpreters/Context.h b/dbms/src/Interpreters/Context.h index 61d97e7fefa..ee7772deafb 100644 --- a/dbms/src/Interpreters/Context.h +++ b/dbms/src/Interpreters/Context.h @@ -360,7 +360,7 @@ class Context void setDDLWorker(std::shared_ptr ddl_worker); DDLWorker & getDDLWorker() const; - void initializeRaftService(const std::string & service_addr); + void initializeRaftService(); void shutdownRaftService(); void createTMTContext(const std::vector & pd_addrs, const std::string & learner_key, diff --git a/dbms/src/Raft/RaftService.cpp b/dbms/src/Raft/RaftService.cpp index 651690b7489..1fdc8ce05e9 100644 --- a/dbms/src/Raft/RaftService.cpp +++ b/dbms/src/Raft/RaftService.cpp @@ -8,9 +8,8 @@ namespace DB { -RaftService::RaftService(const std::string & address_, DB::Context & db_context_) - : address(address_), - db_context(db_context_), +RaftService::RaftService(DB::Context & db_context_) + : db_context(db_context_), kvstore(db_context.getTMTContext().getKVStore()), background_pool(db_context.getBackgroundPool()), log(&Logger::get("RaftService")) @@ -18,16 +17,6 @@ RaftService::RaftService(const std::string & address_, DB::Context & db_context_ if (!db_context.getTMTContext().isInitialized()) throw Exception("TMTContext is not initialized", ErrorCodes::LOGICAL_ERROR); - grpc::ServerBuilder builder; - builder.AddListeningPort(address, grpc::InsecureServerCredentials()); - builder.RegisterService(this); - - // Prevent TiKV from throwing "Received message larger than max (4404462 vs. 4194304)" error. - builder.SetMaxReceiveMessageSize(-1); - builder.SetMaxSendMessageSize(-1); - - grpc_server = builder.BuildAndStart(); - persist_handle = background_pool.addTask([this] { return kvstore->tryPersist(); }, false); table_flush_handle = background_pool.addTask([this] { @@ -63,8 +52,6 @@ RaftService::RaftService(const std::string & address_, DB::Context & db_context_ return true; }); - LOG_INFO(log, "Raft service listening on [" << address << "]"); - { std::vector regions; kvstore->traverseRegions([®ions](RegionID, const RegionPtr & region) { @@ -119,12 +106,6 @@ RaftService::~RaftService() background_pool.removeTask(region_decode_handle); region_decode_handle = nullptr; } - - // wait 5 seconds for pending rpcs to gracefully stop - gpr_timespec deadline{5, 0, GPR_TIMESPAN}; - LOG_DEBUG(log, "Begin to shutting down grpc server"); - grpc_server->Shutdown(deadline); - grpc_server->Wait(); } grpc::Status RaftService::ApplyCommandBatch(grpc::ServerContext * grpc_context, CommandServerReaderWriter * stream) @@ -143,7 +124,7 @@ grpc::Status RaftService::ApplyCommandBatch(grpc::ServerContext * grpc_context, } catch (...) { - tryLogCurrentException(log, "gRPC ApplyCommandBatch on " + address + " error"); + tryLogCurrentException(log, "gRPC ApplyCommandBatch error"); } return grpc::Status::CANCELLED; @@ -158,7 +139,7 @@ grpc::Status RaftService::ApplySnapshot(grpc::ServerContext *, CommandServerRead } catch (...) { - tryLogCurrentException(log, "gRPC ApplyCommandBatch on " + address + " error"); + tryLogCurrentException(log, "gRPC ApplyCommandBatch error"); return grpc::Status(grpc::StatusCode::UNKNOWN, "Runtime error, check theflash log for detail."); } } diff --git a/dbms/src/Raft/RaftService.h b/dbms/src/Raft/RaftService.h index 787c5f3e035..38421f0c727 100644 --- a/dbms/src/Raft/RaftService.h +++ b/dbms/src/Raft/RaftService.h @@ -23,7 +23,7 @@ using RegionMap = std::unordered_map; class RaftService final : public enginepb::Engine::Service, public std::enable_shared_from_this, private boost::noncopyable { public: - RaftService(const std::string & address_, Context & db_context); + RaftService(Context & db_context); ~RaftService() final; @@ -37,10 +37,6 @@ class RaftService final : public enginepb::Engine::Service, public std::enable_s grpc::ServerContext * grpc_context, CommandServerReader * reader, enginepb::SnapshotDone * response) override; private: - std::string address; - - GRPCServerPtr grpc_server; - Context & db_context; KVStorePtr kvstore; diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index db3e458c724..0a57e8f8ec1 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -452,10 +452,7 @@ int Server::main(const std::vector & /*args*/) } if (need_raft_service) - { - String raft_service_addr = config().getString("raft.service_addr"); - global_context->initializeRaftService(raft_service_addr); - } + global_context->initializeRaftService(); SCOPE_EXIT({ LOG_INFO(log, "Shutting down raft service."); From 72630188f66e7f6eb7becacf5e5ce75789ab3b63 Mon Sep 17 00:00:00 2001 From: zanmato1984 Date: Mon, 16 Sep 2019 18:09:05 +0800 Subject: [PATCH 2/3] Address comment and fix build error --- dbms/src/Flash/FlashService.cpp | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/dbms/src/Flash/FlashService.cpp b/dbms/src/Flash/FlashService.cpp index 9810c649c8f..a5521b9eca9 100644 --- a/dbms/src/Flash/FlashService.cpp +++ b/dbms/src/Flash/FlashService.cpp @@ -3,7 +3,7 @@ #include #include #include -#include +#include #include namespace DB @@ -26,10 +26,6 @@ FlashService::FlashService(const std::string & address_, IServer & server_) builder.SetMaxReceiveMessageSize(-1); builder.SetMaxSendMessageSize(-1); - // todo should set a reasonable value?? - builder.SetMaxReceiveMessageSize(-1); - builder.SetMaxSendMessageSize(-1); - grpc_server = builder.BuildAndStart(); LOG_INFO(log, "Flash service listening on [" << address << "]"); From 195dba9d1e28e2fd7f3787105a10f3179d9cd2fb Mon Sep 17 00:00:00 2001 From: zanmato1984 Date: Mon, 16 Sep 2019 18:39:33 +0800 Subject: [PATCH 3/3] Update configs --- dbms/src/Server/config.xml | 5 ++++- tests/docker/config/config.xml | 3 +-- tests/docker/config/tiflash.xml | 3 +-- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/dbms/src/Server/config.xml b/dbms/src/Server/config.xml index dc6fc34f507..ced2c54b2c1 100644 --- a/dbms/src/Server/config.xml +++ b/dbms/src/Server/config.xml @@ -313,12 +313,15 @@ - 0.0.0.0:20021 /var/lib/clickhouse/kvstore /var/lib/clickhouse/regmap http://127.0.0.1:13579 + + 0.0.0.0:3930 + + @@ -24,7 +23,7 @@ - 0.0.0.0:20171 + 0.0.0.0:3930 8123 diff --git a/tests/docker/config/tiflash.xml b/tests/docker/config/tiflash.xml index beed683db12..afc6ea001e4 100644 --- a/tests/docker/config/tiflash.xml +++ b/tests/docker/config/tiflash.xml @@ -16,7 +16,6 @@ the flash - 0.0.0.0:3930 /data/kvstore pd0:2379 @@ -27,7 +26,7 @@ - 0.0.0.0:20171 + 0.0.0.0:3930 8123