diff --git a/ydb/core/grpc_services/query/rpc_execute_query.cpp b/ydb/core/grpc_services/query/rpc_execute_query.cpp index 3e642b39167c..6865cb8379e5 100644 --- a/ydb/core/grpc_services/query/rpc_execute_query.cpp +++ b/ydb/core/grpc_services/query/rpc_execute_query.cpp @@ -289,7 +289,7 @@ class TExecuteQueryRPC : public TActorBootstrapped { false, // keepSession false, // useCancelAfter syntax, - true); + true); // trailing support if (!ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release())) { NYql::TIssues issues; diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 15fe364f8d25..4ff23bfa0165 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -278,8 +278,16 @@ class TKqpDataExecuter : public TKqpExecuterBaseGetByteSize(); if (resultSize > (int)ReplySizeLimit) { - TString message = TStringBuilder() << "Query result size limit exceeded. (" - << resultSize << " > " << ReplySizeLimit << ")"; + TString message; + if (ResponseEv->TxResults.size() == 1 && !ResponseEv->TxResults[0].QueryResultIndex.Defined()) { + message = TStringBuilder() << "Intermediate data materialization exceeded size limit" + << " (" << resultSize << " > " << ReplySizeLimit << ")." + << " This usually happens when trying to write large amounts of data or to perform lookup" + << " by big collection of keys in single query. Consider using smaller batches of data."; + } else { + message = TStringBuilder() << "Query result size limit exceeded. (" + << resultSize << " > " << ReplySizeLimit << ")"; + } auto issue = YqlIssue({}, TIssuesIds::KIKIMR_RESULT_UNAVAILABLE, message); ReplyErrorAndDie(Ydb::StatusIds::PRECONDITION_FAILED, issue); diff --git a/ydb/core/kqp/executer_actor/kqp_executer.h b/ydb/core/kqp/executer_actor/kqp_executer.h index 242c64027224..d0d2105276c4 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer.h +++ b/ydb/core/kqp/executer_actor/kqp_executer.h @@ -28,8 +28,16 @@ struct TEvKqpExecuter { ui64 ResultRowsCount = 0; ui64 ResultRowsBytes = 0; - explicit TEvTxResponse(TTxAllocatorState::TPtr allocState) + enum class EExecutionType { + Data, + Scan, + Scheme, + Literal, + } ExecutionType; + + TEvTxResponse(TTxAllocatorState::TPtr allocState, EExecutionType type) : AllocState(std::move(allocState)) + , ExecutionType(type) {} ~TEvTxResponse(); diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp index f43e2ad845b6..5975a8ac4e2e 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp @@ -96,7 +96,9 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt for (auto& tx : request.Transactions) { if (txsType) { YQL_ENSURE(*txsType == tx.Body->GetType(), "Mixed physical tx types in executer."); - YQL_ENSURE(*txsType == NKqpProto::TKqpPhyTx::TYPE_DATA, "Cannot execute multiple non-data physical txs."); + YQL_ENSURE((*txsType == NKqpProto::TKqpPhyTx::TYPE_DATA) + || (*txsType == NKqpProto::TKqpPhyTx::TYPE_GENERIC), + "Cannot execute multiple non-data physical txs."); } else { txsType = tx.Body->GetType(); } diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index fc944349ea13..f0706f346a95 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -64,12 +64,10 @@ namespace NKqp { #define LOG_E(stream) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "ActorId: " << SelfId() << " TxId: " << TxId << ". " << "Ctx: " << *GetUserRequestContext() << ". " << stream) #define LOG_C(stream) LOG_CRIT_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "ActorId: " << SelfId() << " TxId: " << TxId << ". " << "Ctx: " << *GetUserRequestContext() << ". " << stream) -enum class EExecType { - Data, - Scan -}; +using EExecType = TEvKqpExecuter::TEvTxResponse::EExecutionType; const ui64 MaxTaskSize = 48_MB; +constexpr ui64 PotentialUnsigned64OverflowLimit = (std::numeric_limits::max() >> 1); std::pair SerializeKqpTasksParametersForOlap(const TStageInfo& stageInfo, const TTask& task); @@ -114,6 +112,7 @@ struct TEvPrivate { template class TKqpExecuterBase : public TActorBootstrapped { + static_assert(ExecType == EExecType::Data || ExecType == EExecType::Scan); public: TKqpExecuterBase(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, const TIntrusiveConstPtr& userToken, @@ -140,7 +139,7 @@ class TKqpExecuterBase : public TActorBootstrapped { TasksGraph.GetMeta().Database = Database; TasksGraph.GetMeta().ChannelTransportVersion = chanTransportVersion; TasksGraph.GetMeta().UserRequestContext = userRequestContext; - ResponseEv = std::make_unique(Request.TxAlloc); + ResponseEv = std::make_unique(Request.TxAlloc, ExecType); ResponseEv->Orbit = std::move(Request.Orbit); Stats = std::make_unique(Request.StatsMode, &TasksGraph, ResponseEv->Record.MutableResponse()->MutableResult()->MutableStats()); @@ -360,7 +359,7 @@ class TKqpExecuterBase : public TActorBootstrapped { ui64 seqNo = ev->Get()->Record.GetSeqNo(); i64 freeSpace = ev->Get()->Record.GetFreeSpace(); - LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::KQP_EXECUTER, "TxId: " << TxId + LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::KQP_EXECUTER, "TxId: " << TxId << ", send ack to channelId: " << channelId << ", seqNo: " << seqNo << ", enough: " << ev->Get()->Record.GetEnough() @@ -798,6 +797,9 @@ class TKqpExecuterBase : public TActorBootstrapped { auto ru = NRuCalc::CalcRequestUnit(consumption); + YQL_ENSURE(consumption.ReadIOStat.Rows < PotentialUnsigned64OverflowLimit); + YQL_ENSURE(ru < PotentialUnsigned64OverflowLimit); + // Some heuristic to reduce overprice due to round part stats if (ru <= 100 && !force) return; @@ -1661,6 +1663,10 @@ class TKqpExecuterBase : public TActorBootstrapped { } void AbortExecutionAndDie(TActorId abortSender, NYql::NDqProto::StatusIds::StatusCode status, const TString& message) { + if (AlreadyReplied) { + return; + } + LOG_E("Abort execution: " << NYql::NDqProto::StatusIds_StatusCode_Name(status) << "," << message); if (ExecuterSpan) { ExecuterSpan.EndError(TStringBuilder() << NYql::NDqProto::StatusIds_StatusCode_Name(status)); @@ -1674,6 +1680,7 @@ class TKqpExecuterBase : public TActorBootstrapped { this->Send(Target, abortEv.Release()); } + AlreadyReplied = true; LOG_E("Sending timeout response to: " << Target); this->Send(Target, ResponseEv.release()); @@ -1685,6 +1692,10 @@ class TKqpExecuterBase : public TActorBootstrapped { virtual void ReplyErrorAndDie(Ydb::StatusIds::StatusCode status, google::protobuf::RepeatedPtrField* issues) { + if (AlreadyReplied) { + return; + } + if (Planner) { for (auto computeActor : Planner->GetPendingComputeActors()) { LOG_D("terminate compute actor " << computeActor.first); @@ -1694,6 +1705,7 @@ class TKqpExecuterBase : public TActorBootstrapped { } } + AlreadyReplied = true; auto& response = *ResponseEv->Record.MutableResponse(); response.SetStatus(status); @@ -1907,6 +1919,8 @@ class TKqpExecuterBase : public TActorBootstrapped { THashMap ResultChannelToComputeActor; THashMap> SourceScanStageIdToParititions; + bool AlreadyReplied = false; + private: static constexpr TDuration ResourceUsageUpdateInterval = TDuration::MilliSeconds(100); }; diff --git a/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp b/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp index 34633516825a..2c67d66cbe38 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp +++ b/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp @@ -746,7 +746,7 @@ void TQueryExecutionStats::ExportExecStats(NYql::NDqProto::TDqExecutionStats& st for (auto& p2 : p.second.Egress) { ExportAggAsyncBufferStats(p2.second, (*stageStats.MutableEgress())[p2.first]); } - } + } } void TQueryExecutionStats::Finish() { @@ -761,7 +761,7 @@ void TQueryExecutionStats::Finish() { Result->SetDurationUs(FinishTs.MicroSeconds() - StartTs.MicroSeconds()); // Result->Result* feilds are (temporary?) commented out in proto due to lack of use - // + // // Result->SetResultBytes(ResultBytes); // Result->SetResultRows(ResultRows); @@ -838,6 +838,11 @@ void TProgressStatEntry::Out(IOutputStream& o) const { } void TProgressStat::Set(const NDqProto::TDqComputeActorStats& stats) { + if (Cur.Defined) { + Cur = TEntry(); + } + + Cur.Defined = true; Cur.ComputeTime += TDuration::MicroSeconds(stats.GetCpuTimeUs()); for (auto& task : stats.GetTasks()) { for (auto& table: task.GetTables()) { @@ -848,7 +853,7 @@ void TProgressStat::Set(const NDqProto::TDqComputeActorStats& stats) { } TProgressStat::TEntry TProgressStat::GetLastUsage() const { - return Cur - Total; + return Cur.Defined ? Cur - Total : Cur; } void TProgressStat::Update() { diff --git a/ydb/core/kqp/executer_actor/kqp_executer_stats.h b/ydb/core/kqp/executer_actor/kqp_executer_stats.h index bea43d882dab..b078a6ca7e64 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_stats.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_stats.h @@ -191,6 +191,7 @@ struct TTableStat { struct TProgressStatEntry { TDuration ComputeTime; TTableStat ReadIOStat; + bool Defined = false; TProgressStatEntry& operator+=(const TProgressStatEntry& rhs); diff --git a/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp b/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp index 6c2df7106680..3c50ede68766 100644 --- a/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp @@ -79,7 +79,9 @@ class TKqpLiteralExecuter { , LiteralExecuterSpan(TWilsonKqp::LiteralExecuter, std::move(Request.TraceId), "LiteralExecuter") , UserRequestContext(userRequestContext) { - ResponseEv = std::make_unique(Request.TxAlloc); + ResponseEv = std::make_unique( + Request.TxAlloc, TEvKqpExecuter::TEvTxResponse::EExecutionType::Literal); + ResponseEv->Orbit = std::move(Request.Orbit); Stats = std::make_unique(Request.StatsMode, &TasksGraph, ResponseEv->Record.MutableResponse()->MutableResult()->MutableStats()); diff --git a/ydb/core/kqp/executer_actor/kqp_result_channel.cpp b/ydb/core/kqp/executer_actor/kqp_result_channel.cpp index 3d3d9d00d199..c71efa06082a 100644 --- a/ydb/core/kqp/executer_actor/kqp_result_channel.cpp +++ b/ydb/core/kqp/executer_actor/kqp_result_channel.cpp @@ -56,6 +56,8 @@ class TResultCommonChannelProxy : public NActors::TActor(NYql::NDqProto::StatusIds::INTERNAL_ERROR, msg); + auto evAbort = MakeHolder(code, msg); Send(Executer, evAbort.Release()); Become(&TResultCommonChannelProxy::DeadState); @@ -112,7 +114,7 @@ class TResultCommonChannelProxy : public NActors::TActorGetTypeRewrite()) { hFunc(TEvents::TEvPoison, HandlePoison); } - + } catch(const yexception& ex) { InternalError(ex.what()); } diff --git a/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp index 26dc514af23b..9ed5fceef0d3 100644 --- a/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp @@ -69,7 +69,9 @@ class TKqpSchemeExecuter : public TActorBootstrapped { YQL_ENSURE(PhyTx); YQL_ENSURE(PhyTx->GetType() == NKqpProto::TKqpPhyTx::TYPE_SCHEME); - ResponseEv = std::make_unique(nullptr); + ResponseEv = std::make_unique( + nullptr, + TEvKqpExecuter::TEvTxResponse::EExecutionType::Scheme); } void StartBuildOperation() { diff --git a/ydb/core/kqp/executer_actor/ya.make b/ydb/core/kqp/executer_actor/ya.make index 7a7cee3e675e..9cb7618afb0d 100644 --- a/ydb/core/kqp/executer_actor/ya.make +++ b/ydb/core/kqp/executer_actor/ya.make @@ -42,6 +42,10 @@ PEERDIR( ydb/library/yql/providers/common/http_gateway ) +GENERATE_ENUM_SERIALIZATION( + kqp_executer.h +) + YQL_LAST_ABI_VERSION() END() diff --git a/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp b/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp index 527cc1dce28e..9204f469758d 100644 --- a/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp @@ -887,6 +887,7 @@ virtual TStatus HandleCreateTable(TKiCreateTable create, TExprContext& ctx) over columnMeta.DefaultFromSequence = "_serial_column_" + columnMeta.Name; columnMeta.SetDefaultFromSequence(); + columnMeta.NotNull = true; } else if (constraint.Name().Value() == "not_null") { columnMeta.NotNull = true; } diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp index f19e2e74e82d..8755961afbf8 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp +++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp @@ -863,7 +863,7 @@ class TKqpProxyService : public TActorBootstrapped { } const TKqpSessionInfo* info = LocalSessions->FindPtr(proxyRequest->SessionId); - if (info) { + if (info && !info->AttachedRpcId) { LocalSessions->StartIdleCheck(info, GetSessionIdleDuration()); } diff --git a/ydb/core/kqp/query_data/kqp_query_data.cpp b/ydb/core/kqp/query_data/kqp_query_data.cpp index c0bf8e338ed0..7debbaf9da02 100644 --- a/ydb/core/kqp/query_data/kqp_query_data.cpp +++ b/ydb/core/kqp/query_data/kqp_query_data.cpp @@ -77,14 +77,12 @@ Ydb::ResultSet* TKqpExecuterTxResult::GetYdb(google::protobuf::Arena* arena, TMa return ydbResult; } -Ydb::ResultSet* TKqpExecuterTxResult::GetTrailingYdb(google::protobuf::Arena* arena) { +Ydb::ResultSet* TKqpExecuterTxResult::ExtractTrailingYdb(google::protobuf::Arena* arena) { if (!HasTrailingResult) return nullptr; Ydb::ResultSet* ydbResult = google::protobuf::Arena::CreateMessage(arena); - if (TrailingResult.rows().size() > 0) { - ydbResult->Swap(&TrailingResult); - } + ydbResult->Swap(&TrailingResult); return ydbResult; } @@ -237,10 +235,10 @@ NKikimrMiniKQL::TResult* TQueryData::GetMkqlTxResult(const NKqpProto::TKqpPhyRes return TxResults[txIndex][resultIndex].GetMkql(arena); } -Ydb::ResultSet* TQueryData::GetTrailingTxResult(const NKqpProto::TKqpPhyResultBinding& rb, google::protobuf::Arena* arena) { +Ydb::ResultSet* TQueryData::ExtractTrailingTxResult(const NKqpProto::TKqpPhyResultBinding& rb, google::protobuf::Arena* arena) { auto txIndex = rb.GetTxResultBinding().GetTxIndex(); auto resultIndex = rb.GetTxResultBinding().GetResultIndex(); - return TxResults[txIndex][resultIndex].GetTrailingYdb(arena); + return TxResults[txIndex][resultIndex].ExtractTrailingYdb(arena); } diff --git a/ydb/core/kqp/query_data/kqp_query_data.h b/ydb/core/kqp/query_data/kqp_query_data.h index ca8e772d5bda..9c380c090965 100644 --- a/ydb/core/kqp/query_data/kqp_query_data.h +++ b/ydb/core/kqp/query_data/kqp_query_data.h @@ -100,7 +100,7 @@ struct TKqpExecuterTxResult { NKikimrMiniKQL::TResult* GetMkql(google::protobuf::Arena* arena); NKikimrMiniKQL::TResult GetMkql(); Ydb::ResultSet* GetYdb(google::protobuf::Arena* arena, TMaybe rowsLimitPerWrite); - Ydb::ResultSet* GetTrailingYdb(google::protobuf::Arena* arena); + Ydb::ResultSet* ExtractTrailingYdb(google::protobuf::Arena* arena); void FillMkql(NKikimrMiniKQL::TResult* mkqlResult); void FillYdb(Ydb::ResultSet* ydbResult, TMaybe rowsLimitPerWrite); @@ -253,7 +253,7 @@ class TQueryData : NMiniKQL::ITerminator { TTypedUnboxedValue GetTxResult(ui32 txIndex, ui32 resultIndex); NKikimrMiniKQL::TResult* GetMkqlTxResult(const NKqpProto::TKqpPhyResultBinding& rb, google::protobuf::Arena* arena); Ydb::ResultSet* GetYdbTxResult(const NKqpProto::TKqpPhyResultBinding& rb, google::protobuf::Arena* arena, TMaybe rowsLimitPerWrite); - Ydb::ResultSet* GetTrailingTxResult(const NKqpProto::TKqpPhyResultBinding& rb, google::protobuf::Arena* arena); + Ydb::ResultSet* ExtractTrailingTxResult(const NKqpProto::TKqpPhyResultBinding& rb, google::protobuf::Arena* arena); std::pair GetInternalBindingValue(const NKqpProto::TKqpPhyParamBinding& paramBinding); TTypedUnboxedValue& GetParameterUnboxedValue(const TString& name); diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp index 22f1bf9bcfc9..1b69097b7b48 100644 --- a/ydb/core/kqp/runtime/kqp_read_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp @@ -1136,7 +1136,7 @@ class TKqpReadActor : public TActorBootstrapped, public NYql::NDq NMiniKQL::WriteColumnValuesFromArrow(editAccessors, NMiniKQL::TBatchDataAccessor(result->Get()->GetArrowBatch()), columnIndex, resultColumnIndex, column.TypeInfo) ); if (column.NotNull) { - std::shared_ptr columnSharedPtr = result->Get()->GetArrowBatch()->column(columnIndex); + std::shared_ptr columnSharedPtr = result->Get()->GetArrowBatch()->column(columnIndex); bool gotNullValue = false; for (ui64 rowIndex = 0; rowIndex < result->Get()->GetRowsCount(); ++rowIndex) { if (columnSharedPtr->IsNull(rowIndex)) { @@ -1181,9 +1181,14 @@ class TKqpReadActor : public TActorBootstrapped, public NYql::NDq } NMiniKQL::TBytesStatistics PackCells(TResult& handle, i64& freeSpace) { - auto& [shardId, result, batch, _, packed] = handle; + auto& [shardId, result, batch, processedRows, packed] = handle; NMiniKQL::TBytesStatistics stats; batch->reserve(batch->size()); + CA_LOG_D(TStringBuilder() << "enter pack cells method " + << " shardId: " << shardId + << " processedRows: " << processedRows + << " packed rows: " << packed + << " freeSpace: " << freeSpace); for (size_t rowIndex = packed; rowIndex < result->Get()->GetRowsCount(); ++rowIndex) { const auto& row = result->Get()->GetCells(rowIndex); @@ -1225,6 +1230,12 @@ class TKqpReadActor : public TActorBootstrapped, public NYql::NDq break; } } + + CA_LOG_D(TStringBuilder() << "exit pack cells method " + << " shardId: " << shardId + << " processedRows: " << processedRows + << " packed rows: " << packed + << " freeSpace: " << freeSpace); return stats; } @@ -1246,7 +1257,9 @@ class TKqpReadActor : public TActorBootstrapped, public NYql::NDq YQL_ENSURE(!resultBatch.IsWide(), "Wide stream is not supported"); - CA_LOG_D(TStringBuilder() << " enter getasyncinputdata results size " << Results.size()); + CA_LOG_D(TStringBuilder() << " enter getasyncinputdata results size " << Results.size() + << ", freeSpace " << freeSpace); + ui64 bytes = 0; while (!Results.empty()) { auto& result = Results.front(); @@ -1255,14 +1268,15 @@ class TKqpReadActor : public TActorBootstrapped, public NYql::NDq auto& msg = *result.ReadResult->Get(); if (!batch.Defined()) { batch.ConstructInPlace(); - switch (msg.Record.GetResultFormat()) { - case NKikimrDataEvents::FORMAT_ARROW: - BytesStats.AddStatistics(PackArrow(result, freeSpace)); - break; - case NKikimrDataEvents::FORMAT_UNSPECIFIED: - case NKikimrDataEvents::FORMAT_CELLVEC: - BytesStats.AddStatistics(PackCells(result, freeSpace)); - } + } + + switch (msg.Record.GetResultFormat()) { + case NKikimrDataEvents::FORMAT_ARROW: + BytesStats.AddStatistics(PackArrow(result, freeSpace)); + break; + case NKikimrDataEvents::FORMAT_UNSPECIFIED: + case NKikimrDataEvents::FORMAT_CELLVEC: + BytesStats.AddStatistics(PackCells(result, freeSpace)); } auto id = result.ReadResult->Get()->Record.GetReadId(); @@ -1334,6 +1348,7 @@ class TKqpReadActor : public TActorBootstrapped, public NYql::NDq CA_LOG_D(TStringBuilder() << "returned async data" << " processed rows " << ProcessedRowCount + << " left freeSpace " << freeSpace << " received rows " << ReceivedRowCount << " running reads " << RunningReads() << " pending shards " << PendingShards.Size() diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index 1b285ba738e6..2d43845429eb 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -1268,7 +1268,10 @@ class TKqpSessionActor : public TActorBootstrapped { ExecuterId = TActorId{}; if (response->GetStatus() != Ydb::StatusIds::SUCCESS) { - LOG_D("TEvTxResponse has non-success status, CurrentTx: " << QueryState->CurrentTx); + const auto executionType = ev->ExecutionType; + + LOG_D("TEvTxResponse has non-success status, CurrentTx: " << QueryState->CurrentTx + << " ExecutionType: " << executionType); auto status = response->GetStatus(); TIssues issues; @@ -1280,11 +1283,14 @@ class TKqpSessionActor : public TActorBootstrapped { case Ydb::StatusIds::INTERNAL_ERROR: InvalidateQuery(); issues.AddIssue(YqlIssue(TPosition(), TIssuesIds::KIKIMR_QUERY_INVALIDATED, - TStringBuilder() << "Query invalidated on scheme/internal error.")); + TStringBuilder() << "Query invalidated on scheme/internal error during " + << executionType << " execution")); // SCHEME_ERROR during execution is a soft (retriable) error, we abort query execution, // invalidate query cache, and return ABORTED as retriable status. - if (status == Ydb::StatusIds::SCHEME_ERROR) { + if (status == Ydb::StatusIds::SCHEME_ERROR && + executionType != TEvKqpExecuter::TEvTxResponse::EExecutionType::Scheme) + { status = Ydb::StatusIds::ABORTED; } @@ -1581,7 +1587,7 @@ class TKqpSessionActor : public TActorBootstrapped { size_t trailingResultsCount = 0; for (size_t i = 0; i < phyQuery.ResultBindingsSize(); ++i) { if (QueryState->IsStreamResult()) { - auto ydbResult = QueryState->QueryData->GetTrailingTxResult( + auto ydbResult = QueryState->QueryData->ExtractTrailingTxResult( phyQuery.GetResultBindings(i), response->GetArena()); if (ydbResult) { diff --git a/ydb/core/kqp/ut/query/kqp_limits_ut.cpp b/ydb/core/kqp/ut/query/kqp_limits_ut.cpp index 2b0218e43e7b..bca5187d1607 100644 --- a/ydb/core/kqp/ut/query/kqp_limits_ut.cpp +++ b/ydb/core/kqp/ut/query/kqp_limits_ut.cpp @@ -17,6 +17,35 @@ namespace { } Y_UNIT_TEST_SUITE(KqpLimits) { + Y_UNIT_TEST(QSReplySizeEnsureMemoryLimits) { + TKikimrRunner kikimr; + CreateLargeTable(kikimr, 1'000, 100, 1'000, 1'000); + + auto db = kikimr.GetQueryClient(); + + TControlWrapper mkqlInitialMemoryLimit; + TControlWrapper mkqlMaxMemoryLimit; + + mkqlInitialMemoryLimit = kikimr.GetTestServer().GetRuntime()->GetAppData().Icb->RegisterSharedControl( + mkqlInitialMemoryLimit, "KqpSession.MkqlInitialMemoryLimit"); + mkqlMaxMemoryLimit = kikimr.GetTestServer().GetRuntime()->GetAppData().Icb->RegisterSharedControl( + mkqlMaxMemoryLimit, "KqpSession.MkqlMaxMemoryLimit"); + + mkqlInitialMemoryLimit = 1_KB; + mkqlMaxMemoryLimit = 1_KB; + + auto result = db.ExecuteQuery(R"( + UPSERT INTO KeyValue2 + SELECT + KeyText AS Key, + DataText AS Value + FROM `/Root/LargeTable`; + )", NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + result.GetIssues().PrintTo(Cerr); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::PRECONDITION_FAILED); + UNIT_ASSERT(!to_lower(result.GetIssues().ToString()).Contains("query result")); + } + Y_UNIT_TEST(KqpMkqlMemoryLimitException) { TKikimrRunner kikimr; CreateLargeTable(kikimr, 10, 10, 1'000'000, 1); @@ -1033,6 +1062,24 @@ Y_UNIT_TEST_SUITE(KqpLimits) { last = current; } } + + Y_UNIT_TEST(QSReplySize) { + TKikimrRunner kikimr; + CreateLargeTable(kikimr, 10'000, 100, 1'000, 1'000); + + auto db = kikimr.GetQueryClient(); + + auto result = db.ExecuteQuery(R"( + UPSERT INTO KeyValue2 + SELECT + KeyText AS Key, + DataText AS Value + FROM `/Root/LargeTable`; + )", NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + result.GetIssues().PrintTo(Cerr); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::PRECONDITION_FAILED); + UNIT_ASSERT(!to_lower(result.GetIssues().ToString()).Contains("query result")); + } } } // namespace NKqp diff --git a/ydb/core/kqp/ut/scheme/kqp_constraints_ut.cpp b/ydb/core/kqp/ut/scheme/kqp_constraints_ut.cpp index 71ff6bc7da91..6fbb7319009d 100644 --- a/ydb/core/kqp/ut/scheme/kqp_constraints_ut.cpp +++ b/ydb/core/kqp/ut/scheme/kqp_constraints_ut.cpp @@ -256,7 +256,7 @@ Y_UNIT_TEST_SUITE(KqpConstraints) { result.GetIssues().ToString()); CompareYson(R"( [ - [[1];["New"]] + [1;["New"]] ] )", NYdb::FormatResultSetYson(result.GetResultSet(0))); diff --git a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp index dbcc2dcf8b09..aa8009bcacab 100644 --- a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp +++ b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp @@ -140,26 +140,55 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { auto kikimr = DefaultKikimrRunner(); auto db = kikimr.GetQueryClient(); - auto it = db.StreamExecuteQuery(R"( - SELECT Key, Value2 FROM TwoShard WHERE Value2 > 0; - )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); - UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString()); + { + auto it = db.StreamExecuteQuery(R"( + SELECT Key, Value2 FROM TwoShard WHERE Value2 > 0; + )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString()); + + ui64 count = 0; + for (;;) { + auto streamPart = it.ReadNext().GetValueSync(); + if (!streamPart.IsSuccess()) { + UNIT_ASSERT_C(streamPart.EOS(), streamPart.GetIssues().ToString()); + break; + } - ui64 count = 0; - for (;;) { - auto streamPart = it.ReadNext().GetValueSync(); - if (!streamPart.IsSuccess()) { - UNIT_ASSERT_C(streamPart.EOS(), streamPart.GetIssues().ToString()); - break; + if (streamPart.HasResultSet()) { + auto resultSet = streamPart.ExtractResultSet(); + count += resultSet.RowsCount(); + } } - if (streamPart.HasResultSet()) { - auto resultSet = streamPart.ExtractResultSet(); - count += resultSet.RowsCount(); - } + UNIT_ASSERT_VALUES_EQUAL(count, 2); } - UNIT_ASSERT_VALUES_EQUAL(count, 2); + { + auto it = db.StreamExecuteQuery(R"( + SELECT Key, Value2 FROM TwoShard WHERE false ORDER BY Key > 0; + )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString()); + + ui32 rsCount = 0; + ui32 columns = 0; + for (;;) { + auto streamPart = it.ReadNext().GetValueSync(); + if (!streamPart.IsSuccess()) { + UNIT_ASSERT_C(streamPart.EOS(), streamPart.GetIssues().ToString()); + break; + } + + if (streamPart.HasResultSet()) { + auto resultSet = streamPart.ExtractResultSet(); + columns = resultSet.ColumnsCount(); + CompareYson(R"([])", FormatResultSetYson(resultSet)); + rsCount++; + } + } + + UNIT_ASSERT_VALUES_EQUAL(rsCount, 1); + UNIT_ASSERT_VALUES_EQUAL(columns, 2); + } } void CheckQueryResult(TExecuteQueryResult result) { @@ -242,27 +271,60 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { UNIT_ASSERT_VALUES_EQUAL_C(sessionResult.GetStatus(), EStatus::SUCCESS, sessionResult.GetIssues().ToString()); auto session = sessionResult.GetSession(); - const TString query = "UPDATE TwoShard SET Value2 = 0"; - auto result = session.ExecuteQuery(query, TTxControl::BeginTx()).ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); - auto transaction = result.GetTransaction(); - UNIT_ASSERT(transaction->IsActive()); + { + const TString query = "UPDATE TwoShard SET Value2 = 0"; + auto result = session.ExecuteQuery(query, TTxControl::BeginTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + auto transaction = result.GetTransaction(); + UNIT_ASSERT(transaction->IsActive()); - auto checkResult = [&](TString expected) { - auto selectRes = db.ExecuteQuery( - "SELECT * FROM TwoShard ORDER BY Key", - TTxControl::BeginTx().CommitTx() - ).ExtractValueSync(); + auto checkResult = [&](TString expected) { + auto selectRes = db.ExecuteQuery( + "SELECT * FROM TwoShard ORDER BY Key", + TTxControl::BeginTx().CommitTx() + ).ExtractValueSync(); - UNIT_ASSERT_C(selectRes.IsSuccess(), selectRes.GetIssues().ToString()); - CompareYson(expected, FormatResultSetYson(selectRes.GetResultSet(0))); - }; - checkResult(R"([[[1u];["One"];[-1]];[[2u];["Two"];[0]];[[3u];["Three"];[1]];[[4000000001u];["BigOne"];[-1]];[[4000000002u];["BigTwo"];[0]];[[4000000003u];["BigThree"];[1]]])"); + UNIT_ASSERT_C(selectRes.IsSuccess(), selectRes.GetIssues().ToString()); + CompareYson(expected, FormatResultSetYson(selectRes.GetResultSet(0))); + }; + checkResult(R"([[[1u];["One"];[-1]];[[2u];["Two"];[0]];[[3u];["Three"];[1]];[[4000000001u];["BigOne"];[-1]];[[4000000002u];["BigTwo"];[0]];[[4000000003u];["BigThree"];[1]]])"); - auto txRes = transaction->Commit().GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(txRes.GetStatus(), EStatus::SUCCESS, txRes.GetIssues().ToString()); + auto txRes = transaction->Commit().GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(txRes.GetStatus(), EStatus::SUCCESS, txRes.GetIssues().ToString()); - checkResult(R"([[[1u];["One"];[0]];[[2u];["Two"];[0]];[[3u];["Three"];[0]];[[4000000001u];["BigOne"];[0]];[[4000000002u];["BigTwo"];[0]];[[4000000003u];["BigThree"];[0]]])"); + checkResult(R"([[[1u];["One"];[0]];[[2u];["Two"];[0]];[[3u];["Three"];[0]];[[4000000001u];["BigOne"];[0]];[[4000000002u];["BigTwo"];[0]];[[4000000003u];["BigThree"];[0]]])"); + } + + { + const TString query = "UPDATE TwoShard SET Value2 = 1"; + auto result = session.ExecuteQuery(query, TTxControl::BeginTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + auto transaction = result.GetTransaction(); + UNIT_ASSERT(transaction->IsActive()); + + const TString query2 = "UPDATE KeyValue SET Value = 'Vic'"; + auto result2 = session.ExecuteQuery(query2, TTxControl::Tx(transaction->GetId())).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result2.GetStatus(), EStatus::SUCCESS, result2.GetIssues().ToString()); + auto transaction2 = result2.GetTransaction(); + UNIT_ASSERT(transaction2->IsActive()); + + auto checkResult = [&](TString table, TString expected) { + auto selectRes = db.ExecuteQuery( + Sprintf("SELECT * FROM %s ORDER BY Key", table.data()), + TTxControl::BeginTx().CommitTx() + ).ExtractValueSync(); + + UNIT_ASSERT_C(selectRes.IsSuccess(), selectRes.GetIssues().ToString()); + CompareYson(expected, FormatResultSetYson(selectRes.GetResultSet(0))); + }; + checkResult("TwoShard", R"([[[1u];["One"];[0]];[[2u];["Two"];[0]];[[3u];["Three"];[0]];[[4000000001u];["BigOne"];[0]];[[4000000002u];["BigTwo"];[0]];[[4000000003u];["BigThree"];[0]]])"); + checkResult("KeyValue", R"([[[1u];["One"]];[[2u];["Two"]]])"); + auto txRes = transaction->Commit().GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(txRes.GetStatus(), EStatus::SUCCESS, txRes.GetIssues().ToString()); + + checkResult("KeyValue", R"([[[1u];["Vic"]];[[2u];["Vic"]]])"); + checkResult("TwoShard", R"([[[1u];["One"];[1]];[[2u];["Two"];[1]];[[3u];["Three"];[1]];[[4000000001u];["BigOne"];[1]];[[4000000002u];["BigTwo"];[1]];[[4000000003u];["BigThree"];[1]]])"); + } } Y_UNIT_TEST(ExecuteQueryInteractiveTxCommitWithQuery) { @@ -367,6 +429,19 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { } } + Y_UNIT_TEST(ExecuteDDLStatusCodeSchemeError) { + TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false)); + { + auto db = kikimr.GetQueryClient(); + auto result = db.ExecuteQuery(R"( + CREATE TABLE unsupported_TzTimestamp (key Int32, payload TzTimestamp, primary key(key)))", + TTxControl::NoTx() + ).GetValueSync(); + + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SCHEME_ERROR, result.GetIssues().ToString()); + } + } + Y_UNIT_TEST(ExecuteQueryScalar) { auto kikimr = DefaultKikimrRunner(); auto db = kikimr.GetQueryClient();