From 7d016ea2cd57569efc20aade8ce0e31546d59bbb Mon Sep 17 00:00:00 2001 From: SeaRise Date: Tue, 1 Mar 2022 16:31:51 +0800 Subject: [PATCH] update --- dbms/src/Flash/Coprocessor/DAGQuerySource.cpp | 5 ++ .../Flash/Coprocessor/DAGRequestVerifier.cpp | 60 +++++++++++++++++++ .../Flash/Coprocessor/DAGRequestVerifier.h | 20 +++++++ 3 files changed, 85 insertions(+) create mode 100644 dbms/src/Flash/Coprocessor/DAGRequestVerifier.cpp create mode 100644 dbms/src/Flash/Coprocessor/DAGRequestVerifier.h diff --git a/dbms/src/Flash/Coprocessor/DAGQuerySource.cpp b/dbms/src/Flash/Coprocessor/DAGQuerySource.cpp index d2160992c5b..da53710438b 100644 --- a/dbms/src/Flash/Coprocessor/DAGQuerySource.cpp +++ b/dbms/src/Flash/Coprocessor/DAGQuerySource.cpp @@ -1,4 +1,5 @@ #include +#include #include #include #include @@ -15,6 +16,10 @@ DAGQuerySource::DAGQuerySource(Context & context_) : context(context_) { const tipb::DAGRequest & dag_request = *getDAGContext().dag_request; +#ifndef NDEBUG + DAGRequestVerifier(getDAGContext()).verify(&dag_request); +#endif // NDEBUG + if (dag_request.has_root_executor()) { QueryBlockIDGenerator id_generator; diff --git a/dbms/src/Flash/Coprocessor/DAGRequestVerifier.cpp b/dbms/src/Flash/Coprocessor/DAGRequestVerifier.cpp new file mode 100644 index 00000000000..3b5522d0f81 --- /dev/null +++ b/dbms/src/Flash/Coprocessor/DAGRequestVerifier.cpp @@ -0,0 +1,60 @@ +#include +#include +#include +#include +#include + +#include +#include + +namespace DB +{ +namespace +{ +#define CHECK(condition, msg) \ + if (!(condition)) \ + { \ + throw TiFlashException((msg), Errors::Coprocessor::BadRequest); \ + } + + +using Rule = std::function; + +Rule check_mode = [](const tipb::DAGRequest * dag_request, const DAGContext & dag_context) { + CHECK(dag_request->has_root_executor() != (dag_request->executors_size() > 0), "One and only one of root_executor and executors exists"); + if (dag_context.isMPPTask()) + { + CHECK(dag_request->has_root_executor(), "for mpp mode, dag_request.has_root_executor() must be true"); + CHECK(dag_request->root_executor().has_exchange_sender(), "for mpp mode, dag_request.root_executor() must be ExchangeSender"); + } +}; + +Rule check_executor_id = [](const tipb::DAGRequest * dag_request, const DAGContext & dag_context) { + std::unordered_set set; + traverseExecutors(dag_request, [&](const tipb::Executor & executor) { + CHECK(dag_context.isMPPTask() && executor.has_executor_id(), "for mpp mode, execute.has_executor_id() must be true"); + if (executor.has_executor_id()) + { + CHECK(!executor.executor_id().empty(), "executor_id is blank"); + CHECK(set.find(executor.executor_id()) != set.end(), fmt::format("executor_id({}) duplicate", executor.executor_id())); + set.insert(executor.executor_id()); + } + return true; + }); +}; + +Rule misc = [](const tipb::DAGRequest * dag_request, const DAGContext &) { + CHECK(!dag_request->output_offsets().empty(), "dag_request.output_offsets() is empty"); +}; + +#undef CHECK +} // namespace + +void DAGRequestVerifier::verify(const tipb::DAGRequest * request) +{ + static std::vector rules{check_mode, check_executor_id, misc}; + + for (const auto & rule : rules) + rule(request, dag_context); +} +} // namespace DB \ No newline at end of file diff --git a/dbms/src/Flash/Coprocessor/DAGRequestVerifier.h b/dbms/src/Flash/Coprocessor/DAGRequestVerifier.h new file mode 100644 index 00000000000..4ff9dd15f2f --- /dev/null +++ b/dbms/src/Flash/Coprocessor/DAGRequestVerifier.h @@ -0,0 +1,20 @@ +#pragma once + +#include + +namespace DB +{ +class DAGContext; +class DAGRequestVerifier +{ +public: + explicit DAGRequestVerifier(const DAGContext & dag_context_) + : dag_context(dag_context_) + {} + + void verify(const tipb::DAGRequest * dag_request); + +private: + const DAGContext & dag_context; +}; +} // namespace DB \ No newline at end of file