Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

YQL-18355: Improve timeout settting for literal requests #4347

Merged
merged 1 commit into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion ydb/core/fq/libs/gateway/empty_gateway.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,13 @@ class TEmptyGateway : public NYql::IDqGateway {
const NYql::TDqSettings::TPtr& settings,
const TDqProgressWriter& progressWriter,
const THashMap<TString, TString>& modulesMapping,
bool discard) override
bool discard,
ui64 executionTimeout) override
{
Y_UNUSED(progressWriter);
Y_UNUSED(modulesMapping); // TODO: support.
Y_UNUSED(discard);
Y_UNUSED(executionTimeout);

NProto::TGraphParams params;
THashMap<i64, TString> stagePrograms;
Expand Down
13 changes: 6 additions & 7 deletions ydb/library/yql/providers/dq/actors/executer_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ class TDqExecuter: public TRichActor<TDqExecuter>, NYql::TCounters {
const TDqConfiguration::TPtr& settings,
const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters,
TInstant requestStartTime,
bool createTaskSuspended)
bool createTaskSuspended,
ui64 executionTimeout)
: TRichActor<TDqExecuter>(&TDqExecuter::Handler)
, GwmActorId(gwmActorId)
, PrinterId(printerId)
Expand All @@ -54,6 +55,7 @@ class TDqExecuter: public TRichActor<TDqExecuter>, NYql::TCounters {
, Counters(counters) // root, component=dq
, LongWorkersAllocationCounter(Counters->GetSubgroup("component", "ServiceProxyActor")->GetCounter("LongWorkersAllocation"))
, ExecutionTimeoutCounter(Counters->GetSubgroup("component", "ServiceProxyActor")->GetCounter("ExecutionTimeout", /*derivative=*/ true))
, Timeout(TDuration::MilliSeconds(executionTimeout))
, WorkersAllocationFailTimeout(TDuration::MilliSeconds(Settings->_LongWorkersAllocationFailTimeout.Get().GetOrElse(TDqSettings::TDefault::LongWorkersAllocationFailTimeout)))
, WorkersAllocationWarnTimeout(TDuration::MilliSeconds(Settings->_LongWorkersAllocationWarnTimeout.Get().GetOrElse(TDqSettings::TDefault::LongWorkersAllocationWarnTimeout)))
, RequestStartTime(requestStartTime)
Expand Down Expand Up @@ -239,10 +241,6 @@ class TDqExecuter: public TRichActor<TDqExecuter>, NYql::TCounters {
allocateRequest.Release(),
IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession));

Timeout = tasks.size() == 1
? TDuration::MilliSeconds(Settings->_LiteralTimeout.Get().GetOrElse(TDqSettings::TDefault::LiteralTimeout))
: TDuration::MilliSeconds(Settings->_TableTimeout.Get().GetOrElse(TDqSettings::TDefault::TableTimeout));

YQL_CLOG(DEBUG, ProviderDq) << "Dq timeouts are set to: "
<< ToString(Timeout) << " (global), "
<< ToString(WorkersAllocationFailTimeout) << " (workers allocation fail), "
Expand Down Expand Up @@ -523,9 +521,10 @@ NActors::IActor* MakeDqExecuter(
const TDqConfiguration::TPtr& settings,
const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters,
TInstant requestStartTime,
bool createTaskSuspended
bool createTaskSuspended,
ui64 executionTimeout
) {
return new TLogWrapReceive(new TDqExecuter(gwmActorId, printerId, traceId, username, settings, counters, requestStartTime, createTaskSuspended), traceId);
return new TLogWrapReceive(new TDqExecuter(gwmActorId, printerId, traceId, username, settings, counters, requestStartTime, createTaskSuspended, executionTimeout), traceId);
}

} // namespace NDq
Expand Down
3 changes: 2 additions & 1 deletion ydb/library/yql/providers/dq/actors/executer_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ NActors::IActor* MakeDqExecuter(
const TDqConfiguration::TPtr& settings,
const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters,
TInstant requestStartTime = TInstant::Now(),
bool createTaskSuspended = false
bool createTaskSuspended = false,
ui64 executionTimeout = 0
);

} // namespace NDq
Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/providers/dq/api/protos/service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ message ExecuteGraphRequest {
string RateLimiterResource = 15;
map<string, bytes> CommonTaskParams = 16; // to be merged into each task TTaskMeta.TaskParams
NYql.NDqProto.EDqStatsMode StatsMode = 17;
uint64 ExecutionTimeout = 18; // in milliseconds
}

