Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,17 @@ class TStatusTrackerActor : public TBaseComputeActor<TStatusTrackerActor> {

void Handle(const TEvYdbCompute::TEvGetOperationResponse::TPtr& ev) {
const auto& response = *ev.Get()->Get();

if (response.Status == NYdb::EStatus::NOT_FOUND) { // FAILING / ABORTING_BY_USER / ABORTING_BY_SYSTEM
LOG_I("Operation has been already removed");
Send(Parent, new TEvYdbCompute::TEvStatusTrackerResponse(response.Issues, response.Status, ExecStatus, ComputeStatus));
CompleteAndPassAway();
return;
}

if (response.Status != NYdb::EStatus::SUCCESS) {
LOG_E("Can't get operation: " << ev->Get()->Issues.ToOneLineString());
Send(Parent, new TEvYdbCompute::TEvStatusTrackerResponse(ev->Get()->Issues, ev->Get()->Status, ExecStatus, ComputeStatus));
LOG_E("Can't get operation: " << response.Issues.ToOneLineString());
Send(Parent, new TEvYdbCompute::TEvStatusTrackerResponse(response.Issues, response.Status, ExecStatus, ComputeStatus));
FailedAndPassAway();
return;
}
Expand Down
8 changes: 7 additions & 1 deletion ydb/core/fq/libs/compute/ydb/ydb_run_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@ class TYdbRunActor : public NActors::TActorBootstrapped<TYdbRunActor> {

void Handle(const TEvYdbCompute::TEvStatusTrackerResponse::TPtr& ev) {
auto& response = *ev->Get();
if (response.Status == NYdb::EStatus::NOT_FOUND) { // FAILING / ABORTING_BY_USER / ABORTING_BY_SYSTEM
LOG_I("StatusTrackerResponse (not found). Status: " << response.Status << " Issues: " << response.Issues.ToOneLineString());
Register(ActorFactory->CreateFinalizer(Params, SelfId(), Pinger, ExecStatus, Params.Status).release());
return;
}

if (response.Status != NYdb::EStatus::SUCCESS) {
LOG_I("StatusTrackerResponse (failed). Status: " << response.Status << " Issues: " << response.Issues.ToOneLineString());
ResignAndPassAway(response.Issues);
Expand Down Expand Up @@ -186,7 +192,7 @@ class TYdbRunActor : public NActors::TActorBootstrapped<TYdbRunActor> {
case FederatedQuery::QueryMeta::ABORTING_BY_USER:
case FederatedQuery::QueryMeta::ABORTING_BY_SYSTEM:
if (Params.OperationId.GetKind() != Ydb::TOperationId::UNUSED) {
Register(ActorFactory->CreateResourcesCleaner(SelfId(), Connector, Params.OperationId).release());
Register(ActorFactory->CreateStatusTracker(SelfId(), Connector, Pinger, Params.OperationId).release());
} else {
Register(ActorFactory->CreateFinalizer(Params, SelfId(), Pinger, ExecStatus, Params.Status).release());
}
Expand Down
4 changes: 2 additions & 2 deletions ydb/tests/fq/restarts/test_insert_restarts.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,12 @@ def test_atomic_upload_commit(self, kikimr, s3, client):
elif final_status == fq.QueryMeta.ABORTED_BY_SYSTEM:
assert self.get_issues_depth(query.issue) <= 3, str(query.issue)
assert self.get_issues_depth(query.transient_issue) <= 3, str(query.transient_issue)
assert "Query failed with code UNAVAILABLE at" in query.issue[0].message, query.issue[0].message
assert "Lease expired" in str(query.issue), str(query.issue)
assert final_number_rows == 0, "Incomplete final result in bucket"
else:
assert self.get_issues_depth(query.issue) <= 3, str(query.issue)
assert self.get_issues_depth(query.transient_issue) <= 3, str(query.transient_issue)
assert "Query failed with code UNAVAILABLE at" in query.issue[0].message, query.issue[0].message
assert "Lease expired" in str(query.issue), str(query.issue)
assert final_number_rows == 0 or final_number_rows == number_rows

assert len(list(bucket.multipart_uploads.all())) == 0, "Unexpected uncommited upload in bucket"