Skip to content

Commit

Permalink
handle error in cop request (#171)
Browse files Browse the repository at this point in the history
* fix cop test regression

* address comments

* format code

* fix npe for dag execute

* format code

* address comment

* add some comments

* throw exception when meet error duing cop request handling

* address comments

* add error code

* throw exception when meet error duing cop request handling

* address comments

* add DAGContext so InterpreterDAG can exchange information with DAGDriver

* fix bug

* 1. refine code, 2. address comments

* update comments

* columnref index is based on executor output schema

* handle error in coprocessor request

* refine code

* use Clear to clear a protobuf message completely

* refine code
  • Loading branch information
windtalker authored and zanmato1984 committed Aug 12, 2019
1 parent 57cd382 commit 4a76e91
Show file tree
Hide file tree
Showing 8 changed files with 117 additions and 59 deletions.
61 changes: 60 additions & 1 deletion dbms/src/Flash/Coprocessor/CoprocessorHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
#include <Storages/IStorage.h>
#include <Storages/StorageMergeTree.h>
#include <Storages/Transaction/Codec.h>
#include <Storages/Transaction/LockException.h>
#include <Storages/Transaction/RegionException.h>
#include <Storages/Transaction/SchemaSyncer.h>
#include <Storages/Transaction/TMTContext.h>

Expand All @@ -22,7 +24,8 @@ CoprocessorHandler::CoprocessorHandler(
: cop_context(cop_context_), cop_request(cop_request_), cop_response(cop_response_), log(&Logger::get("CoprocessorHandler"))
{}

void CoprocessorHandler::execute()
grpc::Status CoprocessorHandler::execute()
try
{
switch (cop_request->tp())
{
Expand All @@ -45,6 +48,62 @@ void CoprocessorHandler::execute()
throw Exception(
"Coprocessor request type " + std::to_string(cop_request->tp()) + " is not implemented", ErrorCodes::NOT_IMPLEMENTED);
}
return ::grpc::Status(::grpc::StatusCode::OK, "");
}
catch (const LockException & e)
{
LOG_ERROR(log, __PRETTY_FUNCTION__ << ": LockException: " << e.displayText());
cop_response->Clear();
kvrpcpb::LockInfo * lock_info = cop_response->mutable_locked();
lock_info->set_key(e.lock_infos[0]->key);
lock_info->set_primary_lock(e.lock_infos[0]->primary_lock);
lock_info->set_lock_ttl(e.lock_infos[0]->lock_ttl);
lock_info->set_lock_version(e.lock_infos[0]->lock_version);
// return ok so TiDB has the chance to see the LockException
return ::grpc::Status(::grpc::StatusCode::OK, "");
}
catch (const RegionException & e)
{
LOG_ERROR(log, __PRETTY_FUNCTION__ << ": RegionException: " << e.displayText());
cop_response->Clear();
errorpb::Error * region_err;
switch (e.status)
{
case RegionTable::RegionReadStatus::NOT_FOUND:
case RegionTable::RegionReadStatus::PENDING_REMOVE:
region_err = cop_response->mutable_region_error();
region_err->mutable_region_not_found()->set_region_id(cop_request->context().region_id());
break;
case RegionTable::RegionReadStatus::VERSION_ERROR:
region_err = cop_response->mutable_region_error();
region_err->mutable_epoch_not_match();
break;
default:
// should not happen
break;
}
// return ok so TiDB has the chance to see the LockException
return ::grpc::Status(::grpc::StatusCode::OK, "");
}
catch (const Exception & e)
{
LOG_ERROR(log, __PRETTY_FUNCTION__ << ": Exception: " << e.displayText());
cop_response->Clear();
cop_response->set_other_error(e.message());

if (e.code() == ErrorCodes::NOT_IMPLEMENTED)
return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, e.message());

// TODO: Map other DB error codes to grpc codes.

return ::grpc::Status(::grpc::StatusCode::INTERNAL, e.message());
}
catch (const std::exception & e)
{
LOG_ERROR(log, __PRETTY_FUNCTION__ << ": Exception: " << e.what());
cop_response->Clear();
cop_response->set_other_error(e.what());
return ::grpc::Status(::grpc::StatusCode::INTERNAL, e.what());
}

} // namespace DB
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/CoprocessorHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class CoprocessorHandler

~CoprocessorHandler() = default;

void execute();
grpc::Status execute();

protected:
enum
Expand Down
37 changes: 34 additions & 3 deletions dbms/src/Flash/Coprocessor/DAGDriver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,16 @@
#include <Flash/Coprocessor/DAGStringConverter.h>
#include <Interpreters/Context.h>
#include <Interpreters/executeQuery.h>
#include <Storages/Transaction/LockException.h>
#include <Storages/Transaction/RegionException.h>

namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
extern const int UNKNOWN_EXCEPTION;
} // namespace ErrorCodes

DAGDriver::DAGDriver(Context & context_, const tipb::DAGRequest & dag_request_, RegionID region_id_, UInt64 region_version_,
UInt64 region_conf_version_, tipb::SelectResponse & dag_response_, bool internal_)
Expand All @@ -29,6 +32,7 @@ DAGDriver::DAGDriver(Context & context_, const tipb::DAGRequest & dag_request_,
{}

void DAGDriver::execute()
try
{
context.setSetting("read_tso", UInt64(dag_request.start_ts()));

Expand Down Expand Up @@ -57,8 +61,11 @@ void DAGDriver::execute()
// Only query is allowed, so streams.in must not be null and streams.out must be null
throw Exception("DAG is not query.", ErrorCodes::LOGICAL_ERROR);

BlockOutputStreamPtr outputStreamPtr = std::make_shared<DAGBlockOutputStream>(dag_response, context.getSettings().dag_records_per_chunk,
dag_request.encode_type(), dag.getResultFieldTypes(), streams.in->getHeader());
BlockOutputStreamPtr outputStreamPtr = std::make_shared<DAGBlockOutputStream>(dag_response,
context.getSettings().dag_records_per_chunk,
dag_request.encode_type(),
dag.getResultFieldTypes(),
streams.in->getHeader());
copyData(*streams.in, *outputStreamPtr);
// add ExecutorExecutionSummary info
for (auto & p_streams : dag_context.profile_streams_list)
Expand All @@ -81,5 +88,29 @@ void DAGDriver::execute()
executeSummary->set_num_iterations(num_iterations);
}
}
catch (const RegionException & e)
{
e.rethrow();
}
catch (const LockException & e)
{
e.rethrow();
}
catch (const Exception & e)
{
recordError(e.code(), e.message());
}
catch (const std::exception & e)
{
recordError(ErrorCodes::UNKNOWN_EXCEPTION, e.what());
}

void DAGDriver::recordError(Int32 err_code, const String & err_msg)
{
dag_response.Clear();
tipb::Error * error = dag_response.mutable_error();
error->set_code(err_code);
error->set_msg(err_msg);
}

} // namespace DB
2 changes: 2 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGDriver.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,7 @@ class DAGDriver
tipb::SelectResponse & dag_response;

bool internal;

void recordError(Int32 err_code, const String & err_msg);
};
} // namespace DB
3 changes: 1 addition & 2 deletions dbms/src/Flash/Coprocessor/InterpreterDAG.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,9 @@ void InterpreterDAG::executeTS(const tipb::TableScan & ts, Pipeline & pipeline)
auto current_region = context.getTMTContext().getRegionTable().getRegionByTableAndID(table_id, info.region_id);
if (!current_region)
{
//todo add more region error info in RegionException
std::vector<RegionID> region_ids;
region_ids.push_back(info.region_id);
throw RegionException(region_ids);
throw RegionException(std::move(region_ids), RegionTable::RegionReadStatus::NOT_FOUND);
}
info.range_in_table = current_region->getHandleRangeByTable(table_id);
query_info.mvcc_query_info->regions_query_info.push_back(info);
Expand Down
47 changes: 5 additions & 42 deletions dbms/src/Flash/FlashService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

#include <Core/Types.h>
#include <Flash/Coprocessor/CoprocessorHandler.h>
#include <Storages/Transaction/LockException.h>
#include <Storages/Transaction/RegionException.h>
#include <grpcpp/security/server_credentials.h>
#include <grpcpp/server_builder.h>

Expand Down Expand Up @@ -51,48 +49,13 @@ grpc::Status FlashService::Coprocessor(
return status;
}

try
{
CoprocessorContext cop_context(context, request->context(), *grpc_context);
CoprocessorHandler cop_handler(cop_context, request, response);
CoprocessorContext cop_context(context, request->context(), *grpc_context);
CoprocessorHandler cop_handler(cop_context, request, response);

cop_handler.execute();
auto ret = cop_handler.execute();

LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": Handle coprocessor request done");
return ::grpc::Status(::grpc::StatusCode::OK, "");
}
catch (const LockException & e)
{
// TODO: handle lock error properly.
LOG_ERROR(log, __PRETTY_FUNCTION__ << ": LockException: " << e.displayText());
response->set_data("");
return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, e.message());
}
catch (const RegionException & e)
{
// TODO: handle region error properly.
LOG_ERROR(log, __PRETTY_FUNCTION__ << ": RegionException: " << e.displayText());
response->set_data("");
return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, e.message());
}
catch (const Exception & e)
{
LOG_ERROR(log, __PRETTY_FUNCTION__ << ": Exception: " << e.displayText());
response->set_data("");

if (e.code() == ErrorCodes::NOT_IMPLEMENTED)
return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, e.message());

