Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ydb/core/protos/feature_flags.proto
Original file line number Diff line number Diff line change
Expand Up @@ -196,4 +196,5 @@ message TFeatureFlags {
optional bool EnableChangefeedsImport = 170 [default = false];
optional bool EnablePermissionsExport = 171 [default = false];
optional bool EnableDataErasure = 172 [default = false];
optional bool EnableChangefeedsExport = 174 [default = false];
}
46 changes: 24 additions & 22 deletions ydb/core/tx/datashard/export_s3_uploader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -918,29 +918,31 @@ IActor* TS3Export::CreateUploader(const TActorId& dataShard, ui64 txId) const {
? GenYdbScheme(Columns, Task.GetTable())
: Nothing();

const auto& persQueues = Task.GetChangefeedUnderlyingTopics();
const auto& cdcStreams = Task.GetTable().GetTable().GetCdcStreams();
Y_ASSERT(persQueues.size() == cdcStreams.size());

const int changefeedsCount = cdcStreams.size();
TVector <TChangefeedExportDescriptions> changefeeds;
changefeeds.reserve(changefeedsCount);

for (int i = 0; i < changefeedsCount; ++i) {
Ydb::Table::ChangefeedDescription changefeed;
const auto& cdcStream = cdcStreams.at(i);
FillChangefeedDescription(changefeed, cdcStream);

Ydb::Topic::DescribeTopicResult topic;
const auto& pq = persQueues.at(i);
Ydb::StatusIds::StatusCode status;
TString error;
FillTopicDescription(topic, pq.GetPersQueueGroup(), pq.GetSelf(), cdcStream.GetName(), status, error);
// Unnecessary fields
topic.clear_self();
topic.clear_topic_stats();

changefeeds.emplace_back(changefeed, topic);
if (AppData()->FeatureFlags.GetEnableChangefeedsExport()) {
const auto& persQueues = Task.GetChangefeedUnderlyingTopics();
const auto& cdcStreams = Task.GetTable().GetTable().GetCdcStreams();
Y_ASSERT(persQueues.size() == cdcStreams.size());

const int changefeedsCount = cdcStreams.size();
changefeeds.reserve(changefeedsCount);

for (int i = 0; i < changefeedsCount; ++i) {
Ydb::Table::ChangefeedDescription changefeed;
const auto& cdcStream = cdcStreams.at(i);
FillChangefeedDescription(changefeed, cdcStream);

Ydb::Topic::DescribeTopicResult topic;
const auto& pq = persQueues.at(i);
Ydb::StatusIds::StatusCode status;
TString error;
FillTopicDescription(topic, pq.GetPersQueueGroup(), pq.GetSelf(), cdcStream.GetName(), status, error);
// Unnecessary fields
topic.clear_self();
topic.clear_topic_stats();

changefeeds.emplace_back(changefeed, topic);
}
}

auto permissions = (Task.GetEnablePermissions() && Task.GetShardNum() == 0)
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/tx/schemeshard/ut_export/ut_export.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2632,6 +2632,8 @@ attributes {
)", port);

TTestEnv env(runtime, TTestEnvOptions().EnableChecksumsExport(true));
runtime.GetAppData().FeatureFlags.SetEnableChangefeedsExport(true);

Run(runtime, env, TVector<TString>{
R"(
Name: "Table"
Expand Down
1 change: 1 addition & 0 deletions ydb/services/ydb/backup_ut/ydb_backup_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2261,6 +2261,7 @@ Y_UNIT_TEST_SUITE(BackupRestoreS3) {
NTopic::TTopicClient topicClient(testEnv.GetDriver());

constexpr const char* table = "/Root/table";
testEnv.GetServer().GetRuntime()->GetAppData().FeatureFlags.SetEnableChangefeedsExport(true);
testEnv.GetServer().GetRuntime()->GetAppData().FeatureFlags.SetEnableChangefeedsImport(true);

TestChangefeedAndTopicDescriptionsIsPreserved(
Expand Down
Loading