From 19ed79242f48cae5eb8bcce088457fec9981efed Mon Sep 17 00:00:00 2001 From: Oleg Doronin Date: Thu, 28 Dec 2023 15:24:10 +0000 Subject: [PATCH 1/3] self holder has been added --- ydb/core/fq/libs/shared_resources/db_exec.h | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/ydb/core/fq/libs/shared_resources/db_exec.h b/ydb/core/fq/libs/shared_resources/db_exec.h index 555e16e06915..3258c07e5d2a 100644 --- a/ydb/core/fq/libs/shared_resources/db_exec.h +++ b/ydb/core/fq/libs/shared_resources/db_exec.h @@ -141,8 +141,11 @@ class TDbExecuter : public TDbExecutable { if (CurrentStepIndex == Steps.size()) { if (Transaction) { return Transaction->Commit() - .Apply([this, session=session](const TFuture& future) { - + .Apply([this, self=SelfHolder, session=session](const TFuture& future) { + auto lock = self.lock(); + if (!lock) { + return MakeFuture(TStatus{EStatus::INTERNAL_ERROR, NYql::TIssues{NYql::TIssue{"this has been deleted"}}}); + } TCommitTransactionResult result = future.GetValue(); auto status = static_cast(result); if (!status.IsSuccess()) { @@ -179,7 +182,11 @@ class TDbExecuter : public TDbExecutable { } return session.ExecuteDataQuery(query.Sql, transaction, query.Params, NYdb::NTable::TExecDataQuerySettings().KeepInQueryCache(true)) - .Apply([this, session=session](const TFuture& future) { + .Apply([this, self=SelfHolder, session=session](const TFuture& future) { + auto lock = self.lock(); + if (!lock) { + return MakeFuture(TStatus{EStatus::INTERNAL_ERROR, NYql::TIssues{NYql::TIssue{"this has been deleted"}}}); + } NYdb::NTable::TDataQueryResult result = future.GetValue(); auto status = static_cast(result); From 43b435005d525413bf4ae06e41faa4c5de64d0db Mon Sep 17 00:00:00 2001 From: Oleg Doronin Date: Thu, 28 Dec 2023 16:03:25 +0000 Subject: [PATCH 2/3] cleanup --- ydb/core/fq/libs/shared_resources/db_exec.h | 63 +++++++++++---------- 1 file changed, 34 insertions(+), 29 deletions(-) diff --git a/ydb/core/fq/libs/shared_resources/db_exec.h b/ydb/core/fq/libs/shared_resources/db_exec.h index 3258c07e5d2a..763379963a0f 100644 --- a/ydb/core/fq/libs/shared_resources/db_exec.h +++ b/ydb/core/fq/libs/shared_resources/db_exec.h @@ -136,31 +136,36 @@ class TDbExecuter : public TDbExecutable { SkipStep_ = true; } + static std::shared_ptr Lock(const std::weak_ptr& self) { + auto lock = self.lock(); + return std::dynamic_pointer_cast(lock); + } + TAsyncStatus NextStep(NYdb::NTable::TSession session) { if (CurrentStepIndex == Steps.size()) { if (Transaction) { return Transaction->Commit() - .Apply([this, self=SelfHolder, session=session](const TFuture& future) { - auto lock = self.lock(); - if (!lock) { - return MakeFuture(TStatus{EStatus::INTERNAL_ERROR, NYql::TIssues{NYql::TIssue{"this has been deleted"}}}); + .Apply([selfHolder=SelfHolder, session=session](const TFuture& future) { + auto self = Lock(selfHolder); + if (!self) { + return MakeFuture(TStatus{EStatus::INTERNAL_ERROR, NYql::TIssues{NYql::TIssue{"self has been deleted"}}}); } TCommitTransactionResult result = future.GetValue(); auto status = static_cast(result); if (!status.IsSuccess()) { return MakeFuture(status); } else { - this->Transaction.Clear(); - return this->NextStep(session); + self->Transaction.Clear(); + return self->NextStep(session); } }); } if (HandlerActorId != NActors::TActorId{}) { - auto holder = SelfHolder.lock(); + auto holder = Lock(SelfHolder); if (holder) { - ActorSystem->Send(HandlerActorId, new TEvents::TEvCallback([this, holder=holder, handlerCallback=HandlerCallback]() { - handlerCallback(*this); + ActorSystem->Send(HandlerActorId, new TEvents::TEvCallback([holder=holder, handlerCallback=HandlerCallback]() { + handlerCallback(*holder); })); } } @@ -182,53 +187,53 @@ class TDbExecuter : public TDbExecutable { } return session.ExecuteDataQuery(query.Sql, transaction, query.Params, NYdb::NTable::TExecDataQuerySettings().KeepInQueryCache(true)) - .Apply([this, self=SelfHolder, session=session](const TFuture& future) { - auto lock = self.lock(); - if (!lock) { - return MakeFuture(TStatus{EStatus::INTERNAL_ERROR, NYql::TIssues{NYql::TIssue{"this has been deleted"}}}); + .Apply([selfHolder=SelfHolder, session=session](const TFuture& future) { + auto self = Lock(selfHolder); + if (!self) { + return MakeFuture(TStatus{EStatus::INTERNAL_ERROR, NYql::TIssues{NYql::TIssue{"self has been deleted"}}}); } NYdb::NTable::TDataQueryResult result = future.GetValue(); auto status = static_cast(result); if (status.GetStatus() == EStatus::SCHEME_ERROR) { // retry if table does not exist - this->Transaction.Clear(); + self->Transaction.Clear(); return MakeFuture(TStatus{EStatus::UNAVAILABLE, NYql::TIssues{status.GetIssues()}}); } if (!status.IsSuccess()) { - this->Transaction.Clear(); + self->Transaction.Clear(); return MakeFuture(status); } - if (this->Steps[CurrentStepIndex].Commit) { - this->Transaction.Clear(); - } else if (!this->Transaction) { - this->Transaction = result.GetTransaction(); + if (self->Steps[self->CurrentStepIndex].Commit) { + self->Transaction.Clear(); + } else if (!self->Transaction) { + self->Transaction = result.GetTransaction(); } - if (this->Steps[CurrentStepIndex].ResultCallback) { + if (self->Steps[self->CurrentStepIndex].ResultCallback) { try { - this->Steps[CurrentStepIndex].ResultCallback(*this, result.GetResultSets()); + self->Steps[self->CurrentStepIndex].ResultCallback(*self, result.GetResultSets()); } catch (const TCodeLineException& exception) { NYql::TIssue issue = MakeErrorIssue(exception.Code, exception.GetRawMessage()); - Issues.AddIssue(issue); + self->Issues.AddIssue(issue); NYql::TIssue internalIssue = MakeErrorIssue(exception.Code, CurrentExceptionMessage()); - InternalIssues.AddIssue(internalIssue); + self->InternalIssues.AddIssue(internalIssue); } catch (const std::exception& exception) { NYql::TIssue issue = MakeErrorIssue(TIssuesIds::INTERNAL_ERROR, exception.what()); - Issues.AddIssue(issue); + self->Issues.AddIssue(issue); NYql::TIssue internalIssue = MakeErrorIssue(TIssuesIds::INTERNAL_ERROR, CurrentExceptionMessage()); - InternalIssues.AddIssue(internalIssue); + self->InternalIssues.AddIssue(internalIssue); } catch (...) { NYql::TIssue issue = MakeErrorIssue(TIssuesIds::INTERNAL_ERROR, CurrentExceptionMessage()); - Issues.AddIssue(issue); + self->Issues.AddIssue(issue); NYql::TIssue internalIssue = MakeErrorIssue(TIssuesIds::INTERNAL_ERROR, CurrentExceptionMessage()); - InternalIssues.AddIssue(internalIssue); + self->InternalIssues.AddIssue(internalIssue); } } - this->CurrentStepIndex++; - return this->NextStep(session); + self->CurrentStepIndex++; + return self->NextStep(session); }); } } From 72104bff7d179776e0e3716a961b4ba5e2f400b7 Mon Sep 17 00:00:00 2001 From: Oleg Doronin Date: Thu, 28 Dec 2023 16:13:47 +0000 Subject: [PATCH 3/3] cleanup --- ydb/core/fq/libs/shared_resources/db_exec.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/core/fq/libs/shared_resources/db_exec.h b/ydb/core/fq/libs/shared_resources/db_exec.h index 763379963a0f..8eeac48ec64b 100644 --- a/ydb/core/fq/libs/shared_resources/db_exec.h +++ b/ydb/core/fq/libs/shared_resources/db_exec.h @@ -138,7 +138,7 @@ class TDbExecuter : public TDbExecutable { static std::shared_ptr Lock(const std::weak_ptr& self) { auto lock = self.lock(); - return std::dynamic_pointer_cast(lock); + return std::static_pointer_cast(lock); } TAsyncStatus NextStep(NYdb::NTable::TSession session) {