diff --git a/ydb/core/persqueue/events/internal.h b/ydb/core/persqueue/events/internal.h index b588ba9c32b8..10ffc773c241 100644 --- a/ydb/core/persqueue/events/internal.h +++ b/ydb/core/persqueue/events/internal.h @@ -576,13 +576,15 @@ struct TEvPQ { }; struct TEvChangePartitionConfig : public TEventLocal { - TEvChangePartitionConfig(const NPersQueue::TTopicConverterPtr& topicConverter, const NKikimrPQ::TPQTabletConfig& config) + TEvChangePartitionConfig(const NPersQueue::TTopicConverterPtr& topicConverter, const NKikimrPQ::TPQTabletConfig& config, const NKikimrPQ::TBootstrapConfig& bootstrapConfig) : TopicConverter(topicConverter) , Config(config) + , BootstrapConfig(bootstrapConfig) {} NPersQueue::TTopicConverterPtr TopicConverter; NKikimrPQ::TPQTabletConfig Config; + NKikimrPQ::TBootstrapConfig BootstrapConfig; }; struct TEvPartitionConfigChanged : public TEventLocal { @@ -845,6 +847,7 @@ struct TEvPQ { ui64 TxId; NPersQueue::TTopicConverterPtr TopicConverter; NKikimrPQ::TPQTabletConfig Config; + NKikimrPQ::TBootstrapConfig BootstrapConfig; }; struct TEvProposePartitionConfigResult : public TEventLocal { diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index 5d8123b0f982..4eb2093bf279 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -2037,7 +2037,8 @@ bool TPartition::ExecUserActionOrTransaction(TSimpleSharedPtr& t, } else if (t->ProposeConfig) { Y_ABORT_UNLESS(ChangingConfig); ChangeConfig = MakeSimpleShared(TopicConverter, - t->ProposeConfig->Config); + t->ProposeConfig->Config, + t->ProposeConfig->BootstrapConfig); PendingPartitionConfig = GetPartitionConfig(ChangeConfig->Config); SendChangeConfigReply = false; } @@ -2123,7 +2124,8 @@ bool TPartition::BeginTransaction(const TEvPQ::TEvProposePartitionConfig& event) { ChangeConfig = MakeSimpleShared(TopicConverter, - event.Config); + event.Config, + event.BootstrapConfig); PendingPartitionConfig = GetPartitionConfig(ChangeConfig->Config); SendChangeConfigReply = false; @@ -2360,6 +2362,7 @@ void TPartition::OnProcessTxsAndUserActsWriteComplete(const TActorContext& ctx) if (ChangeConfig) { EndChangePartitionConfig(std::move(ChangeConfig->Config), + std::move(ChangeConfig->BootstrapConfig), ChangeConfig->TopicConverter, ctx); } @@ -2426,12 +2429,24 @@ void TPartition::OnProcessTxsAndUserActsWriteComplete(const TActorContext& ctx) } void TPartition::EndChangePartitionConfig(NKikimrPQ::TPQTabletConfig&& config, + NKikimrPQ::TBootstrapConfig&& bootstrapConfig, NPersQueue::TTopicConverterPtr topicConverter, const TActorContext& ctx) { Config = std::move(config); PartitionConfig = GetPartitionConfig(Config); PartitionGraph = MakePartitionGraph(Config); + + for (const auto& mg : bootstrapConfig.GetExplicitMessageGroups()) { + TMaybe keyRange; + if (mg.HasKeyRange()) { + keyRange = TPartitionKeyRange::Parse(mg.GetKeyRange()); + } + + TSourceIdInfo sourceId(0, 0, ctx.Now(), std::move(keyRange), false); + SourceIdStorage.RegisterSourceIdInfo(mg.GetId(), std::move(sourceId), true); + } + TopicConverter = topicConverter; NewPartition = false; @@ -2441,14 +2456,15 @@ void TPartition::EndChangePartitionConfig(NKikimrPQ::TPQTabletConfig&& config, InitSplitMergeSlidingWindow(); } - Send(ReadQuotaTrackerActor, new TEvPQ::TEvChangePartitionConfig(TopicConverter, Config)); - Send(WriteQuotaTrackerActor, new TEvPQ::TEvChangePartitionConfig(TopicConverter, Config)); + Send(ReadQuotaTrackerActor, new TEvPQ::TEvChangePartitionConfig(TopicConverter, Config, bootstrapConfig)); + Send(WriteQuotaTrackerActor, new TEvPQ::TEvChangePartitionConfig(TopicConverter, Config, bootstrapConfig)); TotalPartitionWriteSpeed = config.GetPartitionConfig().GetWriteSpeedInBytesPerSecond(); if (Config.GetPartitionConfig().HasMirrorFrom()) { if (Mirrorer) { ctx.Send(Mirrorer->Actor, new TEvPQ::TEvChangePartitionConfig(TopicConverter, - Config)); + Config, + bootstrapConfig)); } else { CreateMirrorerActor(); } diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h index cfa7563cb22b..17062cd1bd37 100644 --- a/ydb/core/persqueue/partition.h +++ b/ydb/core/persqueue/partition.h @@ -390,6 +390,7 @@ class TPartition : public TActorBootstrapped { void OnProcessTxsAndUserActsWriteComplete(const TActorContext& ctx); void EndChangePartitionConfig(NKikimrPQ::TPQTabletConfig&& config, + NKikimrPQ::TBootstrapConfig&& bootstrapConfig, NPersQueue::TTopicConverterPtr topicConverter, const TActorContext& ctx); TString GetKeyConfig() const; diff --git a/ydb/core/persqueue/partition_init.cpp b/ydb/core/persqueue/partition_init.cpp index f44ab0f3d6e5..5aecbada325b 100644 --- a/ydb/core/persqueue/partition_init.cpp +++ b/ydb/core/persqueue/partition_init.cpp @@ -169,7 +169,8 @@ void TInitConfigStep::Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorCon if (Partition()->Config.GetVersion() < Partition()->TabletConfig.GetVersion()) { auto event = MakeHolder(Partition()->TopicConverter, - Partition()->TabletConfig); + Partition()->TabletConfig, + NKikimrPQ::TBootstrapConfig()); Partition()->PushFrontDistrTx(event.Release()); } break; diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index 3c317aa629f5..c10606ecacba 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -691,7 +691,7 @@ void TPersQueue::ApplyNewConfigAndReply(const TActorContext& ctx) ClearNewConfig(); for (auto& p : Partitions) { //change config for already created partitions - ctx.Send(p.second.Actor, new TEvPQ::TEvChangePartitionConfig(TopicConverter, Config)); + ctx.Send(p.second.Actor, new TEvPQ::TEvChangePartitionConfig(TopicConverter, Config, BootstrapConfigTx ? *BootstrapConfigTx : NKikimrPQ::TBootstrapConfig())); } ChangePartitionConfigInflight += Partitions.size(); @@ -1807,7 +1807,7 @@ void TPersQueue::AddCmdWriteConfig(TEvKeyValue::TEvRequest* request, keyRange = TPartitionKeyRange::Parse(mg.GetKeyRange()); } - sourceIdWriter.RegisterSourceId(mg.GetId(), 0, 0, ctx.Now(), std::move(keyRange)); + sourceIdWriter.RegisterSourceId(mg.GetId(), 0, 0, ctx.Now(), std::move(keyRange), false); } for (const auto& partition : cfg.GetPartitions()) { @@ -4545,6 +4545,7 @@ void TPersQueue::SendEvProposePartitionConfig(const TActorContext& ctx, event->TopicConverter = tx.TopicConverter; event->Config = tx.TabletConfig; + event->BootstrapConfig = tx.BootstrapConfig; ctx.Send(partition.Actor, std::move(event)); } diff --git a/ydb/core/persqueue/sourceid.h b/ydb/core/persqueue/sourceid.h index e20a97db6acd..775b5878c8fd 100644 --- a/ydb/core/persqueue/sourceid.h +++ b/ydb/core/persqueue/sourceid.h @@ -51,6 +51,7 @@ class TSourceIdStorage: private THeartbeatProcessor { void RegisterSourceId(const TString& sourceId, Args&&... args) { RegisterSourceIdInfo(sourceId, TSourceIdInfo(std::forward(args)...), false); } + void RegisterSourceIdInfo(const TString& sourceId, TSourceIdInfo&& sourceIdInfo, bool load); void DeregisterSourceId(const TString& sourceId); @@ -65,7 +66,6 @@ class TSourceIdStorage: private THeartbeatProcessor { private: void LoadRawSourceIdInfo(const TString& key, const TString& data, TInstant now); void LoadProtoSourceIdInfo(const TString& key, const TString& data); - void RegisterSourceIdInfo(const TString& sourceId, TSourceIdInfo&& sourceIdInfo, bool load); private: TSourceIdMap InMemorySourceIds; diff --git a/ydb/core/persqueue/ut/partition_ut.cpp b/ydb/core/persqueue/ut/partition_ut.cpp index 8d93159cee4b..8bccd773e953 100644 --- a/ydb/core/persqueue/ut/partition_ut.cpp +++ b/ydb/core/persqueue/ut/partition_ut.cpp @@ -988,7 +988,8 @@ void TPartitionFixture::SendChangePartitionConfig(const TConfigParams& config) auto event = MakeHolder(TopicConverter, MakeConfig(config.Version, config.Consumers, 1, - config.MeteringMode)); + config.MeteringMode), + NKikimrPQ::TBootstrapConfig()); Ctx->Runtime->SingleSys()->Send(new IEventHandle(ActorId, Ctx->Edge, event.Release())); } diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp index 2d06845d1759..7664fff85a8d 100644 --- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp +++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp @@ -2654,12 +2654,13 @@ Y_UNIT_TEST_SUITE(Cdc) { } } - Y_UNIT_TEST(InitialScan) { + void InitialScanTest(bool withTopicSchemeTx) { TPortManager portManager; TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig()) .SetUseRealThreads(false) .SetDomainName("Root") .SetEnableChangefeedInitialScan(true) + .SetEnablePQConfigTransactionsAtSchemeShard(withTopicSchemeTx) ); auto& runtime = *server->GetRuntime(); @@ -2702,6 +2703,14 @@ Y_UNIT_TEST_SUITE(Cdc) { }); } + Y_UNIT_TEST(InitialScan) { + InitialScanTest(false); + } + + Y_UNIT_TEST(InitialScan_WithTopicSchemeTx) { + InitialScanTest(true); + } + Y_UNIT_TEST(InitialScanDebezium) { TTestTopicEnv env(SimpleTable(), KeysOnly(NKikimrSchemeOp::ECdcStreamFormatDebeziumJson, "UnusedStream")); auto& client = env.GetClient();