Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ydb/core/protos/pqconfig.proto
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,7 @@ message TPQTabletConfig {
}
optional TPartitionStrategy PartitionStrategy = 35;

// The field is filled in only for the PQ tablet. Contains information about linked partitions for constructing a partial PartitionGraph.
repeated TPartition AllPartitions = 36; // filled by schemeshard

optional TOffloadConfig OffloadConfig = 38;
Expand Down
29 changes: 23 additions & 6 deletions ydb/core/tx/schemeshard/schemeshard__operation_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -928,20 +928,37 @@ class TConfigureParts: public TSubOperationState {
config.SetVersion(pqGroup.AlterData->AlterVersion);
}

THashSet<ui32> linkedPartitions;

for(const auto& pq : pqShard.Partitions) {
config.AddPartitionIds(pq->PqId);

auto& partition = *config.AddPartitions();
FillPartition(partition, pq.Get(), 0);

linkedPartitions.insert(pq->PqId);
linkedPartitions.insert(pq->ParentPartitionIds.begin(), pq->ParentPartitionIds.end());
linkedPartitions.insert(pq->ChildPartitionIds.begin(), pq->ChildPartitionIds.end());
for (auto c : pq->ChildPartitionIds) {
auto it = pqGroup.Partitions.find(c);
if (it == pqGroup.Partitions.end()) {
continue;
}
linkedPartitions.insert(it->second->ParentPartitionIds.begin(), it->second->ParentPartitionIds.end());
}
}

for(const auto& p : pqGroup.Shards) {
const auto& pqShard = p.second;
const auto& tabletId = context.SS->ShardInfos[p.first].TabletID;
for (const auto& pq : pqShard->Partitions) {
auto& partition = *config.AddAllPartitions();
FillPartition(partition, pq.Get(), ui64(tabletId));
for(auto lp : linkedPartitions) {
auto it = pqGroup.Partitions.find(lp);
if (it == pqGroup.Partitions.end()) {
continue;
}

auto* partitionInfo = it->second;
const auto& tabletId = context.SS->ShardInfos[partitionInfo->ShardIdx].TabletID;

auto& partition = *config.AddAllPartitions();
FillPartition(partition, partitionInfo, ui64(tabletId));
}
}

Expand Down
4 changes: 4 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard_info_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -940,6 +940,8 @@ struct TTopicTabletInfo : TSimpleRefCount<TTopicTabletInfo> {
THashSet<ui32> ParentPartitionIds;
THashSet<ui32> ChildPartitionIds;

TShardIdx ShardIdx;

void SetStatus(const TActorContext& ctx, ui32 value) {
if (value >= NKikimrPQ::ETopicPartitionStatus::Active &&
value <= NKikimrPQ::ETopicPartitionStatus::Deleted) {
Expand Down Expand Up @@ -1133,6 +1135,8 @@ struct TTopicInfo : TSimpleRefCount<TTopicInfo> {
TTopicStats Stats;

void AddPartition(TShardIdx shardIdx, TTopicTabletInfo::TTopicPartitionInfo* partition) {
partition->ShardIdx = shardIdx;

TTopicTabletInfo::TPtr& pqShard = Shards[shardIdx];
if (!pqShard) {
pqShard.Reset(new TTopicTabletInfo());
Expand Down