Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

YQ-2885 fix issue script execution is canceled #2106

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion ydb/core/fq/libs/compute/ydb/actors_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,9 @@ struct TActorFactory : public IActorFactory {

std::unique_ptr<NActors::IActor> CreateStopper(const NActors::TActorId& parent,
const NActors::TActorId& connector,
const NActors::TActorId& pinger,
const NYdb::TOperation::TOperationId& operationId) const override {
return CreateStopperActor(Params, parent, connector, operationId, Counters);
return CreateStopperActor(Params, parent, connector, pinger, operationId, Counters);
}

private:
Expand Down
1 change: 1 addition & 0 deletions ydb/core/fq/libs/compute/ydb/actors_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ struct IActorFactory : public TThrRefBase {
FederatedQuery::QueryMeta::ComputeStatus status) const = 0;
virtual std::unique_ptr<NActors::IActor> CreateStopper(const NActors::TActorId& parent,
const NActors::TActorId& connector,
const NActors::TActorId& pinger,
const NYdb::TOperation::TOperationId& operationId) const = 0;
};

Expand Down
96 changes: 96 additions & 0 deletions ydb/core/fq/libs/compute/ydb/base_status_updater_actor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
#pragma once

#include "base_compute_actor.h"

#include <ydb/core/fq/libs/common/compression.h>
#include <ydb/core/fq/libs/compute/common/utils.h>

#include <ydb/library/yql/public/issue/yql_issue_message.h>

namespace NFq {

template<typename TDerived>
class TBaseStatusUpdaterActor : public TBaseComputeActor<TDerived> {
public:
using TBase = TBaseComputeActor<TDerived>;

TBaseStatusUpdaterActor(const NConfig::TCommonConfig& commonConfig, const ::NYql::NCommon::TServiceCounters& queryCounters, const TString& stepName)
: TBase(queryCounters, stepName)
, Compressor(commonConfig.GetQueryArtifactsCompressionMethod(), commonConfig.GetQueryArtifactsCompressionMinSize())
{}

TBaseStatusUpdaterActor(const NConfig::TCommonConfig& commonConfig, const ::NMonitoring::TDynamicCounterPtr& baseCounters, const TString& stepName)
: TBase(baseCounters, stepName)
, Compressor(commonConfig.GetQueryArtifactsCompressionMethod(), commonConfig.GetQueryArtifactsCompressionMinSize())
{}

void SetPingCounters(TComputeRequestCountersPtr pingCounters) {
PingCounters = std::move(pingCounters);
}

void OnPingRequestStart() {
if (!PingCounters) {
return;
}

StartTime = TInstant::Now();
PingCounters->InFly->Inc();
}

void OnPingRequestFinish(bool success) {
if (!PingCounters) {
return;
}

PingCounters->InFly->Dec();
PingCounters->LatencyMs->Collect((TInstant::Now() - StartTime).MilliSeconds());
if (success) {
PingCounters->Ok->Inc();
} else {
PingCounters->Error->Inc();
}
}

Fq::Private::PingTaskRequest GetPingTaskRequest(std::optional<FederatedQuery::QueryMeta::ComputeStatus> computeStatus, std::optional<NYql::NDqProto::StatusIds::StatusCode> pendingStatusCode, const NYql::TIssues& issues, const Ydb::TableStats::QueryStats& queryStats) const {
Fq::Private::PingTaskRequest pingTaskRequest;
NYql::IssuesToMessage(issues, pingTaskRequest.mutable_issues());
if (computeStatus) {
pingTaskRequest.set_status(*computeStatus);
}
if (pendingStatusCode) {
pingTaskRequest.set_pending_status_code(*pendingStatusCode);
}
PrepareAstAndPlan(pingTaskRequest, queryStats.query_plan(), queryStats.query_ast());
return pingTaskRequest;
}

// Can throw errors
Fq::Private::PingTaskRequest GetPingTaskRequestStatistics(std::optional<FederatedQuery::QueryMeta::ComputeStatus> computeStatus, std::optional<NYql::NDqProto::StatusIds::StatusCode> pendingStatusCode, const NYql::TIssues& issues, const Ydb::TableStats::QueryStats& queryStats, double* cpuUsage = nullptr) const {
Fq::Private::PingTaskRequest pingTaskRequest = GetPingTaskRequest(computeStatus, pendingStatusCode, issues, queryStats);
pingTaskRequest.set_statistics(GetV1StatFromV2Plan(queryStats.query_plan(), cpuUsage));
return pingTaskRequest;
}

protected:
void PrepareAstAndPlan(Fq::Private::PingTaskRequest& request, const TString& plan, const TString& expr) const {
if (Compressor.IsEnabled()) {
auto [astCompressionMethod, astCompressed] = Compressor.Compress(expr);
request.mutable_ast_compressed()->set_method(astCompressionMethod);
request.mutable_ast_compressed()->set_data(astCompressed);

auto [planCompressionMethod, planCompressed] = Compressor.Compress(plan);
request.mutable_plan_compressed()->set_method(planCompressionMethod);
request.mutable_plan_compressed()->set_data(planCompressed);
} else {
request.set_ast(expr);
request.set_plan(plan);
}
}

private:
TInstant StartTime;
TComputeRequestCountersPtr PingCounters;
const TCompressor Compressor;
};

} /* NFq */
103 changes: 35 additions & 68 deletions ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
#include "base_compute_actor.h"
#include "base_status_updater_actor.h"

