Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add DAGRequestVerifier to verify tipb::DAGRequest #4165

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGQuerySource.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include <Flash/Coprocessor/DAGQuerySource.h>
#include <Flash/Coprocessor/DAGRequestVerifier.h>
#include <Flash/Coprocessor/InterpreterDAG.h>
#include <Flash/Coprocessor/collectOutputFieldTypes.h>
#include <Parsers/makeDummyQuery.h>
Expand All @@ -15,6 +16,10 @@ DAGQuerySource::DAGQuerySource(Context & context_)
: context(context_)
{
const tipb::DAGRequest & dag_request = *getDAGContext().dag_request;
#ifndef NDEBUG
DAGRequestVerifier(getDAGContext()).verify(&dag_request);
#endif // NDEBUG

if (dag_request.has_root_executor())
{
QueryBlockIDGenerator id_generator;
Expand Down
60 changes: 60 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGRequestVerifier.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
#include <Common/FmtUtils.h>
#include <Common/TiFlashException.h>
#include <Flash/Coprocessor/DAGContext.h>
#include <Flash/Coprocessor/DAGRequestVerifier.h>
#include <Flash/Statistics/traverseExecutors.h>

#include <functional>
#include <unordered_set>

namespace DB
{
namespace
{
#define CHECK(condition, msg) \
if (!(condition)) \
{ \
throw TiFlashException((msg), Errors::Coprocessor::BadRequest); \
}


using Rule = std::function<void(const tipb::DAGRequest *, const DAGContext &)>;

Rule check_mode = [](const tipb::DAGRequest * dag_request, const DAGContext & dag_context) {
CHECK(dag_request->has_root_executor() != (dag_request->executors_size() > 0), "One and only one of root_executor and executors exists");
if (dag_context.isMPPTask())
{
CHECK(dag_request->has_root_executor(), "for mpp mode, dag_request.has_root_executor() must be true");
CHECK(dag_request->root_executor().has_exchange_sender(), "for mpp mode, dag_request.root_executor() must be ExchangeSender");
}
};

Rule check_executor_id = [](const tipb::DAGRequest * dag_request, const DAGContext & dag_context) {
std::unordered_set<String> set;
traverseExecutors(dag_request, [&](const tipb::Executor & executor) {
CHECK(dag_context.isMPPTask() && executor.has_executor_id(), "for mpp mode, execute.has_executor_id() must be true");
if (executor.has_executor_id())
{
CHECK(!executor.executor_id().empty(), "executor_id is blank");
CHECK(set.find(executor.executor_id()) != set.end(), fmt::format("executor_id({}) duplicate", executor.executor_id()));
set.insert(executor.executor_id());
}
return true;
});
};

Rule misc = [](const tipb::DAGRequest * dag_request, const DAGContext &) {
CHECK(!dag_request->output_offsets().empty(), "dag_request.output_offsets() is empty");
};

#undef CHECK
} // namespace

void DAGRequestVerifier::verify(const tipb::DAGRequest * request)
{
static std::vector<Rule> rules{check_mode, check_executor_id, misc};

for (const auto & rule : rules)
rule(request, dag_context);
}
} // namespace DB
20 changes: 20 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGRequestVerifier.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#pragma once

#include <tipb/select.pb.h>

namespace DB
{
class DAGContext;
class DAGRequestVerifier
{
public:
explicit DAGRequestVerifier(const DAGContext & dag_context_)
: dag_context(dag_context_)
{}

void verify(const tipb::DAGRequest * dag_request);

private:
const DAGContext & dag_context;
};
} // namespace DB