Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refine log for cop/batch cop #8182

Merged
merged 4 commits into from
Oct 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions dbms/src/Flash/BatchCoprocessorHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,13 @@ extern const int NOT_IMPLEMENTED;
BatchCoprocessorHandler::BatchCoprocessorHandler(
CoprocessorContext & cop_context_,
const coprocessor::BatchRequest * cop_request_,
::grpc::ServerWriter<::coprocessor::BatchResponse> * writer_)
::grpc::ServerWriter<::coprocessor::BatchResponse> * writer_,
const String & identifier)
: cop_context(cop_context_)
, cop_request(cop_request_)
, writer(writer_)
, resource_group_name(cop_request->context().resource_control_context().resource_group_name())
, log(Logger::get("BatchCoprocessorHandler, resource_group: " + resource_group_name))
, log(Logger::get(identifier))
{}

grpc::Status BatchCoprocessorHandler::execute()
Expand Down Expand Up @@ -83,7 +84,7 @@ grpc::Status BatchCoprocessorHandler::execute()
cop_context.db_context.getClientInfo().current_address.toString(),
DAGRequestKind::BatchCop,
resource_group_name,
Logger::get("BatchCoprocessorHandler, resource_group: " + resource_group_name));
Logger::get(log->identifier()));
cop_context.db_context.setDAGContext(&dag_context);

DAGDriver<DAGRequestKind::BatchCop> driver(
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Flash/BatchCoprocessorHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ class BatchCoprocessorHandler
BatchCoprocessorHandler(
CoprocessorContext & cop_context_,
const coprocessor::BatchRequest * cop_request_,
::grpc::ServerWriter<::coprocessor::BatchResponse> * writer_);
::grpc::ServerWriter<::coprocessor::BatchResponse> * writer_,
const String & identifier);

grpc::Status execute();

Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Flash/Coprocessor/CoprocessorReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ namespace pingcap
{
namespace common
{

template <typename T>
class CopIterMPMCQueue : public IMPMCQueue<T>
{
Expand Down Expand Up @@ -181,7 +180,8 @@ class CoprocessorReader
bool enable_cop_stream_,
size_t queue_size,
UInt64 cop_timeout,
const pingcap::kv::LabelFilter & tiflash_label_filter_)
const pingcap::kv::LabelFilter & tiflash_label_filter_,
const String & source_identifier)
: schema(schema_)
, has_enforce_encode_type(has_enforce_encode_type_)
, concurrency(concurrency_)
Expand All @@ -191,7 +191,7 @@ class CoprocessorReader
std::move(tasks),
cluster,
concurrency_,
&Poco::Logger::get("pingcap/coprocessor"),
&Poco::Logger::get(fmt::format("{} pingcap/coprocessor", source_identifier)),
cop_timeout,
tiflash_label_filter_)
{}
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,8 @@ CoprocessorReaderPtr DAGStorageInterpreter::buildCoprocessorReader(const std::ve
enable_cop_stream,
queue_size,
cop_timeout,
tiflash_label_filter);
tiflash_label_filter,
log->identifier());
context.getDAGContext()->addCoprocessorReader(coprocessor_reader);

return coprocessor_reader;
Expand Down
19 changes: 7 additions & 12 deletions dbms/src/Flash/CoprocessorHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,24 +64,26 @@ template <>
CoprocessorHandler<false>::CoprocessorHandler(
CoprocessorContext & cop_context_,
const coprocessor::Request * cop_request_,
coprocessor::Response * cop_response_)
coprocessor::Response * cop_response_,
const String & identifier)
: cop_context(cop_context_)
, cop_request(cop_request_)
, cop_response(cop_response_)
, resource_group_name(cop_request->context().resource_control_context().resource_group_name())
, log(Logger::get("CoprocessorHandler, resource_group: " + resource_group_name))
, log(Logger::get(identifier))
{}

template <>
CoprocessorHandler<true>::CoprocessorHandler(
CoprocessorContext & cop_context_,
const coprocessor::Request * cop_request_,
grpc::ServerWriter<coprocessor::Response> * cop_writer_)
grpc::ServerWriter<coprocessor::Response> * cop_writer_,
const String & identifier)
: cop_context(cop_context_)
, cop_request(cop_request_)
, cop_writer(cop_writer_)
, resource_group_name(cop_request->context().resource_control_context().resource_group_name())
, log(Logger::get("CoprocessorHandler(stream), resource_group: " + resource_group_name))
, log(Logger::get(identifier))
{}

template <bool is_stream>
Expand Down Expand Up @@ -141,13 +143,6 @@ grpc::Status CoprocessorHandler<is_stream>::execute()
genCopKeyRange(cop_request->ranges()),
&bypass_lock_ts));

String msg;
if constexpr (is_stream)
msg = "CoprocessorHandler(stream), resource_group: ";
else
msg = "CoprocessorHandler, resource_group: ";
msg += resource_group_name;

DAGRequestKind kind;
if constexpr (is_stream)
kind = DAGRequestKind::CopStream;
Expand All @@ -161,7 +156,7 @@ grpc::Status CoprocessorHandler<is_stream>::execute()
cop_context.db_context.getClientInfo().current_address.toString(),
kind,
resource_group_name,
Logger::get(msg));
Logger::get(log->identifier()));
cop_context.db_context.setDAGContext(&dag_context);