#include <ydb/core/fq/libs/common/compression.h>
#include <ydb/core/fq/libs/common/util.h>
#include <ydb/core/fq/libs/compute/common/metrics.h>
#include <ydb/core/fq/libs/compute/common/retry_actor.h>
#include <ydb/core/fq/libs/compute/common/run_actor_params.h>
#include <ydb/core/fq/libs/compute/common/utils.h>
#include <ydb/core/fq/libs/compute/ydb/events/events.h>
#include <ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.h>
#include <ydb/core/fq/libs/ydb/ydb.h>
Expand All @@ -14,7 +12,6 @@

#include <ydb/library/yql/dq/actors/dq.h>
#include <ydb/library/yql/providers/common/metrics/service_counters.h>
#include <ydb/library/yql/public/issue/yql_issue_message.h>

#include <ydb/public/sdk/cpp/client/ydb_query/client.h>
#include <ydb/public/sdk/cpp/client/ydb_operation/operation.h>
Expand All @@ -37,7 +34,7 @@ namespace NFq {
using namespace NActors;
using namespace NFq;

class TStatusTrackerActor : public TBaseComputeActor<TStatusTrackerActor> {
class TStatusTrackerActor : public TBaseStatusUpdaterActor<TStatusTrackerActor> {
public:
using IRetryPolicy = IRetryPolicy<const TEvYdbCompute::TEvGetOperationResponse::TPtr&>;

Expand Down Expand Up @@ -70,16 +67,17 @@ class TStatusTrackerActor : public TBaseComputeActor<TStatusTrackerActor> {
};

TStatusTrackerActor(const TRunActorParams& params, const TActorId& parent, const TActorId& connector, const TActorId& pinger, const NYdb::TOperation::TOperationId& operationId, const ::NYql::NCommon::TServiceCounters& queryCounters)
: TBaseComputeActor(queryCounters, "StatusTracker")
: TBaseStatusUpdaterActor(params.Config.GetCommon(), queryCounters, "StatusTracker")
, Params(params)
, Parent(parent)
, Connector(connector)
, Pinger(pinger)
, OperationId(operationId)
, Counters(GetStepCountersSubgroup())
, BackoffTimer(20, 1000)
, Compressor(params.Config.GetCommon().GetQueryArtifactsCompressionMethod(), params.Config.GetCommon().GetQueryArtifactsCompressionMinSize())
{}
{
SetPingCounters(Counters.GetCounters(ERequestType::RT_PING));
}

static constexpr char ActorName[] = "FQ_STATUS_TRACKER";

Expand All @@ -95,21 +93,17 @@ class TStatusTrackerActor : public TBaseComputeActor<TStatusTrackerActor> {
)

void Handle(const TEvents::TEvForwardPingResponse::TPtr& ev) {
auto pingCounters = Counters.GetCounters(ERequestType::RT_PING);
pingCounters->InFly->Dec();
OnPingRequestFinish(ev.Get()->Get()->Success);

if (ev->Cookie) {
return;
}

pingCounters->LatencyMs->Collect((TInstant::Now() - StartTime).MilliSeconds());
if (ev.Get()->Get()->Success) {
pingCounters->Ok->Inc();
LOG_I("Information about the status of operation is stored");
Send(Parent, new TEvYdbCompute::TEvStatusTrackerResponse(Issues, Status, ExecStatus, ComputeStatus));
CompleteAndPassAway();
} else {
pingCounters->Error->Inc();
LOG_E("Error saving information about the status of operation");
Send(Parent, new TEvYdbCompute::TEvStatusTrackerResponse(NYql::TIssues{NYql::TIssue{TStringBuilder{} << "Error saving information about the status of operation: " << ProtoToString(OperationId)}}, NYdb::EStatus::INTERNAL_ERROR, ExecStatus, ComputeStatus));
FailedAndPassAway();
Expand Down Expand Up @@ -140,7 +134,6 @@ class TStatusTrackerActor : public TBaseComputeActor<TStatusTrackerActor> {
}

ReportPublicCounters(response.QueryStats);
StartTime = TInstant::Now();
LOG_D("Execution status: " << static_cast<int>(response.ExecStatus));
switch (response.ExecStatus) {
case NYdb::NQuery::EExecStatus::Unspecified:
Expand Down Expand Up @@ -217,75 +210,51 @@ class TStatusTrackerActor : public TBaseComputeActor<TStatusTrackerActor> {
Register(new TRetryActor<TEvYdbCompute::TEvGetOperationRequest, TEvYdbCompute::TEvGetOperationResponse, NYdb::TOperation::TOperationId>(Counters.GetCounters(ERequestType::RT_GET_OPERATION), delay, SelfId(), Connector, OperationId));
}

void UpdateProgress() {
auto pingCounters = Counters.GetCounters(ERequestType::RT_PING);
pingCounters->InFly->Inc();
std::pair<Fq::Private::PingTaskRequest, double> GetPingTaskRequestWithStatistic(std::optional<FederatedQuery::QueryMeta::ComputeStatus> computeStatus, std::optional<NYql::NDqProto::StatusIds::StatusCode> pendingStatusCode) {
Fq::Private::PingTaskRequest pingTaskRequest;
PrepareAstAndPlan(pingTaskRequest, QueryStats.query_plan(), QueryStats.query_ast());
double cpuUsage = 0.0;
try {
pingTaskRequest.set_statistics(GetV1StatFromV2Plan(QueryStats.query_plan()));
pingTaskRequest = GetPingTaskRequestStatistics(computeStatus, pendingStatusCode, Issues, QueryStats, &cpuUsage);
} catch(const NJson::TJsonException& ex) {
LOG_E("Error statistics conversion: " << ex.what());
}

return { pingTaskRequest, cpuUsage };
}

void UpdateProgress() {
OnPingRequestStart();

Fq::Private::PingTaskRequest pingTaskRequest = GetPingTaskRequestWithStatistic(std::nullopt, std::nullopt).first;
Send(Pinger, new TEvents::TEvForwardPingRequest(pingTaskRequest), 0, 1);
}

void UpdateCpuQuota(double cpuUsage) {
TDuration duration = TDuration::MicroSeconds(QueryStats.total_duration_us());
if (cpuUsage && duration) {
Send(NFq::ComputeDatabaseControlPlaneServiceActorId(), new TEvYdbCompute::TEvCpuQuotaAdjust(Params.Scope.ToString(), duration, cpuUsage));
}
}

void Failed() {
LOG_I("Execution status: Failed, Status: " << Status << ", StatusCode: " << NYql::NDqProto::StatusIds::StatusCode_Name(StatusCode) << " Issues: " << Issues.ToOneLineString());
auto pingCounters = Counters.GetCounters(ERequestType::RT_PING);
pingCounters->InFly->Inc();
Fq::Private::PingTaskRequest pingTaskRequest;
NYql::IssuesToMessage(Issues, pingTaskRequest.mutable_issues());
pingTaskRequest.set_pending_status_code(StatusCode);
PrepareAstAndPlan(pingTaskRequest, QueryStats.query_plan(), QueryStats.query_ast());
try {
TDuration duration = TDuration::MicroSeconds(QueryStats.total_duration_us());
double cpuUsage = 0.0;
pingTaskRequest.set_statistics(GetV1StatFromV2Plan(QueryStats.query_plan(), &cpuUsage));
if (duration && cpuUsage) {
Send(NFq::ComputeDatabaseControlPlaneServiceActorId(), new TEvYdbCompute::TEvCpuQuotaAdjust(Params.Scope.ToString(), duration, cpuUsage));
}
} catch(const NJson::TJsonException& ex) {
LOG_E("Error statistics conversion: " << ex.what());
}
OnPingRequestStart();

auto [pingTaskRequest, cpuUsage] = GetPingTaskRequestWithStatistic(std::nullopt, StatusCode);
UpdateCpuQuota(cpuUsage);

Send(Pinger, new TEvents::TEvForwardPingRequest(pingTaskRequest));
}

void Complete() {
LOG_I("Execution status: Complete " << Status << ", StatusCode: " << NYql::NDqProto::StatusIds::StatusCode_Name(StatusCode) << " Issues: " << Issues.ToOneLineString());
auto pingCounters = Counters.GetCounters(ERequestType::RT_PING);
pingCounters->InFly->Inc();
Fq::Private::PingTaskRequest pingTaskRequest;
NYql::IssuesToMessage(Issues, pingTaskRequest.mutable_issues());
ComputeStatus = ::FederatedQuery::QueryMeta::COMPLETING;
pingTaskRequest.set_status(ComputeStatus);
PrepareAstAndPlan(pingTaskRequest, QueryStats.query_plan(), QueryStats.query_ast());
try {
TDuration duration = TDuration::MicroSeconds(QueryStats.total_duration_us());
double cpuUsage = 0.0;
pingTaskRequest.set_statistics(GetV1StatFromV2Plan(QueryStats.query_plan(), &cpuUsage));
if (duration && cpuUsage) {
Send(NFq::ComputeDatabaseControlPlaneServiceActorId(), new TEvYdbCompute::TEvCpuQuotaAdjust(Params.Scope.ToString(), duration, cpuUsage));
}
} catch(const NJson::TJsonException& ex) {
LOG_E("Error statistics conversion: " << ex.what());
}
Send(Pinger, new TEvents::TEvForwardPingRequest(pingTaskRequest));
}
OnPingRequestStart();

void PrepareAstAndPlan(Fq::Private::PingTaskRequest& request, const TString& plan, const TString& expr) const {
if (Compressor.IsEnabled()) {
auto [astCompressionMethod, astCompressed] = Compressor.Compress(expr);
request.mutable_ast_compressed()->set_method(astCompressionMethod);
request.mutable_ast_compressed()->set_data(astCompressed);
ComputeStatus = ::FederatedQuery::QueryMeta::COMPLETING;
auto [pingTaskRequest, cpuUsage] = GetPingTaskRequestWithStatistic(ComputeStatus, std::nullopt);
UpdateCpuQuota(cpuUsage);

auto [planCompressionMethod, planCompressed] = Compressor.Compress(plan);
request.mutable_plan_compressed()->set_method(planCompressionMethod);
request.mutable_plan_compressed()->set_data(planCompressed);
} else {
request.set_ast(expr);
request.set_plan(plan);
}
Send(Pinger, new TEvents::TEvForwardPingRequest(pingTaskRequest));
}

private:
Expand All @@ -295,14 +264,12 @@ class TStatusTrackerActor : public TBaseComputeActor<TStatusTrackerActor> {
TActorId Pinger;
NYdb::TOperation::TOperationId OperationId;
TCounters Counters;
TInstant StartTime;
NYql::TIssues Issues;
NYdb::EStatus Status = NYdb::EStatus::SUCCESS;
NYdb::NQuery::EExecStatus ExecStatus = NYdb::NQuery::EExecStatus::Unspecified;
NYql::NDqProto::StatusIds::StatusCode StatusCode = NYql::NDqProto::StatusIds::StatusCode::StatusIds_StatusCode_UNSPECIFIED;
Ydb::TableStats::QueryStats QueryStats;
NKikimr::TBackoffTimer BackoffTimer;
const TCompressor Compressor;
FederatedQuery::QueryMeta::ComputeStatus ComputeStatus = FederatedQuery::QueryMeta::RUNNING;
};

Expand Down
Loading
Loading