Skip to content

Commit

Permalink
Fix race condition of batch command handling (#277)
Browse files Browse the repository at this point in the history
  • Loading branch information
zanmato1984 authored and windtalker committed Oct 12, 2019
1 parent a1304ae commit 687dcbe
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 47 deletions.
66 changes: 34 additions & 32 deletions dbms/src/Flash/BatchCommandsHandler.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#include <Flash/BatchCommandsHandler.h>
#include <Flash/CoprocessorHandler.h>
#include <common/ThreadPool.h>

namespace DB
{
Expand All @@ -10,38 +9,40 @@ BatchCommandsHandler::BatchCommandsHandler(BatchCommandsContext & batch_commands
: batch_commands_context(batch_commands_context_), request(request_), response(response_), log(&Logger::get("BatchCommandsHandler"))
{}

ThreadPool::Job BatchCommandsHandler::handleCommandJob(
const tikvpb::BatchCommandsRequest::Request & req, tikvpb::BatchCommandsResponse::Response & resp, grpc::Status & ret) const
{
return [&]() {
if (!req.has_coprocessor())
{
ret = grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "");
return;
}

const auto & cop_req = req.coprocessor();
auto cop_resp = resp.mutable_coprocessor();

auto [context, status] = batch_commands_context.db_context_creation_func(&batch_commands_context.grpc_server_context);
if (!status.ok())
{
ret = status;
return;
}

CoprocessorContext cop_context(context, cop_req.context(), batch_commands_context.grpc_server_context);
CoprocessorHandler cop_handler(cop_context, &cop_req, cop_resp);

ret = cop_handler.execute();
};
}

grpc::Status BatchCommandsHandler::execute()
{
if (request.requests_size() == 0)
return grpc::Status::OK;

// TODO: Fill transport_layer_load into BatchCommandsResponse.

auto command_handler_func
= [](BatchCommandsContext::DBContextCreationFunc db_context_creation_func, grpc::ServerContext * grpc_server_context,
const tikvpb::BatchCommandsRequest::Request & req, tikvpb::BatchCommandsResponse::Response & resp, grpc::Status & ret) {
if (!req.has_coprocessor())
{
ret = grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "");
return;
}

const auto & cop_req = req.coprocessor();
auto cop_resp = resp.mutable_coprocessor();

auto [context, status] = db_context_creation_func(grpc_server_context);
if (!status.ok())
{
ret = status;
return;
}

CoprocessorContext cop_context(context, cop_req.context(), *grpc_server_context);
CoprocessorHandler cop_handler(cop_context, &cop_req, cop_resp);

ret = cop_handler.execute();
};

/// Shortcut for only one request by not going to thread pool.
if (request.requests_size() == 1)
{
Expand All @@ -51,7 +52,7 @@ grpc::Status BatchCommandsHandler::execute()
auto resp = response.add_responses();
response.add_request_ids(request.request_ids(0));
auto ret = grpc::Status::OK;
command_handler_func(batch_commands_context.db_context_creation_func, &batch_commands_context.grpc_server_context, req, *resp, ret);
handleCommandJob(req, *resp, ret)();
return ret;
}

Expand All @@ -65,18 +66,16 @@ grpc::Status BatchCommandsHandler::execute()

ThreadPool thread_pool(max_threads);

std::vector<grpc::Status> rets;
std::vector<grpc::Status> rets(request.requests_size());
size_t i = 0;

for (const auto & req : request.requests())
{
auto resp = response.add_responses();
response.add_request_ids(request.request_ids(i++));
rets.emplace_back(grpc::Status::OK);
thread_pool.schedule([&]() {
command_handler_func(
batch_commands_context.db_context_creation_func, &batch_commands_context.grpc_server_context, req, *resp, rets.back());
});

thread_pool.schedule(handleCommandJob(req, *resp, rets.back()));
}

thread_pool.wait();
Expand All @@ -85,7 +84,10 @@ grpc::Status BatchCommandsHandler::execute()
for (const auto & ret : rets)
{
if (!ret.ok())
{
response.Clear();
return ret;
}
}

return grpc::Status::OK;
Expand Down
11 changes: 8 additions & 3 deletions dbms/src/Flash/BatchCommandsHandler.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include <Interpreters/Context.h>
#include <common/ThreadPool.h>
#include <common/logger_useful.h>
#include <grpcpp/server_context.h>
#pragma GCC diagnostic push
Expand All @@ -18,10 +19,10 @@ struct BatchCommandsContext

/// Context creation function for each individual command - they should be handled isolated,
/// given that context is being used to pass arguments regarding queries.
using DBContextCreationFunc = std::function<std::tuple<Context, grpc::Status>(grpc::ServerContext *)>;
using DBContextCreationFunc = std::function<std::tuple<Context, grpc::Status>(const grpc::ServerContext *)>;
DBContextCreationFunc db_context_creation_func;

grpc::ServerContext & grpc_server_context;
const grpc::ServerContext & grpc_server_context;

BatchCommandsContext(
Context & db_context_, DBContextCreationFunc && db_context_creation_func_, grpc::ServerContext & grpc_server_context_)
Expand All @@ -40,7 +41,11 @@ class BatchCommandsHandler
grpc::Status execute();

protected:
BatchCommandsContext & batch_commands_context;
ThreadPool::Job handleCommandJob(
const tikvpb::BatchCommandsRequest::Request & req, tikvpb::BatchCommandsResponse::Response & resp, grpc::Status & ret) const;

protected:
const BatchCommandsContext & batch_commands_context;
const tikvpb::BatchCommandsRequest & request;
tikvpb::BatchCommandsResponse & response;

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/CoprocessorHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ try
cop_context.kv_context.region_epoch().version(), cop_context.kv_context.region_epoch().conf_ver(), std::move(key_ranges),
dag_response);
driver.execute();
LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": Handle DAG request done");
cop_response->set_data(dag_response.SerializeAsString());
LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": Handle DAG request done");
break;
}
case COP_REQ_TYPE_ANALYZE:
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Flash/CoprocessorHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ struct CoprocessorContext
{
Context & db_context;
const kvrpcpb::Context & kv_context;
grpc::ServerContext & grpc_server_context;
const grpc::ServerContext & grpc_server_context;

CoprocessorContext(Context & db_context_, const kvrpcpb::Context & kv_context_, grpc::ServerContext & grpc_server_context_)
CoprocessorContext(Context & db_context_, const kvrpcpb::Context & kv_context_, const grpc::ServerContext & grpc_server_context_)
: db_context(db_context_), kv_context(kv_context_), grpc_server_context(grpc_server_context_)
{}
};
Expand Down
15 changes: 7 additions & 8 deletions dbms/src/Flash/FlashService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ grpc::Status FlashService::BatchCommands(

tikvpb::BatchCommandsResponse response;
BatchCommandsContext batch_commands_context(
context, [this](grpc::ServerContext * grpc_server_context) { return createDBContext(grpc_server_context); }, *grpc_context);
context, [this](const grpc::ServerContext * grpc_server_context) { return createDBContext(grpc_server_context); },
*grpc_context);
BatchCommandsHandler batch_commands_handler(batch_commands_context, request, response);
auto ret = batch_commands_handler.execute();
if (!ret.ok())
Expand All @@ -75,22 +76,20 @@ grpc::Status FlashService::BatchCommands(
return grpc::Status::OK;
}

