diff --git a/ydb/core/fq/libs/shared_resources/db_exec.h b/ydb/core/fq/libs/shared_resources/db_exec.h index 555e16e06915..8eeac48ec64b 100644 --- a/ydb/core/fq/libs/shared_resources/db_exec.h +++ b/ydb/core/fq/libs/shared_resources/db_exec.h @@ -136,28 +136,36 @@ class TDbExecuter : public TDbExecutable { SkipStep_ = true; } + static std::shared_ptr Lock(const std::weak_ptr& self) { + auto lock = self.lock(); + return std::static_pointer_cast(lock); + } + TAsyncStatus NextStep(NYdb::NTable::TSession session) { if (CurrentStepIndex == Steps.size()) { if (Transaction) { return Transaction->Commit() - .Apply([this, session=session](const TFuture& future) { - + .Apply([selfHolder=SelfHolder, session=session](const TFuture& future) { + auto self = Lock(selfHolder); + if (!self) { + return MakeFuture(TStatus{EStatus::INTERNAL_ERROR, NYql::TIssues{NYql::TIssue{"self has been deleted"}}}); + } TCommitTransactionResult result = future.GetValue(); auto status = static_cast(result); if (!status.IsSuccess()) { return MakeFuture(status); } else { - this->Transaction.Clear(); - return this->NextStep(session); + self->Transaction.Clear(); + return self->NextStep(session); } }); } if (HandlerActorId != NActors::TActorId{}) { - auto holder = SelfHolder.lock(); + auto holder = Lock(SelfHolder); if (holder) { - ActorSystem->Send(HandlerActorId, new TEvents::TEvCallback([this, holder=holder, handlerCallback=HandlerCallback]() { - handlerCallback(*this); + ActorSystem->Send(HandlerActorId, new TEvents::TEvCallback([holder=holder, handlerCallback=HandlerCallback]() { + handlerCallback(*holder); })); } } @@ -179,49 +187,53 @@ class TDbExecuter : public TDbExecutable { } return session.ExecuteDataQuery(query.Sql, transaction, query.Params, NYdb::NTable::TExecDataQuerySettings().KeepInQueryCache(true)) - .Apply([this, session=session](const TFuture& future) { + .Apply([selfHolder=SelfHolder, session=session](const TFuture& future) { + auto self = Lock(selfHolder); + if (!self) { + return MakeFuture(TStatus{EStatus::INTERNAL_ERROR, NYql::TIssues{NYql::TIssue{"self has been deleted"}}}); + } NYdb::NTable::TDataQueryResult result = future.GetValue(); auto status = static_cast(result); if (status.GetStatus() == EStatus::SCHEME_ERROR) { // retry if table does not exist - this->Transaction.Clear(); + self->Transaction.Clear(); return MakeFuture(TStatus{EStatus::UNAVAILABLE, NYql::TIssues{status.GetIssues()}}); } if (!status.IsSuccess()) { - this->Transaction.Clear(); + self->Transaction.Clear(); return MakeFuture(status); } - if (this->Steps[CurrentStepIndex].Commit) { - this->Transaction.Clear(); - } else if (!this->Transaction) { - this->Transaction = result.GetTransaction(); + if (self->Steps[self->CurrentStepIndex].Commit) { + self->Transaction.Clear(); + } else if (!self->Transaction) { + self->Transaction = result.GetTransaction(); } - if (this->Steps[CurrentStepIndex].ResultCallback) { + if (self->Steps[self->CurrentStepIndex].ResultCallback) { try { - this->Steps[CurrentStepIndex].ResultCallback(*this, result.GetResultSets()); + self->Steps[self->CurrentStepIndex].ResultCallback(*self, result.GetResultSets()); } catch (const TCodeLineException& exception) { NYql::TIssue issue = MakeErrorIssue(exception.Code, exception.GetRawMessage()); - Issues.AddIssue(issue); + self->Issues.AddIssue(issue); NYql::TIssue internalIssue = MakeErrorIssue(exception.Code, CurrentExceptionMessage()); - InternalIssues.AddIssue(internalIssue); + self->InternalIssues.AddIssue(internalIssue); } catch (const std::exception& exception) { NYql::TIssue issue = MakeErrorIssue(TIssuesIds::INTERNAL_ERROR, exception.what()); - Issues.AddIssue(issue); + self->Issues.AddIssue(issue); NYql::TIssue internalIssue = MakeErrorIssue(TIssuesIds::INTERNAL_ERROR, CurrentExceptionMessage()); - InternalIssues.AddIssue(internalIssue); + self->InternalIssues.AddIssue(internalIssue); } catch (...) { NYql::TIssue issue = MakeErrorIssue(TIssuesIds::INTERNAL_ERROR, CurrentExceptionMessage()); - Issues.AddIssue(issue); + self->Issues.AddIssue(issue); NYql::TIssue internalIssue = MakeErrorIssue(TIssuesIds::INTERNAL_ERROR, CurrentExceptionMessage()); - InternalIssues.AddIssue(internalIssue); + self->InternalIssues.AddIssue(internalIssue); } } - this->CurrentStepIndex++; - return this->NextStep(session); + self->CurrentStepIndex++; + return self->NextStep(session); }); } }