diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto index c14920af8763..b2f9d7097285 100644 --- a/ydb/core/protos/pqconfig.proto +++ b/ydb/core/protos/pqconfig.proto @@ -207,6 +207,8 @@ message TPQConfig { optional uint64 BalancerWakeupIntervalSec = 54 [default = 30]; optional uint64 BalancerStatsWakeupIntervalSec = 55 [default = 5]; + + optional uint64 MaxWriteSessionBytesInflight = 57 [default = 1000000]; } message TChannelProfile { diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.cpp b/ydb/services/persqueue_v1/actors/write_session_actor.cpp index c1301a94af4b..4ab18d87f692 100644 --- a/ydb/services/persqueue_v1/actors/write_session_actor.cpp +++ b/ydb/services/persqueue_v1/actors/write_session_actor.cpp @@ -159,7 +159,6 @@ namespace NGRpcProxy::V1 { using namespace Ydb::PersQueue::V1; -static const ui32 MAX_BYTES_INFLIGHT = 1_MB; static const TDuration SOURCEID_UPDATE_PERIOD = TDuration::Hours(1); // metering @@ -942,7 +941,7 @@ void TWriteSessionActor::Handle(NPQ::TEvPartitionWriter::T if (BytesInflight) { BytesInflight.Dec(diff); } - if (!NextRequestInited && BytesInflight_ < MAX_BYTES_INFLIGHT) { //allow only one big request to be readed but not sended + if (!NextRequestInited && BytesInflight_ < AppData(ctx)->PQConfig.GetMaxWriteSessionBytesInflight()) { //allow only one big request to be readed but not sended NextRequestInited = true; if (!Request->GetStreamCtx()->Read()) { LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session v1 cookie: " << Cookie << " sessionId: " << OwnerCookie << " grpc read failed"); @@ -1498,7 +1497,7 @@ void TWriteSessionActor::Handle(typename TEvWrite::TPtr& e BytesInflightTotal.Inc(diff); } - if (BytesInflight_ < MAX_BYTES_INFLIGHT) { //allow only one big request to be readed but not sended + if (BytesInflight_ < AppData(ctx)->PQConfig.GetMaxWriteSessionBytesInflight()) { //allow only one big request to be readed but not sended Y_ABORT_UNLESS(NextRequestInited); if (!Request->GetStreamCtx()->Read()) { LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session v1 cookie: " << Cookie << " sessionId: " << OwnerCookie << " grpc read failed");