|
6 | 6 |
|
7 | 7 | #include <ydb/core/change_exchange/change_sender_common_ops.h> |
8 | 8 | #include <ydb/core/change_exchange/change_sender_monitoring.h> |
9 | | -#include <ydb/core/persqueue/partition_key_range/partition_key_range.h> |
| 9 | +#include <ydb/core/change_exchange/util.h> |
10 | 10 | #include <ydb/core/persqueue/writer/source_id_encoding.h> |
11 | 11 | #include <ydb/core/persqueue/writer/writer.h> |
12 | 12 | #include <ydb/core/tx/scheme_cache/helpers.h> |
@@ -300,45 +300,6 @@ class TCdcChangeSenderMain |
300 | 300 | , public NChangeExchange::ISenderFactory |
301 | 301 | , private NSchemeCache::TSchemeCacheHelpers |
302 | 302 | { |
303 | | - struct TPQPartitionInfo { |
304 | | - ui32 PartitionId; |
305 | | - ui64 ShardId; |
306 | | - TPartitionKeyRange KeyRange; |
307 | | - |
308 | | - struct TLess { |
309 | | - TConstArrayRef<NScheme::TTypeInfo> Schema; |
310 | | - |
311 | | - TLess(const TVector<NScheme::TTypeInfo>& schema) |
312 | | - : Schema(schema) |
313 | | - { |
314 | | - } |
315 | | - |
316 | | - bool operator()(const TPQPartitionInfo& lhs, const TPQPartitionInfo& rhs) const { |
317 | | - Y_ABORT_UNLESS(lhs.KeyRange.ToBound || rhs.KeyRange.ToBound); |
318 | | - |
319 | | - if (!lhs.KeyRange.ToBound) { |
320 | | - return false; |
321 | | - } |
322 | | - |
323 | | - if (!rhs.KeyRange.ToBound) { |
324 | | - return true; |
325 | | - } |
326 | | - |
327 | | - Y_ABORT_UNLESS(lhs.KeyRange.ToBound && rhs.KeyRange.ToBound); |
328 | | - |
329 | | - const int compares = CompareTypedCellVectors( |
330 | | - lhs.KeyRange.ToBound->GetCells().data(), |
331 | | - rhs.KeyRange.ToBound->GetCells().data(), |
332 | | - Schema.data(), Schema.size() |
333 | | - ); |
334 | | - |
335 | | - return (compares < 0); |
336 | | - } |
337 | | - |
338 | | - }; // TLess |
339 | | - |
340 | | - }; // TPQPartitionInfo |
341 | | - |
342 | 303 | TStringBuf GetLogPrefix() const { |
343 | 304 | if (!LogPrefix) { |
344 | 305 | LogPrefix = TStringBuilder() |
@@ -430,16 +391,6 @@ class TCdcChangeSenderMain |
430 | 391 | return false; |
431 | 392 | } |
432 | 393 |
|
433 | | - static TVector<ui64> MakePartitionIds(const TVector<NKikimr::TKeyDesc::TPartitionInfo>& partitions) { |
434 | | - TVector<ui64> result(Reserve(partitions.size())); |
435 | | - |
436 | | - for (const auto& partition : partitions) { |
437 | | - result.push_back(partition.ShardId); |
438 | | - } |
439 | | - |
440 | | - return result; |
441 | | - } |
442 | | - |
443 | 394 | /// ResolveCdcStream |
444 | 395 |
|
445 | 396 | void ResolveCdcStream() { |
@@ -561,77 +512,27 @@ class TCdcChangeSenderMain |
561 | 512 | return; |
562 | 513 | } |
563 | 514 |
|
564 | | - const auto& pqDesc = entry.PQGroupInfo->Description; |
565 | | - const auto& pqConfig = pqDesc.GetPQTabletConfig(); |
566 | | - |
567 | | - TVector<NScheme::TTypeInfo> schema; |
568 | | - PartitionToShard.clear(); |
569 | | - |
570 | | - schema.reserve(pqConfig.PartitionKeySchemaSize()); |
571 | | - for (const auto& keySchema : pqConfig.GetPartitionKeySchema()) { |
572 | | - // TODO: support pg types |
573 | | - schema.push_back(NScheme::TTypeInfo(keySchema.GetTypeId())); |
| 515 | + const auto topicVersion = entry.Self->Info.GetVersion().GetGeneralVersion(); |
| 516 | + if (TopicVersion && TopicVersion == topicVersion) { |
| 517 | + CreateSenders(); |
| 518 | + return Become(&TThis::StateMain); |
574 | 519 | } |
575 | 520 |
|
576 | | - TSet<TPQPartitionInfo, TPQPartitionInfo::TLess> partitions(schema); |
577 | | - THashSet<ui64> shards; |
578 | | - |
579 | | - for (const auto& partition : pqDesc.GetPartitions()) { |
580 | | - const auto partitionId = partition.GetPartitionId(); |
581 | | - const auto shardId = partition.GetTabletId(); |
582 | | - |
583 | | - PartitionToShard.emplace(partitionId, shardId); |
584 | | - |
585 | | - auto keyRange = TPartitionKeyRange::Parse(partition.GetKeyRange()); |
586 | | - Y_ABORT_UNLESS(!keyRange.FromBound || keyRange.FromBound->GetCells().size() == schema.size()); |
587 | | - Y_ABORT_UNLESS(!keyRange.ToBound || keyRange.ToBound->GetCells().size() == schema.size()); |
588 | | - |
589 | | - partitions.insert({partitionId, shardId, std::move(keyRange)}); |
590 | | - shards.insert(shardId); |
591 | | - } |
592 | | - |
593 | | - // used to validate |
594 | | - bool isFirst = true; |
595 | | - const TPQPartitionInfo* prev = nullptr; |
596 | | - |
597 | | - TVector<NKikimr::TKeyDesc::TPartitionInfo> partitioning; |
598 | | - partitioning.reserve(partitions.size()); |
599 | | - for (const auto& cur : partitions) { |
600 | | - if (isFirst) { |
601 | | - isFirst = false; |
602 | | - Y_ABORT_UNLESS(!cur.KeyRange.FromBound.Defined()); |
603 | | - } else { |
604 | | - Y_ABORT_UNLESS(cur.KeyRange.FromBound.Defined()); |
605 | | - Y_ABORT_UNLESS(prev); |
606 | | - Y_ABORT_UNLESS(prev->KeyRange.ToBound.Defined()); |
607 | | - // TODO: compare cells |
608 | | - } |
609 | | - |
610 | | - auto& part = partitioning.emplace_back(cur.PartitionId); // TODO: double-check that it is right partitioning |
611 | | - |
612 | | - if (cur.KeyRange.ToBound) { |
613 | | - part.Range = NKikimr::TKeyDesc::TPartitionRangeInfo{ |
614 | | - .EndKeyPrefix = *cur.KeyRange.ToBound, |
615 | | - }; |
616 | | - } else { |
617 | | - part.Range = NKikimr::TKeyDesc::TPartitionRangeInfo{}; |
618 | | - } |
| 521 | + TopicVersion = topicVersion; |
619 | 522 |
|
620 | | - prev = &cur; |
621 | | - } |
| 523 | + const auto& pqDesc = entry.PQGroupInfo->Description; |
622 | 524 |
|
623 | | - if (prev) { |
624 | | - Y_ABORT_UNLESS(!prev->KeyRange.ToBound.Defined()); |
| 525 | + PartitionToShard.clear(); |
| 526 | + for (const auto& partition : pqDesc.GetPartitions()) { |
| 527 | + PartitionToShard.emplace(partition.GetPartitionId(), partition.GetTabletId()); |
625 | 528 | } |
626 | 529 |
|
627 | | - const auto topicVersion = entry.Self->Info.GetVersion().GetGeneralVersion(); |
628 | | - const bool versionChanged = !TopicVersion || TopicVersion != topicVersion; |
629 | | - TopicVersion = topicVersion; |
630 | | - |
631 | | - KeyDesc = NKikimr::TKeyDesc::CreateMiniKeyDesc(schema); |
632 | | - KeyDesc->Partitioning = std::make_shared<TVector<NKikimr::TKeyDesc::TPartitionInfo>>(std::move(partitioning)); |
| 530 | + Y_ABORT_UNLESS(entry.PQGroupInfo->Schema); |
| 531 | + KeyDesc = NKikimr::TKeyDesc::CreateMiniKeyDesc(entry.PQGroupInfo->Schema); |
| 532 | + Y_ABORT_UNLESS(entry.PQGroupInfo->Partitioning); |
| 533 | + KeyDesc->Partitioning = std::make_shared<TVector<NKikimr::TKeyDesc::TPartitionInfo>>(entry.PQGroupInfo->Partitioning); |
633 | 534 |
|
634 | | - CreateSenders(MakePartitionIds(*KeyDesc->Partitioning), versionChanged); |
| 535 | + CreateSenders(NChangeExchange::MakePartitionIds(*KeyDesc->Partitioning)); |
635 | 536 | Become(&TThis::StateMain); |
636 | 537 | } |
637 | 538 |
|
|
0 commit comments