Skip to content

Commit 89f0d5e

Browse files
authored
Merge 2334a5f into 5907dba
2 parents 5907dba + 2334a5f commit 89f0d5e

File tree

12 files changed

+174
-58
lines changed

12 files changed

+174
-58
lines changed

ydb/core/grpc_services/query/rpc_execute_query.cpp

Lines changed: 17 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -394,12 +394,18 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
394394
return;
395395
}
396396

397-
if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) {
398-
Request_->SetRuHeader(record.GetConsumedRu());
397+
Ydb::Query::ExecuteQueryResponsePart response;
399398

400-
auto& kqpResponse = record.GetResponse();
399+
if (NeedReportStats(*Request_->GetProtoRequest())) {
400+
hasTrailingMessage = true;
401+
FillQueryStats(*response.mutable_exec_stats(), kqpResponse);
402+
if (NeedReportAst(*Request_->GetProtoRequest())) {
403+
response.mutable_exec_stats()->set_query_ast(kqpResponse.GetQueryAst());
404+
}
405+
}
401406

402-
Ydb::Query::ExecuteQueryResponsePart response;
407+
if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) {
408+
Request_->SetRuHeader(record.GetConsumedRu());
403409

404410
if (QueryAction == NKikimrKqp::QUERY_ACTION_EXECUTE) {
405411
for(int i = 0; i < kqpResponse.GetYdbResults().size(); i++) {
@@ -415,25 +421,15 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
415421
hasTrailingMessage = true;
416422
response.mutable_tx_meta()->set_id(kqpResponse.GetTxMeta().id());
417423
}
418-
419-
if (NeedReportStats(*Request_->GetProtoRequest())) {
420-
hasTrailingMessage = true;
421-
FillQueryStats(*response.mutable_exec_stats(), kqpResponse);
422-
if (NeedReportAst(*Request_->GetProtoRequest())) {
423-
response.mutable_exec_stats()->set_query_ast(kqpResponse.GetQueryAst());
424-
}
425-
}
426-
427-
if (hasTrailingMessage) {
428-
response.set_status(Ydb::StatusIds::SUCCESS);
429-
response.mutable_issues()->CopyFrom(issueMessage);
430-
TString out;
431-
Y_PROTOBUF_SUPPRESS_NODISCARD response.SerializeToString(&out);
432-
ReplySerializedAndFinishStream(record.GetYdbStatus(), std::move(out));
433-
}
434424
}
435425

436-
if (!hasTrailingMessage) {
426+
if (hasTrailingMessage) {
427+
response.set_status(record.GetYdbStatus());
428+
response.mutable_issues()->CopyFrom(issueMessage);
429+
TString out;
430+
Y_PROTOBUF_SUPPRESS_NODISCARD response.SerializeToString(&out);
431+
ReplySerializedAndFinishStream(record.GetYdbStatus(), std::move(out));
432+
} else {
437433
NYql::TIssues issues;
438434
NYql::IssuesFromMessage(issueMessage, issues);
439435
ReplyFinishStream(record.GetYdbStatus(), issueMessage);

ydb/core/kqp/compile_service/kqp_compile_actor.cpp

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -372,6 +372,18 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
372372
PassAway();
373373
}
374374

375+
void FillCompileResult(std::unique_ptr<NKikimrKqp::TPreparedQuery> preparingQuery, NKikimrKqp::EQueryType queryType) {
376+
auto preparedQueryHolder = std::make_shared<TPreparedQueryHolder>(
377+
preparingQuery.release(), AppData()->FunctionRegistry);
378+
preparedQueryHolder->MutableLlvmSettings().Fill(Config, queryType);
379+
KqpCompileResult->PreparedQuery = preparedQueryHolder;
380+
KqpCompileResult->AllowCache = CanCacheQuery(KqpCompileResult->PreparedQuery->GetPhysicalQuery());
381+
382+
if (AstResult) {
383+
KqpCompileResult->Ast = AstResult->Ast;
384+
}
385+
}
386+
375387
void Handle(TEvKqp::TEvContinueProcess::TPtr &ev, const TActorContext &ctx) {
376388
Y_ENSURE(!ev->Get()->QueryId);
377389

@@ -403,17 +415,7 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
403415

404416
if (status == Ydb::StatusIds::SUCCESS) {
405417
YQL_ENSURE(kqpResult.PreparingQuery);
406-
{
407-
auto preparedQueryHolder = std::make_shared<TPreparedQueryHolder>(
408-
kqpResult.PreparingQuery.release(), AppData()->FunctionRegistry);
409-
preparedQueryHolder->MutableLlvmSettings().Fill(Config, queryType);
410-
KqpCompileResult->PreparedQuery = preparedQueryHolder;
411-
KqpCompileResult->AllowCache = CanCacheQuery(KqpCompileResult->PreparedQuery->GetPhysicalQuery());
412-
413-
if (AstResult) {
414-
KqpCompileResult->Ast = AstResult->Ast;
415-
}
416-
}
418+
FillCompileResult(std::move(kqpResult.PreparingQuery), queryType);
417419

418420
auto now = TInstant::Now();
419421
auto duration = now - StartTime;
@@ -423,6 +425,10 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
423425
<< ", self: " << ctx.SelfID
424426
<< ", duration: " << duration);
425427
} else {
428+
if (kqpResult.PreparingQuery) {
429+
FillCompileResult(std::move(kqpResult.PreparingQuery), queryType);
430+
}
431+
426432
LOG_ERROR_S(ctx, NKikimrServices::KQP_COMPILE_ACTOR, "Compilation failed"
427433
<< ", self: " << ctx.SelfID
428434
<< ", status: " << Ydb::StatusIds_StatusCode_Name(status)

ydb/core/kqp/host/kqp_host.cpp

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,10 @@ class TAsyncValidateYqlResult : public TKqpAsyncResultBase<IKqpHost::TQueryResul
182182
, SqlVersion(sqlVersion) {}
183183

184184
void FillResult(TResult& validateResult) const override {
185+
if (!validateResult.Success()) {
186+
return;
187+
}
188+
185189
YQL_ENSURE(SessionCtx->Query().PrepareOnly);
186190
validateResult.PreparedQuery.reset(SessionCtx->Query().PreparingQuery.release());
187191
validateResult.SqlVersion = SqlVersion;
@@ -211,6 +215,10 @@ class TAsyncExplainYqlResult : public TKqpAsyncResultBase<IKqpHost::TQueryResult
211215
, UseDqExplain(useDqExplain) {}
212216

213217
void FillResult(TResult& queryResult) const override {
218+
if (!queryResult.Success()) {
219+
return;
220+
}
221+
214222
if (UseDqExplain) {
215223
TVector<const TString> plans;
216224
for (auto id : SessionCtx->Query().ExecutionOrder) {
@@ -253,6 +261,10 @@ class TAsyncExecuteYqlResult : public TKqpAsyncResultBase<IKqpHost::TQueryResult
253261
, SqlVersion(sqlVersion) {}
254262

255263
void FillResult(TResult& queryResult) const override {
264+
if (!queryResult.Success()) {
265+
return;
266+
}
267+
256268
for (auto& resultStr : ResultProviderConfig.CommittedResults) {
257269
queryResult.Results.emplace_back(
258270
google::protobuf::Arena::CreateMessage<NKikimrMiniKQL::TResult>(queryResult.ProtobufArenaPtr.get()));
@@ -300,6 +312,10 @@ class TAsyncExecuteKqlResult : public TKqpAsyncResultBase<IKqpHost::TQueryResult
300312
, ExecuteCtx(executeCtx) {}
301313

302314
void FillResult(TResult& queryResult) const override {
315+
if (!queryResult.Success()) {
316+
return;
317+
}
318+
303319
YQL_ENSURE(ExecuteCtx.QueryResults.size() == 1);
304320
queryResult = std::move(ExecuteCtx.QueryResults[0]);
305321
queryResult.QueryPlan = queryResult.PreparingQuery->GetPhysicalQuery().GetQueryPlan();
@@ -320,13 +336,28 @@ class TAsyncPrepareYqlResult : public TKqpAsyncResultBase<IKqpHost::TQueryResult
320336
using TResult = IKqpHost::TQueryResult;
321337

322338
TAsyncPrepareYqlResult(TExprNode* queryRoot, TExprContext& exprCtx, IGraphTransformer& transformer,
323-
TIntrusivePtr<TKikimrQueryContext> queryCtx, const TKqpQueryRef& query, TMaybe<TSqlVersion> sqlVersion)
339+
TIntrusivePtr<TKikimrQueryContext> queryCtx, const TKqpQueryRef& query, TMaybe<TSqlVersion> sqlVersion,
340+
TIntrusivePtr<TKqlTransformContext> transformCtx)
324341
: TKqpAsyncResultBase(queryRoot, exprCtx, transformer)
325342
, QueryCtx(queryCtx)
343+
, ExprCtx(exprCtx)
344+
, TransformCtx(transformCtx)
326345
, QueryText(query.Text)
327346
, SqlVersion(sqlVersion) {}
328347

329348
void FillResult(TResult& prepareResult) const override {
349+
if (!prepareResult.Success()) {
350+
auto exprRoot = GetExprRoot();
351+
if (TransformCtx && TransformCtx->ExplainTransformerInput) {
352+
exprRoot = TransformCtx->ExplainTransformerInput;
353+
}
354+
if (exprRoot) {
355+
prepareResult.PreparingQuery = std::move(QueryCtx->PreparingQuery);
356+
prepareResult.PreparingQuery->MutablePhysicalQuery()->SetQueryAst(KqpExprToPrettyString(*exprRoot, ExprCtx));
357+
}
358+
return;
359+
}
360+
330361
YQL_ENSURE(QueryCtx->PrepareOnly);
331362
YQL_ENSURE(QueryCtx->PreparingQuery);
332363

@@ -344,6 +375,8 @@ class TAsyncPrepareYqlResult : public TKqpAsyncResultBase<IKqpHost::TQueryResult
344375

345376
private:
346377
TIntrusivePtr<TKikimrQueryContext> QueryCtx;
378+
NYql::TExprContext& ExprCtx;
379+
TIntrusivePtr<TKqlTransformContext> TransformCtx;
347380
TString QueryText;
348381
TMaybe<TSqlVersion> SqlVersion;
349382
};
@@ -933,6 +966,7 @@ class TKqpHost : public IKqpHost {
933966
, IsInternalCall(isInternalCall)
934967
, FederatedQuerySetup(federatedQuerySetup)
935968
, SessionCtx(new TKikimrSessionContext(funcRegistry, config, TAppData::TimeProvider, TAppData::RandomProvider, userToken))
969+
, Config(config)
936970
, TypesCtx(MakeIntrusive<TTypeAnnotationContext>())
937971
, PlanBuilder(CreatePlanBuilder(*TypesCtx))
938972
, FakeWorld(ExprCtx->NewWorld(TPosition()))
@@ -1265,7 +1299,7 @@ class TKqpHost : public IKqpHost {
12651299
}
12661300

12671301
return MakeIntrusive<TAsyncPrepareYqlResult>(queryExpr.Get(), ctx, *YqlTransformer, SessionCtx->QueryPtr(),
1268-
query.Text, sqlVersion);
1302+
query.Text, sqlVersion, TransformCtx);
12691303
}
12701304

12711305
IAsyncQueryResultPtr PrepareDataQueryAstInternal(const TKqpQueryRef& queryAst, const TPrepareSettings& settings,
@@ -1327,7 +1361,7 @@ class TKqpHost : public IKqpHost {
13271361
}
13281362

13291363
return MakeIntrusive<TAsyncPrepareYqlResult>(queryExpr.Get(), ctx, *YqlTransformer, SessionCtx->QueryPtr(),
1330-
query.Text, sqlVersion);
1364+
query.Text, sqlVersion, TransformCtx);
13311365
}
13321366

13331367
IAsyncQueryResultPtr PrepareScanQueryInternal(const TKqpQueryRef& query, bool isSql, TExprContext& ctx,
@@ -1354,7 +1388,7 @@ class TKqpHost : public IKqpHost {
13541388
}
13551389

13561390
return MakeIntrusive<TAsyncPrepareYqlResult>(queryExpr.Get(), ctx, *YqlTransformer, SessionCtx->QueryPtr(),
1357-
query.Text, sqlVersion);
1391+
query.Text, sqlVersion, TransformCtx);
13581392
}
13591393

13601394
IAsyncQueryResultPtr PrepareScanQueryAstInternal(const TKqpQueryRef& queryAst, TExprContext& ctx) {
@@ -1502,7 +1536,8 @@ class TKqpHost : public IKqpHost {
15021536
}
15031537

15041538
void Init(EKikimrQueryType queryType) {
1505-
KqpRunner = CreateKqpRunner(Gateway, Cluster, TypesCtx, SessionCtx, *FuncRegistry);
1539+
TransformCtx = MakeIntrusive<TKqlTransformContext>(Config, SessionCtx->QueryPtr(), SessionCtx->TablesPtr());
1540+
KqpRunner = CreateKqpRunner(Gateway, Cluster, TypesCtx, SessionCtx, TransformCtx, *FuncRegistry);
15061541

15071542
ExprCtx->NodesAllocationLimit = SessionCtx->Config()._KqpExprNodesAllocationLimit.Get().GetRef();
15081543
ExprCtx->StringsAllocationLimit = SessionCtx->Config()._KqpExprStringsAllocationLimit.Get().GetRef();
@@ -1635,6 +1670,7 @@ class TKqpHost : public IKqpHost {
16351670
std::optional<TKqpFederatedQuerySetup> FederatedQuerySetup;
16361671

16371672
TIntrusivePtr<TKikimrSessionContext> SessionCtx;
1673+
TKikimrConfiguration::TPtr Config;
16381674

16391675
TIntrusivePtr<NKikimr::NMiniKQL::IFunctionRegistry> FuncRegistryHolder;
16401676
const NKikimr::NMiniKQL::IFunctionRegistry* FuncRegistry;
@@ -1648,6 +1684,7 @@ class TKqpHost : public IKqpHost {
16481684
TExprNode::TPtr FakeWorld;
16491685

16501686
TIntrusivePtr<TExecuteContext> ExecuteCtx;
1687+
TIntrusivePtr<TKqlTransformContext> TransformCtx;
16511688
TIntrusivePtr<IKqpRunner> KqpRunner;
16521689
NExternalSource::IExternalSourceFactory::TPtr ExternalSourceFactory{NExternalSource::CreateExternalSourceFactory({})};
16531690

ydb/core/kqp/host/kqp_host_impl.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,9 @@ class TKqpAsyncResultBase : public NYql::IKikimrAsyncResult<TResult> {
3434
YQL_ENSURE(HasResult());
3535

3636
if (Status.GetValue() == NYql::IGraphTransformer::TStatus::Error) {
37-
return NYql::NCommon::ResultFromErrors<TResult>(ExprCtx.IssueManager.GetIssues());
37+
TResult result = NYql::NCommon::ResultFromErrors<TResult>(ExprCtx.IssueManager.GetIssues());
38+
FillResult(result);
39+
return result;
3840
}
3941

4042
YQL_ENSURE(Status.GetValue() == NYql::IGraphTransformer::TStatus::Ok);
@@ -244,7 +246,7 @@ class IKqpRunner : public TThrRefBase {
244246

245247
TIntrusivePtr<IKqpRunner> CreateKqpRunner(TIntrusivePtr<IKqpGateway> gateway, const TString& cluster,
246248
const TIntrusivePtr<NYql::TTypeAnnotationContext>& typesCtx, const TIntrusivePtr<NYql::TKikimrSessionContext>& sessionCtx,
247-
const NMiniKQL::IFunctionRegistry& funcRegistry);
249+
const TIntrusivePtr<TKqlTransformContext>& transformCtx, const NMiniKQL::IFunctionRegistry& funcRegistry);
248250

249251
TAutoPtr<NYql::IGraphTransformer> CreateKqpExplainPreparedTransformer(TIntrusivePtr<IKqpGateway> gateway,
250252
const TString& cluster, TIntrusivePtr<TKqlTransformContext> transformCtx, const NMiniKQL::IFunctionRegistry* funcRegistry,

ydb/core/kqp/host/kqp_runner.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -137,14 +137,14 @@ class TKqpRunner : public IKqpRunner {
137137
public:
138138
TKqpRunner(TIntrusivePtr<IKqpGateway> gateway, const TString& cluster,
139139
const TIntrusivePtr<TTypeAnnotationContext>& typesCtx, const TIntrusivePtr<TKikimrSessionContext>& sessionCtx,
140-
const NMiniKQL::IFunctionRegistry& funcRegistry)
140+
const TIntrusivePtr<TKqlTransformContext>& transformCtx, const NMiniKQL::IFunctionRegistry& funcRegistry)
141141
: Gateway(gateway)
142142
, Cluster(cluster)
143143
, TypesCtx(*typesCtx)
144144
, SessionCtx(sessionCtx)
145145
, FunctionRegistry(funcRegistry)
146146
, Config(sessionCtx->ConfigPtr())
147-
, TransformCtx(MakeIntrusive<TKqlTransformContext>(Config, sessionCtx->QueryPtr(), sessionCtx->TablesPtr()))
147+
, TransformCtx(transformCtx)
148148
, OptimizeCtx(MakeIntrusive<TKqpOptimizeContext>(cluster, Config, sessionCtx->QueryPtr(),
149149
sessionCtx->TablesPtr()))
150150
, BuildQueryCtx(MakeIntrusive<TKqpBuildQueryContext>())
@@ -377,9 +377,9 @@ class TKqpRunner : public IKqpRunner {
377377

378378
TIntrusivePtr<IKqpRunner> CreateKqpRunner(TIntrusivePtr<IKqpGateway> gateway, const TString& cluster,
379379
const TIntrusivePtr<TTypeAnnotationContext>& typesCtx, const TIntrusivePtr<TKikimrSessionContext>& sessionCtx,
380-
const NMiniKQL::IFunctionRegistry& funcRegistry)
380+
const TIntrusivePtr<TKqlTransformContext>& transformCtx, const NMiniKQL::IFunctionRegistry& funcRegistry)
381381
{
382-
return new TKqpRunner(gateway, cluster, typesCtx, sessionCtx, funcRegistry);
382+
return new TKqpRunner(gateway, cluster, typesCtx, sessionCtx, transformCtx, funcRegistry);
383383
}
384384

385385
} // namespace NKqp

ydb/core/kqp/session_actor/kqp_session_actor.cpp

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1720,10 +1720,17 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
17201720

17211721
const auto& phyQuery = QueryState->PreparedQuery->GetPhysicalQuery();
17221722
FillColumnsMeta(phyQuery, response);
1723-
} else if (compileResult->Status == Ydb::StatusIds::TIMEOUT && QueryState->QueryDeadlines.CancelAt) {
1724-
// The compile timeout cause cancelation execution of request.
1725-
// So in case of cancel after we can reply with canceled status
1726-
ev.SetYdbStatus(Ydb::StatusIds::CANCELLED);
1723+
} else {
1724+
if (compileResult->Status == Ydb::StatusIds::TIMEOUT && QueryState->QueryDeadlines.CancelAt) {
1725+
// The compile timeout cause cancelation execution of request.
1726+
// So in case of cancel after we can reply with canceled status
1727+
ev.SetYdbStatus(Ydb::StatusIds::CANCELLED);
1728+
}
1729+
1730+
auto& preparedQuery = compileResult->PreparedQuery;
1731+
if (preparedQuery && QueryState->ReportStats() && QueryState->GetStatsMode() >= Ydb::Table::QueryStatsCollection::STATS_COLLECTION_FULL) {
1732+
response.SetQueryAst(preparedQuery->GetPhysicalQuery().GetQueryAst());
1733+
}
17271734
}
17281735
}
17291736

ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -553,6 +553,29 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
553553
UNIT_ASSERT_VALUES_EQUAL(totalTasks, 2);
554554
}
555555

556+
Y_UNIT_TEST(ExecStatsAst) {
557+
auto kikimr = DefaultKikimrRunner();
558+
auto db = kikimr.GetQueryClient();
559+
560+
auto settings = TExecuteQuerySettings()
561+
.StatsMode(EStatsMode::Full);
562+
563+
std::vector<std::pair<TString, EStatus>> cases = {
564+
{ "SELECT 42 AS test_ast_column", EStatus::SUCCESS },
565+
{ "SELECT test_ast_column FROM TwoShard", EStatus::GENERIC_ERROR },
566+
{ "SELECT UNWRAP(42 / 0) AS test_ast_column", EStatus::PRECONDITION_FAILED },
567+
};
568+
569+
for (const auto& [sql, status] : cases) {
570+
auto result = db.ExecuteQuery(sql, TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();
571+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), status, result.GetIssues().ToString());
572+
573+
UNIT_ASSERT(result.GetStats().Defined());
574+
UNIT_ASSERT(result.GetStats()->GetAst().Defined());
575+
UNIT_ASSERT_STRING_CONTAINS(*result.GetStats()->GetAst(), "test_ast_column");
576+
}
577+
}
578+
556579
Y_UNIT_TEST(Ddl) {
557580
NKikimrConfig::TAppConfig appConfig;
558581
appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true);

ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.cpp

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -136,16 +136,21 @@ struct TExecuteQueryBuffer : public TThrRefBase, TNonCopyable {
136136
Iterator_.ReadNext().Subscribe([self](TAsyncExecuteQueryPart partFuture) mutable {
137137
auto part = partFuture.ExtractValue();
138138

139+
if (const auto& st = part.GetStats()) {
140+
self->Stats_ = st;
141+
}
142+
139143
if (!part.IsSuccess()) {
144+
TMaybe<TExecStats> stats;
145+
std::swap(self->Stats_, stats);
146+
140147
if (part.EOS()) {
141148
TVector<NYql::TIssue> issues;
142149
TVector<Ydb::ResultSet> resultProtos;
143-
TMaybe<TExecStats> stats;
144150
TMaybe<TTransaction> tx;
145151

146152
std::swap(self->Issues_, issues);
147153
std::swap(self->ResultSets_, resultProtos);
148-
std::swap(self->Stats_, stats);
149154
std::swap(self->Tx_, tx);
150155

151156
TVector<TResultSet> resultSets;
@@ -160,7 +165,7 @@ struct TExecuteQueryBuffer : public TThrRefBase, TNonCopyable {
160165
std::move(tx)
161166
));
162167
} else {
163-
self->Promise_.SetValue(TExecuteQueryResult(std::move(part), {}, {}, {}));
168+
self->Promise_.SetValue(TExecuteQueryResult(std::move(part), {}, std::move(stats), {}));
164169
}
165170

166171
return;
@@ -185,10 +190,6 @@ struct TExecuteQueryBuffer : public TThrRefBase, TNonCopyable {
185190
resultSet.mutable_rows()->Add(inRsProto.rows().begin(), inRsProto.rows().end());
186191
}
187192

188-
if (const auto& st = part.GetStats()) {
189-
self->Stats_ = st;
190-
}
191-
192193
if (const auto& tx = part.GetTransaction()) {
193194
self->Tx_ = tx;
194195
}

0 commit comments

Comments
 (0)