Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions ydb/core/persqueue/events/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#include <ydb/core/base/row_version.h>
#include <ydb/core/protos/pqconfig.pb.h>
#include <ydb/core/persqueue/blob.h>
#include <ydb/core/persqueue/key.h>
#include <ydb/core/persqueue/metering_sink.h>
#include <ydb/core/tablet/tablet_counters.h>
Expand Down Expand Up @@ -65,6 +66,18 @@ namespace NPQ {
{}
};

struct TDataKey {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: Change caching proxy logic to process move commands too.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added it to the task of transferring data

TKey Key;
ui32 Size;
TInstant Timestamp;
ui64 CumulativeSize;
};

struct TSeqNoRange {
ui64 Min;
ui64 Max;
};

struct TErrorInfo {
NPersQueue::NErrorCode::EErrorCode ErrorCode;
TString ErrorStr;
Expand Down Expand Up @@ -171,6 +184,9 @@ struct TEvPQ {
EvProvideDirectReadInfo,
EvCheckPartitionStatusRequest,
EvCheckPartitionStatusResponse,
EvGetWriteInfoRequest,
EvGetWriteInfoResponse,
EvGetWriteInfoError,
EvEnd
};

Expand Down Expand Up @@ -1004,7 +1020,43 @@ struct TEvPQ {
struct TEvCheckPartitionStatusResponse : public TEventPB<TEvCheckPartitionStatusResponse, NKikimrPQ::TEvCheckPartitionStatusResponse, EvCheckPartitionStatusResponse> {
};

struct TEvGetWriteInfoRequest : public TEventLocal<TEvGetWriteInfoRequest, EvGetWriteInfoRequest> {
explicit TEvGetWriteInfoRequest(ui32 cookie) :
Cookie(cookie)
{
}

ui32 Cookie; // ShadowPartitionId
};

struct TEvGetWriteInfoResponse : public TEventLocal<TEvGetWriteInfoResponse, EvGetWriteInfoResponse> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

split in 2 messages - writeinforesponse and writeinfoerror

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done
7f74ed7

TEvGetWriteInfoResponse(ui32 cookie,
THashMap<TString, NPQ::TSeqNoRange>&& seqNo,
std::deque<NPQ::TDataKey>&& bodyKeys,
TVector<NPQ::TClientBlob>&& head) :
Cookie(cookie),
SeqNo(std::move(seqNo)),
BodyKeys(std::move(bodyKeys)),
Head(std::move(head))
{
}

ui32 Cookie; // ShadowPartitionId
THashMap<TString, NPQ::TSeqNoRange> SeqNo; // SourceId -> (MinSeqNo, MaxSeqNo)
std::deque<NPQ::TDataKey> BodyKeys;
TVector<NPQ::TClientBlob> Head;
};

struct TEvGetWriteInfoError : public TEventLocal<TEvGetWriteInfoError, EvGetWriteInfoError> {
ui32 Cookie; // ShadowPartitionId
TString Message;

TEvGetWriteInfoError(ui32 cookie, TString message) :
Cookie(cookie),
Message(std::move(message))
{
}
};
};

} //NKikimr
7 changes: 0 additions & 7 deletions ydb/core/persqueue/partition_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,6 @@ struct TMessage {
#undef DEFINE_CHECKER_GETTER
};

struct TDataKey {
TKey Key;
ui32 Size;
TInstant Timestamp;
ui64 CumulativeSize;
};


} // namespace NKikimr::NPQ