diff --git a/dbms/src/Flash/executeQuery.cpp b/dbms/src/Flash/executeQuery.cpp new file mode 100644 index 00000000000..4b708a0131e --- /dev/null +++ b/dbms/src/Flash/executeQuery.cpp @@ -0,0 +1,189 @@ +// Copyright 2023 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace ProfileEvents +{ +extern const Event Query; +} + +namespace DB +{ +namespace FailPoints +{ +extern const char random_interpreter_failpoint[]; +} // namespace FailPoints + +namespace +{ +void prepareForExecute(Context & context) +{ + ProfileEvents::increment(ProfileEvents::Query); + context.setQueryContext(context); + + QuotaForIntervals & quota = context.getQuota(); + quota.addQuery(); /// NOTE Seems that when new time interval has come, first query is not accounted in number of queries. + quota.checkExceeded(time(nullptr)); +} + +ProcessList::EntryPtr getProcessListEntry(Context & context, DAGContext & dag_context) +{ + if (dag_context.is_mpp_task) + { + /// for MPPTask, process list entry is set in MPPTask::initProcessListEntry() + RUNTIME_ASSERT(dag_context.getProcessListEntry() != nullptr, "process list entry for MPP task must not be nullptr"); + return dag_context.getProcessListEntry(); + } + else + { + RUNTIME_ASSERT(dag_context.getProcessListEntry() == nullptr, "process list entry for non-MPP must be nullptr"); + auto process_list_entry = setProcessListElement( + context, + dag_context.dummy_query_string, + dag_context.dummy_ast.get(), + true); + dag_context.setProcessListEntry(process_list_entry); + return process_list_entry; + } +} + +QueryExecutorPtr doExecuteAsBlockIO(IQuerySource & dag, Context & context, bool internal) +{ + RUNTIME_ASSERT(context.getDAGContext()); + auto & dag_context = *context.getDAGContext(); + const auto & logger = dag_context.log; + RUNTIME_ASSERT(logger); + + prepareForExecute(context); + + ProcessList::EntryPtr process_list_entry; + if (likely(!internal)) + { + process_list_entry = getProcessListEntry(context, dag_context); + logQuery(dag.str(context.getSettingsRef().log_queries_cut_to_length), context, logger); + } + + FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::random_interpreter_failpoint); + auto interpreter = dag.interpreter(context, QueryProcessingStage::Complete); + BlockIO res = interpreter->execute(); + MemoryTrackerPtr memory_tracker; + if (likely(process_list_entry)) + { + (*process_list_entry)->setQueryStreams(res); + memory_tracker = (*process_list_entry)->getMemoryTrackerPtr(); + } + + /// Hold element of process list till end of query execution. + res.process_list_entry = process_list_entry; + + if (likely(!internal)) + logQueryPipeline(logger, res.in); + + dag_context.switchToStreamMode(); + return std::make_unique(memory_tracker, context, logger->identifier(), res.in); +} + +std::optional executeAsPipeline(Context & context, bool internal) +{ + RUNTIME_ASSERT(context.getDAGContext()); + auto & dag_context = *context.getDAGContext(); + const auto & logger = dag_context.log; + RUNTIME_ASSERT(logger); + + if unlikely (!TaskScheduler::instance) + { + LOG_WARNING(logger, "The task scheduler of the pipeline model has not been initialized, which is an exception. It is necessary to restart the TiFlash node."); + return {}; + } + if (!Pipeline::isSupported(*dag_context.dag_request, context.getSettingsRef())) + { + LOG_DEBUG(logger, "Can't executed by pipeline model due to unsupported operator, and then fallback to block inputstream model"); + return {}; + } + + prepareForExecute(context); + + ProcessList::EntryPtr process_list_entry; + if (likely(!internal)) + { + process_list_entry = getProcessListEntry(context, dag_context); + logQuery(dag_context.dummy_query_string, context, logger); + } + + MemoryTrackerPtr memory_tracker; + if (likely(process_list_entry)) + memory_tracker = (*process_list_entry)->getMemoryTrackerPtr(); + + FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::random_interpreter_failpoint); + auto executor = std::make_unique(memory_tracker, context, logger->identifier()); + if (likely(!internal)) + LOG_INFO(logger, fmt::format("Query pipeline:\n{}", executor->toString())); + dag_context.switchToPipelineMode(); + return {std::move(executor)}; +} + +QueryExecutorPtr executeAsBlockIO(Context & context, bool internal) +{ + if (context.getSettingsRef().enable_planner) + { + PlanQuerySource plan(context); + return doExecuteAsBlockIO(plan, context, internal); + } + else + { + DAGQuerySource dag(context); + return doExecuteAsBlockIO(dag, context, internal); + } +} +} // namespace + +QueryExecutorPtr queryExecute(Context & context, bool internal) +{ + if (context.getSettingsRef().enforce_enable_pipeline) + { + RUNTIME_CHECK_MSG( + TaskScheduler::instance, + "The task scheduler of the pipeline model has not been initialized, which is an exception. It is necessary to restart the TiFlash node."); + auto res = executeAsPipeline(context, internal); + RUNTIME_CHECK_MSG(res, "Failed to execute query using pipeline model, and an error is reported because the setting enforce_enable_pipeline is true."); + return std::move(*res); + } + if (context.getSettingsRef().enable_planner + && context.getSettingsRef().enable_pipeline) + { + if (auto res = executeAsPipeline(context, internal); res) + return std::move(*res); + } + return executeAsBlockIO(context, internal); +} +} // namespace DB diff --git a/dbms/src/Interpreters/executeQuery.cpp b/dbms/src/Interpreters/executeQuery.cpp index e5c1622feeb..cf0cc40e7c9 100644 --- a/dbms/src/Interpreters/executeQuery.cpp +++ b/dbms/src/Interpreters/executeQuery.cpp @@ -123,9 +123,26 @@ static void onExceptionBeforeStart(const String & query, Context & context, time } } +<<<<<<< HEAD static std::tuple executeQueryImpl( IQuerySource & query_src, +======= +void prepareForInputStream( + Context & context, + const BlockInputStreamPtr & in) +{ + assert(in); + if (auto * stream = dynamic_cast(in.get())) + { + stream->setProgressCallback(context.getProgressCallback()); + stream->setProcessListElement(context.getProcessListElement()); + } +} + +std::tuple executeQueryImpl( + SQLQuerySource & query_src, +>>>>>>> 80f8e9dc65 (fix issue #7810 (#7854)) Context & context, bool internal, QueryProcessingStage::Enum stage) @@ -361,6 +378,56 @@ static std::tuple executeQueryImpl( return std::make_tuple(ast, res); } +<<<<<<< HEAD +======= +/// Log query into text log (not into system table). +void logQuery(const String & query, const Context & context, const LoggerPtr & logger) +{ + const auto & current_query_id = context.getClientInfo().current_query_id; + const auto & initial_query_id = context.getClientInfo().initial_query_id; + const auto & current_user = context.getClientInfo().current_user; + + LOG_DEBUG( + logger, + "(from {}{}, query_id: {}{}) {}", + context.getClientInfo().current_address.toString(), + (current_user != "default" ? ", user: " + current_user : ""), + current_query_id, + (!initial_query_id.empty() && current_query_id != initial_query_id ? ", initial_query_id: " + initial_query_id : ""), + joinLines(query)); +} + +std::shared_ptr setProcessListElement( + Context & context, + const String & query, + const IAST * ast, + bool is_dag_task) +{ + assert(ast); + auto total_memory = context.getServerInfo().has_value() ? context.getServerInfo()->memory_info.capacity : 0; + auto process_list_entry = context.getProcessList().insert( + query, + ast, + context.getClientInfo(), + context.getSettingsRef(), + total_memory, + is_dag_task); + context.setProcessListElement(&process_list_entry->get()); + return process_list_entry; +} + +void logQueryPipeline(const LoggerPtr & logger, const BlockInputStreamPtr & in) +{ + assert(in); + auto pipeline_log_str = [&in]() { + FmtBuffer log_buffer; + log_buffer.append("Query pipeline:\n"); + in->dumpTree(log_buffer); + return log_buffer.toString(); + }; + LOG_INFO(logger, pipeline_log_str()); +} +>>>>>>> 80f8e9dc65 (fix issue #7810 (#7854)) BlockIO executeQuery( const String & query, diff --git a/dbms/src/Interpreters/executeQuery.h b/dbms/src/Interpreters/executeQuery.h index 4f4ef136ed2..7139198b042 100644 --- a/dbms/src/Interpreters/executeQuery.h +++ b/dbms/src/Interpreters/executeQuery.h @@ -43,4 +43,10 @@ BlockIO executeQuery( BlockIO executeQuery(DAGQuerySource & dag, Context & context, bool internal, QueryProcessingStage::Enum stage); +<<<<<<< HEAD } +======= +void logQuery(const String & query, const Context & context, const LoggerPtr & logger); + +} // namespace DB +>>>>>>> 80f8e9dc65 (fix issue #7810 (#7854))