diff --git a/.gitignore b/.gitignore index f97992275a5d..bde4ce084232 100644 --- a/.gitignore +++ b/.gitignore @@ -26,6 +26,9 @@ __pycache__/ *.pb.h *.pb.cc +# Other generated +*.fbs.h + # MacOS specific .DS_Store diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index fd16b7b441a6..1d18d034ac9d 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -205,6 +205,8 @@ class TKqpDataExecuter : public TKqpExecuterBaseResultsSize()); - Send(Target, ResponseEv.release()); + AlreadyReplied = true; PassAway(); } @@ -319,6 +320,8 @@ class TKqpDataExecuter : public TKqpExecuterBaseGetPendingComputeTasks().empty() && Planner->GetPendingComputeActors().empty()) { + LOG_I("Shutdown immediately - nothing to wait"); + PassAway(); + } else { + this->Become(&TThis::WaitShutdownState); + LOG_I("Waiting for shutdown of " << Planner->GetPendingComputeTasks().size() << " tasks and " + << Planner->GetPendingComputeActors().size() << " compute actors"); + TActivationContext::Schedule(TDuration::Seconds(10), new IEventHandle(SelfId(), SelfId(), new TEvents::TEvPoison)); + } + } else { + PassAway(); + } + } + void PassAway() override { auto totalTime = TInstant::Now() - StartTime; Counters->Counters->DataTxTotalTimeHistogram->Collect(totalTime.MilliSeconds()); @@ -2612,6 +2631,61 @@ class TKqpDataExecuter : public TKqpExecuterBaseGetTypeRewrite()) { + hFunc(TEvDqCompute::TEvState, HandleShutdown); + hFunc(TEvInterconnect::TEvNodeDisconnected, HandleShutdown); + hFunc(TEvents::TEvPoison, HandleShutdown); + default: + LOG_E("Unexpected event: " << ev->GetTypeName()); // ignore all other events + } + } + + void HandleShutdown(TEvDqCompute::TEvState::TPtr& ev) { + if (ev->Get()->Record.GetState() == NDqProto::COMPUTE_STATE_FAILURE) { + YQL_ENSURE(Planner); + + TActorId actor = ev->Sender; + ui64 taskId = ev->Get()->Record.GetTaskId(); + + Planner->CompletedCA(taskId, actor); + + if (Planner->GetPendingComputeTasks().empty() && Planner->GetPendingComputeActors().empty()) { + PassAway(); + } + } + } + + void HandleShutdown(TEvInterconnect::TEvNodeDisconnected::TPtr& ev) { + const auto nodeId = ev->Get()->NodeId; + LOG_N("Node has disconnected while shutdown: " << nodeId); + + YQL_ENSURE(Planner); + + for (const auto& task : TasksGraph.GetTasks()) { + if (task.Meta.NodeId == nodeId && !task.Meta.Completed) { + if (task.ComputeActorId) { + Planner->CompletedCA(task.Id, task.ComputeActorId); + } else { + Planner->TaskNotStarted(task.Id); + } + } + } + + if (Planner->GetPendingComputeTasks().empty() && Planner->GetPendingComputeActors().empty()) { + PassAway(); + } + } + + void HandleShutdown(TEvents::TEvPoison::TPtr& ev) { + // Self-poison means timeout - don't wait anymore. + LOG_I("Timed out on waiting for Compute Actors to finish - forcing shutdown"); + + if (ev->Sender == SelfId()) { + PassAway(); + } + } + private: void ReplyTxStateUnknown(ui64 shardId) { auto message = TStringBuilder() << "Tx state unknown for shard " << shardId << ", txid " << TxId; diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index 37f8bec82831..2915775ea3fc 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -667,7 +667,7 @@ class TKqpExecuterBase : public TActorBootstrapped { if (statusCode == Ydb::StatusIds::INTERNAL_ERROR) { InternalError(issues); } else if (statusCode == Ydb::StatusIds::TIMEOUT) { - AbortExecutionAndDie(ev->Sender, NYql::NDqProto::StatusIds::TIMEOUT, "Request timeout exceeded"); + TimeoutError(ev->Sender); } else { RuntimeError(NYql::NDq::DqStatusToYdbStatus(msg.GetStatusCode()), issues); } @@ -1624,14 +1624,14 @@ class TKqpExecuterBase : public TActorBootstrapped { protected: void TerminateComputeActors(Ydb::StatusIds::StatusCode code, const NYql::TIssues& issues) { for (const auto& task : this->TasksGraph.GetTasks()) { - if (task.ComputeActorId) { + if (task.ComputeActorId && !task.Meta.Completed) { LOG_I("aborting compute actor execution, message: " << issues.ToOneLineString() << ", compute actor: " << task.ComputeActorId << ", task: " << task.Id); auto ev = MakeHolder(NYql::NDq::YdbStatusToDqStatus(code), issues); this->Send(task.ComputeActorId, ev.Release()); } else { - LOG_I("task: " << task.Id << ", does not have Compute ActorId yet"); + LOG_I("task: " << task.Id << ", does not have the CA id yet or is already complete"); } } } @@ -1649,7 +1649,6 @@ class TKqpExecuterBase : public TActorBootstrapped { void InternalError(const NYql::TIssues& issues) { LOG_E(issues.ToOneLineString()); - TerminateComputeActors(Ydb::StatusIds::INTERNAL_ERROR, issues); auto issue = NYql::YqlIssue({}, NYql::TIssuesIds::UNEXPECTED, "Internal error while executing transaction."); for (const NYql::TIssue& i : issues) { issue.AddSubIssue(MakeIntrusive(i)); @@ -1663,7 +1662,6 @@ class TKqpExecuterBase : public TActorBootstrapped { void ReplyUnavailable(const TString& message) { LOG_E("UNAVAILABLE: " << message); - TerminateComputeActors(Ydb::StatusIds::UNAVAILABLE, message); auto issue = NYql::YqlIssue({}, NYql::TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE); issue.AddSubIssue(new NYql::TIssue(message)); ReplyErrorAndDie(Ydb::StatusIds::UNAVAILABLE, issue); @@ -1671,7 +1669,6 @@ class TKqpExecuterBase : public TActorBootstrapped { void RuntimeError(Ydb::StatusIds::StatusCode code, const NYql::TIssues& issues) { LOG_E(Ydb::StatusIds_StatusCode_Name(code) << ": " << issues.ToOneLineString()); - TerminateComputeActors(code, issues); ReplyErrorAndDie(code, issues); } @@ -1687,11 +1684,19 @@ class TKqpExecuterBase : public TActorBootstrapped { ReplyErrorAndDie(status, &issues); } - void AbortExecutionAndDie(TActorId abortSender, NYql::NDqProto::StatusIds::StatusCode status, const TString& message) { + void TimeoutError(TActorId abortSender) { if (AlreadyReplied) { + LOG_E("Timeout when we already replied - not good" << Endl << TBackTrace().PrintToString() << Endl); return; } + const auto status = NYql::NDqProto::StatusIds::TIMEOUT; + const TString message = "Request timeout exceeded"; + + TerminateComputeActors(Ydb::StatusIds::TIMEOUT, message); + + AlreadyReplied = true; + LOG_E("Abort execution: " << NYql::NDqProto::StatusIds_StatusCode_Name(status) << "," << message); if (ExecuterSpan) { ExecuterSpan.EndError(TStringBuilder() << NYql::NDqProto::StatusIds_StatusCode_Name(status)); @@ -1701,17 +1706,14 @@ class TKqpExecuterBase : public TActorBootstrapped { // TEvAbortExecution can come from either ComputeActor or SessionActor (== Target). if (abortSender != Target) { - auto abortEv = MakeHolder(status, "Request timeout exceeded"); + auto abortEv = MakeHolder(status, message); this->Send(Target, abortEv.Release()); } - AlreadyReplied = true; LOG_E("Sending timeout response to: " << Target); - this->Send(Target, ResponseEv.release()); Request.Transactions.crop(0); - TerminateComputeActors(Ydb::StatusIds::TIMEOUT, message); - this->PassAway(); + this->Shutdown(); } void FillResponseStats(Ydb::StatusIds::StatusCode status) { @@ -1746,17 +1748,11 @@ class TKqpExecuterBase : public TActorBootstrapped { google::protobuf::RepeatedPtrField* issues) { if (AlreadyReplied) { + LOG_E("Error when we already replied - not good" << Endl << TBackTrace().PrintToString() << Endl); return; } - if (Planner) { - for (auto computeActor : Planner->GetPendingComputeActors()) { - LOG_D("terminate compute actor " << computeActor.first); - - auto ev = MakeHolder(NYql::NDq::YdbStatusToDqStatus(status), "Terminate execution"); - this->Send(computeActor.first, ev.Release()); - } - } + TerminateComputeActors(status, "Terminate execution"); AlreadyReplied = true; auto& response = *ResponseEv->Record.MutableResponse(); @@ -1782,8 +1778,7 @@ class TKqpExecuterBase : public TActorBootstrapped { ExecuterStateSpan.EndError(response.DebugString()); Request.Transactions.crop(0); - this->Send(Target, ResponseEv.release()); - this->PassAway(); + this->Shutdown(); } protected: @@ -1851,7 +1846,16 @@ class TKqpExecuterBase : public TActorBootstrapped { } protected: + // Introduced separate method from `PassAway()` - to not get confused with expectations from other actors, + // that `PassAway()` should kill actor immediately. + virtual void Shutdown() { + PassAway(); + } + void PassAway() override { + YQL_ENSURE(AlreadyReplied && ResponseEv); + this->Send(Target, ResponseEv.release()); + for (auto channelPair: ResultChannelProxies) { LOG_D("terminate result channel " << channelPair.first << " proxy at " << channelPair.second->SelfId()); @@ -1872,12 +1876,11 @@ class TKqpExecuterBase : public TActorBootstrapped { if (KqpTableResolverId) { this->Send(KqpTableResolverId, new TEvents::TEvPoison); - this->Send(this->SelfId(), new TEvents::TEvPoison); - LOG_T("Terminate, become ZombieState"); - this->Become(&TKqpExecuterBase::ZombieState); - } else { - IActor::PassAway(); } + + this->Send(this->SelfId(), new TEvents::TEvPoison); + LOG_T("Terminate, become ZombieState"); + this->Become(&TKqpExecuterBase::ZombieState); } STATEFN(ZombieState) { diff --git a/ydb/core/kqp/executer_actor/kqp_planner.cpp b/ydb/core/kqp/executer_actor/kqp_planner.cpp index 3a13c9fb7f6e..926dda2700ef 100644 --- a/ydb/core/kqp/executer_actor/kqp_planner.cpp +++ b/ydb/core/kqp/executer_actor/kqp_planner.cpp @@ -554,7 +554,19 @@ void TKqpPlanner::CompletedCA(ui64 taskId, TActorId computeActor) { YQL_ENSURE(it != PendingComputeActors.end()); LastStats.emplace_back(std::move(it->second)); PendingComputeActors.erase(it); - return; + + LOG_I("Compute actor has finished execution: " << computeActor.ToString()); +} + +void TKqpPlanner::TaskNotStarted(ui64 taskId) { + // NOTE: should be invoked only while shutting down - when node is disconnected. + + auto& task = TasksGraph.GetTask(taskId); + + YQL_ENSURE(!task.ComputeActorId); + YQL_ENSURE(!task.Meta.Completed); + + PendingComputeTasks.erase(taskId); } TProgressStat::TEntry TKqpPlanner::CalculateConsumptionUpdate() { diff --git a/ydb/core/kqp/executer_actor/kqp_planner.h b/ydb/core/kqp/executer_actor/kqp_planner.h index 639c53737055..eed887d6e9bc 100644 --- a/ydb/core/kqp/executer_actor/kqp_planner.h +++ b/ydb/core/kqp/executer_actor/kqp_planner.h @@ -74,6 +74,7 @@ class TKqpPlanner { std::unique_ptr AssignTasksToNodes(); bool AcknowledgeCA(ui64 taskId, TActorId computeActor, const NYql::NDqProto::TEvComputeActorState* state); void CompletedCA(ui64 taskId, TActorId computeActor); + void TaskNotStarted(ui64 taskId); TProgressStat::TEntry CalculateConsumptionUpdate(); void ShiftConsumption(); void Submit(); diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp index 277e77da71e0..c04d5e7573f4 100644 --- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp @@ -273,6 +273,9 @@ class TKqpScanExecuter : public TKqpExecuterBaseOrbit, TxId, LastTaskId, LastComputeActorId, ResponseEv->ResultsSize()); @@ -281,8 +284,6 @@ class TKqpScanExecuter : public TKqpExecuterBase +#include #include +#include #include +#include #include #include @@ -830,6 +833,120 @@ Y_UNIT_TEST_SUITE(KqpLimits) { UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::TIMEOUT); } + /* Scenario: + - prepare and run query + - observe first EvState event from CA to Executer and replace it with EvAbortExecution + - count all EvState events from all CAs + - wait for final event EvTxResponse from Executer + - expect it to happen strictly after all EvState events + */ + Y_UNIT_TEST(WaitCAsStateOnAbort) { + TKikimrRunner kikimr(TKikimrSettings().SetUseRealThreads(false)); + auto db = kikimr.RunCall([&] { return kikimr.GetTableClient(); } ); + auto session = kikimr.RunCall([&] { return db.CreateSession().GetValueSync().GetSession(); } ); + + auto prepareResult = kikimr.RunCall([&] { return session.PrepareDataQuery(Q_(R"( + SELECT COUNT(*) FROM `/Root/TwoShard`; + )")).GetValueSync(); + }); + UNIT_ASSERT_VALUES_EQUAL_C(prepareResult.GetStatus(), EStatus::SUCCESS, prepareResult.GetIssues().ToString()); + auto dataQuery = prepareResult.GetQuery(); + + bool firstEvState = false; + ui32 totalEvState = 0; + TActorId executerId; + ui32 actorCount = 3; // TODO: get number of actors properly. + + auto& runtime = *kikimr.GetTestServer().GetRuntime(); + runtime.SetObserverFunc([&](TAutoPtr& ev) { + if (ev->GetTypeRewrite() == NYql::NDq::TEvDqCompute::TEvState::EventType) { + ++totalEvState; + if (!firstEvState) { + executerId = ev->Recipient; + ev = new IEventHandle(ev->Recipient, ev->Sender, + new NKikimr::NKqp::TEvKqp::TEvAbortExecution(NYql::NDqProto::StatusIds::UNSPECIFIED, NYql::TIssues())); + firstEvState = true; + } + } else if (ev->GetTypeRewrite() == NKikimr::NKqp::TEvKqpExecuter::TEvTxResponse::EventType && ev->Sender == executerId) { + UNIT_ASSERT_C(totalEvState == actorCount*2, "Executer sent response before waiting for CAs"); + } + + return TTestActorRuntime::EEventAction::PROCESS; + }); + + auto settings = TExecDataQuerySettings().OperationTimeout(TDuration::MilliSeconds(500)); + kikimr.RunInThreadPool([&] { return dataQuery.Execute(TTxControl::BeginTx().CommitTx(), settings).GetValueSync(); }); + + TDispatchOptions opts; + opts.FinalEvents.emplace_back([&](IEventHandle& ev) { + return ev.GetTypeRewrite() == NKikimr::NKqp::TEvKqpExecuter::TEvTxResponse::EventType + && ev.Sender == executerId && totalEvState == actorCount*2; + }); + + UNIT_ASSERT(runtime.DispatchEvents(opts)); + } + + /* Scenario: + - prepare and run query + - observe first EvState event from CA to Executer and replace it with EvAbortExecution + - count all EvState events from all CAs + - drop final EvState event from last CA + - wait for final event EvTxResponse from Executer after timeout poison + - expect it to happen strictly after all EvState events + */ + Y_UNIT_TEST(WaitCAsTimeout) { + TKikimrRunner kikimr(TKikimrSettings().SetUseRealThreads(false)); + auto db = kikimr.RunCall([&] { return kikimr.GetTableClient(); } ); + auto session = kikimr.RunCall([&] { return db.CreateSession().GetValueSync().GetSession(); } ); + + auto prepareResult = kikimr.RunCall([&] { return session.PrepareDataQuery(Q_(R"( + SELECT COUNT(*) FROM `/Root/TwoShard`; + )")).GetValueSync(); + }); + UNIT_ASSERT_VALUES_EQUAL_C(prepareResult.GetStatus(), EStatus::SUCCESS, prepareResult.GetIssues().ToString()); + auto dataQuery = prepareResult.GetQuery(); + + bool firstEvState = false; + bool timeoutPoison = false; + ui32 totalEvState = 0; + TActorId executerId; + ui32 actorCount = 3; // TODO: get number of actors properly. + + auto& runtime = *kikimr.GetTestServer().GetRuntime(); + runtime.SetObserverFunc([&](TAutoPtr& ev) { + if (ev->GetTypeRewrite() == NYql::NDq::TEvDqCompute::TEvState::EventType) { + ++totalEvState; + if (!firstEvState) { + executerId = ev->Recipient; + ev = new IEventHandle(ev->Recipient, ev->Sender, + new NKikimr::NKqp::TEvKqp::TEvAbortExecution(NYql::NDqProto::StatusIds::UNSPECIFIED, NYql::TIssues())); + firstEvState = true; + } else { + return TTestActorRuntime::EEventAction::DROP; + } + } else if (ev->GetTypeRewrite() == TEvents::TEvPoison::EventType && totalEvState == actorCount*2 && + ev->Sender == executerId && ev->Recipient == executerId) + { + timeoutPoison = true; + } else if (ev->GetTypeRewrite() == NKikimr::NKqp::TEvKqpExecuter::TEvTxResponse::EventType && ev->Sender == executerId) { + UNIT_ASSERT_C(timeoutPoison, "Executer sent response before waiting for CAs"); + } + + return TTestActorRuntime::EEventAction::PROCESS; + }); + + auto settings = TExecDataQuerySettings().OperationTimeout(TDuration::MilliSeconds(500)); + kikimr.RunInThreadPool([&] { return dataQuery.Execute(TTxControl::BeginTx().CommitTx(), settings).GetValueSync(); }); + + TDispatchOptions opts; + opts.FinalEvents.emplace_back([&](IEventHandle& ev) { + return ev.GetTypeRewrite() == NKikimr::NKqp::TEvKqpExecuter::TEvTxResponse::EventType + && ev.Sender == executerId && totalEvState == actorCount*2 && timeoutPoison; + }); + + UNIT_ASSERT(runtime.DispatchEvents(opts)); + } + Y_UNIT_TEST(ReplySizeExceeded) { auto kikimr = DefaultKikimrRunner(); auto db = kikimr.GetTableClient(); diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index 9536882a2a7a..1718bb67e1a9 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -554,7 +554,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped } } - void ReportStateAndMaybeDie(NYql::NDqProto::StatusIds::StatusCode statusCode, const TIssues& issues) + void ReportStateAndMaybeDie(NYql::NDqProto::StatusIds::StatusCode statusCode, const TIssues& issues, bool forceTerminate = false) { auto execEv = MakeHolder(); auto& record = execEv->Record; @@ -575,7 +575,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped this->Send(ExecuterId, execEv.Release()); - if (Checkpoints && State == NDqProto::COMPUTE_STATE_FINISHED) { + if (!forceTerminate && Checkpoints && State == NDqProto::COMPUTE_STATE_FINISHED) { // checkpointed CAs must not self-destroy return; } @@ -1032,10 +1032,6 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped auto tag = (EEvWakeupTag) ev->Get()->Tag; switch (tag) { case EEvWakeupTag::TimeoutTag: { - auto abortEv = MakeHolder(NYql::NDqProto::StatusIds::TIMEOUT, TStringBuilder() - << "Timeout event from compute actor " << this->SelfId() - << ", TxId: " << TxId << ", task: " << Task.GetId()); - if (ComputeActorSpan) { ComputeActorSpan.EndError( TStringBuilder() @@ -1044,10 +1040,8 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped ); } - this->Send(ExecuterId, abortEv.Release()); - - TerminateSources("timeout exceeded", false); - Terminate(false, "timeout exceeded"); + State = NDqProto::COMPUTE_STATE_FAILURE; + ReportStateAndMaybeDie(NYql::NDqProto::StatusIds::TIMEOUT, {TIssue("timeout exceeded")}, true); break; } case EEvWakeupTag::PeriodicStatsTag: { @@ -1071,8 +1065,9 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped switch (lostEventType) { case TEvDqCompute::TEvState::EventType: { CA_LOG_E("Handle undelivered TEvState event, abort execution"); - this->TerminateSources("executer lost", false); - Terminate(false, "executer lost"); + + TerminateSources("executer lost", false); + Terminate(false, "executer lost"); // Executer lost - no need to report state break; } default: { @@ -1118,14 +1113,17 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped InternalError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, *ev->Get()->GetIssues().begin()); return; } + TIssues issues = ev->Get()->GetIssues(); CA_LOG_E("Handle abort execution event from: " << ev->Sender << ", status: " << NYql::NDqProto::StatusIds_StatusCode_Name(ev->Get()->Record.GetStatusCode()) << ", reason: " << issues.ToOneLineString()); - bool success = ev->Get()->Record.GetStatusCode() == NYql::NDqProto::StatusIds::SUCCESS; - - this->TerminateSources(issues, success); + if (ev->Get()->Record.GetStatusCode() == NYql::NDqProto::StatusIds::SUCCESS) { + State = NDqProto::COMPUTE_STATE_FINISHED; + } else { + State = NDqProto::COMPUTE_STATE_FAILURE; + } if (ev->Sender != ExecuterId) { if (ComputeActorSpan) { @@ -1135,7 +1133,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped NActors::TActivationContext::Send(ev->Forward(ExecuterId)); } - Terminate(success, issues); + ReportStateAndMaybeDie(ev->Get()->Record.GetStatusCode(), issues, true); } void HandleExecuteBase(NActors::TEvInterconnect::TEvNodeDisconnected::TPtr& ev) {