diff --git a/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp b/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp index e5e69d5c5a63..6781385582ab 100644 --- a/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp +++ b/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp @@ -117,9 +117,17 @@ class TStatusTrackerActor : public TBaseComputeActor { void Handle(const TEvYdbCompute::TEvGetOperationResponse::TPtr& ev) { const auto& response = *ev.Get()->Get(); + + if (response.Status == NYdb::EStatus::NOT_FOUND) { // FAILING / ABORTING_BY_USER / ABORTING_BY_SYSTEM + LOG_I("Operation has been already removed"); + Send(Parent, new TEvYdbCompute::TEvStatusTrackerResponse(response.Issues, response.Status, ExecStatus, ComputeStatus)); + CompleteAndPassAway(); + return; + } + if (response.Status != NYdb::EStatus::SUCCESS) { - LOG_E("Can't get operation: " << ev->Get()->Issues.ToOneLineString()); - Send(Parent, new TEvYdbCompute::TEvStatusTrackerResponse(ev->Get()->Issues, ev->Get()->Status, ExecStatus, ComputeStatus)); + LOG_E("Can't get operation: " << response.Issues.ToOneLineString()); + Send(Parent, new TEvYdbCompute::TEvStatusTrackerResponse(response.Issues, response.Status, ExecStatus, ComputeStatus)); FailedAndPassAway(); return; } diff --git a/ydb/core/fq/libs/compute/ydb/ydb_run_actor.cpp b/ydb/core/fq/libs/compute/ydb/ydb_run_actor.cpp index 022f73677055..42e4ff140414 100644 --- a/ydb/core/fq/libs/compute/ydb/ydb_run_actor.cpp +++ b/ydb/core/fq/libs/compute/ydb/ydb_run_actor.cpp @@ -99,6 +99,12 @@ class TYdbRunActor : public NActors::TActorBootstrapped { void Handle(const TEvYdbCompute::TEvStatusTrackerResponse::TPtr& ev) { auto& response = *ev->Get(); + if (response.Status == NYdb::EStatus::NOT_FOUND) { // FAILING / ABORTING_BY_USER / ABORTING_BY_SYSTEM + LOG_I("StatusTrackerResponse (not found). Status: " << response.Status << " Issues: " << response.Issues.ToOneLineString()); + Register(ActorFactory->CreateFinalizer(Params, SelfId(), Pinger, ExecStatus, Params.Status).release()); + return; + } + if (response.Status != NYdb::EStatus::SUCCESS) { LOG_I("StatusTrackerResponse (failed). Status: " << response.Status << " Issues: " << response.Issues.ToOneLineString()); ResignAndPassAway(response.Issues); @@ -186,7 +192,7 @@ class TYdbRunActor : public NActors::TActorBootstrapped { case FederatedQuery::QueryMeta::ABORTING_BY_USER: case FederatedQuery::QueryMeta::ABORTING_BY_SYSTEM: if (Params.OperationId.GetKind() != Ydb::TOperationId::UNUSED) { - Register(ActorFactory->CreateResourcesCleaner(SelfId(), Connector, Params.OperationId).release()); + Register(ActorFactory->CreateStatusTracker(SelfId(), Connector, Pinger, Params.OperationId).release()); } else { Register(ActorFactory->CreateFinalizer(Params, SelfId(), Pinger, ExecStatus, Params.Status).release()); } diff --git a/ydb/tests/fq/restarts/test_insert_restarts.py b/ydb/tests/fq/restarts/test_insert_restarts.py index 9a8d9678e7a2..5ba020ca7fbd 100644 --- a/ydb/tests/fq/restarts/test_insert_restarts.py +++ b/ydb/tests/fq/restarts/test_insert_restarts.py @@ -95,8 +95,14 @@ def test_atomic_upload_commit(self, kikimr, s3, client): if final_status == fq.QueryMeta.COMPLETED: assert final_number_rows == number_rows, "Invalid result in bucket for COMPLETED final status" elif final_status == fq.QueryMeta.ABORTED_BY_SYSTEM: + assert self.get_issues_depth(query.issue) <= 3, str(query.issue) + assert self.get_issues_depth(query.transient_issue) <= 3, str(query.transient_issue) + assert "Lease expired" in str(query.issue), str(query.issue) assert final_number_rows == 0, "Incomplete final result in bucket" else: + assert self.get_issues_depth(query.issue) <= 3, str(query.issue) + assert self.get_issues_depth(query.transient_issue) <= 3, str(query.transient_issue) + assert "Lease expired" in str(query.issue), str(query.issue) assert final_number_rows == 0 or final_number_rows == number_rows assert len(list(bucket.multipart_uploads.all())) == 0, "Unexpected uncommited upload in bucket"