Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TiFlash supports stale read #6459

Merged
merged 8 commits into from
Dec 16, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ namespace DB
M(tiflash_schema_apply_duration_seconds, "Bucketed histogram of ddl apply duration", Histogram, \
F(type_ddl_apply_duration, {{"req", "ddl_apply_duration"}}, ExpBuckets{0.001, 2, 20})) \
M(tiflash_raft_read_index_count, "Total number of raft read index", Counter) \
M(tiflash_stale_read_count, "Total number of stale read", Counter) \
M(tiflash_raft_read_index_duration_seconds, "Bucketed histogram of raft read index duration", Histogram, \
F(type_raft_read_index_duration, {{"type", "tmt_raft_read_index_duration"}}, ExpBuckets{0.001, 2, 20})) \
M(tiflash_raft_wait_index_duration_seconds, "Bucketed histogram of raft wait index duration", Histogram, \
Expand Down Expand Up @@ -233,7 +234,7 @@ namespace DB
F(type_active_rpc_async_worker, {"type", "rpc_async_worker_active"}), \
F(type_total_rpc_async_worker, {"type", "rpc_async_worker_total"})) \
M(tiflash_task_scheduler, "Min-tso task scheduler", Gauge, \
F(type_min_tso, {"type", "min_tso"}), \
F(type_min_query_ts, {"type", "min_query_ts"}), \
hehechen marked this conversation as resolved.
Show resolved Hide resolved
F(type_waiting_queries_count, {"type", "waiting_queries_count"}), \
F(type_active_queries_count, {"type", "active_queries_count"}), \
F(type_waiting_tasks_count, {"type", "waiting_tasks_count"}), \
Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Debug/DAGProperties.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ struct DAGProperties
bool use_broadcast_join = false;
Int32 mpp_partition_num = 1;
Timestamp start_ts = DEFAULT_MAX_READ_TSO;
UInt64 query_ts = 0;
UInt64 server_id = 1;
UInt64 local_query_id = 1;
Int64 task_id = 1;

Int32 mpp_timeout = 10;
};
} // namespace DB
7 changes: 5 additions & 2 deletions dbms/src/Debug/MockComputeServerManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,14 @@ void MockComputeServerManager::addServer(size_t partition_id, std::unique_ptr<Fl
server_map[partition_id] = std::move(server);
}

