Skip to content

Commit

Permalink
Import changefeed's configuration from s3 (ydb-platform#13943)
Browse files Browse the repository at this point in the history
Co-authored-by: Ilnaz Nizametdinov <i.nizametdinov@gmail.com>
  • Loading branch information
stanislav-shchetinin and CyberROFL authored Feb 19, 2025
1 parent bc40145 commit 9dda4cb
Show file tree
Hide file tree
Showing 21 changed files with 806 additions and 17 deletions.
1 change: 1 addition & 0 deletions ydb/core/protos/feature_flags.proto
Original file line number Diff line number Diff line change
Expand Up @@ -193,4 +193,5 @@ message TFeatureFlags {
// deny non-administrators the privilege of administering local users and groups
optional bool EnableStrictUserManagement = 168 [default = false];
optional bool EnableDatabaseAdmin = 169 [default = false];
optional bool EnableChangefeedsImport = 170 [default = false];
}
9 changes: 9 additions & 0 deletions ydb/core/protos/flat_scheme_op.proto
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import "ydb/library/mkql_proto/protos/minikql.proto";
import "ydb/public/api/protos/ydb_coordination.proto";
import "ydb/public/api/protos/ydb_export.proto";
import "ydb/public/api/protos/ydb_table.proto";
import "ydb/public/api/protos/ydb_topic.proto";
import "ydb/public/api/protos/ydb_value.proto";

import "google/protobuf/empty.proto";
Expand Down Expand Up @@ -2124,3 +2125,11 @@ message TBackupBackupCollection {
optional NKikimrProto.TPathID PathId = 2;
optional string TargetDir = 3; // must be set on Rewrite
}

message TImportTableChangefeeds {
message TImportChangefeedTopic {
optional Ydb.Table.ChangefeedDescription Changefeed = 1;
optional Ydb.Topic.DescribeTopicResult Topic = 2;
}
repeated TImportChangefeedTopic Changefeeds = 1;
}
5 changes: 5 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard__init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4467,9 +4467,14 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
item.Metadata = NBackup::TMetadata::Deserialize(rowset.GetValue<Schema::ImportItems::Metadata>());
}

if (rowset.HaveValue<Schema::ImportItems::Changefeeds>()) {
item.Changefeeds = rowset.GetValue<Schema::ImportItems::Changefeeds>();
}

item.State = static_cast<TImportInfo::EState>(rowset.GetValue<Schema::ImportItems::State>());
item.WaitTxId = rowset.GetValueOrDefault<Schema::ImportItems::WaitTxId>(InvalidTxId);
item.NextIndexIdx = rowset.GetValueOrDefault<Schema::ImportItems::NextIndexIdx>(0);
item.NextChangefeedIdx = rowset.GetValueOrDefault<Schema::ImportItems::NextChangefeedIdx>(0);
item.Issue = rowset.GetValueOrDefault<Schema::ImportItems::Issue>(TString());

