Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 36 additions & 24 deletions ydb/core/fq/libs/shared_resources/db_exec.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,28 +136,36 @@ class TDbExecuter : public TDbExecutable {
SkipStep_ = true;
}

static std::shared_ptr<TDbExecuter> Lock(const std::weak_ptr<TDbExecutable>& self) {
auto lock = self.lock();
return std::static_pointer_cast<TDbExecuter>(lock);
}

TAsyncStatus NextStep(NYdb::NTable::TSession session) {

if (CurrentStepIndex == Steps.size()) {
if (Transaction) {
return Transaction->Commit()
.Apply([this, session=session](const TFuture<TCommitTransactionResult>& future) {

.Apply([selfHolder=SelfHolder, session=session](const TFuture<TCommitTransactionResult>& future) {
auto self = Lock(selfHolder);
if (!self) {
return MakeFuture(TStatus{EStatus::INTERNAL_ERROR, NYql::TIssues{NYql::TIssue{"self has been deleted"}}});
}
TCommitTransactionResult result = future.GetValue();
auto status = static_cast<TStatus>(result);
if (!status.IsSuccess()) {
return MakeFuture(status);
} else {
this->Transaction.Clear();
return this->NextStep(session);
self->Transaction.Clear();
return self->NextStep(session);
}
});
}
if (HandlerActorId != NActors::TActorId{}) {
auto holder = SelfHolder.lock();
auto holder = Lock(SelfHolder);
if (holder) {
ActorSystem->Send(HandlerActorId, new TEvents::TEvCallback([this, holder=holder, handlerCallback=HandlerCallback]() {
handlerCallback(*this);
ActorSystem->Send(HandlerActorId, new TEvents::TEvCallback([holder=holder, handlerCallback=HandlerCallback]() {
handlerCallback(*holder);
}));
}
}
Expand All @@ -179,49 +187,53 @@ class TDbExecuter : public TDbExecutable {
}

return session.ExecuteDataQuery(query.Sql, transaction, query.Params, NYdb::NTable::TExecDataQuerySettings().KeepInQueryCache(true))
.Apply([this, session=session](const TFuture<TDataQueryResult>& future) {
.Apply([selfHolder=SelfHolder, session=session](const TFuture<TDataQueryResult>& future) {
auto self = Lock(selfHolder);
if (!self) {
return MakeFuture(TStatus{EStatus::INTERNAL_ERROR, NYql::TIssues{NYql::TIssue{"self has been deleted"}}});
}

NYdb::NTable::TDataQueryResult result = future.GetValue();
auto status = static_cast<TStatus>(result);

if (status.GetStatus() == EStatus::SCHEME_ERROR) { // retry if table does not exist
this->Transaction.Clear();
self->Transaction.Clear();
return MakeFuture(TStatus{EStatus::UNAVAILABLE, NYql::TIssues{status.GetIssues()}});
}
if (!status.IsSuccess()) {
this->Transaction.Clear();
self->Transaction.Clear();
return MakeFuture(status);
}

if (this->Steps[CurrentStepIndex].Commit) {
this->Transaction.Clear();
} else if (!this->Transaction) {
this->Transaction = result.GetTransaction();
if (self->Steps[self->CurrentStepIndex].Commit) {
self->Transaction.Clear();
} else if (!self->Transaction) {
self->Transaction = result.GetTransaction();
}

if (this->Steps[CurrentStepIndex].ResultCallback) {
if (self->Steps[self->CurrentStepIndex].ResultCallback) {
try {
this->Steps[CurrentStepIndex].ResultCallback(*this, result.GetResultSets());
self->Steps[self->CurrentStepIndex].ResultCallback(*self, result.GetResultSets());
} catch (const TCodeLineException& exception) {
NYql::TIssue issue = MakeErrorIssue(exception.Code, exception.GetRawMessage());
Issues.AddIssue(issue);
self->Issues.AddIssue(issue);
NYql::TIssue internalIssue = MakeErrorIssue(exception.Code, CurrentExceptionMessage());
InternalIssues.AddIssue(internalIssue);
self->InternalIssues.AddIssue(internalIssue);
} catch (const std::exception& exception) {
NYql::TIssue issue = MakeErrorIssue(TIssuesIds::INTERNAL_ERROR, exception.what());
Issues.AddIssue(issue);
self->Issues.AddIssue(issue);
NYql::TIssue internalIssue = MakeErrorIssue(TIssuesIds::INTERNAL_ERROR, CurrentExceptionMessage());
InternalIssues.AddIssue(internalIssue);
self->InternalIssues.AddIssue(internalIssue);
} catch (...) {
NYql::TIssue issue = MakeErrorIssue(TIssuesIds::INTERNAL_ERROR, CurrentExceptionMessage());
Issues.AddIssue(issue);
self->Issues.AddIssue(issue);
NYql::TIssue internalIssue = MakeErrorIssue(TIssuesIds::INTERNAL_ERROR, CurrentExceptionMessage());
InternalIssues.AddIssue(internalIssue);
self->InternalIssues.AddIssue(internalIssue);
}
}

this->CurrentStepIndex++;
return this->NextStep(session);
self->CurrentStepIndex++;
return self->NextStep(session);
});
}
}
Expand Down