Skip to content

Commit

Permalink
FLASH-483: Combine raft service and flash service (#235)
Browse files Browse the repository at this point in the history
* Combine raft service and flash service

* Address comment and fix build error

* Update configs
  • Loading branch information
zanmato1984 authored and windtalker committed Sep 16, 2019
1 parent 1ccfbd4 commit df07939
Show file tree
Hide file tree
Showing 10 changed files with 20 additions and 46 deletions.
7 changes: 4 additions & 3 deletions dbms/src/Flash/FlashService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#include <Core/Types.h>
#include <Flash/BatchCommandsHandler.h>
#include <Flash/CoprocessorHandler.h>
#include <grpcpp/security/server_credentials.h>
#include <Raft/RaftService.h>
#include <grpcpp/server_builder.h>

namespace DB
Expand All @@ -15,13 +15,14 @@ extern const int NOT_IMPLEMENTED;
}

FlashService::FlashService(const std::string & address_, IServer & server_)
: server(server_), address(address_), log(&Logger::get("FlashService"))
: address(address_), server(server_), log(&Logger::get("FlashService"))
{
grpc::ServerBuilder builder;
builder.AddListeningPort(address, grpc::InsecureServerCredentials());
builder.RegisterService(this);
builder.RegisterService(&server.context().getRaftService());

// todo should set a reasonable value??
// Prevent TiKV from throwing "Received message larger than max (4404462 vs. 4194304)" error.
builder.SetMaxReceiveMessageSize(-1);
builder.SetMaxSendMessageSize(-1);

Expand Down
4 changes: 1 addition & 3 deletions dbms/src/Flash/FlashService.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,8 @@ class FlashService final : public tikvpb::Tikv::Service, public std::enable_shar
std::tuple<Context, ::grpc::Status> createDBContext(grpc::ServerContext * grpc_contex);

private:
IServer & server;

std::string address;

IServer & server;
GRPCServerPtr grpc_server;

Logger * log;
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1402,12 +1402,12 @@ DDLWorker & Context::getDDLWorker() const
return *shared->ddl_worker;
}

void Context::initializeRaftService(const std::string & service_addr)
void Context::initializeRaftService()
{
auto lock = getLock();
if (shared->raft_service)
throw Exception("Raft Service has already been initialized.", ErrorCodes::LOGICAL_ERROR);
shared->raft_service = std::make_shared<RaftService>(service_addr, *this);
shared->raft_service = std::make_shared<RaftService>(*this);
}

void Context::shutdownRaftService()
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Interpreters/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ class Context
void setDDLWorker(std::shared_ptr<DDLWorker> ddl_worker);
DDLWorker & getDDLWorker() const;

