Skip to content

Commit

Permalink
YQ-3151 added issues for timeout and cancelled (ydb-platform#8805)
Browse files Browse the repository at this point in the history
  • Loading branch information
uzhastik committed Sep 12, 2024
1 parent 73790aa commit 53761c4
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 22 deletions.
14 changes: 11 additions & 3 deletions ydb/core/kqp/executer_actor/kqp_executer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -711,7 +711,10 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
if (statusCode == Ydb::StatusIds::INTERNAL_ERROR) {
InternalError(issues);
} else if (statusCode == Ydb::StatusIds::TIMEOUT) {
AbortExecutionAndDie(ev->Sender, NYql::NDqProto::StatusIds::TIMEOUT, "Request timeout exceeded");
if (issues.Empty()) {
issues.AddIssue("Request timeout exceeded");
}
AbortExecutionAndDie(ev->Sender, NYql::NDqProto::StatusIds::TIMEOUT, issues);
} else {
RuntimeError(NYql::NDq::DqStatusToYdbStatus(msg.GetStatusCode()), issues);
}
Expand Down Expand Up @@ -1722,16 +1725,21 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
}

void AbortExecutionAndDie(TActorId abortSender, NYql::NDqProto::StatusIds::StatusCode status, const TString& message) {
AbortExecutionAndDie(abortSender, status, {NYql::TIssue(message)});
}