if (item.WaitTxId != InvalidTxId) {
Expand Down
8 changes: 8 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard_import.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ void TSchemeShard::FromXxportInfo(NKikimrImport::TImport& import, const TImportI
case TImportInfo::EState::BuildIndexes:
import.SetProgress(Ydb::Import::ImportProgress::PROGRESS_BUILD_INDEXES);
break;
case TImportInfo::EState::CreateChangefeed:
import.SetProgress(Ydb::Import::ImportProgress::PROGRESS_CREATE_CHANGEFEEDS);
break;
case TImportInfo::EState::Done:
import.SetProgress(Ydb::Import::ImportProgress::PROGRESS_DONE);
break;
Expand Down Expand Up @@ -163,6 +166,7 @@ void TSchemeShard::PersistImportItemState(NIceDb::TNiceDb& db, const TImportInfo
NIceDb::TUpdate<Schema::ImportItems::State>(static_cast<ui8>(item.State)),
NIceDb::TUpdate<Schema::ImportItems::WaitTxId>(item.WaitTxId),
NIceDb::TUpdate<Schema::ImportItems::NextIndexIdx>(item.NextIndexIdx),
NIceDb::TUpdate<Schema::ImportItems::NextChangefeedIdx>(item.NextChangefeedIdx),
NIceDb::TUpdate<Schema::ImportItems::Issue>(item.Issue)
);
}
Expand All @@ -189,6 +193,10 @@ void TSchemeShard::PersistImportItemScheme(NIceDb::TNiceDb& db, const TImportInf
db.Table<Schema::ImportItems>().Key(importInfo->Id, itemIdx).Update(
NIceDb::TUpdate<Schema::ImportItems::Metadata>(item.Metadata.Serialize())
);

db.Table<Schema::ImportItems>().Key(importInfo->Id, itemIdx).Update(
NIceDb::TUpdate<Schema::ImportItems::Changefeeds>(item.Changefeeds)
);
}

void TSchemeShard::PersistImportItemPreparedCreationQuery(NIceDb::TNiceDb& db, const TImportInfo::TPtr importInfo, ui32 itemIdx) {
Expand Down
62 changes: 62 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard_import__create.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,21 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
return true;
}

void CreateChangefeed(TImportInfo::TPtr importInfo, ui32 itemIdx, TTxId txId) {
Y_ABORT_UNLESS(itemIdx < importInfo->Items.size());
auto& item = importInfo->Items.at(itemIdx);
item.SubState = ESubState::Proposed;

LOG_I("TImport::TTxProgress: CreateChangefeed propose"
<< ": info# " << importInfo->ToString()
<< ", item# " << item.ToString(itemIdx)
<< ", txId# " << txId);

Y_ABORT_UNLESS(item.WaitTxId == InvalidTxId);

Send(Self->SelfId(), CreateChangefeedPropose(Self, txId, item));
}

void AllocateTxId(TImportInfo::TPtr importInfo, ui32 itemIdx) {
Y_ABORT_UNLESS(itemIdx < importInfo->Items.size());
auto& item = importInfo->Items.at(itemIdx);
Expand Down Expand Up @@ -588,6 +603,25 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
return TTxId(ui64((*infoPtr)->Id));
}

TTxId GetActiveCreateChangefeedTxId(TImportInfo::TPtr importInfo, ui32 itemIdx) {
Y_ABORT_UNLESS(itemIdx < importInfo->Items.size());
const auto& item = importInfo->Items.at(itemIdx);

Y_ABORT_UNLESS(item.State == EState::CreateChangefeed);
Y_ABORT_UNLESS(item.DstPathId);

if (!Self->PathsById.contains(item.DstPathId)) {
return InvalidTxId;
}

auto path = Self->PathsById.at(item.DstPathId);
if (path->PathState != NKikimrSchemeOp::EPathStateAlter) {
return InvalidTxId;
}

return path->LastTxId;
}

static TString MakeIndexBuildUid(TImportInfo::TPtr importInfo, ui32 itemIdx) {
Y_ABORT_UNLESS(itemIdx < importInfo->Items.size());
const auto& item = importInfo->Items.at(itemIdx);
Expand Down Expand Up @@ -756,6 +790,7 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
case EState::CreateSchemeObject:
case EState::Transferring:
case EState::BuildIndexes:
case EState::CreateChangefeed:
if (item.WaitTxId == InvalidTxId) {
if (!IsCreatedByQuery(item) || item.PreparedCreationQuery) {
AllocateTxId(importInfo, itemIdx);
Expand All @@ -781,6 +816,10 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
TTxId txId = InvalidTxId;

switch (item.State) {
case EState::CreateChangefeed:
txId = GetActiveCreateChangefeedTxId(importInfo, itemIdx);
break;

case EState::Transferring:
if (!CancelTransferring(importInfo, itemIdx)) {
txId = GetActiveRestoreTxId(importInfo, itemIdx);
Expand Down Expand Up @@ -1004,6 +1043,11 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
BuildIndex(importInfo, i, txId);
itemIdx = i;
break;

case EState::CreateChangefeed:
CreateChangefeed(importInfo, i, txId);
itemIdx = i;
break;

default:
break;
Expand Down Expand Up @@ -1064,6 +1108,8 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
txId = TTxId(record.GetPathCreateTxId());
} else if (item.State == EState::Transferring) {
txId = GetActiveRestoreTxId(importInfo, itemIdx);
} else if (item.State == EState::CreateChangefeed) {
txId = GetActiveCreateChangefeedTxId(importInfo, itemIdx);
}
}

Expand Down Expand Up @@ -1216,6 +1262,10 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
if (item.NextIndexIdx < item.Scheme.indexes_size()) {
item.State = EState::BuildIndexes;
AllocateTxId(importInfo, itemIdx);
} else if (item.NextChangefeedIdx < item.Changefeeds.changefeeds_size() &&
AppData()->FeatureFlags.GetEnableChangefeedsImport()) {
item.State = EState::CreateChangefeed;
AllocateTxId(importInfo, itemIdx);
} else {
item.State = EState::Done;
}
Expand All @@ -1229,11 +1279,23 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
} else {
if (++item.NextIndexIdx < item.Scheme.indexes_size()) {
AllocateTxId(importInfo, itemIdx);
} else if (item.NextChangefeedIdx < item.Changefeeds.changefeeds_size() &&
AppData()->FeatureFlags.GetEnableChangefeedsImport()) {
item.State = EState::CreateChangefeed;
AllocateTxId(importInfo, itemIdx);
} else {
item.State = EState::Done;
}
}
break;

case EState::CreateChangefeed:
if (++item.NextChangefeedIdx < item.Changefeeds.GetChangefeeds().size()) {
AllocateTxId(importInfo, itemIdx);
} else {
item.State = EState::Done;
}
break;

default:
return SendNotificationsIfFinished(importInfo);
Expand Down
60 changes: 60 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -231,5 +231,65 @@ THolder<TEvIndexBuilder::TEvCancelRequest> CancelIndexBuildPropose(
return MakeHolder<TEvIndexBuilder::TEvCancelRequest>(ui64(indexBuildId), domainPath.PathString(), ui64(indexBuildId));
}

THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateChangefeedPropose(
TSchemeShard* ss,
TTxId txId,
const TImportInfo::TItem& item
) {
Y_ABORT_UNLESS(item.NextChangefeedIdx < item.Changefeeds.GetChangefeeds().size());

const auto& importChangefeedTopic = item.Changefeeds.GetChangefeeds()[item.NextChangefeedIdx];
const auto& changefeed = importChangefeedTopic.GetChangefeed();
const auto& topic = importChangefeedTopic.GetTopic();

auto propose = MakeHolder<TEvSchemeShard::TEvModifySchemeTransaction>(ui64(txId), ss->TabletID());
auto& record = propose->Record;
auto& modifyScheme = *record.AddTransaction();
modifyScheme.SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpCreateCdcStream);
auto& cdcStream = *modifyScheme.MutableCreateCdcStream();

const TPath dstPath = TPath::Init(item.DstPathId, ss);
modifyScheme.SetWorkingDir(dstPath.Parent().PathString());
cdcStream.SetTableName(dstPath.LeafName());

TString error;
Ydb::StatusIds::StatusCode status;

auto& cdcStreamDescription = *cdcStream.MutableStreamDescription();
if (!FillChangefeedDescription(cdcStreamDescription, changefeed, status, error)) {
return nullptr;
}

if (topic.has_retention_period()) {
cdcStream.SetRetentionPeriodSeconds(topic.retention_period().seconds());
}

if (topic.has_partitioning_settings()) {
i64 minActivePartitions =
topic.partitioning_settings().min_active_partitions();
if (minActivePartitions < 0) {
return nullptr;
} else if (minActivePartitions == 0) {
minActivePartitions = 1;
}
cdcStream.SetTopicPartitions(minActivePartitions);

if (topic.partitioning_settings().has_auto_partitioning_settings()) {
auto& partitioningSettings = topic.partitioning_settings().auto_partitioning_settings();
cdcStream.SetTopicAutoPartitioning(partitioningSettings.strategy() != ::Ydb::Topic::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_DISABLED);

i64 maxActivePartitions =
topic.partitioning_settings().max_active_partitions();
if (maxActivePartitions < 0) {
return nullptr;
} else if (maxActivePartitions == 0) {
maxActivePartitions = 50;
}
cdcStream.SetMaxPartitionCount(maxActivePartitions);
}
}
return propose;
}

} // NSchemeShard
} // NKikimr
6 changes: 6 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,11 @@ THolder<TEvIndexBuilder::TEvCancelRequest> CancelIndexBuildPropose(
TTxId indexBuildId
);

THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateChangefeedPropose(
TSchemeShard* ss,
TTxId txId,
const TImportInfo::TItem& item
);

} // NSchemeShard
} // NKikimr
Loading

0 comments on commit 9dda4cb

Please sign in to comment.