if constexpr (is_stream)
Expand Down
6 changes: 4 additions & 2 deletions dbms/src/Flash/CoprocessorHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,13 @@ class CoprocessorHandler
CoprocessorHandler(
CoprocessorContext & cop_context_,
const coprocessor::Request * cop_request_,
coprocessor::Response * response_);
coprocessor::Response * response_,
const String & identifier);
CoprocessorHandler(
CoprocessorContext & cop_context_,
const coprocessor::Request * cop_request_,
grpc::ServerWriter<coprocessor::Response> * cop_writer_);
grpc::ServerWriter<coprocessor::Response> * cop_writer_,
const String & identifier);

grpc::Status execute();

Expand Down
52 changes: 37 additions & 15 deletions dbms/src/Flash/FlashService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -207,15 +207,19 @@ grpc::Status FlashService::Coprocessor(
{
CPUAffinityManager::getInstance().bindSelfGrpcThread();
bool is_remote_read = getClientMetaVarWithDefault(grpc_context, "is_remote_read", "") == "true";
auto region_info = fmt::format(
"{{{}, {}, {}}}",
request->context().region_id(),
request->context().region_epoch().conf_ver(),
request->context().region_epoch().version());
auto log_level = is_remote_read ? Poco::Message::PRIO_INFORMATION : Poco::Message::PRIO_DEBUG;
LOG_IMPL(
log,
log_level,
"Handling coprocessor request, is_remote_read: {}, start ts: {}, region info: {}, region epoch: {}",
"Handling coprocessor request, is_remote_read: {}, start ts: {}, region info: {}",
is_remote_read,
request->start_ts(),
request->context().region_id(),
request->context().region_epoch().DebugString());
region_info);

auto check_result = checkGrpcContext(grpc_context);
if (!check_result.ok())
Expand Down Expand Up @@ -275,11 +279,10 @@ grpc::Status FlashService::Coprocessor(
LOG_IMPL(
log,
log_level,
"Begin process cop request after wait {} ms, start ts: {}, region info: {}, region epoch: {}",
"Begin process cop request after wait {} ms, start ts: {}, region info: {}",
wait_ms,
request->start_ts(),
request->context().region_id(),
request->context().region_epoch().DebugString());
region_info);
auto [db_context, status] = createDBContext(grpc_context);
if (!status.ok())
{
Expand All @@ -292,7 +295,13 @@ grpc::Status FlashService::Coprocessor(
GET_METRIC(tiflash_coprocessor_handling_request_count, type_remote_read_executing).Decrement();
});
CoprocessorContext cop_context(*db_context, request->context(), *grpc_context);
CoprocessorHandler<false> cop_handler(cop_context, request, response);
auto request_identifier = fmt::format(
"Coprocessor, is_remote_read: {}, start_ts: {}, region_info: {}, resource_group: {}",
is_remote_read,
request->start_ts(),
region_info,
request->context().resource_control_context().resource_group_name());
CoprocessorHandler<false> cop_handler(cop_context, request, response, request_identifier);
return cop_handler.execute();
});

Expand Down Expand Up @@ -330,7 +339,11 @@ grpc::Status FlashService::BatchCoprocessor(
return status;
}
CoprocessorContext cop_context(*db_context, request->context(), *grpc_context);
BatchCoprocessorHandler cop_handler(cop_context, request, writer);
auto request_identifier = fmt::format(
"BatchCoprocessor, start_ts: {}, resource_group: {}",
request->start_ts(),
request->context().resource_control_context().resource_group_name());
BatchCoprocessorHandler cop_handler(cop_context, request, writer, request_identifier);
return cop_handler.execute();
});

Expand All @@ -345,15 +358,19 @@ grpc::Status FlashService::CoprocessorStream(
{
CPUAffinityManager::getInstance().bindSelfGrpcThread();
bool is_remote_read = getClientMetaVarWithDefault(grpc_context, "is_remote_read", "") == "true";
auto region_info = fmt::format(
"{{{}, {}, {}}}",
request->context().region_id(),
request->context().region_epoch().conf_ver(),
request->context().region_epoch().version());
auto log_level = is_remote_read ? Poco::Message::PRIO_INFORMATION : Poco::Message::PRIO_DEBUG;
LOG_IMPL(
log,
log_level,
"Handling coprocessor stream request, is_remote_read: {}, start ts: {}, region info: {}, region epoch: {}",
"Handling coprocessor stream request, is_remote_read: {}, start ts: {}, region info: {}",
is_remote_read,
request->start_ts(),
request->context().region_id(),
request->context().region_epoch().DebugString());
region_info);

auto check_result = checkGrpcContext(grpc_context);
if (!check_result.ok())
Expand Down Expand Up @@ -419,11 +436,10 @@ grpc::Status FlashService::CoprocessorStream(
LOG_IMPL(
log,
log_level,
"Begin process cop stream request after wait {} ms, start ts: {}, region info: {}, region epoch: {}",
"Begin process cop stream request after wait {} ms, start ts: {}, region info: {}",
wait_ms,
request->start_ts(),
request->context().region_id(),
request->context().region_epoch().DebugString());
region_info);
auto [db_context, status] = createDBContext(grpc_context);
if (!status.ok())
{
Expand All @@ -436,7 +452,13 @@ grpc::Status FlashService::CoprocessorStream(
GET_METRIC(tiflash_coprocessor_handling_request_count, type_remote_read_executing).Decrement();
});
CoprocessorContext cop_context(*db_context, request->context(), *grpc_context);
CoprocessorHandler<true> cop_handler(cop_context, request, writer);
auto request_identifier = fmt::format(
"Coprocessor(stream), is_remote_read: {}, start_ts: {}, region_info: {}, resource_group: {}",
is_remote_read,
request->start_ts(),
region_info,
request->context().resource_control_context().resource_group_name());
CoprocessorHandler<true> cop_handler(cop_context, request, writer, request_identifier);
return cop_handler.execute();
});

Expand Down