From 6e70d57e082efe406f4eac3c3358c9055f29e271 Mon Sep 17 00:00:00 2001 From: Alexander Kotov Date: Wed, 17 Jan 2024 13:00:00 +0300 Subject: [PATCH 1/7] =?UTF-8?q?[+]=20=D1=81=D0=BE=D0=BE=D0=B1=D1=89=D0=B5?= =?UTF-8?q?=D0=BD=D0=B8=D1=8F=20=D0=B4=D0=BB=D1=8F=20=D1=81=D0=BB=D1=83?= =?UTF-8?q?=D0=B6=D0=B5=D0=B1=D0=BD=D0=BE=D0=B9=20=D0=BF=D0=B0=D1=80=D1=82?= =?UTF-8?q?=D0=B8=D1=86=D0=B8=D0=B8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ydb/core/persqueue/events/internal.h | 49 ++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/ydb/core/persqueue/events/internal.h b/ydb/core/persqueue/events/internal.h index 0bc98a1d5753..d40843233dcf 100644 --- a/ydb/core/persqueue/events/internal.h +++ b/ydb/core/persqueue/events/internal.h @@ -4,6 +4,7 @@ #include #include +#include #include #include #include @@ -171,6 +172,8 @@ struct TEvPQ { EvProvideDirectReadInfo, EvCheckPartitionStatusRequest, EvCheckPartitionStatusResponse, + EvGetWriteInfoRequest, + EvGetWriteInfoResponse, EvEnd }; @@ -1004,7 +1007,53 @@ struct TEvPQ { struct TEvCheckPartitionStatusResponse : public TEventPB { }; + struct TEvGetWriteInfoRequest : public TEventLocal { + explicit TEvGetWriteInfoRequest(ui32 cookie) : + Cookie(cookie) + { + } + + ui32 Cookie; // ShadowPartitionId + }; + + struct TEvGetWriteInfoResponse : public TEventLocal { + struct TError { + }; + + struct TSuccess { + TString SourceId; + ui64 MinSeqNo, MaxSeqNo; + NKikimrPQ::TPartitionKeyRange KeyRange; + NPQ::THead Head; + }; + + explicit TEvGetWriteInfoResponse(ui32 cookie) : + Cookie(cookie), + Result(TError{}) + { + } + + TEvGetWriteInfoResponse(ui32 cookie, + TString sourceId, + ui64 minSeqNo, ui64 maxSeqNo, + NKikimrPQ::TPartitionKeyRange keyRange, + NPQ::THead head) : + Cookie(cookie), + Result(TSuccess{std::move(sourceId), minSeqNo, maxSeqNo, std::move(keyRange), std::move(head)}) + { + } + bool IsSuccess() const { + return Result.index() == 1; + } + + const TSuccess& GetSuccess() const { + return get<1>(Result); + } + + ui32 Cookie; // ShadowPartitionId + std::variant Result; + }; }; } //NKikimr From c7ceff7878b911b2adddd880c812726a6dfda4f0 Mon Sep 17 00:00:00 2001 From: Alexander Kotov Date: Wed, 17 Jan 2024 13:56:44 +0300 Subject: [PATCH 2/7] =?UTF-8?q?[+]=20=D0=BA=D0=BE=D0=B4=20=D0=BE=D1=88?= =?UTF-8?q?=D0=B8=D0=B1=D0=BA=D0=B8=20=D0=B8=20=D1=81=D0=BE=D0=BE=D0=B1?= =?UTF-8?q?=D1=89=D0=B5=D0=BD=D0=B8=D0=B5=20=D0=BE=D0=B1=20=D0=BE=D1=88?= =?UTF-8?q?=D0=B8=D0=B1=D0=BA=D0=B5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ydb/core/persqueue/events/internal.h | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/ydb/core/persqueue/events/internal.h b/ydb/core/persqueue/events/internal.h index d40843233dcf..067d0ee187b9 100644 --- a/ydb/core/persqueue/events/internal.h +++ b/ydb/core/persqueue/events/internal.h @@ -1018,6 +1018,12 @@ struct TEvPQ { struct TEvGetWriteInfoResponse : public TEventLocal { struct TError { + enum ECode { + UnprocessedRequestsError, + }; + + ECode Code; + TString Message; }; struct TSuccess { @@ -1027,9 +1033,9 @@ struct TEvPQ { NPQ::THead Head; }; - explicit TEvGetWriteInfoResponse(ui32 cookie) : + TEvGetWriteInfoResponse(ui32 cookie, TError::ECode code, TString message) : Cookie(cookie), - Result(TError{}) + Result(TError{code, std::move(message)}) { } @@ -1047,6 +1053,10 @@ struct TEvPQ { return Result.index() == 1; } + const TError& GetError() const { + return get<0>(Result); + } + const TSuccess& GetSuccess() const { return get<1>(Result); } From 57e69c37014edc905242d2c2a9638735b59483ee Mon Sep 17 00:00:00 2001 From: Alexander Kotov Date: Wed, 17 Jan 2024 14:52:31 +0300 Subject: [PATCH 3/7] =?UTF-8?q?[/]=20=D0=BA=D0=BE=D0=B4=20=D0=BE=D1=88?= =?UTF-8?q?=D0=B8=D0=B1=D0=BA=D0=B8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ydb/core/persqueue/events/internal.h | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/ydb/core/persqueue/events/internal.h b/ydb/core/persqueue/events/internal.h index 067d0ee187b9..1c007708f07a 100644 --- a/ydb/core/persqueue/events/internal.h +++ b/ydb/core/persqueue/events/internal.h @@ -1018,11 +1018,6 @@ struct TEvPQ { struct TEvGetWriteInfoResponse : public TEventLocal { struct TError { - enum ECode { - UnprocessedRequestsError, - }; - - ECode Code; TString Message; }; @@ -1033,9 +1028,9 @@ struct TEvPQ { NPQ::THead Head; }; - TEvGetWriteInfoResponse(ui32 cookie, TError::ECode code, TString message) : + TEvGetWriteInfoResponse(ui32 cookie, TString message) : Cookie(cookie), - Result(TError{code, std::move(message)}) + Result(TError{std::move(message)}) { } From deb4d3d798298142e1a5ce2c518b2c90ff90ec7a Mon Sep 17 00:00:00 2001 From: Alexander Kotov Date: Wed, 17 Jan 2024 14:50:39 +0300 Subject: [PATCH 4/7] =?UTF-8?q?[*]=20SeqNo,=20=D0=BA=D0=BB=D1=8E=D1=87?= =?UTF-8?q?=D0=B8=20=D1=82=D0=B5=D0=BB=D0=B0=20=D0=B8=20=D0=B3=D0=BE=D0=BB?= =?UTF-8?q?=D0=BE=D0=B2=D0=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ydb/core/persqueue/events/internal.h | 28 +++++++++++++++++++--------- ydb/core/persqueue/partition_types.h | 7 ------- 2 files changed, 19 insertions(+), 16 deletions(-) diff --git a/ydb/core/persqueue/events/internal.h b/ydb/core/persqueue/events/internal.h index 1c007708f07a..8b27ec45e42e 100644 --- a/ydb/core/persqueue/events/internal.h +++ b/ydb/core/persqueue/events/internal.h @@ -66,6 +66,18 @@ namespace NPQ { {} }; + struct TDataKey { + TKey Key; + ui32 Size; + TInstant Timestamp; + ui64 CumulativeSize; + }; + + struct TSeqNoRange { + ui64 Min; + ui64 Max; + }; + struct TErrorInfo { NPersQueue::NErrorCode::EErrorCode ErrorCode; TString ErrorStr; @@ -1022,10 +1034,9 @@ struct TEvPQ { }; struct TSuccess { - TString SourceId; - ui64 MinSeqNo, MaxSeqNo; - NKikimrPQ::TPartitionKeyRange KeyRange; - NPQ::THead Head; + THashMap SeqNo; // SourceId -> (MinSeqNo, MaxSeqNo) + std::deque BodyKeys; + std::deque Head; }; TEvGetWriteInfoResponse(ui32 cookie, TString message) : @@ -1035,12 +1046,11 @@ struct TEvPQ { } TEvGetWriteInfoResponse(ui32 cookie, - TString sourceId, - ui64 minSeqNo, ui64 maxSeqNo, - NKikimrPQ::TPartitionKeyRange keyRange, - NPQ::THead head) : + THashMap seqNo, + std::deque bodyKeys, + std::deque head) : Cookie(cookie), - Result(TSuccess{std::move(sourceId), minSeqNo, maxSeqNo, std::move(keyRange), std::move(head)}) + Result(TSuccess{std::move(seqNo), std::move(bodyKeys), std::move(head)}) { } diff --git a/ydb/core/persqueue/partition_types.h b/ydb/core/persqueue/partition_types.h index a84cc54e86ec..a2d2a7905389 100644 --- a/ydb/core/persqueue/partition_types.h +++ b/ydb/core/persqueue/partition_types.h @@ -120,13 +120,6 @@ struct TMessage { #undef DEFINE_CHECKER_GETTER }; -struct TDataKey { - TKey Key; - ui32 Size; - TInstant Timestamp; - ui64 CumulativeSize; -}; - } // namespace NKikimr::NPQ From 1ca8116b23cfae989e99f4217025a7004ac4275c Mon Sep 17 00:00:00 2001 From: Alexander Kotov Date: Thu, 25 Jan 2024 18:48:20 +0300 Subject: [PATCH 5/7] =?UTF-8?q?[*]=20=D0=B3=D0=BE=D0=BB=D0=BE=D0=B2=D0=B0?= =?UTF-8?q?=20=D0=BF=D0=B5=D1=80=D0=B5=D0=B4=D0=B0=D1=91=D1=82=D1=81=D1=8F?= =?UTF-8?q?=20=D0=BA=D0=B0=D0=BA=20=D1=81=D0=BF=D0=B8=D1=81=D0=BE=D0=BA=20?= =?UTF-8?q?TClientBlob?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ydb/core/persqueue/events/internal.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ydb/core/persqueue/events/internal.h b/ydb/core/persqueue/events/internal.h index 8b27ec45e42e..7afe3e398bf3 100644 --- a/ydb/core/persqueue/events/internal.h +++ b/ydb/core/persqueue/events/internal.h @@ -1036,7 +1036,7 @@ struct TEvPQ { struct TSuccess { THashMap SeqNo; // SourceId -> (MinSeqNo, MaxSeqNo) std::deque BodyKeys; - std::deque Head; + TVector Head; }; TEvGetWriteInfoResponse(ui32 cookie, TString message) : @@ -1048,7 +1048,7 @@ struct TEvPQ { TEvGetWriteInfoResponse(ui32 cookie, THashMap seqNo, std::deque bodyKeys, - std::deque head) : + TVector head) : Cookie(cookie), Result(TSuccess{std::move(seqNo), std::move(bodyKeys), std::move(head)}) { From 32351ec78ec9255aa02b1c83a470f41f823537ef Mon Sep 17 00:00:00 2001 From: Alexander Kotov Date: Thu, 25 Jan 2024 19:21:20 +0300 Subject: [PATCH 6/7] =?UTF-8?q?[*]=20=D1=81=D0=BE=D0=BE=D0=B1=D1=89=D0=B5?= =?UTF-8?q?=D0=BD=D0=B8=D1=8F=20TEvGetWriteInfoResponse=20=D0=B8=20TEvGetW?= =?UTF-8?q?riteInfoError?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ydb/core/persqueue/events/internal.h | 44 ++++++++++------------------ 1 file changed, 16 insertions(+), 28 deletions(-) diff --git a/ydb/core/persqueue/events/internal.h b/ydb/core/persqueue/events/internal.h index 7afe3e398bf3..aa1ffdfc1473 100644 --- a/ydb/core/persqueue/events/internal.h +++ b/ydb/core/persqueue/events/internal.h @@ -186,6 +186,7 @@ struct TEvPQ { EvCheckPartitionStatusResponse, EvGetWriteInfoRequest, EvGetWriteInfoResponse, + EvGetWriteInfoError, EvEnd }; @@ -1029,45 +1030,32 @@ struct TEvPQ { }; struct TEvGetWriteInfoResponse : public TEventLocal { - struct TError { - TString Message; - }; - - struct TSuccess { - THashMap SeqNo; // SourceId -> (MinSeqNo, MaxSeqNo) - std::deque BodyKeys; - TVector Head; - }; - - TEvGetWriteInfoResponse(ui32 cookie, TString message) : - Cookie(cookie), - Result(TError{std::move(message)}) - { - } - TEvGetWriteInfoResponse(ui32 cookie, THashMap seqNo, std::deque bodyKeys, TVector head) : Cookie(cookie), - Result(TSuccess{std::move(seqNo), std::move(bodyKeys), std::move(head)}) + SeqNo(std::move(seqNo)), + BodyKeys(std::move(bodyKeys)), + Head(std::move(head)) { } - bool IsSuccess() const { - return Result.index() == 1; - } + ui32 Cookie; // ShadowPartitionId + THashMap SeqNo; // SourceId -> (MinSeqNo, MaxSeqNo) + std::deque BodyKeys; + TVector Head; + }; - const TError& GetError() const { - return get<0>(Result); - } + struct TEvGetWriteInfoError : public TEventLocal { + ui32 Cookie; // ShadowPartitionId + TString Message; - const TSuccess& GetSuccess() const { - return get<1>(Result); + TEvGetWriteInfoError(ui32 cookie, TString message) : + Cookie(cookie), + Message(std::move(message)) + { } - - ui32 Cookie; // ShadowPartitionId - std::variant Result; }; }; From 8fc84c463d7ddf8f2ff8caa27ebcd912bf6453f6 Mon Sep 17 00:00:00 2001 From: Alexander Kotov Date: Fri, 26 Jan 2024 12:35:11 +0300 Subject: [PATCH 7/7] =?UTF-8?q?[*]=20=D0=BE=D0=BF=D1=82=D0=B8=D0=BC=D0=B8?= =?UTF-8?q?=D0=B7=D0=B0=D1=86=D0=B8=D1=8F=20=D0=BA=D0=BE=D0=BD=D1=81=D1=82?= =?UTF-8?q?=D1=80=D1=83=D0=BA=D1=82=D0=BE=D1=80=D0=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ydb/core/persqueue/events/internal.h | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ydb/core/persqueue/events/internal.h b/ydb/core/persqueue/events/internal.h index aa1ffdfc1473..7e584528fdd9 100644 --- a/ydb/core/persqueue/events/internal.h +++ b/ydb/core/persqueue/events/internal.h @@ -1031,9 +1031,9 @@ struct TEvPQ { struct TEvGetWriteInfoResponse : public TEventLocal { TEvGetWriteInfoResponse(ui32 cookie, - THashMap seqNo, - std::deque bodyKeys, - TVector head) : + THashMap&& seqNo, + std::deque&& bodyKeys, + TVector&& head) : Cookie(cookie), SeqNo(std::move(seqNo)), BodyKeys(std::move(bodyKeys)),