Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ydb/core/grpc_services/query/rpc_execute_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
false, // keepSession
false, // useCancelAfter
syntax,
true);
true); // trailing support

if (!ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release())) {
NYql::TIssues issues;
Expand Down
12 changes: 10 additions & 2 deletions ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -278,8 +278,16 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da

auto resultSize = ResponseEv->GetByteSize();
if (resultSize > (int)ReplySizeLimit) {
TString message = TStringBuilder() << "Query result size limit exceeded. ("
<< resultSize << " > " << ReplySizeLimit << ")";
TString message;
if (ResponseEv->TxResults.size() == 1 && !ResponseEv->TxResults[0].QueryResultIndex.Defined()) {
message = TStringBuilder() << "Intermediate data materialization exceeded size limit"
<< " (" << resultSize << " > " << ReplySizeLimit << ")."
<< " This usually happens when trying to write large amounts of data or to perform lookup"
<< " by big collection of keys in single query. Consider using smaller batches of data.";
} else {
message = TStringBuilder() << "Query result size limit exceeded. ("
<< resultSize << " > " << ReplySizeLimit << ")";
}

auto issue = YqlIssue({}, TIssuesIds::KIKIMR_RESULT_UNAVAILABLE, message);
ReplyErrorAndDie(Ydb::StatusIds::PRECONDITION_FAILED, issue);
Expand Down
10 changes: 9 additions & 1 deletion ydb/core/kqp/executer_actor/kqp_executer.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,16 @@ struct TEvKqpExecuter {
ui64 ResultRowsCount = 0;
ui64 ResultRowsBytes = 0;

explicit TEvTxResponse(TTxAllocatorState::TPtr allocState)
enum class EExecutionType {
Data,
Scan,
Scheme,
Literal,
} ExecutionType;

TEvTxResponse(TTxAllocatorState::TPtr allocState, EExecutionType type)
: AllocState(std::move(allocState))
, ExecutionType(type)
{}

~TEvTxResponse();
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/kqp/executer_actor/kqp_executer_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt
for (auto& tx : request.Transactions) {
if (txsType) {
YQL_ENSURE(*txsType == tx.Body->GetType(), "Mixed physical tx types in executer.");
YQL_ENSURE(*txsType == NKqpProto::TKqpPhyTx::TYPE_DATA, "Cannot execute multiple non-data physical txs.");
YQL_ENSURE((*txsType == NKqpProto::TKqpPhyTx::TYPE_DATA)
|| (*txsType == NKqpProto::TKqpPhyTx::TYPE_GENERIC),
"Cannot execute multiple non-data physical txs.");
} else {
txsType = tx.Body->GetType();
}
Expand Down
26 changes: 20 additions & 6 deletions ydb/core/kqp/executer_actor/kqp_executer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,10 @@ namespace NKqp {
#define LOG_E(stream) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "ActorId: " << SelfId() << " TxId: " << TxId << ". " << "Ctx: " << *GetUserRequestContext() << ". " << stream)
#define LOG_C(stream) LOG_CRIT_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "ActorId: " << SelfId() << " TxId: " << TxId << ". " << "Ctx: " << *GetUserRequestContext() << ". " << stream)

enum class EExecType {
Data,
Scan
};
using EExecType = TEvKqpExecuter::TEvTxResponse::EExecutionType;

const ui64 MaxTaskSize = 48_MB;
constexpr ui64 PotentialUnsigned64OverflowLimit = (std::numeric_limits<ui64>::max() >> 1);

std::pair<TString, TString> SerializeKqpTasksParametersForOlap(const TStageInfo& stageInfo, const TTask& task);

Expand Down Expand Up @@ -114,6 +112,7 @@ struct TEvPrivate {

template <class TDerived, EExecType ExecType>
class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
static_assert(ExecType == EExecType::Data || ExecType == EExecType::Scan);
public:
TKqpExecuterBase(IKqpGateway::TExecPhysicalRequest&& request, const TString& database,
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken,
Expand All @@ -140,7 +139,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
TasksGraph.GetMeta().Database = Database;
TasksGraph.GetMeta().ChannelTransportVersion = chanTransportVersion;
TasksGraph.GetMeta().UserRequestContext = userRequestContext;
ResponseEv = std::make_unique<TEvKqpExecuter::TEvTxResponse>(Request.TxAlloc);
ResponseEv = std::make_unique<TEvKqpExecuter::TEvTxResponse>(Request.TxAlloc, ExecType);
ResponseEv->Orbit = std::move(Request.Orbit);
Stats = std::make_unique<TQueryExecutionStats>(Request.StatsMode, &TasksGraph,
ResponseEv->Record.MutableResponse()->MutableResult()->MutableStats());
Expand Down Expand Up @@ -360,7 +359,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
ui64 seqNo = ev->Get()->Record.GetSeqNo();
i64 freeSpace = ev->Get()->Record.GetFreeSpace();

LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::KQP_EXECUTER, "TxId: " << TxId
LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::KQP_EXECUTER, "TxId: " << TxId
<< ", send ack to channelId: " << channelId
<< ", seqNo: " << seqNo
<< ", enough: " << ev->Get()->Record.GetEnough()
Expand Down Expand Up @@ -798,6 +797,9 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {

auto ru = NRuCalc::CalcRequestUnit(consumption);

YQL_ENSURE(consumption.ReadIOStat.Rows < PotentialUnsigned64OverflowLimit);
YQL_ENSURE(ru < PotentialUnsigned64OverflowLimit);

// Some heuristic to reduce overprice due to round part stats
if (ru <= 100 && !force)
return;
Expand Down Expand Up @@ -1661,6 +1663,10 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
}

void AbortExecutionAndDie(TActorId abortSender, NYql::NDqProto::StatusIds::StatusCode status, const TString& message) {
if (AlreadyReplied) {
return;
}

LOG_E("Abort execution: " << NYql::NDqProto::StatusIds_StatusCode_Name(status) << "," << message);
if (ExecuterSpan) {
ExecuterSpan.EndError(TStringBuilder() << NYql::NDqProto::StatusIds_StatusCode_Name(status));
Expand All @@ -1674,6 +1680,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
this->Send(Target, abortEv.Release());
}

AlreadyReplied = true;
LOG_E("Sending timeout response to: " << Target);
this->Send(Target, ResponseEv.release());

Expand All @@ -1685,6 +1692,10 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
virtual void ReplyErrorAndDie(Ydb::StatusIds::StatusCode status,
google::protobuf::RepeatedPtrField<Ydb::Issue::IssueMessage>* issues)
{
if (AlreadyReplied) {
return;
}

if (Planner) {
for (auto computeActor : Planner->GetPendingComputeActors()) {
LOG_D("terminate compute actor " << computeActor.first);
Expand All @@ -1694,6 +1705,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
}
}

AlreadyReplied = true;
auto& response = *ResponseEv->Record.MutableResponse();

response.SetStatus(status);
Expand Down Expand Up @@ -1907,6 +1919,8 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
THashMap<ui64, TActorId> ResultChannelToComputeActor;
THashMap<NYql::NDq::TStageId, THashMap<ui64, TShardInfo>> SourceScanStageIdToParititions;

bool AlreadyReplied = false;

private:
static constexpr TDuration ResourceUsageUpdateInterval = TDuration::MilliSeconds(100);
};
Expand Down
11 changes: 8 additions & 3 deletions ydb/core/kqp/executer_actor/kqp_executer_stats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -746,7 +746,7 @@ void TQueryExecutionStats::ExportExecStats(NYql::NDqProto::TDqExecutionStats& st
for (auto& p2 : p.second.Egress) {
ExportAggAsyncBufferStats(p2.second, (*stageStats.MutableEgress())[p2.first]);
}
}
}
}

void TQueryExecutionStats::Finish() {
Expand All @@ -761,7 +761,7 @@ void TQueryExecutionStats::Finish() {
Result->SetDurationUs(FinishTs.MicroSeconds() - StartTs.MicroSeconds());

// Result->Result* feilds are (temporary?) commented out in proto due to lack of use
//
//
// Result->SetResultBytes(ResultBytes);
// Result->SetResultRows(ResultRows);

Expand Down Expand Up @@ -838,6 +838,11 @@ void TProgressStatEntry::Out(IOutputStream& o) const {
}

void TProgressStat::Set(const NDqProto::TDqComputeActorStats& stats) {
if (Cur.Defined) {
Cur = TEntry();
}

Cur.Defined = true;
Cur.ComputeTime += TDuration::MicroSeconds(stats.GetCpuTimeUs());
for (auto& task : stats.GetTasks()) {
for (auto& table: task.GetTables()) {
Expand All @@ -848,7 +853,7 @@ void TProgressStat::Set(const NDqProto::TDqComputeActorStats& stats) {
}

TProgressStat::TEntry TProgressStat::GetLastUsage() const {
return Cur - Total;
return Cur.Defined ? Cur - Total : Cur;
}

void TProgressStat::Update() {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/executer_actor/kqp_executer_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ struct TTableStat {
struct TProgressStatEntry {
TDuration ComputeTime;
TTableStat ReadIOStat;
bool Defined = false;

TProgressStatEntry& operator+=(const TProgressStatEntry& rhs);

Expand Down
4 changes: 3 additions & 1 deletion ydb/core/kqp/executer_actor/kqp_literal_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@ class TKqpLiteralExecuter {
, LiteralExecuterSpan(TWilsonKqp::LiteralExecuter, std::move(Request.TraceId), "LiteralExecuter")
, UserRequestContext(userRequestContext)
{
ResponseEv = std::make_unique<TEvKqpExecuter::TEvTxResponse>(Request.TxAlloc);
ResponseEv = std::make_unique<TEvKqpExecuter::TEvTxResponse>(
Request.TxAlloc, TEvKqpExecuter::TEvTxResponse::EExecutionType::Literal);

ResponseEv->Orbit = std::move(Request.Orbit);
Stats = std::make_unique<TQueryExecutionStats>(Request.StatsMode, &TasksGraph,
ResponseEv->Record.MutableResponse()->MutableResult()->MutableStats());
Expand Down
8 changes: 5 additions & 3 deletions ydb/core/kqp/executer_actor/kqp_result_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ class TResultCommonChannelProxy : public NActors::TActor<TResultCommonChannelPro
}
} catch (const yexception& ex) {
InternalError(ex.what());
} catch (const NKikimr::TMemoryLimitExceededException& ex) {
InternalError("Memory limit exceeded exception", NYql::NDqProto::StatusIds::PRECONDITION_FAILED);
}
}

Expand Down Expand Up @@ -97,10 +99,10 @@ class TResultCommonChannelProxy : public NActors::TActor<TResultCommonChannelPro
Send(ComputeActor, ackEv.Release(), /* TODO: undelivery */ 0, /* cookie */ ChannelId);
}

void InternalError(const TString& msg) {
void InternalError(const TString& msg, const NYql::NDqProto::StatusIds_StatusCode& code = NYql::NDqProto::StatusIds::INTERNAL_ERROR) {
LOG_CRIT_S(*NActors::TlsActivationContext, NKikimrServices::KQP_EXECUTER, msg);

auto evAbort = MakeHolder<TEvKqp::TEvAbortExecution>(NYql::NDqProto::StatusIds::INTERNAL_ERROR, msg);
auto evAbort = MakeHolder<TEvKqp::TEvAbortExecution>(code, msg);
Send(Executer, evAbort.Release());

Become(&TResultCommonChannelProxy::DeadState);
Expand All @@ -112,7 +114,7 @@ class TResultCommonChannelProxy : public NActors::TActor<TResultCommonChannelPro
switch (ev->GetTypeRewrite()) {
hFunc(TEvents::TEvPoison, HandlePoison);
}

} catch(const yexception& ex) {
InternalError(ex.what());
}
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
YQL_ENSURE(PhyTx);
YQL_ENSURE(PhyTx->GetType() == NKqpProto::TKqpPhyTx::TYPE_SCHEME);

ResponseEv = std::make_unique<TEvKqpExecuter::TEvTxResponse>(nullptr);
ResponseEv = std::make_unique<TEvKqpExecuter::TEvTxResponse>(
nullptr,
TEvKqpExecuter::TEvTxResponse::EExecutionType::Scheme);
}

void StartBuildOperation() {
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/kqp/executer_actor/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ PEERDIR(
ydb/library/yql/providers/common/http_gateway
)

GENERATE_ENUM_SERIALIZATION(
kqp_executer.h
)

YQL_LAST_ABI_VERSION()

END()
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/provider/yql_kikimr_type_ann.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -887,6 +887,7 @@ virtual TStatus HandleCreateTable(TKiCreateTable create, TExprContext& ctx) over

columnMeta.DefaultFromSequence = "_serial_column_" + columnMeta.Name;
columnMeta.SetDefaultFromSequence();
columnMeta.NotNull = true;
} else if (constraint.Name().Value() == "not_null") {
columnMeta.NotNull = true;
}
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -863,7 +863,7 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
}

const TKqpSessionInfo* info = LocalSessions->FindPtr(proxyRequest->SessionId);
if (info) {
if (info && !info->AttachedRpcId) {
LocalSessions->StartIdleCheck(info, GetSessionIdleDuration());
}

Expand Down
10 changes: 4 additions & 6 deletions ydb/core/kqp/query_data/kqp_query_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,12 @@ Ydb::ResultSet* TKqpExecuterTxResult::GetYdb(google::protobuf::Arena* arena, TMa
return ydbResult;
}

Ydb::ResultSet* TKqpExecuterTxResult::GetTrailingYdb(google::protobuf::Arena* arena) {
Ydb::ResultSet* TKqpExecuterTxResult::ExtractTrailingYdb(google::protobuf::Arena* arena) {
if (!HasTrailingResult)
return nullptr;

Ydb::ResultSet* ydbResult = google::protobuf::Arena::CreateMessage<Ydb::ResultSet>(arena);
if (TrailingResult.rows().size() > 0) {
ydbResult->Swap(&TrailingResult);
}
ydbResult->Swap(&TrailingResult);

return ydbResult;
}
Expand Down Expand Up @@ -237,10 +235,10 @@ NKikimrMiniKQL::TResult* TQueryData::GetMkqlTxResult(const NKqpProto::TKqpPhyRes
return TxResults[txIndex][resultIndex].GetMkql(arena);
}

Ydb::ResultSet* TQueryData::GetTrailingTxResult(const NKqpProto::TKqpPhyResultBinding& rb, google::protobuf::Arena* arena) {
Ydb::ResultSet* TQueryData::ExtractTrailingTxResult(const NKqpProto::TKqpPhyResultBinding& rb, google::protobuf::Arena* arena) {
auto txIndex = rb.GetTxResultBinding().GetTxIndex();
auto resultIndex = rb.GetTxResultBinding().GetResultIndex();
return TxResults[txIndex][resultIndex].GetTrailingYdb(arena);
return TxResults[txIndex][resultIndex].ExtractTrailingYdb(arena);
}


Expand Down
4 changes: 2 additions & 2 deletions ydb/core/kqp/query_data/kqp_query_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ struct TKqpExecuterTxResult {
NKikimrMiniKQL::TResult* GetMkql(google::protobuf::Arena* arena);
NKikimrMiniKQL::TResult GetMkql();
Ydb::ResultSet* GetYdb(google::protobuf::Arena* arena, TMaybe<ui64> rowsLimitPerWrite);
Ydb::ResultSet* GetTrailingYdb(google::protobuf::Arena* arena);
Ydb::ResultSet* ExtractTrailingYdb(google::protobuf::Arena* arena);

void FillMkql(NKikimrMiniKQL::TResult* mkqlResult);
void FillYdb(Ydb::ResultSet* ydbResult, TMaybe<ui64> rowsLimitPerWrite);
Expand Down Expand Up @@ -253,7 +253,7 @@ class TQueryData : NMiniKQL::ITerminator {
TTypedUnboxedValue GetTxResult(ui32 txIndex, ui32 resultIndex);
NKikimrMiniKQL::TResult* GetMkqlTxResult(const NKqpProto::TKqpPhyResultBinding& rb, google::protobuf::Arena* arena);
Ydb::ResultSet* GetYdbTxResult(const NKqpProto::TKqpPhyResultBinding& rb, google::protobuf::Arena* arena, TMaybe<ui64> rowsLimitPerWrite);
Ydb::ResultSet* GetTrailingTxResult(const NKqpProto::TKqpPhyResultBinding& rb, google::protobuf::Arena* arena);
Ydb::ResultSet* ExtractTrailingTxResult(const NKqpProto::TKqpPhyResultBinding& rb, google::protobuf::Arena* arena);

std::pair<NKikimr::NMiniKQL::TType*, NUdf::TUnboxedValue> GetInternalBindingValue(const NKqpProto::TKqpPhyParamBinding& paramBinding);
TTypedUnboxedValue& GetParameterUnboxedValue(const TString& name);
Expand Down
Loading