void MockComputeServerManager::cancelQuery(size_t start_ts)
void MockComputeServerManager::cancelQuery(const MPPQueryId & query_id)
{
mpp::CancelTaskRequest req;
auto * meta = req.mutable_meta();
meta->set_start_ts(start_ts);
meta->set_query_ts(query_id.query_ts);
meta->set_local_query_id(query_id.local_query_id);
meta->set_server_id(query_id.server_id);
meta->set_start_ts(query_id.start_ts);
mpp::CancelTaskResponse response;
for (const auto & server : server_map)
server.second->flashService()->cancelMPPTaskForTest(&req, &response);
Expand Down
3 changes: 1 addition & 2 deletions dbms/src/Debug/MockComputeServerManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

namespace DB::tests
{

/** Hold Mock Compute Server to manage the lifetime of them.
* Maintains Mock Compute Server info.
*/
Expand All @@ -49,7 +48,7 @@ class MockComputeServerManager : public ext::Singleton<MockComputeServerManager>

void resetMockMPPServerInfo(size_t partition_num);

void cancelQuery(size_t start_ts);
void cancelQuery(const MPPQueryId & query_id);

static String queryInfo();

Expand Down
9 changes: 9 additions & 0 deletions dbms/src/Debug/MockExecutor/AstToPB.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,18 +57,27 @@ using MPPCtxPtr = std::shared_ptr<MPPCtx>;
struct MPPInfo
{
Timestamp start_ts;
UInt64 query_ts;
UInt64 server_id;
UInt64 local_query_id;
Int64 partition_id;
Int64 task_id;
const std::vector<Int64> sender_target_task_ids;
const std::unordered_map<String, std::vector<Int64>> receiver_source_task_ids_map;

MPPInfo(
Timestamp start_ts_,
UInt64 query_ts_,
UInt64 server_id_,
UInt64 local_query_id_,
Int64 partition_id_,
Int64 task_id_,
const std::vector<Int64> & sender_target_task_ids_,
const std::unordered_map<String, std::vector<Int64>> & receiver_source_task_ids_map_)
: start_ts(start_ts_)
, query_ts(query_ts_)
, server_id(server_id_)
, local_query_id(local_query_id_)
, partition_id(partition_id_)
, task_id(task_id_)
, sender_target_task_ids(sender_target_task_ids_)
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Debug/MockExecutor/ExchangeReceiverBinder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ bool ExchangeReceiverBinder::toTiPBExecutor(tipb::Executor * tipb_executor, int3
{
mpp::TaskMeta meta;
meta.set_start_ts(mpp_info.start_ts);
meta.set_query_ts(mpp_info.query_ts);
meta.set_server_id(mpp_info.server_id);
meta.set_local_query_id(mpp_info.local_query_id);
meta.set_task_id(it->second[i]);
meta.set_partition_id(i);
auto addr = context.isMPPTest() ? tests::MockComputeServerManager::instance().getServerConfigMap()[i].addr : Debug::LOCAL_HOST;
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Debug/MockExecutor/ExchangeSenderBinder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ bool ExchangeSenderBinder::toTiPBExecutor(tipb::Executor * tipb_executor, int32_
{
mpp::TaskMeta meta;
meta.set_start_ts(mpp_info.start_ts);
meta.set_query_ts(mpp_info.query_ts);
meta.set_server_id(mpp_info.server_id);
meta.set_local_query_id(mpp_info.local_query_id);
meta.set_task_id(task_id);
meta.set_partition_id(i);
auto addr = context.isMPPTest() ? tests::MockComputeServerManager::instance().getServerConfigMap()[i++].addr : Debug::LOCAL_HOST;
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Debug/dbgFuncMisc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ inline size_t getReadTSOForLog(const String & line)
std::regex rx(R"((0|[1-9][0-9]*))");
std::smatch m;
// Rely on that MPP task prefix "MPP<query:435802637197639681,task:1>"
hehechen marked this conversation as resolved.
Show resolved Hide resolved
auto pos = line.find("query:");
auto pos = line.find(", start_ts:");
hehechen marked this conversation as resolved.
Show resolved Hide resolved
if (pos != std::string::npos && regex_search(line.cbegin() + pos, line.cend(), m, rx))
{
return std::stoul(m[1]);
Expand Down
5 changes: 4 additions & 1 deletion dbms/src/Debug/dbgQueryCompiler.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,9 @@ struct QueryFragment
{
MPPInfo mpp_info(
properties.start_ts,
properties.query_ts,
properties.server_id,
properties.local_query_id,
partition_id,
task_ids[partition_id],
sender_target_task_ids,
Expand All @@ -141,7 +144,7 @@ struct QueryFragment
}
else
{
MPPInfo mpp_info(properties.start_ts, /*partition_id*/ -1, /*task_id*/ -1, /*sender_target_task_ids*/ {}, /*receiver_source_task_ids_map*/ {});
MPPInfo mpp_info(properties.start_ts, properties.query_ts, properties.server_id, properties.local_query_id, /*partition_id*/ -1, /*task_id*/ -1, /*sender_target_task_ids*/ {}, /*receiver_source_task_ids_map*/ {});
ret.push_back(toQueryTask(properties, mpp_info, context));
}
return ret;
Expand Down
15 changes: 15 additions & 0 deletions dbms/src/Debug/dbgQueryExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ BlockInputStreamPtr constructExchangeReceiverStream(Context & context, tipb::Exc

mpp::TaskMeta root_tm;
root_tm.set_start_ts(properties.start_ts);
root_tm.set_query_ts(properties.query_ts);
root_tm.set_local_query_id(properties.local_query_id);
root_tm.set_server_id(properties.server_id);
root_tm.set_address(root_addr);
root_tm.set_task_id(-1);
root_tm.set_partition_id(-1);
Expand Down Expand Up @@ -71,6 +74,9 @@ BlockInputStreamPtr prepareRootExchangeReceiver(Context & context, const DAGProp
{
mpp::TaskMeta tm;
tm.set_start_ts(properties.start_ts);
tm.set_query_ts(properties.query_ts);
tm.set_local_query_id(properties.local_query_id);
tm.set_server_id(properties.server_id);
tm.set_address(Debug::LOCAL_HOST);
tm.set_task_id(root_task_id);
tm.set_partition_id(-1);
Expand All @@ -84,6 +90,9 @@ void prepareExchangeReceiverMetaWithMultipleContext(tipb::ExchangeReceiver & tip
{
mpp::TaskMeta tm;
tm.set_start_ts(properties.start_ts);
tm.set_query_ts(properties.query_ts);
tm.set_local_query_id(properties.local_query_id);
tm.set_server_id(properties.server_id);
tm.set_address(addr);
tm.set_task_id(task_id);
tm.set_partition_id(-1);
Expand All @@ -109,6 +118,9 @@ void prepareDispatchTaskRequest(QueryTask & task, std::shared_ptr<mpp::DispatchT
}
auto * tm = req->mutable_meta();
tm->set_start_ts(properties.start_ts);
tm->set_query_ts(properties.query_ts);
tm->set_local_query_id(properties.local_query_id);
tm->set_server_id(properties.server_id);
tm->set_partition_id(task.partition_id);
tm->set_address(addr);
tm->set_task_id(task.task_id);
Expand All @@ -128,6 +140,9 @@ void prepareDispatchTaskRequestWithMultipleContext(QueryTask & task, std::shared
}
auto * tm = req->mutable_meta();
tm->set_start_ts(properties.start_ts);
tm->set_query_ts(properties.query_ts);
tm->set_local_query_id(properties.local_query_id);
tm->set_server_id(properties.server_id);
tm->set_partition_id(task.partition_id);
tm->set_address(addr);
tm->set_task_id(task.task_id);
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/DAGContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ class DAGContext
, flags(dag_request->flags())
, sql_mode(dag_request->sql_mode())
, mpp_task_meta(meta_)
, mpp_task_id(mpp_task_meta.start_ts(), mpp_task_meta.task_id())
, mpp_task_id(mpp_task_meta.start_ts(), mpp_task_meta.task_id(), mpp_task_meta.server_id(), mpp_task_meta.query_ts(), mpp_task_meta.local_query_id())
hehechen marked this conversation as resolved.
Show resolved Hide resolved
, max_recorded_error_count(getMaxErrorCount(*dag_request))
, warnings(max_recorded_error_count)
, warning_count(0)
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Flash/FlashService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ grpc::Status FlashService::CancelMPPTask(

auto & tmt_context = context->getTMTContext();
auto task_manager = tmt_context.getMPPTaskManager();
task_manager->abortMPPQuery(request->meta().start_ts(), "Receive cancel request from TiDB", AbortType::ONCANCELLATION);
task_manager->abortMPPQuery(MPPQueryId(request->meta().query_ts(), request->meta().local_query_id(), request->meta().server_id(), request->meta().start_ts()), "Receive cancel request from TiDB", AbortType::ONCANCELLATION);
hehechen marked this conversation as resolved.
Show resolved Hide resolved
return grpc::Status::OK;
}

Expand Down Expand Up @@ -408,7 +408,7 @@ ::grpc::Status FlashService::cancelMPPTaskForTest(const ::mpp::CancelTaskRequest
}
auto & tmt_context = context->getTMTContext();
auto task_manager = tmt_context.getMPPTaskManager();
task_manager->abortMPPQuery(request->meta().start_ts(), "Receive cancel request from GTest", AbortType::ONCANCELLATION);
task_manager->abortMPPQuery(MPPQueryId(request->meta().query_ts(), request->meta().local_query_id(), request->meta().server_id(), request->meta().start_ts()), "Receive cancel request from GTest", AbortType::ONCANCELLATION);
return grpc::Status::OK;
}

Expand Down
8 changes: 4 additions & 4 deletions dbms/src/Flash/Mpp/MPPTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ extern const char force_no_local_region_for_mpp_task[];

MPPTask::MPPTask(const mpp::TaskMeta & meta_, const ContextPtr & context_)
: meta(meta_)
, id(meta.start_ts(), meta.task_id())
, id(meta.start_ts(), meta.task_id(), meta.server_id(), meta.query_ts(), meta.local_query_id())
hehechen marked this conversation as resolved.
Show resolved Hide resolved
hehechen marked this conversation as resolved.
Show resolved Hide resolved
, context(context_)
, manager(context_->getTMTContext().getMPPTaskManager().get())
, schedule_entry(manager, id)
Expand Down Expand Up @@ -139,7 +139,7 @@ void MPPTask::registerTunnels(const mpp::DispatchTaskRequest & task_request)
LOG_DEBUG(log, "begin to register the tunnel {}, is_local: {}, is_async: {}", tunnel->id(), is_local, is_async);
if (status != INITIALIZING)
throw Exception(fmt::format("The tunnel {} can not be registered, because the task is not in initializing state", tunnel->id()));
tunnel_set_local->registerTunnel(MPPTaskId{task_meta.start_ts(), task_meta.task_id()}, tunnel);
tunnel_set_local->registerTunnel(MPPTaskId{task_meta.start_ts(), task_meta.task_id(), task_meta.server_id(), task_meta.query_ts(), task_meta.local_query_id()}, tunnel);
if (!dag_context->isRootMPPTask())
{
FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_during_mpp_register_tunnel_for_non_root_mpp_task);
Expand Down Expand Up @@ -204,7 +204,7 @@ std::pair<MPPTunnelPtr, String> MPPTask::getTunnel(const ::mpp::EstablishMPPConn
return {nullptr, err_msg};
}

MPPTaskId receiver_id{request->receiver_meta().start_ts(), request->receiver_meta().task_id()};
MPPTaskId receiver_id{request->receiver_meta().start_ts(), request->receiver_meta().task_id(), request->receiver_meta().server_id(), request->receiver_meta().query_ts(), request->receiver_meta().local_query_id()};
RUNTIME_ASSERT(tunnel_set != nullptr, log, "mpp task without tunnel set");
auto tunnel_ptr = tunnel_set->getTunnelByReceiverTaskId(receiver_id);
if (tunnel_ptr == nullptr)
Expand Down Expand Up @@ -452,7 +452,7 @@ void MPPTask::runImpl()
void MPPTask::handleError(const String & error_msg)
{
auto updated_msg = fmt::format("From {}: {}", id.toString(), error_msg);
manager->abortMPPQuery(id.start_ts, updated_msg, AbortType::ONERROR);
manager->abortMPPQuery(id.query_id, updated_msg, AbortType::ONERROR);
if (!registered)
// if the task is not registered, need to cancel it explicitly
abort(error_msg, AbortType::ONERROR);
Expand Down
37 changes: 35 additions & 2 deletions dbms/src/Flash/Mpp/MPPTaskId.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,48 @@

namespace DB
{
bool MPPQueryId::operator<(const MPPQueryId & mpp_query_id) const
{
if (query_ts == 0 && local_query_id == 0 && server_id == 0)
hehechen marked this conversation as resolved.
Show resolved Hide resolved
{
return start_ts < mpp_query_id.start_ts;
}
return (query_ts < mpp_query_id.query_ts) || (local_query_id < mpp_query_id.local_query_id) || (server_id < mpp_query_id.server_id);
}
bool MPPQueryId::operator==(const MPPQueryId & rid) const
{
return query_ts == rid.query_ts && local_query_id == rid.local_query_id && server_id == rid.server_id && start_ts == rid.start_ts;
}
bool MPPQueryId::operator!=(const MPPQueryId & rid) const
{
return !(*this == rid);
}
bool MPPQueryId::operator<=(const MPPQueryId & rid) const
{
return *this < rid || *this == rid;
}

size_t MPPQueryIdHash::operator()(MPPQueryId const & mpp_query_id) const noexcept
{
if (mpp_query_id.query_ts == 0 && mpp_query_id.local_query_id == 0 && mpp_query_id.server_id == 0)
hehechen marked this conversation as resolved.
Show resolved Hide resolved
{
return std::hash<UInt64>()(mpp_query_id.start_ts);
}
return std::hash<UInt64>()(mpp_query_id.query_ts) ^ std::hash<UInt64>()(mpp_query_id.local_query_id) ^ std::hash<UInt64>()(mpp_query_id.server_id);
}

String MPPTaskId::toString() const
{
return isUnknown() ? "MPP<query:N/A,task:N/A>" : fmt::format("MPP<query:{},task:{}>", start_ts, task_id);
return isUnknown() ? "MPP<query_id:N/A,start_ts:N/A,task_id:N/A>" : fmt::format("MPP<query:{},start_ts:{},task_id:{}>", query_id.toDebugString(), start_ts, task_id);
}

const MPPTaskId MPPTaskId::unknown_mpp_task_id = MPPTaskId{};

constexpr UInt64 MAX_UINT64 = std::numeric_limits<UInt64>::max();
const MPPQueryId MPPTaskId::Max_Query_Id = MPPQueryId(MAX_UINT64, MAX_UINT64, MAX_UINT64, MAX_UINT64);

bool operator==(const MPPTaskId & lid, const MPPTaskId & rid)
{
return lid.start_ts == rid.start_ts && lid.task_id == rid.task_id;
return lid.query_id == rid.query_id && lid.task_id == rid.task_id;
}
} // namespace DB
43 changes: 38 additions & 5 deletions dbms/src/Flash/Mpp/MPPTaskId.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,28 +15,61 @@
#pragma once

#include <common/types.h>
#include <fmt/core.h>

namespace DB
{
// global unique MPP query id.
struct MPPQueryId
{
UInt64 query_ts;
UInt64 local_query_id;
UInt64 server_id;
UInt64 start_ts;
MPPQueryId(UInt64 query_ts, UInt64 local_query_id, UInt64 server_id, UInt64 start_ts)
: query_ts(query_ts)
, local_query_id(local_query_id)
, server_id(server_id)
, start_ts(start_ts)
{}
bool operator<(const MPPQueryId & mpp_query_id) const;
bool operator==(const MPPQueryId & rid) const;
bool operator!=(const MPPQueryId & rid) const;
bool operator<=(const MPPQueryId & rid) const;
String toDebugString() const
hehechen marked this conversation as resolved.
Show resolved Hide resolved
{
return fmt::format("query_ts:{}, local_query_id:{}, server_id:{}, start_ts:{}", query_ts, local_query_id, server_id, start_ts);
hehechen marked this conversation as resolved.
Show resolved Hide resolved
}
};

struct MPPQueryIdHash
{
size_t operator()(MPPQueryId const & mpp_query_id) const noexcept;
};

// Identify a mpp task.
struct MPPTaskId
{
MPPTaskId()
: start_ts(0)
, task_id(unknown_task_id){};
, task_id(unknown_task_id)
, query_id({0, 0, 0, 0}){};

MPPTaskId(UInt64 start_ts_, Int64 task_id_)
MPPTaskId(UInt64 start_ts_, Int64 task_id_, UInt64 server_id, UInt64 query_ts, UInt64 local_query_id)
: start_ts(start_ts_)
, task_id(task_id_){};
, task_id(task_id_)
, query_id(query_ts, local_query_id, server_id, start_ts_)
{}

UInt64 start_ts;
hehechen marked this conversation as resolved.
Show resolved Hide resolved
Int64 task_id;
MPPQueryId query_id;

bool isUnknown() const { return task_id == unknown_task_id; }

String toString() const;

static const MPPTaskId unknown_mpp_task_id;
static const MPPQueryId Max_Query_Id;

private:
static constexpr Int64 unknown_task_id = -1;
Expand All @@ -53,7 +86,7 @@ class hash<DB::MPPTaskId>
public:
size_t operator()(const DB::MPPTaskId & id) const
{
return hash<UInt64>()(id.start_ts) ^ hash<Int64>()(id.task_id);
return DB::MPPQueryIdHash()(id.query_id) ^ hash<Int64>()(id.task_id);
}
};
} // namespace std
Loading