// TODO: Map other DB error codes to grpc codes.

return ::grpc::Status(::grpc::StatusCode::INTERNAL, e.message());
}
catch (const std::exception & e)
{
LOG_ERROR(log, __PRETTY_FUNCTION__ << ": Exception: " << e.what());
response->set_data("");
return ::grpc::Status(::grpc::StatusCode::INTERNAL, e.what());
}
LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": Handle coprocessor request done");
return ret;
}

String getClientMetaVarWithDefault(grpc::ServerContext * grpc_context, const String & name, const String & default_val)
Expand Down
18 changes: 9 additions & 9 deletions dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,12 +222,12 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(const Names & column_names_t
// the index of column is constant after MergeTreeBlockInputStream is constructed. exception will be thrown if not found.
const size_t handle_column_index = 0, version_column_index = 1, delmark_column_index = 2;

const auto func_throw_retry_region = [&]() {
const auto func_throw_retry_region = [&](RegionTable::RegionReadStatus status) {
std::vector<RegionID> region_ids;
region_ids.reserve(regions_executor_data.size());
for (const auto & query_info : regions_executor_data)
region_ids.push_back(query_info.info.region_id);
throw RegionException(region_ids);
throw RegionException(std::move(region_ids), status);
};

/// If query contains restrictions on the virtual column `_part` or `_part_index`, select only parts suitable for it.
Expand Down Expand Up @@ -314,7 +314,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(const Names & column_names_t
if (region == nullptr)
{
LOG_WARNING(log, "[region " << query_info.info.region_id << "] is not found in KVStore, try again");
func_throw_retry_region();
func_throw_retry_region(RegionTable::RegionReadStatus::NOT_FOUND);
}
kvstore_region.emplace(query_info.info.region_id, std::move(region));
}
Expand All @@ -331,13 +331,13 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(const Names & column_names_t
auto start_time = Clock::now();
const size_t mem_region_num = regions_executor_data.size();
const size_t batch_size = mem_region_num / concurrent_num;
std::atomic_bool need_retry = false;
std::atomic_uint8_t region_status = RegionTable::RegionReadStatus::OK;

const auto func_run_learner_read = [&](const size_t region_begin) {
const size_t region_end = std::min(region_begin + batch_size, mem_region_num);
for (size_t region_index = region_begin; region_index < region_end; ++region_index)
{
if (need_retry)
if (region_status != RegionTable::RegionReadStatus::OK)
return;

RegionQueryInfo & region_query_info = regions_executor_data[region_index].info;
Expand All @@ -359,7 +359,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(const Names & column_names_t
<< ", handle range [" << region_query_info.range_in_table.first.toString() << ", "
<< region_query_info.range_in_table.second.toString() << ") , status "
<< RegionTable::RegionReadStatusString(status));
need_retry = true;
region_status = status;
}
else if (block)
regions_executor_data[region_index].block = std::move(block);
Expand All @@ -379,8 +379,8 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(const Names & column_names_t
func_run_learner_read(0);
}

if (need_retry)
func_throw_retry_region();
if (region_status != RegionTable::RegionReadStatus::OK)
func_throw_retry_region(static_cast<RegionTable::RegionReadStatus>(region_status.load()));

auto end_time = Clock::now();
LOG_DEBUG(log,
Expand Down Expand Up @@ -862,7 +862,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(const Names & column_names_t
<< region_query_info.range_in_table.second.toString() << ") , status "
<< RegionTable::RegionReadStatusString(status));
// throw exception and exit.
func_throw_retry_region();
func_throw_retry_region(status);
}
}
}
Expand Down
6 changes: 5 additions & 1 deletion dbms/src/Storages/Transaction/RegionException.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include <Common/Exception.h>
#include <Storages/Transaction/RegionTable.h>
#include <Storages/Transaction/Types.h>

namespace DB
Expand All @@ -9,9 +10,12 @@ namespace DB
class RegionException : public Exception
{
public:
explicit RegionException(std::vector<RegionID> region_ids_) : region_ids(region_ids_) {}
RegionException(std::vector<RegionID> && region_ids_, RegionTable::RegionReadStatus status_)
: Exception(RegionTable::RegionReadStatusString(status_)), region_ids(region_ids_), status(status_)
{}

std::vector<RegionID> region_ids;
RegionTable::RegionReadStatus status;
};

} // namespace DB

0 comments on commit 4a76e91

Please sign in to comment.