diff --git a/dbms/src/Flash/FlashService.cpp b/dbms/src/Flash/FlashService.cpp index b2a473d083a..a5521b9eca9 100644 --- a/dbms/src/Flash/FlashService.cpp +++ b/dbms/src/Flash/FlashService.cpp @@ -3,7 +3,7 @@ #include #include #include -#include +#include #include namespace DB @@ -15,13 +15,14 @@ extern const int NOT_IMPLEMENTED; } FlashService::FlashService(const std::string & address_, IServer & server_) - : server(server_), address(address_), log(&Logger::get("FlashService")) + : address(address_), server(server_), log(&Logger::get("FlashService")) { grpc::ServerBuilder builder; builder.AddListeningPort(address, grpc::InsecureServerCredentials()); builder.RegisterService(this); + builder.RegisterService(&server.context().getRaftService()); - // todo should set a reasonable value?? + // Prevent TiKV from throwing "Received message larger than max (4404462 vs. 4194304)" error. builder.SetMaxReceiveMessageSize(-1); builder.SetMaxSendMessageSize(-1); diff --git a/dbms/src/Flash/FlashService.h b/dbms/src/Flash/FlashService.h index f208ec04db0..92b895f0137 100644 --- a/dbms/src/Flash/FlashService.h +++ b/dbms/src/Flash/FlashService.h @@ -34,10 +34,8 @@ class FlashService final : public tikvpb::Tikv::Service, public std::enable_shar std::tuple createDBContext(grpc::ServerContext * grpc_contex); private: - IServer & server; - std::string address; - + IServer & server; GRPCServerPtr grpc_server; Logger * log; diff --git a/dbms/src/Interpreters/Context.cpp b/dbms/src/Interpreters/Context.cpp index 2247dd3f04f..67a2b035caf 100644 --- a/dbms/src/Interpreters/Context.cpp +++ b/dbms/src/Interpreters/Context.cpp @@ -1402,12 +1402,12 @@ DDLWorker & Context::getDDLWorker() const return *shared->ddl_worker; } -void Context::initializeRaftService(const std::string & service_addr) +void Context::initializeRaftService() { auto lock = getLock(); if (shared->raft_service) throw Exception("Raft Service has already been initialized.", ErrorCodes::LOGICAL_ERROR); - shared->raft_service = std::make_shared(service_addr, *this); + shared->raft_service = std::make_shared(*this); } void Context::shutdownRaftService() diff --git a/dbms/src/Interpreters/Context.h b/dbms/src/Interpreters/Context.h index 61d97e7fefa..ee7772deafb 100644 --- a/dbms/src/Interpreters/Context.h +++ b/dbms/src/Interpreters/Context.h @@ -360,7 +360,7 @@ class Context void setDDLWorker(std::shared_ptr ddl_worker); DDLWorker & getDDLWorker() const; - void initializeRaftService(const std::string & service_addr); + void initializeRaftService(); void shutdownRaftService(); void createTMTContext(const std::vector & pd_addrs, const std::string & learner_key, diff --git a/dbms/src/Raft/RaftService.cpp b/dbms/src/Raft/RaftService.cpp index 651690b7489..1fdc8ce05e9 100644 --- a/dbms/src/Raft/RaftService.cpp +++ b/dbms/src/Raft/RaftService.cpp @@ -8,9 +8,8 @@ namespace DB { -RaftService::RaftService(const std::string & address_, DB::Context & db_context_) - : address(address_), - db_context(db_context_), +RaftService::RaftService(DB::Context & db_context_) + : db_context(db_context_), kvstore(db_context.getTMTContext().getKVStore()), background_pool(db_context.getBackgroundPool()), log(&Logger::get("RaftService")) @@ -18,16 +17,6 @@ RaftService::RaftService(const std::string & address_, DB::Context & db_context_ if (!db_context.getTMTContext().isInitialized()) throw Exception("TMTContext is not initialized", ErrorCodes::LOGICAL_ERROR); - grpc::ServerBuilder builder; - builder.AddListeningPort(address, grpc::InsecureServerCredentials()); - builder.RegisterService(this); - - // Prevent TiKV from throwing "Received message larger than max (4404462 vs. 4194304)" error. - builder.SetMaxReceiveMessageSize(-1); - builder.SetMaxSendMessageSize(-1); - - grpc_server = builder.BuildAndStart(); - persist_handle = background_pool.addTask([this] { return kvstore->tryPersist(); }, false); table_flush_handle = background_pool.addTask([this] { @@ -63,8 +52,6 @@ RaftService::RaftService(const std::string & address_, DB::Context & db_context_ return true; }); - LOG_INFO(log, "Raft service listening on [" << address << "]"); - { std::vector regions; kvstore->traverseRegions([®ions](RegionID, const RegionPtr & region) { @@ -119,12 +106,6 @@ RaftService::~RaftService() background_pool.removeTask(region_decode_handle); region_decode_handle = nullptr; } - - // wait 5 seconds for pending rpcs to gracefully stop - gpr_timespec deadline{5, 0, GPR_TIMESPAN}; - LOG_DEBUG(log, "Begin to shutting down grpc server"); - grpc_server->Shutdown(deadline); - grpc_server->Wait(); } grpc::Status RaftService::ApplyCommandBatch(grpc::ServerContext * grpc_context, CommandServerReaderWriter * stream) @@ -143,7 +124,7 @@ grpc::Status RaftService::ApplyCommandBatch(grpc::ServerContext * grpc_context, } catch (...) { - tryLogCurrentException(log, "gRPC ApplyCommandBatch on " + address + " error"); + tryLogCurrentException(log, "gRPC ApplyCommandBatch error"); } return grpc::Status::CANCELLED; @@ -158,7 +139,7 @@ grpc::Status RaftService::ApplySnapshot(grpc::ServerContext *, CommandServerRead } catch (...) { - tryLogCurrentException(log, "gRPC ApplyCommandBatch on " + address + " error"); + tryLogCurrentException(log, "gRPC ApplyCommandBatch error"); return grpc::Status(grpc::StatusCode::UNKNOWN, "Runtime error, check theflash log for detail."); } } diff --git a/dbms/src/Raft/RaftService.h b/dbms/src/Raft/RaftService.h index 787c5f3e035..38421f0c727 100644 --- a/dbms/src/Raft/RaftService.h +++ b/dbms/src/Raft/RaftService.h @@ -23,7 +23,7 @@ using RegionMap = std::unordered_map; class RaftService final : public enginepb::Engine::Service, public std::enable_shared_from_this, private boost::noncopyable { public: - RaftService(const std::string & address_, Context & db_context); + RaftService(Context & db_context); ~RaftService() final; @@ -37,10 +37,6 @@ class RaftService final : public enginepb::Engine::Service, public std::enable_s grpc::ServerContext * grpc_context, CommandServerReader * reader, enginepb::SnapshotDone * response) override; private: - std::string address; - - GRPCServerPtr grpc_server; - Context & db_context; KVStorePtr kvstore; diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index db3e458c724..0a57e8f8ec1 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -452,10 +452,7 @@ int Server::main(const std::vector & /*args*/) } if (need_raft_service) - { - String raft_service_addr = config().getString("raft.service_addr"); - global_context->initializeRaftService(raft_service_addr); - } + global_context->initializeRaftService(); SCOPE_EXIT({ LOG_INFO(log, "Shutting down raft service."); diff --git a/dbms/src/Server/config.xml b/dbms/src/Server/config.xml index dc6fc34f507..ced2c54b2c1 100644 --- a/dbms/src/Server/config.xml +++ b/dbms/src/Server/config.xml @@ -313,12 +313,15 @@ - 0.0.0.0:20021 /var/lib/clickhouse/kvstore /var/lib/clickhouse/regmap http://127.0.0.1:13579 + + 0.0.0.0:3930 + + @@ -24,7 +23,7 @@ - 0.0.0.0:20171 + 0.0.0.0:3930 8123 diff --git a/tests/docker/config/tiflash.xml b/tests/docker/config/tiflash.xml index beed683db12..afc6ea001e4 100644 --- a/tests/docker/config/tiflash.xml +++ b/tests/docker/config/tiflash.xml @@ -16,7 +16,6 @@ the flash - 0.0.0.0:3930 /data/kvstore pd0:2379 @@ -27,7 +26,7 @@ - 0.0.0.0:20171 + 0.0.0.0:3930 8123