message ExecuteGraphResponse {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ class TDqGatewayLocalImpl: public std::enable_shared_from_this<TDqGatewayLocalIm
THashMap<TString, TString> ModulesMapping;
bool Discard;
NThreading::TPromise<IDqGateway::TResult> Result;
ui64 ExecutionTimeout;
};

public:
Expand All @@ -148,13 +149,13 @@ class TDqGatewayLocalImpl: public std::enable_shared_from_this<TDqGatewayLocalIm
const THashMap<TString, TString>& secureParams, const THashMap<TString, TString>& graphParams,
const TDqSettings::TPtr& settings,
const IDqGateway::TDqProgressWriter& progressWriter, const THashMap<TString, TString>& modulesMapping,
bool discard)
bool discard, ui64 executionTimeout)
{

NThreading::TFuture<IDqGateway::TResult> result;
{
TGuard<TMutex> lock(Mutex);
Queue.emplace_back(TRequest{sessionId, std::move(plan), columns, secureParams, graphParams, settings, progressWriter, modulesMapping, discard, NThreading::NewPromise<IDqGateway::TResult>()});
Queue.emplace_back(TRequest{sessionId, std::move(plan), columns, secureParams, graphParams, settings, progressWriter, modulesMapping, discard, NThreading::NewPromise<IDqGateway::TResult>(), executionTimeout});
result = Queue.back().Result;
}

Expand All @@ -177,7 +178,7 @@ class TDqGatewayLocalImpl: public std::enable_shared_from_this<TDqGatewayLocalIm

auto weak = weak_from_this();

Gateway->ExecutePlan(request.SessionId, std::move(request.Plan), request.Columns, request.SecureParams, request.GraphParams, request.Settings, request.ProgressWriter, request.ModulesMapping, request.Discard)
Gateway->ExecutePlan(request.SessionId, std::move(request.Plan), request.Columns, request.SecureParams, request.GraphParams, request.Settings, request.ProgressWriter, request.ModulesMapping, request.Discard, request.ExecutionTimeout)
.Apply([promise=request.Result, weak](const NThreading::TFuture<IDqGateway::TResult>& result) mutable {
try {
promise.SetValue(result.GetValue());
Expand Down Expand Up @@ -228,10 +229,10 @@ class TDqGatewayLocal : public IDqGateway {
const THashMap<TString, TString>& secureParams, const THashMap<TString, TString>& graphParams,
const TDqSettings::TPtr& settings,
const TDqProgressWriter& progressWriter, const THashMap<TString, TString>& modulesMapping,
bool discard) override
bool discard, ui64 executionTimeout) override
{
return Impl->ExecutePlan(sessionId, std::move(plan), columns, secureParams, graphParams,
settings, progressWriter, modulesMapping, discard);
settings, progressWriter, modulesMapping, discard, executionTimeout);
}

void Stop() override {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -875,6 +875,7 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters
}

TInstant startTime = TInstant::Now();
ui64 executionTimeout = State->Settings->_LiteralTimeout.Get().GetOrElse(TDqSettings::TDefault::LiteralTimeout);

try {
auto result = TMaybeNode<TResult>(input).Cast();
Expand All @@ -888,7 +889,7 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters

auto precomputes = FindIndependentPrecomputes(result.Input().Ptr());
if (!precomputes.empty()) {
auto status = HandlePrecomputes(precomputes, ctx, resSettings);
auto status = HandlePrecomputes(precomputes, ctx, resSettings, executionTimeout);
if (status.Level != TStatus::Ok) {
if (status == TStatus::Async) {
return std::make_pair(status, ExecState->Promise.GetFuture().Apply([execState = ExecState](const TFuture<void>& completedFuture) {
Expand Down Expand Up @@ -1017,7 +1018,7 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters
graphParams["Evaluation"] = ToString(!ctx.Step.IsDone(TExprStep::ExprEval));
future = State->ExecutePlan(
State->SessionId, executionPlanner->GetPlan(), columns, secureParams, graphParams,
settings, progressWriter, ModulesMapping, fillSettings.Discard);
settings, progressWriter, ModulesMapping, fillSettings.Discard, executionTimeout);
}
}

Expand Down Expand Up @@ -1264,6 +1265,7 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters
YQL_CLOG(TRACE, ProviderDq) << "HandlePull " << NCommon::ExprToPrettyString(ctx, *input);

TInstant startTime = TInstant::Now();
ui64 executionTimeout = State->Settings->_TableTimeout.Get().GetOrElse(TDqSettings::TDefault::TableTimeout);
auto pull = TPull(input);

THashMap<TString, TString> pullSettings;
Expand All @@ -1282,7 +1284,7 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters

auto precomputes = FindIndependentPrecomputes(pull.Input().Ptr());
if (!precomputes.empty()) {
auto status = HandlePrecomputes(precomputes, ctx, pullSettings);
auto status = HandlePrecomputes(precomputes, ctx, pullSettings, executionTimeout);
if (status.Level != TStatus::Ok) {
if (status == TStatus::Async) {
return std::make_pair(status, ExecState->Promise.GetFuture().Apply([execState = ExecState](const TFuture<void>& completedFuture) {
Expand Down Expand Up @@ -1463,7 +1465,7 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters
IDqGateway::TDqProgressWriter progressWriter = MakeDqProgressWriter(publicIds);

auto future = State->ExecutePlan(State->SessionId, executionPlanner->GetPlan(), columns, secureParams, graphParams,
settings, progressWriter, ModulesMapping, fillSettings.Discard);
settings, progressWriter, ModulesMapping, fillSettings.Discard, executionTimeout);

future.Subscribe([publicIds, progressWriter = State->ProgressWriter](const NThreading::TFuture<IDqGateway::TResult>& completedFuture) {
YQL_ENSURE(!completedFuture.HasException());
Expand Down Expand Up @@ -1809,7 +1811,7 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters
});
}

IGraphTransformer::TStatus HandlePrecomputes(const TNodeOnNodeOwnedMap& precomputes, TExprContext& ctx, const THashMap<TString, TString>& providerParams) {
IGraphTransformer::TStatus HandlePrecomputes(const TNodeOnNodeOwnedMap& precomputes, TExprContext& ctx, const THashMap<TString, TString>& providerParams, ui64 executionTimeout) {

IDataProvider::TFillSettings fillSettings;
fillSettings.AllResultsBytesLimit.Clear();
Expand Down Expand Up @@ -1977,7 +1979,7 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters
IDqGateway::TDqProgressWriter progressWriter = MakeDqProgressWriter(publicIds);

auto future = State->ExecutePlan(State->SessionId, executionPlanner->GetPlan(), {}, secureParams, graphParams,
settings, progressWriter, ModulesMapping, false);
settings, progressWriter, ModulesMapping, false, executionTimeout);

executionPlanner.Destroy();

Expand Down
11 changes: 6 additions & 5 deletions ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ class TDqGatewaySession: public std::enable_shared_from_this<TDqGatewaySession>
const THashMap<TString, TString>& secureParams, const THashMap<TString, TString>& graphParams,
const TDqSettings::TPtr& settings,
const TDqProgressWriter& progressWriter, const THashMap<TString, TString>& modulesMapping,
bool discard)
bool discard, ui64 executionTimeout)
{
YQL_LOG_CTX_ROOT_SESSION_SCOPE(SessionId);

Expand All @@ -257,6 +257,7 @@ class TDqGatewaySession: public std::enable_shared_from_this<TDqGatewaySession>
YQL_ENSURE(!file.GetObjectId().empty());
}
}
queryPB.SetExecutionTimeout(executionTimeout);
queryPB.SetSession(SessionId);
queryPB.SetResultType(plan.ResultType);
queryPB.SetSourceId(plan.SourceID.NodeId()-1);
Expand Down Expand Up @@ -521,7 +522,7 @@ class TDqGatewayImpl: public std::enable_shared_from_this<TDqGatewayImpl> {
const THashMap<TString, TString>& secureParams, const THashMap<TString, TString>& graphParams,
const TDqSettings::TPtr& settings,
const TDqProgressWriter& progressWriter, const THashMap<TString, TString>& modulesMapping,
bool discard)
bool discard, ui64 executionTimeout)
{
std::shared_ptr<TDqGatewaySession> session;
with_lock(Mutex) {
Expand All @@ -534,7 +535,7 @@ class TDqGatewayImpl: public std::enable_shared_from_this<TDqGatewayImpl> {
YQL_CLOG(ERROR, ProviderDq) << "Session was closed: " << sessionId;
return MakeFuture(NCommon::ResultFromException<TResult>(yexception() << "Session was closed"));
}
return session->ExecutePlan(std::move(plan), columns, secureParams, graphParams, settings, progressWriter, modulesMapping, discard)
return session->ExecutePlan(std::move(plan), columns, secureParams, graphParams, settings, progressWriter, modulesMapping, discard, executionTimeout)
.Apply([](const TFuture<TResult>& f) {
try {
f.TryRethrow();
Expand Down Expand Up @@ -586,9 +587,9 @@ class TDqGateway: public IDqGateway {
const THashMap<TString, TString>& secureParams, const THashMap<TString, TString>& graphParams,
const TDqSettings::TPtr& settings,
const TDqProgressWriter& progressWriter, const THashMap<TString, TString>& modulesMapping,
bool discard) override
bool discard, ui64 executionTimeout) override
{
return Impl->ExecutePlan(sessionId, std::move(plan), columns, secureParams, graphParams, settings, progressWriter, modulesMapping, discard);
return Impl->ExecutePlan(sessionId, std::move(plan), columns, secureParams, graphParams, settings, progressWriter, modulesMapping, discard, executionTimeout);
}

TString GetVanillaJobPath() override {
Expand Down
2 changes: 1 addition & 1 deletion ydb/library/yql/providers/dq/provider/yql_dq_gateway.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class IDqGateway : public TThrRefBase {
const THashMap<TString, TString>& secureParams, const THashMap<TString, TString>& graphParams,
const TDqSettings::TPtr& settings,
const TDqProgressWriter& progressWriter, const THashMap<TString, TString>& modulesMapping,
bool discard) = 0;
bool discard, ui64 executionTimeout) = 0;

virtual TString GetVanillaJobPath() {
return "";
Expand Down
6 changes: 3 additions & 3 deletions ydb/library/yql/providers/dq/provider/yql_dq_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,16 +82,16 @@ struct TDqState: public TThrRefBase {
const THashMap<TString, TString>& secureParams, const THashMap<TString, TString>& graphParams,
const TDqSettings::TPtr& settings,
const IDqGateway::TDqProgressWriter& progressWriter, const THashMap<TString, TString>& modulesMapping,
bool discard) {
bool discard, ui64 executionTimeout) {
with_lock(Mutex_) {
if (!OperationSemaphore) {
const auto parallelOperationsLimit = Settings->ParallelOperationsLimit.Get().GetOrElse(TDqSettings::TDefault::ParallelOperationsLimit);
OperationSemaphore = NThreading::TAsyncSemaphore::Make(parallelOperationsLimit);
}
}
return OperationSemaphore->AcquireAsync().Apply([this_=TIntrusivePtr<TDqState>(this), sessionId, plan=std::move(plan), columns, secureParams, graphParams, settings, progressWriter, modulesMapping, discard](const auto& f) mutable {
return OperationSemaphore->AcquireAsync().Apply([this_=TIntrusivePtr<TDqState>(this), sessionId, plan=std::move(plan), columns, secureParams, graphParams, settings, progressWriter, modulesMapping, discard, executionTimeout](const auto& f) mutable {
auto lock = f.GetValue()->MakeAutoRelease();
return this_->DqGateway->ExecutePlan(sessionId, std::move(plan), columns, secureParams, graphParams, settings, progressWriter, modulesMapping, discard).Apply([unlock = lock.DeferRelease()](const auto& f) {
return this_->DqGateway->ExecutePlan(sessionId, std::move(plan), columns, secureParams, graphParams, settings, progressWriter, modulesMapping, discard, executionTimeout).Apply([unlock = lock.DeferRelease()](const auto& f) {
unlock(NThreading::MakeFuture());
return f;
});
Expand Down
4 changes: 3 additions & 1 deletion ydb/library/yql/providers/dq/service/grpc_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ namespace NYql::NDqs {
: TServiceProxyActor(ctx, counters, traceId, username)
, GraphExecutionEventsActorId(graphExecutionEventsActorId)
{
ExecutionTimeout = Request->GetExecutionTimeout();
}

void DoRetry() override {
Expand Down Expand Up @@ -378,7 +379,7 @@ namespace NYql::NDqs {
YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__;
MergeTaskMetas(params);

auto executerId = RegisterChild(NDq::MakeDqExecuter(MakeWorkerManagerActorID(SelfId().NodeId()), SelfId(), TraceId, Username, Settings, Counters, RequestStartTime));
auto executerId = RegisterChild(NDq::MakeDqExecuter(MakeWorkerManagerActorID(SelfId().NodeId()), SelfId(), TraceId, Username, Settings, Counters, RequestStartTime, false, ExecutionTimeout));

TVector<TString> columns;
columns.reserve(Request->GetColumns().size());
Expand Down Expand Up @@ -427,6 +428,7 @@ namespace NYql::NDqs {
}

NActors::TActorId GraphExecutionEventsActorId;
ui64 ExecutionTimeout;
};

TString GetVersionString() {
Expand Down
Loading