@@ -24,6 +24,8 @@ using TEvExecuteQueryRequest = TGrpcRequestNoOperationCall<Ydb::Query::ExecuteQu
2424struct TProducerState {
2525 TMaybe<ui64> LastSeqNo;
2626 ui64 AckedFreeSpaceBytes = 0 ;
27+ TActorId ActorId;
28+ ui64 QueryResultIndex = 0 ;
2729};
2830
2931class TRpcFlowControlState {
@@ -244,8 +246,7 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
244246 const auto traceId = Request_->GetTraceId ();
245247
246248 NYql::TIssues issues;
247- NKikimrKqp::EQueryAction queryAction;
248- if (!ParseQueryAction (*req, queryAction, issues)) {
249+ if (!ParseQueryAction (*req, QueryAction, issues)) {
249250 return ReplyFinishStream (Ydb::StatusIds::BAD_REQUEST, std::move (issues));
250251 }
251252
@@ -274,7 +275,7 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
274275 cachePolicy->set_keep_in_cache (true );
275276
276277 auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>(
277- queryAction ,
278+ QueryAction ,
278279 queryType,
279280 SelfId (),
280281 Request_,
@@ -288,7 +289,8 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
288289 nullptr , // operationParams
289290 false , // keepSession
290291 false , // useCancelAfter
291- syntax);
292+ syntax,
293+ true );
292294
293295 if (!ctx.Send (NKqp::MakeKqpProxyID (ctx.SelfID .NodeId ()), ev.Release ())) {
294296 NYql::TIssues issues;
@@ -322,23 +324,24 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
322324
323325 ui64 freeSpaceBytes = FlowControl_.FreeSpaceBytes ();
324326
325- for (auto & pair : StreamProducers_ ) {
326- const auto & producerId = pair.first ;
327- auto & producer = pair.second ;
327+ for (auto & pair : StreamChannels_ ) {
328+ const auto & channelId = pair.first ;
329+ auto & channel = pair.second ;
328330
329- if (freeSpaceBytes > 0 && producer .LastSeqNo && producer .AckedFreeSpaceBytes == 0 ) {
331+ if (freeSpaceBytes > 0 && channel .LastSeqNo && channel .AckedFreeSpaceBytes == 0 ) {
330332 LOG_DEBUG_S (ctx, NKikimrServices::RPC_REQUEST, this ->SelfId () << " Resume execution, "
331- << " , producer : " << producerId
332- << " , seqNo: " << producer .LastSeqNo
333+ << " , channel : " << channelId
334+ << " , seqNo: " << channel .LastSeqNo
333335 << " , freeSpace: " << freeSpaceBytes);
334336
335337 auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
336- resp->Record .SetSeqNo (*producer .LastSeqNo );
338+ resp->Record .SetSeqNo (*channel .LastSeqNo );
337339 resp->Record .SetFreeSpace (freeSpaceBytes);
340+ resp->Record .SetChannelId (channelId);
338341
339- ctx.Send (producerId , resp.Release ());
342+ ctx.Send (channel. ActorId , resp.Release ());
340343
341- producer .AckedFreeSpaceBytes = freeSpaceBytes;
344+ channel .AckedFreeSpaceBytes = freeSpaceBytes;
342345 }
343346 }
344347
@@ -358,9 +361,11 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
358361
359362 Request_->SendSerializedResult (std::move (out), Ydb::StatusIds::SUCCESS);
360363
361- auto & producer = StreamProducers_[ev->Sender ];
362- producer.LastSeqNo = ev->Get ()->Record .GetSeqNo ();
363- producer.AckedFreeSpaceBytes = freeSpaceBytes;
364+ auto & channel = StreamChannels_[ev->Get ()->Record .GetChannelId ()];
365+ channel.ActorId = ev->Sender ;
366+ channel.LastSeqNo = ev->Get ()->Record .GetSeqNo ();
367+ channel.AckedFreeSpaceBytes = freeSpaceBytes;
368+ channel.QueryResultIndex = ev->Get ()->Record .GetQueryResultIndex ();
364369
365370 LOG_DEBUG_S (ctx, NKikimrServices::RPC_REQUEST, this ->SelfId () << " Send stream data ack"
366371 << " , seqNo: " << ev->Get ()->Record .GetSeqNo ()
@@ -371,8 +376,9 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
371376 auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
372377 resp->Record .SetSeqNo (ev->Get ()->Record .GetSeqNo ());
373378 resp->Record .SetFreeSpace (freeSpaceBytes);
379+ resp->Record .SetChannelId (ev->Get ()->Record .GetChannelId ());
374380
375- ctx.Send (ev-> Sender , resp.Release ());
381+ ctx.Send (channel. ActorId , resp.Release ());
376382 }
377383
378384 void Handle (NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext&) {
@@ -381,14 +387,30 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
381387 const auto & issueMessage = record.GetResponse ().GetQueryIssues ();
382388
383389 bool hasTrailingMessage = false ;
384-
390+
391+ auto & kqpResponse = record.GetResponse ();
392+ if (kqpResponse.GetYdbResults ().size () > 1 ) {
393+ auto issue = MakeIssue (NKikimrIssues::TIssuesIds::DEFAULT_ERROR,
394+ " Unexpected trailing message with multiple result sets." );
395+ ReplyFinishStream (Ydb::StatusIds::INTERNAL_ERROR, issue);
396+ return ;
397+ }
398+
385399 if (record.GetYdbStatus () == Ydb::StatusIds::SUCCESS) {
386400 Request_->SetRuHeader (record.GetConsumedRu ());
387401
388402 auto & kqpResponse = record.GetResponse ();
389403
390404 Ydb::Query::ExecuteQueryResponsePart response;
391405
406+ if (QueryAction == NKikimrKqp::QUERY_ACTION_EXECUTE) {
407+ for (int i = 0 ; i < kqpResponse.GetYdbResults ().size (); i++) {
408+ hasTrailingMessage = true ;
409+ response.set_result_set_index (i);
410+ response.mutable_result_set ()->Swap (record.MutableResponse ()->MutableYdbResults (i));
411+ }
412+ }
413+
392414 AuditContextAppend (Request_.get (), *Request_->GetProtoRequest (), response);
393415
394416 if (kqpResponse.HasTxMeta ()) {
@@ -492,8 +514,9 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
492514private:
493515 std::shared_ptr<TEvExecuteQueryRequest> Request_;
494516
517+ NKikimrKqp::EQueryAction QueryAction;
495518 TRpcFlowControlState FlowControl_;
496- TMap<TActorId , TProducerState> StreamProducers_ ;
519+ TMap<ui64 , TProducerState> StreamChannels_ ;
497520};
498521
499522} // namespace
0 commit comments