Skip to content

Commit 16767a7

Browse files
authored
Merge 798475b into edf8f5c
2 parents edf8f5c + 798475b commit 16767a7

File tree

9 files changed

+49
-25
lines changed

9 files changed

+49
-25
lines changed

ydb/core/grpc_services/base/base.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -407,8 +407,8 @@ class IRequestCtx
407407
, public virtual IRequestCtxBase
408408
{
409409
friend class TProtoResponseHelper;
410-
411410
public:
411+
using EStreamCtrl = NYdbGrpc::IRequestContextBase::EStreamCtrl;
412412
virtual google::protobuf::Message* GetRequestMut() = 0;
413413

414414
virtual void SetRuHeader(ui64 ru) = 0;
@@ -418,7 +418,7 @@ class IRequestCtx
418418
virtual void SetStreamingNotify(NYdbGrpc::IRequestContextBase::TOnNextReply&& cb) = 0;
419419
virtual void FinishStream(ui32 status) = 0;
420420

421-
virtual void SendSerializedResult(TString&& in, Ydb::StatusIds::StatusCode status) = 0;
421+
virtual void SendSerializedResult(TString&& in, Ydb::StatusIds::StatusCode status, EStreamCtrl flag = EStreamCtrl::CONT) = 0;
422422

423423
virtual void Reply(NProtoBuf::Message* resp, ui32 status = 0) = 0;
424424

@@ -1193,7 +1193,7 @@ class TGRpcRequestWrapperImpl
11931193
return GetPeerMetaValues(NYdb::YDB_REQUEST_TYPE_HEADER);
11941194
}
11951195

1196-
void SendSerializedResult(TString&& in, Ydb::StatusIds::StatusCode status) override {
1196+
void SendSerializedResult(TString&& in, Ydb::StatusIds::StatusCode status, IRequestCtx::EStreamCtrl flag = IRequestCtx::EStreamCtrl::CONT) override {
11971197
// res->data() pointer is used inside grpc code.
11981198
// So this object should be destroyed during grpc_slice destroying routine
11991199
auto res = new TString;
@@ -1208,7 +1208,7 @@ class TGRpcRequestWrapperImpl
12081208
(void*)(res->data()), res->size(), freeResult, res);
12091209
grpc::Slice sl = grpc::Slice(slice, grpc::Slice::STEAL_REF);
12101210
auto data = grpc::ByteBuffer(&sl, 1);
1211-
Ctx_->Reply(&data, status);
1211+
Ctx_->Reply(&data, status, flag);
12121212
}
12131213

