Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FLASH-483: Combine raft service and flash service #235

Merged
merged 3 commits into from
Sep 16, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion dbms/src/Flash/FlashService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,16 @@ extern const int NOT_IMPLEMENTED;
}

FlashService::FlashService(const std::string & address_, IServer & server_)
: server(server_), address(address_), log(&Logger::get("FlashService"))
: address(address_), server(server_), log(&Logger::get("FlashService"))
{
grpc::ServerBuilder builder;
builder.AddListeningPort(address, grpc::InsecureServerCredentials());
builder.RegisterService(this);
builder.RegisterService(&server.context().getRaftService());

// Prevent TiKV from throwing "Received message larger than max (4404462 vs. 4194304)" error.
builder.SetMaxReceiveMessageSize(-1);
builder.SetMaxSendMessageSize(-1);
zanmato1984 marked this conversation as resolved.
Show resolved Hide resolved

// todo should set a reasonable value??
builder.SetMaxReceiveMessageSize(-1);
Expand Down
4 changes: 1 addition & 3 deletions dbms/src/Flash/FlashService.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,8 @@ class FlashService final : public tikvpb::Tikv::Service, public std::enable_shar
std::tuple<Context, ::grpc::Status> createDBContext(grpc::ServerContext * grpc_contex);

private:
IServer & server;

std::string address;

IServer & server;
GRPCServerPtr grpc_server;

Logger * log;
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1402,12 +1402,12 @@ DDLWorker & Context::getDDLWorker() const
return *shared->ddl_worker;
}

void Context::initializeRaftService(const std::string & service_addr)
void Context::initializeRaftService()
{
auto lock = getLock();
if (shared->raft_service)
throw Exception("Raft Service has already been initialized.", ErrorCodes::LOGICAL_ERROR);
shared->raft_service = std::make_shared<RaftService>(service_addr, *this);
shared->raft_service = std::make_shared<RaftService>(*this);
}

void Context::shutdownRaftService()
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Interpreters/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ class Context
void setDDLWorker(std::shared_ptr<DDLWorker> ddl_worker);
DDLWorker & getDDLWorker() const;

void initializeRaftService(const std::string & service_addr);
void initializeRaftService();
void shutdownRaftService();
void createTMTContext(const std::vector<std::string> & pd_addrs,
const std::string & learner_key,
Expand Down
27 changes: 4 additions & 23 deletions dbms/src/Raft/RaftService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,15 @@
namespace DB
{

RaftService::RaftService(const std::string & address_, DB::Context & db_context_)
: address(address_),
db_context(db_context_),
RaftService::RaftService(DB::Context & db_context_)
: db_context(db_context_),
kvstore(db_context.getTMTContext().getKVStore()),
background_pool(db_context.getBackgroundPool()),
log(&Logger::get("RaftService"))
{
if (!db_context.getTMTContext().isInitialized())
throw Exception("TMTContext is not initialized", ErrorCodes::LOGICAL_ERROR);

grpc::ServerBuilder builder;
builder.AddListeningPort(address, grpc::InsecureServerCredentials());
builder.RegisterService(this);

// Prevent TiKV from throwing "Received message larger than max (4404462 vs. 4194304)" error.
builder.SetMaxReceiveMessageSize(-1);
builder.SetMaxSendMessageSize(-1);

grpc_server = builder.BuildAndStart();

persist_handle = background_pool.addTask([this] { return kvstore->tryPersist(); }, false);

table_flush_handle = background_pool.addTask([this] {
Expand Down Expand Up @@ -63,8 +52,6 @@ RaftService::RaftService(const std::string & address_, DB::Context & db_context_
return true;
});

LOG_INFO(log, "Raft service listening on [" << address << "]");

{
std::vector<RegionPtr> regions;
kvstore->traverseRegions([&regions](RegionID, const RegionPtr & region) {
Expand Down Expand Up @@ -119,12 +106,6 @@ RaftService::~RaftService()
background_pool.removeTask(region_decode_handle);
region_decode_handle = nullptr;
}

// wait 5 seconds for pending rpcs to gracefully stop
gpr_timespec deadline{5, 0, GPR_TIMESPAN};
LOG_DEBUG(log, "Begin to shutting down grpc server");
grpc_server->Shutdown(deadline);
grpc_server->Wait();
}

grpc::Status RaftService::ApplyCommandBatch(grpc::ServerContext * grpc_context, CommandServerReaderWriter * stream)
Expand All @@ -143,7 +124,7 @@ grpc::Status RaftService::ApplyCommandBatch(grpc::ServerContext * grpc_context,
}
catch (...)
{
tryLogCurrentException(log, "gRPC ApplyCommandBatch on " + address + " error");
tryLogCurrentException(log, "gRPC ApplyCommandBatch error");
}

return grpc::Status::CANCELLED;
Expand All @@ -158,7 +139,7 @@ grpc::Status RaftService::ApplySnapshot(grpc::ServerContext *, CommandServerRead
}
catch (...)
{
tryLogCurrentException(log, "gRPC ApplyCommandBatch on " + address + " error");
tryLogCurrentException(log, "gRPC ApplyCommandBatch error");
return grpc::Status(grpc::StatusCode::UNKNOWN, "Runtime error, check theflash log for detail.");
}
}
Expand Down
6 changes: 1 addition & 5 deletions dbms/src/Raft/RaftService.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ using RegionMap = std::unordered_map<RegionID, RegionPtr>;
class RaftService final : public enginepb::Engine::Service, public std::enable_shared_from_this<RaftService>, private boost::noncopyable
{
public:
RaftService(const std::string & address_, Context & db_context);
RaftService(Context & db_context);

~RaftService() final;

Expand All @@ -37,10 +37,6 @@ class RaftService final : public enginepb::Engine::Service, public std::enable_s
grpc::ServerContext * grpc_context, CommandServerReader * reader, enginepb::SnapshotDone * response) override;

private:
std::string address;

GRPCServerPtr grpc_server;

Context & db_context;
KVStorePtr kvstore;

Expand Down
5 changes: 1 addition & 4 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -452,10 +452,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
}

if (need_raft_service)
{
String raft_service_addr = config().getString("raft.service_addr");
global_context->initializeRaftService(raft_service_addr);
}
global_context->initializeRaftService();

SCOPE_EXIT({
LOG_INFO(log, "Shutting down raft service.");
Expand Down