String getClientMetaVarWithDefault(grpc::ServerContext * grpc_context, const String & name, const String & default_val)
String getClientMetaVarWithDefault(const grpc::ServerContext * grpc_context, const String & name, const String & default_val)
{
if (grpc_context->client_metadata().count(name) != 1)
return default_val;
else
return String(grpc_context->client_metadata().find(name)->second.data());
if (auto it = grpc_context->client_metadata().find(name); it != grpc_context->client_metadata().end())
return it->second.data();
return default_val;
}

std::tuple<Context, grpc::Status> FlashService::createDBContext(grpc::ServerContext * grpc_context)
std::tuple<Context, grpc::Status> FlashService::createDBContext(const grpc::ServerContext * grpc_context) const
{
/// Create DB context.
Context context = server.context();
context.setGlobalContext(server.context());

/// Set a bunch of client information.
auto client_meta = grpc_context->client_metadata();
String query_id = getClientMetaVarWithDefault(grpc_context, "query_id", "");
context.setCurrentQueryId(query_id);
ClientInfo & client_info = context.getClientInfo();
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/FlashService.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class FlashService final : public tikvpb::Tikv::Service, public std::enable_shar
grpc::ServerReaderWriter<tikvpb::BatchCommandsResponse, tikvpb::BatchCommandsRequest> * stream) override;

private:
std::tuple<Context, ::grpc::Status> createDBContext(grpc::ServerContext * grpc_contex);
std::tuple<Context, ::grpc::Status> createDBContext(const grpc::ServerContext * grpc_contex) const;

private:
IServer & server;
Expand Down

0 comments on commit 687dcbe

Please sign in to comment.