12141214
void SetCostInfo(float consumed_units) override {

ydb/core/grpc_services/local_rpc/local_rpc.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ class TLocalRpcCtx : public NGRpcService::IRequestOpCtx {
183183
Y_ABORT("Unimplemented for local rpc");
184184
}
185185

186-
virtual void SendSerializedResult(TString&&, Ydb::StatusIds::StatusCode) override {
186+
virtual void SendSerializedResult(TString&&, Ydb::StatusIds::StatusCode, EStreamCtrl) override {
187187
Y_ABORT("Unimplemented for local rpc");
188188
}
189189

ydb/core/grpc_services/query/rpc_execute_query.cpp

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -359,10 +359,10 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
359359
void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext&) {
360360
auto& record = ev->Get()->Record.GetRef();
361361

362-
NYql::TIssues issues;
363362
const auto& issueMessage = record.GetResponse().GetQueryIssues();
364-
NYql::IssuesFromMessage(issueMessage, issues);
365363

364+
bool hasTrailingMessage = false;
365+
366366
if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) {
367367
Request_->SetRuHeader(record.GetConsumedRu());
368368

@@ -372,8 +372,6 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
372372

373373
AuditContextAppend(Request_.get(), *Request_->GetProtoRequest(), response);
374374

375-
bool hasTrailingMessage = false;
376-
377375
if (kqpResponse.HasTxMeta()) {
378376
hasTrailingMessage = true;
379377
response.mutable_tx_meta()->set_id(kqpResponse.GetTxMeta().id());
@@ -386,13 +384,20 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
386384

387385
if (hasTrailingMessage) {
388386
response.set_status(Ydb::StatusIds::SUCCESS);
387+
response.mutable_issues()->CopyFrom(issueMessage);
389388
TString out;
390389
Y_PROTOBUF_SUPPRESS_NODISCARD response.SerializeToString(&out);
391-
Request_->SendSerializedResult(std::move(out), record.GetYdbStatus());
390+
const auto finishStreamFlag = NYdbGrpc::IRequestContextBase::EStreamCtrl::FINISH;
391+
Request_->SendSerializedResult(std::move(out), record.GetYdbStatus(), finishStreamFlag);
392+
this->PassAway();
392393
}
393394
}
394395

395-
ReplyFinishStream(record.GetYdbStatus(), issues);
396+
if (!hasTrailingMessage) {
397+
NYql::TIssues issues;
398+
NYql::IssuesFromMessage(issueMessage, issues);
399+
ReplyFinishStream(record.GetYdbStatus(), issueMessage);
400+
}
396401
}
397402

398403
private:
@@ -437,10 +442,11 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
437442
response.set_status(status);
438443
response.mutable_issues()->CopyFrom(message);
439444
Y_PROTOBUF_SUPPRESS_NODISCARD response.SerializeToString(&out);
440-
Request_->SendSerializedResult(std::move(out), status);
445+
const auto finishStreamFlag = NYdbGrpc::IRequestContextBase::EStreamCtrl::FINISH;
446+
Request_->SendSerializedResult(std::move(out), status, finishStreamFlag);
447+
} else {
448+
Request_->FinishStream(status);
441449
}
442-
443-
Request_->FinishStream(status);
444450
this->PassAway();
445451
}
446452

ydb/core/kafka_proxy/actors/kafka_create_partitions_actor.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ class TKafkaCreatePartitionsRequest : public NKikimr::NGRpcService::IRequestOpCt
153153
Y_UNUSED(status);
154154
};
155155

