From 3ebbe29da9c87b3cee7354739e8c1a953473b023 Mon Sep 17 00:00:00 2001 From: xufei Date: Tue, 16 Nov 2021 16:52:02 +0800 Subject: [PATCH] resolve conflict --- .../ParallelAggregatingBlockInputStream.cpp | 4 +++- dbms/src/DataStreams/UnionBlockInputStream.h | 3 ++- dbms/src/Flash/Mpp/MPPHandler.h | 11 +++++++---- 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp b/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp index a21e1197028..23f3a848286 100644 --- a/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp +++ b/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp @@ -157,7 +157,9 @@ void ParallelAggregatingBlockInputStream::Handler::onFinish() void ParallelAggregatingBlockInputStream::Handler::onException(std::exception_ptr & exception, size_t thread_num) { parent.exceptions[thread_num] = exception; - parent.cancel(false); + /// can not cancel parent inputStream or the exception might be lost + if (!parent.executed) + parent.processor.cancel(false); } diff --git a/dbms/src/DataStreams/UnionBlockInputStream.h b/dbms/src/DataStreams/UnionBlockInputStream.h index 71e72672aa3..520eb144142 100644 --- a/dbms/src/DataStreams/UnionBlockInputStream.h +++ b/dbms/src/DataStreams/UnionBlockInputStream.h @@ -284,7 +284,8 @@ class UnionBlockInputStream final : public IProfilingBlockInputStream /// and the exception is lost. parent.output_queue.push(exception); - parent.cancel(false); /// Does not throw exceptions. + /// can not cancel parent inputStream or the exception might be lost + parent.processor.cancel(false); /// Does not throw exceptions. } Self & parent; diff --git a/dbms/src/Flash/Mpp/MPPHandler.h b/dbms/src/Flash/Mpp/MPPHandler.h index d66077e2383..08b3cac21bf 100644 --- a/dbms/src/Flash/Mpp/MPPHandler.h +++ b/dbms/src/Flash/Mpp/MPPHandler.h @@ -246,14 +246,16 @@ enum TaskStatus struct MPPTask : std::enable_shared_from_this, private boost::noncopyable { - Context context; std::unique_ptr dag_req; - std::unique_ptr dag_context; + Context context; /// store io in MPPTask to keep the life cycle of memory_tracker for the current query - /// BlockIO contains some information stored in Context and DAGContext, so need deconstruct it before Context and DAGContext + /// BlockIO contains some information stored in Context, so need deconstruct it before Context BlockIO io; + /// The inputStreams should be released in the destructor of BlockIO, since DAGContext contains + /// some reference to inputStreams, so it need to be destructed before BlockIO + std::unique_ptr dag_context; MemoryTracker * memory_tracker = nullptr; MPPTaskId id; @@ -349,7 +351,8 @@ struct MPPTask : std::enable_shared_from_this, private boost::noncopyab { /// MPPTask maybe destructed by different thread, set the query memory_tracker /// to current_memory_tracker in the destructor - current_memory_tracker = memory_tracker; + if (current_memory_tracker != memory_tracker) + current_memory_tracker = memory_tracker; closeAllTunnel(""); LOG_DEBUG(log, "finish MPPTask: " << id.toString()); }