void AbortExecutionAndDie(TActorId abortSender, NYql::NDqProto::StatusIds::StatusCode status, const NYql::TIssues& issues) {
if (AlreadyReplied) {
return;
}

LOG_E("Abort execution: " << NYql::NDqProto::StatusIds_StatusCode_Name(status) << "," << message);
LOG_E("Abort execution: " << NYql::NDqProto::StatusIds_StatusCode_Name(status) << ", " << issues.ToOneLineString());
if (ExecuterSpan) {
ExecuterSpan.EndError(TStringBuilder() << NYql::NDqProto::StatusIds_StatusCode_Name(status));
}

static_cast<TDerived*>(this)->FillResponseStats(Ydb::StatusIds::TIMEOUT);
NYql::IssuesToMessage(issues, ResponseEv->Record.MutableResponse()->MutableIssues());

// TEvAbortExecution can come from either ComputeActor or SessionActor (== Target).
if (abortSender != Target) {
Expand All @@ -1744,7 +1752,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
this->Send(Target, ResponseEv.release());

Request.Transactions.crop(0);
TerminateComputeActors(Ydb::StatusIds::TIMEOUT, message);
TerminateComputeActors(Ydb::StatusIds::TIMEOUT, issues);
this->PassAway();
}

Expand Down
10 changes: 6 additions & 4 deletions ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,12 +147,14 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
struct TEvOnRequestTimeout: public TEventLocal<TEvOnRequestTimeout, EEv::EvOnRequestTimeout> {
ui64 RequestId;
TDuration Timeout;
TDuration InitialTimeout;
NYql::NDqProto::StatusIds::StatusCode Status;
int Round;

TEvOnRequestTimeout(ui64 requestId, TDuration timeout, NYql::NDqProto::StatusIds::StatusCode status, int round)
: RequestId(requestId)
, Timeout(timeout)
, InitialTimeout(timeout)
, Status(status)
, Round(round)
{}
Expand Down Expand Up @@ -1272,9 +1274,9 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {

const TKqpSessionInfo* info = LocalSessions->FindPtr(reqInfo->SessionId);
if (msg->Round == 0 && info) {
TString message = TStringBuilder()
<< "request's " << (msg->Status == NYql::NDqProto::StatusIds::TIMEOUT ? "timeout" : "cancelAfter")
<< " exceeded";
TString message = msg->Status == NYql::NDqProto::StatusIds::TIMEOUT
? (TStringBuilder() << "Request timeout " << msg->Timeout.MilliSeconds() << "ms exceeded")
: (TStringBuilder() << "Request canceled after " << msg->Timeout.MilliSeconds() << "ms");

Send(info->WorkerId, new TEvKqp::TEvAbortExecution(msg->Status, message));

Expand All @@ -1286,7 +1288,7 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
}
} else {
TString message = TStringBuilder()
<< "Query did not complete within specified timeout, session id " << reqInfo->SessionId;
<< "Query did not complete within specified timeout " << msg->InitialTimeout.MilliSeconds() << "ms, session id " << reqInfo->SessionId;
ReplyProcessError(NYql::NDq::DqStatusToYdbStatus(msg->Status), message, requestId);
}
}
Expand Down
14 changes: 14 additions & 0 deletions ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <ydb/library/ydb_issue/issue_helpers.h>
#include <ydb/core/kqp/common/events/events.h>
#include <ydb/core/kqp/common/kqp.h>
#include <ydb/core/kqp/common/kqp_timeouts.h>
#include <ydb/core/kqp/executer_actor/kqp_executer.h>
#include <ydb/core/kqp/proxy_service/kqp_script_executions.h>
#include <ydb/core/kqp/proxy_service/proto/result_set_meta.pb.h>
Expand Down Expand Up @@ -216,6 +217,12 @@ class TRunScriptActor : public NActors::TActorBootstrapped<TRunScriptActor> {
WaitFinalizationRequest = true;
RunState = IsExecuting() ? ERunState::Finishing : RunState;

if (RunState == ERunState::Cancelling) {
NYql::TIssue cancelIssue("Request was canceled by user");
cancelIssue.SetCode(NYql::DEFAULT_ERROR, NYql::TSeverityIds::S_INFO);
Issues.AddIssue(std::move(cancelIssue));
}

auto scriptFinalizeRequest = std::make_unique<TEvScriptFinalizeRequest>(
GetFinalizationStatusFromRunState(), ExecutionId, Database, Status, GetExecStatusFromStatusCode(Status),
Issues, std::move(QueryStats), std::move(QueryPlan), std::move(QueryAst), LeaseGeneration
Expand Down Expand Up @@ -424,6 +431,13 @@ class TRunScriptActor : public NActors::TActorBootstrapped<TRunScriptActor> {
const auto& issueMessage = record.GetResponse().GetQueryIssues();
NYql::IssuesFromMessage(issueMessage, Issues);

if (record.GetYdbStatus() == Ydb::StatusIds::TIMEOUT) {
const TDuration timeout = GetQueryTimeout(NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT, Request.GetRequest().GetTimeoutMs(), {}, QueryServiceConfig);
NYql::TIssue timeoutIssue(TStringBuilder() << "Current request timeout is " << timeout.MilliSeconds() << "ms");
timeoutIssue.SetCode(NYql::DEFAULT_ERROR, NYql::TSeverityIds::S_INFO);
Issues.AddIssue(std::move(timeoutIssue));
}

if (record.GetResponse().HasQueryPlan()) {
QueryPlan = record.GetResponse().GetQueryPlan();
}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/session_actor/kqp_query_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ std::unique_ptr<TEvTxProxySchemeCache::TEvNavigateKeySet> TKqpQueryState::BuildN


bool TKqpQueryState::SaveAndCheckCompileResult(TEvKqp::TEvCompileResponse* ev) {
CompilationRunning = false;
CompileResult = ev->CompileResult;
YQL_ENSURE(CompileResult);
MaxReadType = CompileResult->MaxReadType;
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/session_actor/kqp_query_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ class TKqpQueryState : public TNonCopyable {
bool KeepSession = false;
TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
NActors::TMonotonic StartedAt;
bool CompilationRunning = false;

THashMap<NKikimr::TTableId, ui64> TableVersions;

Expand Down
25 changes: 15 additions & 10 deletions ydb/core/kqp/session_actor/kqp_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,8 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {

void CompileQuery() {
YQL_ENSURE(QueryState);
auto ev = QueryState->BuildCompileRequest(CompilationCookie, GUCSettings);
QueryState->CompilationRunning = true;
auto ev = QueryState->BuildCompileRequest(CompilationCookie);
LOG_D("Sending CompileQuery request");

Send(MakeKqpCompileServiceID(SelfId().NodeId()), ev.release(), 0, QueryState->QueryId,
Expand Down Expand Up @@ -1521,16 +1522,22 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
TString logMsg = TStringBuilder() << "got TEvAbortExecution in " << CurrentStateFuncName();
LOG_I(logMsg << ", status: " << NYql::NDqProto::StatusIds_StatusCode_Name(msg.GetStatusCode()) << " send to: " << ExecuterId);

TString reason = TStringBuilder() << "Request timeout exceeded, cancelling after "
<< (AppData()->MonotonicTimeProvider->Now() - QueryState->StartedAt).MilliSeconds()
<< " milliseconds.";
auto issues = ev->Get()->GetIssues();
TStringBuilder reason = TStringBuilder() << "Cancelling after " << (AppData()->MonotonicTimeProvider->Now() - QueryState->StartedAt).MilliSeconds() << "ms";
if (QueryState->CompilationRunning) {
reason << " during compilation";
} else if (ExecuterId) {
reason << " during execution";
} else {
reason << " in " << CurrentStateFuncName();
}
issues.AddIssue(reason);

if (ExecuterId) {
auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(msg.GetStatusCode(), reason);
auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(msg.GetStatusCode(), issues);
Send(ExecuterId, abortEv.Release(), IEventHandle::FlagTrackDelivery);
} else {
const auto& issues = ev->Get()->GetIssues();
ReplyQueryError(NYql::NDq::DqStatusToYdbStatus(msg.GetStatusCode()), logMsg, MessageFromIssues(issues));
ReplyQueryError(NYql::NDq::DqStatusToYdbStatus(msg.GetStatusCode()), "", MessageFromIssues(issues));
}
}

Expand Down Expand Up @@ -2253,9 +2260,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {

void Handle(TEvKqp::TEvCancelQueryRequest::TPtr& ev) {
{
auto abort = MakeHolder<NYql::NDq::TEvDq::TEvAbortExecution>();
abort->Record.SetStatusCode(NYql::NDqProto::StatusIds::CANCELLED);
abort->Record.AddIssues()->set_message("Canceled");
auto abort = MakeHolder<NYql::NDq::TEvDq::TEvAbortExecution>(NYql::NDqProto::StatusIds::CANCELLED, "Request was canceled");
Send(SelfId(), abort.Release());
}

Expand Down
16 changes: 11 additions & 5 deletions ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
public:
void Bootstrap() {
try {
StartTime = TInstant::Now();
{
TStringBuilder prefixBuilder;
prefixBuilder << "SelfId: " << this->SelfId() << ", TxId: " << TxId << ", task: " << Task.GetId() << ". ";
Expand Down Expand Up @@ -1028,9 +1029,13 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
auto tag = (EEvWakeupTag) ev->Get()->Tag;
switch (tag) {
case EEvWakeupTag::TimeoutTag: {
auto abortEv = MakeHolder<TEvDq::TEvAbortExecution>(NYql::NDqProto::StatusIds::TIMEOUT, TStringBuilder()
<< "Timeout event from compute actor " << this->SelfId()
<< ", TxId: " << TxId << ", task: " << Task.GetId());
TStringBuilder reason = TStringBuilder() << "Task execution timeout ";
if (RuntimeSettings.Timeout) {
reason << RuntimeSettings.Timeout->MilliSeconds() << "ms ";
}
reason << "exceeded, terminating after " << (TInstant::Now() - StartTime).MilliSeconds() << "ms";

auto abortEv = MakeHolder<TEvDq::TEvAbortExecution>(NYql::NDqProto::StatusIds::TIMEOUT, reason);

if (ComputeActorSpan) {
ComputeActorSpan.EndError(
Expand All @@ -1042,8 +1047,8 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>

this->Send(ExecuterId, abortEv.Release());

TerminateSources("timeout exceeded", false);
Terminate(false, "timeout exceeded");
TerminateSources(reason, false);
Terminate(false, reason);
break;
}
case EEvWakeupTag::PeriodicStatsTag: {
Expand Down Expand Up @@ -1912,6 +1917,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
NWilson::TSpan ComputeActorSpan;
TDuration SourceCpuTime;
private:
TInstant StartTime;
bool Running = true;
TInstant LastSendStatsTime;
bool PassExceptions = false;
Expand Down

0 comments on commit 53761c4

Please sign in to comment.