Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

avoid tiflash crash when query is killed (#3434) #3448

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,9 @@ void ParallelAggregatingBlockInputStream::Handler::onFinish()
void ParallelAggregatingBlockInputStream::Handler::onException(std::exception_ptr & exception, size_t thread_num)
{
parent.exceptions[thread_num] = exception;
parent.cancel(false);
/// can not cancel parent inputStream or the exception might be lost
if (!parent.executed)
parent.processor.cancel(false);
}


Expand Down
3 changes: 2 additions & 1 deletion dbms/src/DataStreams/UnionBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,8 @@ class UnionBlockInputStream final : public IProfilingBlockInputStream
/// and the exception is lost.

parent.output_queue.push(exception);
parent.cancel(false); /// Does not throw exceptions.
/// can not cancel parent inputStream or the exception might be lost
parent.processor.cancel(false); /// Does not throw exceptions.
}

Self & parent;
Expand Down
11 changes: 7 additions & 4 deletions dbms/src/Flash/Mpp/MPPHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -246,14 +246,16 @@ enum TaskStatus

struct MPPTask : std::enable_shared_from_this<MPPTask>, private boost::noncopyable
{
Context context;

std::unique_ptr<tipb::DAGRequest> dag_req;
std::unique_ptr<DAGContext> dag_context;

Context context;
/// store io in MPPTask to keep the life cycle of memory_tracker for the current query
/// BlockIO contains some information stored in Context and DAGContext, so need deconstruct it before Context and DAGContext
/// BlockIO contains some information stored in Context, so need deconstruct it before Context
BlockIO io;
/// The inputStreams should be released in the destructor of BlockIO, since DAGContext contains
/// some reference to inputStreams, so it need to be destructed before BlockIO
std::unique_ptr<DAGContext> dag_context;
MemoryTracker * memory_tracker = nullptr;

MPPTaskId id;
Expand Down Expand Up @@ -349,7 +351,8 @@ struct MPPTask : std::enable_shared_from_this<MPPTask>, private boost::noncopyab
{
/// MPPTask maybe destructed by different thread, set the query memory_tracker
/// to current_memory_tracker in the destructor
current_memory_tracker = memory_tracker;
if (current_memory_tracker != memory_tracker)
current_memory_tracker = memory_tracker;
closeAllTunnel("");
LOG_DEBUG(log, "finish MPPTask: " << id.toString());
}
Expand Down