Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

invalidate stale regions for Mpp query. #1859

Merged
merged 12 commits into from
May 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion contrib/kvproto
3 changes: 3 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include <DataStreams/IBlockInputStream.h>
#include <Storages/Transaction/TiDB.h>
#include <Flash/Coprocessor/DAGDriver.h>

namespace DB
{
Expand Down Expand Up @@ -83,6 +84,8 @@ class DAGContext
bool is_mpp_task;
bool is_root_mpp_task;

RegionInfoList retry_regions;

private:
/// profile_streams_map is a map that maps from executor_id to ProfileStreamsInfo
std::map<String, ProfileStreamsInfo> profile_streams_map;
Expand Down
12 changes: 12 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGDriver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,18 @@ try
}
else
{
if (!dag_context.retry_regions.empty())
{
coprocessor::BatchResponse response;
for (auto region : dag_context.retry_regions)
{
auto * retry_region = response.add_retry_regions();
retry_region->set_id(region.region_id);
retry_region->mutable_region_epoch()->set_conf_ver(region.region_conf_version);
retry_region->mutable_region_epoch()->set_version(region.region_version);
}
writer->Write(response);
}
auto streaming_writer = std::make_shared<StreamWriter>(writer);
std::unique_ptr<DAGResponseWriter> response_writer = std::make_unique<StreamingDAGResponseWriter<StreamWriterPtr>>(streaming_writer,
std::vector<Int64>(), tipb::ExchangeType::PassThrough, context.getSettings().dag_records_per_chunk, dag.getEncodeType(),
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGDriver.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ class RegionInfo
{}
};

using RegionInfoList = std::vector<RegionInfo>;

/// An abstraction of driver running DAG request.
/// Now is a naive native executor. Might get evolved to drive MPP-like computation.

Expand Down
4 changes: 4 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,10 @@ void DAGQueryBlockInterpreter::executeTS(const tipb::TableScan & ts, Pipeline &
// For those regions which are not presented in this tiflash node, we will try to fetch streams by key ranges from other tiflash nodes, only happens in batch cop mode.
if (!region_retry.empty())
{
for (auto it : region_retry)
{
context.getQueryContext().getDAGContext()->retry_regions.push_back(it.second);
}
LOG_DEBUG(log, ({
std::stringstream ss;
ss << "Start to retry " << region_retry.size() << " regions (";
Expand Down
14 changes: 12 additions & 2 deletions dbms/src/Flash/Mpp/MPPHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ void MPPTask::unregisterTask()
}
}

void MPPTask::prepare(const mpp::DispatchTaskRequest & task_request)
std::vector<RegionInfo> MPPTask::prepare(const mpp::DispatchTaskRequest & task_request)
{
auto start_time = Clock::now();
dag_req = std::make_unique<tipb::DAGRequest>();
Expand Down Expand Up @@ -198,6 +198,8 @@ void MPPTask::prepare(const mpp::DispatchTaskRequest & task_request)
auto end_time = Clock::now();
Int64 compile_time_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(end_time - start_time).count();
dag_context->compile_time_ns = compile_time_ns;

return dag_context->retry_regions;
}

String taskStatusToString(TaskStatus ts)
Expand Down Expand Up @@ -353,7 +355,15 @@ grpc::Status MPPHandler::execute(Context & context, mpp::DispatchTaskResponse *
{
Stopwatch stopwatch;
task = std::make_shared<MPPTask>(task_request.meta(), context);
task->prepare(task_request);

auto retry_regions = task->prepare(task_request);
for (auto region : retry_regions)
{
auto * retry_region = response->add_retry_regions();
retry_region->set_id(region.region_id);
retry_region->mutable_region_epoch()->set_conf_ver(region.region_conf_version);
retry_region->mutable_region_epoch()->set_version(region.region_version);
}
if (task->dag_context->isRootMPPTask())
{
FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_before_mpp_root_task_run);
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Mpp/MPPHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ struct MPPTask : std::enable_shared_from_this<MPPTask>, private boost::noncopyab
}
}

void prepare(const mpp::DispatchTaskRequest & task_request);
std::vector<RegionInfo> prepare(const mpp::DispatchTaskRequest & task_request);

void updateProgress(const Progress &) { task_progress.current_progress++; }

Expand Down