void initializeRaftService(const std::string & service_addr);
void initializeRaftService();
void shutdownRaftService();
void createTMTContext(const std::vector<std::string> & pd_addrs,
const std::string & learner_key,
Expand Down
27 changes: 4 additions & 23 deletions dbms/src/Raft/RaftService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,15 @@
namespace DB
{

RaftService::RaftService(const std::string & address_, DB::Context & db_context_)
: address(address_),
db_context(db_context_),
RaftService::RaftService(DB::Context & db_context_)
: db_context(db_context_),
kvstore(db_context.getTMTContext().getKVStore()),
background_pool(db_context.getBackgroundPool()),
log(&Logger::get("RaftService"))
{
if (!db_context.getTMTContext().isInitialized())
throw Exception("TMTContext is not initialized", ErrorCodes::LOGICAL_ERROR);

grpc::ServerBuilder builder;
builder.AddListeningPort(address, grpc::InsecureServerCredentials());
builder.RegisterService(this);

// Prevent TiKV from throwing "Received message larger than max (4404462 vs. 4194304)" error.
builder.SetMaxReceiveMessageSize(-1);
builder.SetMaxSendMessageSize(-1);

grpc_server = builder.BuildAndStart();

persist_handle = background_pool.addTask([this] { return kvstore->tryPersist(); }, false);

table_flush_handle = background_pool.addTask([this] {
Expand Down Expand Up @@ -63,8 +52,6 @@ RaftService::RaftService(const std::string & address_, DB::Context & db_context_
return true;
});

LOG_INFO(log, "Raft service listening on [" << address << "]");

{
std::vector<RegionPtr> regions;
kvstore->traverseRegions([&regions](RegionID, const RegionPtr & region) {
Expand Down Expand Up @@ -119,12 +106,6 @@ RaftService::~RaftService()
background_pool.removeTask(region_decode_handle);
region_decode_handle = nullptr;
}

// wait 5 seconds for pending rpcs to gracefully stop
gpr_timespec deadline{5, 0, GPR_TIMESPAN};
LOG_DEBUG(log, "Begin to shutting down grpc server");
grpc_server->Shutdown(deadline);
grpc_server->Wait();
}

grpc::Status RaftService::ApplyCommandBatch(grpc::ServerContext * grpc_context, CommandServerReaderWriter * stream)
Expand All @@ -143,7 +124,7 @@ grpc::Status RaftService::ApplyCommandBatch(grpc::ServerContext * grpc_context,
}
catch (...)
{
tryLogCurrentException(log, "gRPC ApplyCommandBatch on " + address + " error");
tryLogCurrentException(log, "gRPC ApplyCommandBatch error");
}

return grpc::Status::CANCELLED;
Expand All @@ -158,7 +139,7 @@ grpc::Status RaftService::ApplySnapshot(grpc::ServerContext *, CommandServerRead
}
catch (...)
{
tryLogCurrentException(log, "gRPC ApplyCommandBatch on " + address + " error");
tryLogCurrentException(log, "gRPC ApplyCommandBatch error");
return grpc::Status(grpc::StatusCode::UNKNOWN, "Runtime error, check theflash log for detail.");
}
}
Expand Down
6 changes: 1 addition & 5 deletions dbms/src/Raft/RaftService.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ using RegionMap = std::unordered_map<RegionID, RegionPtr>;
class RaftService final : public enginepb::Engine::Service, public std::enable_shared_from_this<RaftService>, private boost::noncopyable
{
public:
RaftService(const std::string & address_, Context & db_context);
RaftService(Context & db_context);

~RaftService() final;

Expand All @@ -37,10 +37,6 @@ class RaftService final : public enginepb::Engine::Service, public std::enable_s
grpc::ServerContext * grpc_context, CommandServerReader * reader, enginepb::SnapshotDone * response) override;

private:
std::string address;

GRPCServerPtr grpc_server;

Context & db_context;
KVStorePtr kvstore;

Expand Down
5 changes: 1 addition & 4 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -452,10 +452,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
}

if (need_raft_service)
{
String raft_service_addr = config().getString("raft.service_addr");
global_context->initializeRaftService(raft_service_addr);
}
global_context->initializeRaftService();

SCOPE_EXIT({
LOG_INFO(log, "Shutting down raft service.");
Expand Down
5 changes: 4 additions & 1 deletion dbms/src/Server/config.xml
Original file line number Diff line number Diff line change
Expand Up @@ -313,12 +313,15 @@
</distributed_ddl>

<raft>
<service_addr>0.0.0.0:20021</service_addr>
<kvstore_path>/var/lib/clickhouse/kvstore</kvstore_path>
<regmap>/var/lib/clickhouse/regmap</regmap>
<pd_addr>http://127.0.0.1:13579</pd_addr>
</raft>

<flash>
<service_addr>0.0.0.0:3930</service_addr>
</flash>

<!-- Settings to fine tune MergeTree tables. See documentation in source code, in MergeTreeSettings.h -->
<!--
<merge_tree>
Expand Down
3 changes: 1 addition & 2 deletions tests/docker/config/config.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,14 @@
<display_name>the flash</display_name>

<raft>
<service_addr>0.0.0.0:3930</service_addr>
<kvstore_path>/data/kvstore</kvstore_path>
<!--pd_addr>127.0.0.1:2379</pd_addr-->
<!-- ignored dbs, extra spaces are trimed -->
<ignore_databases>system </ignore_databases>
</raft>

<flash>
<service_addr>0.0.0.0:20171</service_addr>
<service_addr>0.0.0.0:3930</service_addr>
</flash>

<http_port>8123</http_port>
Expand Down
3 changes: 1 addition & 2 deletions tests/docker/config/tiflash.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
<display_name>the flash</display_name>

<raft>
<service_addr>0.0.0.0:3930</service_addr>
<kvstore_path>/data/kvstore</kvstore_path>
<!--Linux-->
<pd_addr>pd0:2379</pd_addr>
Expand All @@ -27,7 +26,7 @@
</raft>

<flash>
<service_addr>0.0.0.0:20171</service_addr>
<service_addr>0.0.0.0:3930</service_addr>
</flash>

<http_port>8123</http_port>
Expand Down

0 comments on commit df07939

Please sign in to comment.