Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 5 additions & 101 deletions ydb/core/tx/datashard/change_sender_cdc_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,8 @@

#include <ydb/core/change_exchange/change_sender.h>
#include <ydb/core/change_exchange/change_sender_monitoring.h>
#include <ydb/core/persqueue/partition_key_range/partition_key_range.h>
#include <ydb/core/persqueue/writer/source_id_encoding.h>
#include <ydb/core/persqueue/writer/writer.h>
#include <ydb/core/scheme/protos/type_info.pb.h>
#include <ydb/core/tx/scheme_cache/helpers.h>
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
#include <ydb/library/actors/core/actor_bootstrapped.h>
Expand Down Expand Up @@ -351,45 +349,6 @@ class TCdcChangeSenderMain
, public NChangeExchange::IChangeSenderFactory
, private NSchemeCache::TSchemeCacheHelpers
{
struct TPQPartitionInfo {
ui32 PartitionId;
ui64 ShardId;
TPartitionKeyRange KeyRange;

struct TLess {
TConstArrayRef<NScheme::TTypeInfo> Schema;

TLess(const TVector<NScheme::TTypeInfo>& schema)
: Schema(schema)
{
}

bool operator()(const TPQPartitionInfo& lhs, const TPQPartitionInfo& rhs) const {
Y_ABORT_UNLESS(lhs.KeyRange.ToBound || rhs.KeyRange.ToBound);

if (!lhs.KeyRange.ToBound) {
return false;
}

if (!rhs.KeyRange.ToBound) {
return true;
}

Y_ABORT_UNLESS(lhs.KeyRange.ToBound && rhs.KeyRange.ToBound);

const int compares = CompareTypedCellVectors(
lhs.KeyRange.ToBound->GetCells().data(),
rhs.KeyRange.ToBound->GetCells().data(),
Schema.data(), Schema.size()
);

return (compares < 0);
}

}; // TLess

}; // TPQPartitionInfo

TStringBuf GetLogPrefix() const {
if (!LogPrefix) {
LogPrefix = TStringBuilder()
Expand Down Expand Up @@ -615,74 +574,19 @@ class TCdcChangeSenderMain
const auto& pqDesc = entry.PQGroupInfo->Description;
const auto& pqConfig = pqDesc.GetPQTabletConfig();

TVector<NScheme::TTypeInfo> schema;
PartitionToShard.clear();

schema.reserve(pqConfig.PartitionKeySchemaSize());
for (const auto& keySchema : pqConfig.GetPartitionKeySchema()) {
if (keySchema.GetTypeId() == NScheme::NTypeIds::Pg) {
schema.push_back(NScheme::TTypeInfo(
NPg::TypeDescFromPgTypeId(keySchema.GetTypeInfo().GetPgTypeId())));
} else {
schema.push_back(NScheme::TTypeInfo(keySchema.GetTypeId()));
}
}

TSet<TPQPartitionInfo, TPQPartitionInfo::TLess> partitions(schema);

for (const auto& partition : pqDesc.GetPartitions()) {
const auto partitionId = partition.GetPartitionId();
const auto shardId = partition.GetTabletId();

PartitionToShard.emplace(partitionId, shardId);

auto keyRange = TPartitionKeyRange::Parse(partition.GetKeyRange());
Y_ABORT_UNLESS(!keyRange.FromBound || keyRange.FromBound->GetCells().size() == schema.size());
Y_ABORT_UNLESS(!keyRange.ToBound || keyRange.ToBound->GetCells().size() == schema.size());

partitions.insert({partitionId, shardId, std::move(keyRange)});
}

// used to validate
bool isFirst = true;
const TPQPartitionInfo* prev = nullptr;

TVector<NKikimr::TKeyDesc::TPartitionInfo> partitioning;
partitioning.reserve(partitions.size());
for (const auto& cur : partitions) {
if (isFirst) {
isFirst = false;
Y_ABORT_UNLESS(!cur.KeyRange.FromBound.Defined());
} else {
Y_ABORT_UNLESS(cur.KeyRange.FromBound.Defined());
Y_ABORT_UNLESS(prev);
Y_ABORT_UNLESS(prev->KeyRange.ToBound.Defined());
// TODO: compare cells
}

auto& part = partitioning.emplace_back(cur.PartitionId); // TODO: double-check that it is right partitioning

if (cur.KeyRange.ToBound) {
part.Range = NKikimr::TKeyDesc::TPartitionRangeInfo{
.EndKeyPrefix = *cur.KeyRange.ToBound,
};
} else {
part.Range = NKikimr::TKeyDesc::TPartitionRangeInfo{};
}

prev = &cur;
}

if (prev) {
Y_ABORT_UNLESS(!prev->KeyRange.ToBound.Defined());
PartitionToShard.emplace(partition.GetPartitionId(), partition.GetTabletId());
}

const auto topicVersion = entry.Self->Info.GetVersion().GetGeneralVersion();
const bool versionChanged = !TopicVersion || TopicVersion != topicVersion;
TopicVersion = topicVersion;

KeyDesc = NKikimr::TKeyDesc::CreateMiniKeyDesc(schema);
KeyDesc->Partitioning = std::make_shared<TVector<NKikimr::TKeyDesc::TPartitionInfo>>(std::move(partitioning));
Y_ABORT_UNLESS(entry.PQGroupInfo->Schema);
KeyDesc = NKikimr::TKeyDesc::CreateMiniKeyDesc(entry.PQGroupInfo->Schema);
Y_ABORT_UNLESS(entry.PQGroupInfo->Partitioning);
KeyDesc->Partitioning = std::make_shared<TVector<NKikimr::TKeyDesc::TPartitionInfo>>(entry.PQGroupInfo->Partitioning);

if (::NKikimrPQ::TPQTabletConfig::TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_DISABLED != pqConfig.GetPartitionStrategy().GetPartitionStrategyType()) {
SetPartitionResolver(new TBoundaryPartitionResolver(pqDesc));
Expand Down
1 change: 0 additions & 1 deletion ydb/core/tx/datashard/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,6 @@ PEERDIR(
ydb/core/formats
ydb/core/io_formats/ydb_dump
ydb/core/kqp/runtime
ydb/core/persqueue/partition_key_range
ydb/core/persqueue/writer
ydb/core/protos
ydb/core/tablet
Expand Down
64 changes: 63 additions & 1 deletion ydb/core/tx/scheme_board/cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
#include <ydb/core/base/path.h>
#include <ydb/core/base/tabletid.h>
#include <ydb/core/base/feature_flags.h>
#include <ydb/core/persqueue/partition_key_range/partition_key_range.h>
#include <ydb/core/persqueue/utils.h>
#include <ydb/core/protos/flat_tx_scheme.pb.h>
#include <ydb/library/services/services.pb.h>
#include <ydb/core/scheme/scheme_tabledefs.h>
#include <ydb/core/scheme/scheme_types_proto.h>
#include <ydb/core/sys_view/common/schema.h>
Expand All @@ -26,6 +26,8 @@
#include <ydb/library/actors/core/actor_bootstrapped.h>
#include <ydb/library/actors/core/hfunc.h>
#include <ydb/library/actors/core/log.h>
#include <ydb/library/services/services.pb.h>

#include <library/cpp/json/writer/json.h>

#include <util/generic/algorithm.h>
Expand Down Expand Up @@ -979,6 +981,65 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> {
return partitions;
}

static void FillTopicPartitioning(
const NKikimrSchemeOp::TPersQueueGroupDescription& pqDesc,
TVector<NScheme::TTypeInfo>& schema,
TVector<NKikimr::TKeyDesc::TPartitionInfo>& partitioning)
{
const auto& pqConfig = pqDesc.GetPQTabletConfig();
if (pqConfig.GetPartitionKeySchema().empty()) {
return;
}

schema.reserve(pqConfig.PartitionKeySchemaSize());
for (const auto& keySchema : pqConfig.GetPartitionKeySchema()) {
if (keySchema.GetTypeId() == NScheme::NTypeIds::Pg) {
schema.push_back(NScheme::TTypeInfo(NPg::TypeDescFromPgTypeId(keySchema.GetTypeInfo().GetPgTypeId())));
} else {
schema.push_back(NScheme::TTypeInfo(keySchema.GetTypeId()));
}
}

partitioning.reserve(pqDesc.PartitionsSize());
for (const auto& partition : pqDesc.GetPartitions()) {
auto keyRange = NPQ::TPartitionKeyRange::Parse(partition.GetKeyRange());
Y_ABORT_UNLESS(!keyRange.FromBound || keyRange.FromBound->GetCells().size() == schema.size());
Y_ABORT_UNLESS(!keyRange.ToBound || keyRange.ToBound->GetCells().size() == schema.size());

auto& info = partitioning.emplace_back(partition.GetPartitionId());
if (keyRange.ToBound) {
info.Range = NKikimr::TKeyDesc::TPartitionRangeInfo{
.EndKeyPrefix = *keyRange.ToBound,
};
} else {
info.Range = NKikimr::TKeyDesc::TPartitionRangeInfo{};
}
}

Sort(partitioning.begin(), partitioning.end(), [&schema](const auto& lhs, const auto& rhs) {
Y_ABORT_UNLESS(lhs.Range && rhs.Range);
Y_ABORT_UNLESS(lhs.Range->EndKeyPrefix || rhs.Range->EndKeyPrefix);

if (!lhs.Range->EndKeyPrefix) {
return false;
}

if (!rhs.Range->EndKeyPrefix) {
return true;
}

Y_ABORT_UNLESS(lhs.Range->EndKeyPrefix && rhs.Range->EndKeyPrefix);

const int compares = CompareTypedCellVectors(
lhs.Range->EndKeyPrefix.GetCells().data(),
rhs.Range->EndKeyPrefix.GetCells().data(),
schema.data(), schema.size()
);

return (compares < 0);
});
}

bool IsSysTable() const {
return Kind == TNavigate::KindTable && PathId.OwnerId == TSysTables::SysSchemeShard;
}
Expand Down Expand Up @@ -1487,6 +1548,7 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> {
if (Created) {
NPQ::Migrate(*pathDesc.MutablePersQueueGroup()->MutablePQTabletConfig());
FillInfo(Kind, PQGroupInfo, std::move(*pathDesc.MutablePersQueueGroup()));
FillTopicPartitioning(PQGroupInfo->Description, PQGroupInfo->Schema, PQGroupInfo->Partitioning);
}
break;
case NKikimrSchemeOp::EPathTypeCdcStream:
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/scheme_board/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ PEERDIR(
ydb/library/actors/core
ydb/core/base
ydb/core/mon
ydb/core/persqueue/partition_key_range
ydb/core/protos
ydb/core/sys_view/common
ydb/core/tx/scheme_cache
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/tx/scheme_cache/scheme_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,8 @@ struct TSchemeCacheNavigate {
struct TPQGroupInfo : public TAtomicRefCount<TPQGroupInfo> {
EKind Kind = KindUnknown;
NKikimrSchemeOp::TPersQueueGroupDescription Description;
TVector<NScheme::TTypeInfo> Schema;
TVector<NKikimr::TKeyDesc::TPartitionInfo> Partitioning;
};

struct TRtmrVolumeInfo : public TAtomicRefCount<TRtmrVolumeInfo> {
Expand Down