Skip to content

Commit e66ee60

Browse files
authored
get status polling has been fixed YQ-2667 (#620) (#720)
1 parent a91b6c7 commit e66ee60

File tree

3 files changed

+23
-3
lines changed

3 files changed

+23
-3
lines changed

ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,9 +117,17 @@ class TStatusTrackerActor : public TBaseComputeActor<TStatusTrackerActor> {
117117

118118
void Handle(const TEvYdbCompute::TEvGetOperationResponse::TPtr& ev) {
119119
const auto& response = *ev.Get()->Get();
120+
121+
if (response.Status == NYdb::EStatus::NOT_FOUND) { // FAILING / ABORTING_BY_USER / ABORTING_BY_SYSTEM
122+
LOG_I("Operation has been already removed");
123+
Send(Parent, new TEvYdbCompute::TEvStatusTrackerResponse(response.Issues, response.Status, ExecStatus, ComputeStatus));
124+
CompleteAndPassAway();
125+
return;
126+
}
127+
120128
if (response.Status != NYdb::EStatus::SUCCESS) {
121-
LOG_E("Can't get operation: " << ev->Get()->Issues.ToOneLineString());
122-
Send(Parent, new TEvYdbCompute::TEvStatusTrackerResponse(ev->Get()->Issues, ev->Get()->Status, ExecStatus, ComputeStatus));
129+
LOG_E("Can't get operation: " << response.Issues.ToOneLineString());
130+
Send(Parent, new TEvYdbCompute::TEvStatusTrackerResponse(response.Issues, response.Status, ExecStatus, ComputeStatus));
123131
FailedAndPassAway();
124132
return;
125133
}

ydb/core/fq/libs/compute/ydb/ydb_run_actor.cpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,12 @@ class TYdbRunActor : public NActors::TActorBootstrapped<TYdbRunActor> {
9999

100100
void Handle(const TEvYdbCompute::TEvStatusTrackerResponse::TPtr& ev) {
101101
auto& response = *ev->Get();
102+
if (response.Status == NYdb::EStatus::NOT_FOUND) { // FAILING / ABORTING_BY_USER / ABORTING_BY_SYSTEM
103+
LOG_I("StatusTrackerResponse (not found). Status: " << response.Status << " Issues: " << response.Issues.ToOneLineString());
104+
Register(ActorFactory->CreateFinalizer(Params, SelfId(), Pinger, ExecStatus, Params.Status).release());
105+
return;
106+
}
107+
102108
if (response.Status != NYdb::EStatus::SUCCESS) {
103109
LOG_I("StatusTrackerResponse (failed). Status: " << response.Status << " Issues: " << response.Issues.ToOneLineString());
104110
ResignAndPassAway(response.Issues);
@@ -186,7 +192,7 @@ class TYdbRunActor : public NActors::TActorBootstrapped<TYdbRunActor> {
186192
case FederatedQuery::QueryMeta::ABORTING_BY_USER:
187193
case FederatedQuery::QueryMeta::ABORTING_BY_SYSTEM:
188194
if (Params.OperationId.GetKind() != Ydb::TOperationId::UNUSED) {
189-
Register(ActorFactory->CreateResourcesCleaner(SelfId(), Connector, Params.OperationId).release());
195+
Register(ActorFactory->CreateStatusTracker(SelfId(), Connector, Pinger, Params.OperationId).release());
190196
} else {
191197
Register(ActorFactory->CreateFinalizer(Params, SelfId(), Pinger, ExecStatus, Params.Status).release());
192198
}

ydb/tests/fq/restarts/test_insert_restarts.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,14 @@ def test_atomic_upload_commit(self, kikimr, s3, client):
9595
if final_status == fq.QueryMeta.COMPLETED:
9696
assert final_number_rows == number_rows, "Invalid result in bucket for COMPLETED final status"
9797
elif final_status == fq.QueryMeta.ABORTED_BY_SYSTEM:
98+
assert self.get_issues_depth(query.issue) <= 3, str(query.issue)
99+
assert self.get_issues_depth(query.transient_issue) <= 3, str(query.transient_issue)
100+
assert "Lease expired" in str(query.issue), str(query.issue)
98101
assert final_number_rows == 0, "Incomplete final result in bucket"
99102
else:
103+
assert self.get_issues_depth(query.issue) <= 3, str(query.issue)
104+
assert self.get_issues_depth(query.transient_issue) <= 3, str(query.transient_issue)
105+
assert "Lease expired" in str(query.issue), str(query.issue)
100106
assert final_number_rows == 0 or final_number_rows == number_rows
101107

102108
assert len(list(bucket.multipart_uploads.all())) == 0, "Unexpected uncommited upload in bucket"

0 commit comments

Comments
 (0)