156-
void SendSerializedResult(TString&& in, Ydb::StatusIds::StatusCode status) override {
156+
void SendSerializedResult(TString&& in, Ydb::StatusIds::StatusCode status, EStreamCtrl) override {
157157
Y_UNUSED(in);
158158
Y_UNUSED(status);
159159
};

ydb/core/kafka_proxy/actors/kafka_create_topics_actor.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,9 +151,10 @@ class TKafkaCreateTopicRequest : public NKikimr::NGRpcService::IRequestOpCtx {
151151
Y_UNUSED(status);
152152
};
153153

154-
void SendSerializedResult(TString&& in, Ydb::StatusIds::StatusCode status) override {
154+
void SendSerializedResult(TString&& in, Ydb::StatusIds::StatusCode status, EStreamCtrl flag) override {
155155
Y_UNUSED(in);
156156
Y_UNUSED(status);
157+
Y_UNUSED(flag);
157158
};
158159

159160
void Reply(NProtoBuf::Message* resp, ui32 status = 0) override {

ydb/core/public_http/grpc_request_context_wrapper.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,10 @@ namespace NKikimr::NPublicHttp {
3434
ReplySender(RequestContext, JsonSettings, resp, status);
3535
}
3636

37-
void TGrpcRequestContextWrapper::Reply(grpc::ByteBuffer* resp, ui32 status) {
37+
void TGrpcRequestContextWrapper::Reply(grpc::ByteBuffer* resp, ui32 status, EStreamCtrl ctrl) {
3838
Y_UNUSED(resp);
3939
Y_UNUSED(status);
40+
Y_UNUSED(ctrl);
4041
Y_ABORT_UNLESS(false, "TGrpcRequestContextWrapper::Reply");
4142
}
4243

ydb/core/public_http/grpc_request_context_wrapper.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ class TGrpcRequestContextWrapper : public NYdbGrpc::IRequestContextBase {
2727
virtual NProtoBuf::Message* GetRequestMut();
2828
virtual NYdbGrpc::TAuthState& GetAuthState();
2929
virtual void Reply(NProtoBuf::Message* resp, ui32 status = 0);
30-
virtual void Reply(grpc::ByteBuffer* resp, ui32 status = 0);
30+
virtual void Reply(grpc::ByteBuffer* resp, ui32 status = 0, EStreamCtrl ctrl = EStreamCtrl::CONT);
3131
virtual void ReplyUnauthenticated(const TString& in);
3232
virtual void ReplyError(grpc::StatusCode code, const TString& msg, const TString& details);
3333
virtual TInstant Deadline() const;

ydb/library/grpc/server/grpc_request.h

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -204,8 +204,8 @@ class TGRpcRequestImpl
204204
WriteDataOk(resp, status);
205205
}
206206

207-
void Reply(grpc::ByteBuffer* resp, ui32 status) override {
208-
WriteByteDataOk(resp, status);
207+
void Reply(grpc::ByteBuffer* resp, ui32 status, EStreamCtrl ctrl) override {
208+
WriteByteDataOk(resp, status, ctrl);
209209
}
210210

211211
void ReplyError(grpc::StatusCode code, const TString& msg, const TString& details) override {
@@ -314,7 +314,7 @@ class TGRpcRequestImpl
314314
}
315315
}
316316

317-
void WriteByteDataOk(grpc::ByteBuffer* resp, ui32 status) {
317+
void WriteByteDataOk(grpc::ByteBuffer* resp, ui32 status, EStreamCtrl ctrl) {
318318
auto sz = resp->Length();
319319
if (Writer_) {
320320
GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# byteString peer# %s", this, Name_,
@@ -332,14 +332,23 @@ class TGRpcRequestImpl
332332
// because of std::function cannot hold move-only captured object
333333
// we allocate shared object on heap to avoid buffer copy
334334
auto uResp = MakeIntrusive<TUniversalResponse<TOut>>(resp);
335-
auto cb = [this, uResp = std::move(uResp), sz, status]() {
335+
const bool finish = ctrl == EStreamCtrl::FINISH;
336+
auto cb = [this, uResp = std::move(uResp), sz, status, finish]() {
336337
GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# byteString peer# %s (pushed to grpc)",
337338
this, Name_, this->Context.peer().c_str());
338-
StateFunc_ = &TThis::NextReply;
339+
340+
StateFunc_ = finish ? &TThis::SetFinishDone : &TThis::NextReply;
341+
339342
ResponseSize += sz;
340343
ResponseStatus = status;
341344
OnBeforeCall();
342-
StreamWriter_->Write(*uResp, GetGRpcTag());
345+
if (finish) {
346+
Finished_ = true;
347+
const auto option = grpc::WriteOptions().set_last_message();
348+
StreamWriter_->WriteAndFinish(*uResp, option, grpc::Status::OK, GetGRpcTag());
349+
} else {
350+
StreamWriter_->Write(*uResp, GetGRpcTag());
351+
}
343352
};
344353
StreamAdaptor_->Enqueue(std::move(cb), false);
345354
}

ydb/library/grpc/server/grpc_request_base.h

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,11 @@ class IRequestContextBase: public TThrRefBase {
4646
ERROR,
4747
CANCEL
4848
};
49+
enum class EStreamCtrl {
50+
CONT = 0, // Continue stream
51+
FINISH = 1, // Finish stream just after this reply
52+
};
53+
4954
using TAsyncFinishResult = NThreading::TFuture<EFinishStatus>;
5055

5156
using TOnNextReply = std::function<void (size_t left)>;
@@ -65,7 +70,9 @@ class IRequestContextBase: public TThrRefBase {
6570

6671
//! Send serialised response (The request shoult be created for bytes response type)
6772
//! Implementation can swap ByteBuffer
68-
virtual void Reply(grpc::ByteBuffer* resp, ui32 status = 0) = 0;
73+
74+
//! ctrl - controll stream behaviour. Ignored in case of unary call
75+
virtual void Reply(grpc::ByteBuffer* resp, ui32 status = 0, EStreamCtrl ctrl = EStreamCtrl::CONT) = 0;
6976

7077
//! Send grpc UNAUTHENTICATED status
7178
virtual void ReplyUnauthenticated(const TString& in) = 0;

0 commit comments

Comments
 (0)