diff --git a/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp index a4ea928fc169..674eeca14436 100644 --- a/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp @@ -132,9 +132,7 @@ void TKafkaProduceActor::HandleInit(TEvTxProxySchemeCache::TEvNavigateKeySetResu if (info.SecurityObject->CheckAccess(NACLib::EAccessRights::UpdateRow, *Context->UserToken)) { topic.Status = OK; topic.ExpirationTime = now + TOPIC_OK_EXPIRATION_INTERVAL; - for(auto& p : info.PQGroupInfo->Description.GetPartitions()) { - topic.partitions[p.GetPartitionId()] = p.GetTabletId(); - } + topic.PartitionChooser = CreatePartitionChooser(info.PQGroupInfo->Description); } else { KAFKA_LOG_W("Produce actor: Unauthorized PRODUCE to topic '" << topicPath << "'"); topic.Status = UNAUTHORIZED; @@ -178,7 +176,7 @@ void TKafkaProduceActor::Handle(TEvTxProxySchemeCache::TEvWatchNotifyDeleted::TP auto& topicInfo = Topics[path]; topicInfo.Status = NOT_FOUND; topicInfo.ExpirationTime = ctx.Now() + TOPIC_NOT_FOUND_EXPIRATION_INTERVAL; - topicInfo.partitions.clear(); + topicInfo.PartitionChooser.reset(); } void TKafkaProduceActor::Handle(TEvTxProxySchemeCache::TEvWatchNotifyUpdated::TPtr& ev, const TActorContext& ctx) { @@ -192,10 +190,7 @@ void TKafkaProduceActor::Handle(TEvTxProxySchemeCache::TEvWatchNotifyUpdated::TP } topic.Status = OK; topic.ExpirationTime = ctx.Now() + TOPIC_OK_EXPIRATION_INTERVAL; - topic.partitions.clear(); - for (auto& p : e->Result->GetPathDescription().GetPersQueueGroup().GetPartitions()) { - topic.partitions[p.GetPartitionId()] = p.GetTabletId(); - } + topic.PartitionChooser = CreatePartitionChooser(e->Result->GetPathDescription().GetPersQueueGroup()); } void TKafkaProduceActor::Handle(TEvKafka::TEvProduceRequest::TPtr request, const TActorContext& ctx) { @@ -578,17 +573,17 @@ std::pair TKafkaProduceActor::Partit return { OK, writerInfo.ActorId }; } - auto& partitions = topicInfo.partitions; - auto pit = partitions.find(partitionId); - if (pit == partitions.end()) { + auto* partition = topicInfo.PartitionChooser->GetPartition(partitionId); + if (!partition) { return { NOT_FOUND, TActorId{} }; } - auto tabletId = pit->second; TPartitionWriterOpts opts; opts.WithDeduplication(false) + .WithSourceId(SourceId) + .WithTopicPath(topicPath) .WithCheckRequestUnits(topicInfo.MeteringMode, Context->RlContext); - auto* writerActor = CreatePartitionWriter(SelfId(), topicPath, tabletId, partitionId, {/*expectedGeneration*/}, SourceId, opts); + auto* writerActor = CreatePartitionWriter(SelfId(), partition->TabletId, partitionId, opts); auto& writerInfo = partitionWriters[partitionId]; writerInfo.ActorId = ctx.RegisterWithSameMailbox(writerActor); diff --git a/ydb/core/kafka_proxy/actors/kafka_produce_actor.h b/ydb/core/kafka_proxy/actors/kafka_produce_actor.h index 478ce2dc172b..f1eea1c150de 100644 --- a/ydb/core/kafka_proxy/actors/kafka_produce_actor.h +++ b/ydb/core/kafka_proxy/actors/kafka_produce_actor.h @@ -181,9 +181,7 @@ class TKafkaProduceActor: public NActors::TActorBootstrapped TInstant ExpirationTime; NKikimrPQ::TPQTabletConfig::EMeteringMode MeteringMode; - - // partitioId -> tabletId - std::unordered_map partitions; + std::shared_ptr PartitionChooser; }; std::map Topics; diff --git a/ydb/core/persqueue/writer/partition_chooser.h b/ydb/core/persqueue/writer/partition_chooser.h index f85d879702e5..9ab9eee1cbbf 100644 --- a/ydb/core/persqueue/writer/partition_chooser.h +++ b/ydb/core/persqueue/writer/partition_chooser.h @@ -48,6 +48,25 @@ struct TEvPartitionChooser { }; +class IPartitionChooser { +public: + struct TPartitionInfo { + TPartitionInfo(ui32 partitionId, ui64 tabletId) + : PartitionId(partitionId) + , TabletId(tabletId) {} + + ui32 PartitionId; + ui64 TabletId; + }; + + virtual ~IPartitionChooser() = default; + + virtual const TPartitionInfo* GetPartition(const TString& sourceId) const = 0; + virtual const TPartitionInfo* GetPartition(ui32 partitionId) const = 0; +}; + +std::shared_ptr CreatePartitionChooser(const NKikimrSchemeOp::TPersQueueGroupDescription& config, bool withoutHash = false); + NActors::IActor* CreatePartitionChooserActor(TActorId parentId, const NKikimrSchemeOp::TPersQueueGroupDescription& config, NPersQueue::TTopicConverterPtr& fullConverter, diff --git a/ydb/core/persqueue/writer/partition_chooser_impl.cpp b/ydb/core/persqueue/writer/partition_chooser_impl.cpp index 4012709f578b..d8102d2f32de 100644 --- a/ydb/core/persqueue/writer/partition_chooser_impl.cpp +++ b/ydb/core/persqueue/writer/partition_chooser_impl.cpp @@ -32,32 +32,453 @@ TString TMd5Converter::operator()(const TString& sourceId) const { return AsKeyBound(Hash(sourceId)); } + +// +// TPartitionChooserActor +// + +TPartitionChooserActor::TPartitionChooserActor(TActorId parent, + const NKikimrSchemeOp::TPersQueueGroupDescription& config, + std::shared_ptr& chooser, + NPersQueue::TTopicConverterPtr& fullConverter, + const TString& sourceId, + std::optional preferedPartition) + : Parent(parent) + , FullConverter(fullConverter) + , SourceId(sourceId) + , PreferedPartition(preferedPartition) + , Chooser(chooser) + , SplitMergeEnabled_(SplitMergeEnabled(config.GetPQTabletConfig())) + , Partition(nullptr) + , BalancerTabletId(config.GetBalancerTabletID()) { +} + +void TPartitionChooserActor::Bootstrap(const TActorContext& ctx) { + const auto& pqConfig = AppData(ctx)->PQConfig; + + NeedUpdateTable = (!pqConfig.GetTopicsAreFirstClassCitizen() || pqConfig.GetUseSrcIdMetaMappingInFirstClass()) && !SplitMergeEnabled_ && SourceId; + + if (!SourceId) { + return ChoosePartition(ctx); + } + + TableGeneration = pqConfig.GetTopicsAreFirstClassCitizen() ? ESourceIdTableGeneration::PartitionMapping + : ESourceIdTableGeneration::SrcIdMeta2; + try { + EncodedSourceId = NSourceIdEncoding::EncodeSrcId( + FullConverter->GetTopicForSrcIdHash(), SourceId, TableGeneration + ); + } catch (yexception& e) { + return ReplyError(ErrorCode::BAD_REQUEST, TStringBuilder() << "incorrect sourceId \"" << SourceId << "\": " << e.what(), ctx); + } + + SelectQuery = GetSelectSourceIdQueryFromPath(pqConfig.GetSourceIdTablePath(), TableGeneration); + UpdateQuery = GetUpdateSourceIdQueryFromPath(pqConfig.GetSourceIdTablePath(), TableGeneration); + + DEBUG("SelectQuery: " << SelectQuery); + + if (pqConfig.GetTopicsAreFirstClassCitizen()) { + if (pqConfig.GetUseSrcIdMetaMappingInFirstClass()) { + TThisActor::Become(&TThis::StateInit); + InitTable(ctx); + } else { + ChoosePartition(ctx); + } + } else { + TThisActor::Become(&TThis::StateInit); + StartKqpSession(ctx); + } +} + +void TPartitionChooserActor::Stop(const TActorContext& ctx) { + CloseKqpSession(ctx); + if (PipeToBalancer) { + NTabletPipe::CloseClient(ctx, PipeToBalancer); + } + IActor::Die(ctx); +} + +void TPartitionChooserActor::ScheduleStop() { + TThisActor::Become(&TThis::StateDestroy); +} + +TString TPartitionChooserActor::GetDatabaseName(const NActors::TActorContext& ctx) { + const auto& pqConfig = AppData(ctx)->PQConfig; + switch (TableGeneration) { + case ESourceIdTableGeneration::SrcIdMeta2: + return NKikimr::NPQ::GetDatabaseFromConfig(pqConfig); + case ESourceIdTableGeneration::PartitionMapping: + return AppData(ctx)->TenantName; + } +} + +void TPartitionChooserActor::InitTable(const NActors::TActorContext& ctx) { + ctx.Send( + NMetadata::NProvider::MakeServiceId(ctx.SelfID.NodeId()), + new NMetadata::NProvider::TEvPrepareManager(NGRpcProxy::V1::TSrcIdMetaInitManager::GetInstant()) + ); +} + +void TPartitionChooserActor::StartKqpSession(const NActors::TActorContext& ctx) { + auto ev = MakeCreateSessionRequest(ctx); + ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release()); +} + +void TPartitionChooserActor::CloseKqpSession(const TActorContext& ctx) { + if (KqpSessionId) { + auto ev = MakeCloseSessionRequest(); + ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release()); + + KqpSessionId = ""; + } +} + +void TPartitionChooserActor::SendUpdateRequests(const TActorContext& ctx) { + TThisActor::Become(&TThis::StateUpdate); + + auto ev = MakeUpdateQueryRequest(ctx); + ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release()); + + UpdatesInflight++; +} + +void TPartitionChooserActor::SendSelectRequest(const NActors::TActorContext& ctx) { + TThisActor::Become(&TThis::StateSelect); + + auto ev = MakeSelectQueryRequest(ctx); + ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release()); + + SelectInflight++; +} + +THolder TPartitionChooserActor::MakeCreateSessionRequest(const NActors::TActorContext& ctx) { + auto ev = MakeHolder(); + ev->Record.MutableRequest()->SetDatabase(GetDatabaseName(ctx)); + return ev; +} + +THolder TPartitionChooserActor::MakeCloseSessionRequest() { + auto ev = MakeHolder(); + ev->Record.MutableRequest()->SetSessionId(KqpSessionId); + return ev; +} + +THolder TPartitionChooserActor::MakeSelectQueryRequest(const NActors::TActorContext& ctx) { + auto ev = MakeHolder(); + + ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE); + ev->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_DML); + ev->Record.MutableRequest()->SetQuery(SelectQuery); + + ev->Record.MutableRequest()->SetDatabase(GetDatabaseName(ctx)); + // fill tx settings: set commit tx flag& begin new serializable tx. + ev->Record.MutableRequest()->SetSessionId(KqpSessionId); + ev->Record.MutableRequest()->MutableTxControl()->set_commit_tx(false); + ev->Record.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write(); + // keep compiled query in cache. + ev->Record.MutableRequest()->MutableQueryCachePolicy()->set_keep_in_cache(true); + + NYdb::TParamsBuilder paramsBuilder = NYdb::TParamsBuilder(); + + SetHashToTParamsBuilder(paramsBuilder, EncodedSourceId); + + paramsBuilder + .AddParam("$Topic") + .Utf8(FullConverter->GetClientsideName()) + .Build() + .AddParam("$SourceId") + .Utf8(EncodedSourceId.EscapedSourceId) + .Build(); + + NYdb::TParams params = paramsBuilder.Build(); + + ev->Record.MutableRequest()->MutableYdbParameters()->swap(*(NYdb::TProtoAccessor::GetProtoMapPtr(params))); + + return ev; +} + +THolder TPartitionChooserActor::MakeUpdateQueryRequest(const NActors::TActorContext& ctx) { + auto ev = MakeHolder(); + + ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE); + ev->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_DML); + ev->Record.MutableRequest()->SetQuery(UpdateQuery); + ev->Record.MutableRequest()->SetDatabase(GetDatabaseName(ctx)); + // fill tx settings: set commit tx flag& begin new serializable tx. + ev->Record.MutableRequest()->MutableTxControl()->set_commit_tx(true); + if (KqpSessionId) { + ev->Record.MutableRequest()->SetSessionId(KqpSessionId); + } + if (TxId) { + ev->Record.MutableRequest()->MutableTxControl()->set_tx_id(TxId); + TxId = ""; + } else { + ev->Record.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write(); + } + // keep compiled query in cache. + ev->Record.MutableRequest()->MutableQueryCachePolicy()->set_keep_in_cache(true); + + NYdb::TParamsBuilder paramsBuilder = NYdb::TParamsBuilder(); + + SetHashToTParamsBuilder(paramsBuilder, EncodedSourceId); + + paramsBuilder + .AddParam("$Topic") + .Utf8(FullConverter->GetClientsideName()) + .Build() + .AddParam("$SourceId") + .Utf8(EncodedSourceId.EscapedSourceId) + .Build() + .AddParam("$CreateTime") + .Uint64(CreateTime) + .Build() + .AddParam("$AccessTime") + .Uint64(TInstant::Now().MilliSeconds()) + .Build() + .AddParam("$Partition") + .Uint32(Partition->PartitionId) + .Build(); + + NYdb::TParams params = paramsBuilder.Build(); + + ev->Record.MutableRequest()->MutableYdbParameters()->swap(*(NYdb::TProtoAccessor::GetProtoMapPtr(params))); + + return ev; +} + +void TPartitionChooserActor::RequestPQRB(const NActors::TActorContext& ctx) { + Y_ABORT_UNLESS(BalancerTabletId); + + if (!PipeToBalancer) { + NTabletPipe::TClientConfig clientConfig; + clientConfig.RetryPolicy = { + .RetryLimitCount = 6, + .MinRetryTime = TDuration::MilliSeconds(10), + .MaxRetryTime = TDuration::MilliSeconds(100), + .BackoffMultiplier = 2, + .DoFirstRetryInstantly = true + }; + PipeToBalancer = ctx.RegisterWithSameMailbox(NTabletPipe::CreateClient(ctx.SelfID, BalancerTabletId, clientConfig)); + } + + TThisActor::Become(&TThis::StateSelect); + NTabletPipe::SendData(ctx, PipeToBalancer, new TEvPersQueue::TEvGetPartitionIdForWrite()); +} + +void TPartitionChooserActor::ReplyResult(const NActors::TActorContext& ctx) { + ctx.Send(Parent, new TEvPartitionChooser::TEvChooseResult(Partition->PartitionId, Partition->TabletId)); +} + +void TPartitionChooserActor::ReplyError(ErrorCode code, TString&& errorMessage, const NActors::TActorContext& ctx) { + ctx.Send(Parent, new TEvPartitionChooser::TEvChooseError(code, std::move(errorMessage))); + + Stop(ctx); +} + +void TPartitionChooserActor::HandleInit(NMetadata::NProvider::TEvManagerPrepared::TPtr&, const TActorContext& ctx) { + StartKqpSession(ctx); +} + +void TPartitionChooserActor::HandleInit(NKqp::TEvKqp::TEvCreateSessionResponse::TPtr& ev, const NActors::TActorContext& ctx) { + const auto& record = ev->Get()->Record; + + if (record.GetYdbStatus() != Ydb::StatusIds::SUCCESS) { + ReplyError(ErrorCode::INITIALIZING, TStringBuilder() << "kqp error Marker# PQ53 : " << record, ctx); + return; + } + + KqpSessionId = record.GetResponse().GetSessionId(); + Y_ABORT_UNLESS(!KqpSessionId.empty()); + + SendSelectRequest(ctx); +} + +void TPartitionChooserActor::HandleDestroy(NKqp::TEvKqp::TEvCreateSessionResponse::TPtr& ev, const NActors::TActorContext& ctx) { + const auto& record = ev->Get()->Record; + KqpSessionId = record.GetResponse().GetSessionId(); + + Stop(ctx); +} + +void TPartitionChooserActor::HandleSelect(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) { + auto& record = ev->Get()->Record.GetRef(); + + if (record.GetYdbStatus() != Ydb::StatusIds::SUCCESS) { + return ReplyError(ErrorCode::INITIALIZING, TStringBuilder() << "kqp error Marker# PQ50 : " << record, ctx); + } + + auto& t = record.GetResponse().GetResults(0).GetValue().GetStruct(0); + + TxId = record.GetResponse().GetTxMeta().id(); + Y_ABORT_UNLESS(!TxId.empty()); + + if (t.ListSize() != 0) { + auto& tt = t.GetList(0).GetStruct(0); + if (tt.HasOptional() && tt.GetOptional().HasUint32()) { //already got partition + auto accessTime = t.GetList(0).GetStruct(2).GetOptional().GetUint64(); + if (accessTime > AccessTime) { // AccessTime + PartitionId = tt.GetOptional().GetUint32(); + DEBUG("Received partition " << PartitionId << " from table for SourceId=" << SourceId); + Partition = Chooser->GetPartition(PartitionId.value()); + CreateTime = t.GetList(0).GetStruct(1).GetOptional().GetUint64(); + AccessTime = accessTime; + } + } + } + + if (CreateTime == 0) { + CreateTime = TInstant::Now().MilliSeconds(); + } + + if (!Partition) { + ChoosePartition(ctx); + } else { + OnPartitionChosen(ctx); + } +} + +void TPartitionChooserActor::HandleSelect(TEvPersQueue::TEvGetPartitionIdForWriteResponse::TPtr& ev, const TActorContext& ctx) { + PartitionId = ev->Get()->Record.GetPartitionId(); + DEBUG("Received partition " << PartitionId << " from PQRB for SourceId=" << SourceId); + Partition = Chooser->GetPartition(PartitionId.value()); + + OnPartitionChosen(ctx); +} + +void TPartitionChooserActor::HandleSelect(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext& ctx) { + Y_UNUSED(ev); + + ReplyError(ErrorCode::INITIALIZING, "Pipe destroyed", ctx); +} + +void TPartitionChooserActor::HandleUpdate(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) { + auto& record = ev->Get()->Record.GetRef(); + + if (record.GetYdbStatus() == Ydb::StatusIds::ABORTED) { + if (!PartitionPersisted) { + CloseKqpSession(ctx); + StartKqpSession(ctx); + } + return; + } + + if (record.GetYdbStatus() != Ydb::StatusIds::SUCCESS) { + if (!PartitionPersisted) { + ReplyError(ErrorCode::INITIALIZING, TStringBuilder() << "kqp error Marker# PQ51 : " << record, ctx); + } + return; + } + + if (!PartitionPersisted) { + ReplyResult(ctx); + PartitionPersisted = true; + // Use tx only for query after select. Updating AccessTime without transaction. + CloseKqpSession(ctx); + } + + TThisActor::Become(&TThis::StateIdle); +} + +void TPartitionChooserActor::HandleDestroy(NKqp::TEvKqp::TEvQueryResponse::TPtr&, const TActorContext& ctx) { + Stop(ctx); +} + +void TPartitionChooserActor::HandleIdle(TEvPartitionChooser::TEvRefreshRequest::TPtr&, const TActorContext& ctx) { + if (PartitionPersisted) { + // we do not update AccessTime for Split/Merge partitions because don't use table. + SendUpdateRequests(ctx); + } +} + +void TPartitionChooserActor::ChoosePartition(const TActorContext& ctx) { + auto [roundRobin, p] = ChoosePartitionSync(ctx); + if (roundRobin) { + RequestPQRB(ctx); + } else { + Partition = p; + OnPartitionChosen(ctx); + } +} + +void TPartitionChooserActor::OnPartitionChosen(const TActorContext& ctx) { + if (!Partition && PreferedPartition) { + return ReplyError(ErrorCode::BAD_REQUEST, + TStringBuilder() << "Prefered partition " << (PreferedPartition.value() + 1) << " is not exists or inactive.", + ctx); + } + + if (!Partition) { + return ReplyError(ErrorCode::INITIALIZING, "Can't choose partition", ctx); + } + + if (PreferedPartition && Partition->PartitionId != PreferedPartition.value()) { + return ReplyError(ErrorCode::BAD_REQUEST, + TStringBuilder() << "MessageGroupId " << SourceId << " is already bound to PartitionGroupId " + << (Partition->PartitionId + 1) << ", but client provided " << (PreferedPartition.value() + 1) + << ". MessageGroupId->PartitionGroupId binding cannot be changed, either use " + "another MessageGroupId, specify PartitionGroupId " << (Partition->PartitionId + 1) + << ", or do not specify PartitionGroupId at all.", + ctx); + } + + if (SplitMergeEnabled_ && SourceId && PartitionId) { + if (Partition != Chooser->GetPartition(SourceId)) { + return ReplyError(ErrorCode::BAD_REQUEST, + TStringBuilder() << "Message group " << SourceId << " not in a partition boundary", ctx); + } + } + + if (NeedUpdateTable) { + SendUpdateRequests(ctx); + } else { + TThisActor::Become(&TThis::StateIdle); + + ReplyResult(ctx); + } +} + +std::pair TPartitionChooserActor::ChoosePartitionSync(const TActorContext& ctx) const { + const auto& pqConfig = AppData(ctx)->PQConfig; + if (SourceId && SplitMergeEnabled_) { + return {false, Chooser->GetPartition(SourceId)}; + } else if (PreferedPartition) { + return {false, Chooser->GetPartition(PreferedPartition.value())}; + } else if (pqConfig.GetTopicsAreFirstClassCitizen() && SourceId) { + return {false, Chooser->GetPartition(SourceId)}; + } else { + return {true, nullptr}; + } +} + } // namespace NPartitionChooser -IActor* CreatePartitionChooserActor(TActorId parentId, - const NKikimrSchemeOp::TPersQueueGroupDescription& config, - NPersQueue::TTopicConverterPtr& fullConverter, - const TString& sourceId, - std::optional preferedPartition, - bool withoutHash) { + +std::shared_ptr CreatePartitionChooser(const NKikimrSchemeOp::TPersQueueGroupDescription& config, bool withoutHash) { if (SplitMergeEnabled(config.GetPQTabletConfig())) { if (withoutHash) { - return new NPartitionChooser::TPartitionChooserActor> - (parentId, config, fullConverter, sourceId, preferedPartition); + return std::make_shared>(config); } else { - return new NPartitionChooser::TPartitionChooserActor> - (parentId, config, fullConverter, sourceId, preferedPartition); + return std::make_shared>(config); } } else { if (withoutHash) { - return new NPartitionChooser::TPartitionChooserActor> - (parentId, config, fullConverter, sourceId, preferedPartition); + return std::make_shared>(config); } else { - return new NPartitionChooser::TPartitionChooserActor> - (parentId, config, fullConverter, sourceId, preferedPartition); + return std::make_shared>(config); } } } +IActor* CreatePartitionChooserActor(TActorId parentId, + const NKikimrSchemeOp::TPersQueueGroupDescription& config, + NPersQueue::TTopicConverterPtr& fullConverter, + const TString& sourceId, + std::optional preferedPartition, + bool withoutHash) { + auto chooser = CreatePartitionChooser(config, withoutHash); + return new NPartitionChooser::TPartitionChooserActor(parentId, config, chooser, fullConverter, sourceId, preferedPartition); +} + + } // namespace NKikimr::NPQ diff --git a/ydb/core/persqueue/writer/partition_chooser_impl.h b/ydb/core/persqueue/writer/partition_chooser_impl.h index 1f9fe561591d..2fede5ae1226 100644 --- a/ydb/core/persqueue/writer/partition_chooser_impl.h +++ b/ydb/core/persqueue/writer/partition_chooser_impl.h @@ -45,19 +45,21 @@ struct TMd5Converter { // Chooses the partition to produce messages using the boundaries of the partition for the SourceID distribution. // It used for split/merge distribution and guarantee stable distribution for changing partition set. template -class TBoundaryChooser { +class TBoundaryChooser: public IPartitionChooser { public: - struct TPartitionInfo { - ui32 PartitionId; - ui64 TabletId; + struct TPartitionInfo: public IPartitionChooser::TPartitionInfo { + TPartitionInfo(ui32 partitionId, ui64 tabletId, std::optional toBound) + : IPartitionChooser::TPartitionInfo(partitionId, tabletId) + , ToBound(toBound) {} + std::optional ToBound; }; TBoundaryChooser(const NKikimrSchemeOp::TPersQueueGroupDescription& config); TBoundaryChooser(TBoundaryChooser&&) = default; - const TPartitionInfo* GetPartition(const TString& sourceId) const; - const TPartitionInfo* GetPartition(ui32 partitionId) const; + const TPartitionInfo* GetPartition(const TString& sourceId) const override; + const TPartitionInfo* GetPartition(ui32 partitionId) const override; private: const TString TopicName; @@ -67,17 +69,12 @@ class TBoundaryChooser { // It is old alghoritm of choosing partition by SourceId template -class THashChooser { +class THashChooser: public IPartitionChooser { public: - struct TPartitionInfo { - ui32 PartitionId; - ui64 TabletId; - }; - THashChooser(const NKikimrSchemeOp::TPersQueueGroupDescription& config); - const TPartitionInfo* GetPartition(const TString& sourceId) const; - const TPartitionInfo* GetPartition(ui32 partitionId) const; + const TPartitionInfo* GetPartition(const TString& sourceId) const override; + const TPartitionInfo* GetPartition(ui32 partitionId) const override; private: std::vector Partitions; @@ -85,17 +82,17 @@ class THashChooser { }; -template -class TPartitionChooserActor: public TActorBootstrapped> { - using TThis = TPartitionChooserActor; +class TPartitionChooserActor: public TActorBootstrapped { + using TThis = TPartitionChooserActor; using TThisActor = TActor; friend class TActorBootstrapped; public: - using TPartitionInfo = typename TChooser::TPartitionInfo; + using TPartitionInfo = typename IPartitionChooser::TPartitionInfo; TPartitionChooserActor(TActorId parentId, const NKikimrSchemeOp::TPersQueueGroupDescription& config, + std::shared_ptr& chooser, NPersQueue::TTopicConverterPtr& fullConverter, const TString& sourceId, std::optional preferedPartition); @@ -191,7 +188,7 @@ class TPartitionChooserActor: public TActorBootstrapped PreferedPartition; - const TChooser Chooser; + const std::shared_ptr Chooser; const bool SplitMergeEnabled_; std::optional PartitionId; @@ -288,451 +285,5 @@ const typename THashChooser::TPartitionInfo* THashChooser::Get return it->PartitionId == partitionId ? it : nullptr; } - -// -// TPartitionChooserActor -// - -template -TPartitionChooserActor::TPartitionChooserActor(TActorId parent, - const NKikimrSchemeOp::TPersQueueGroupDescription& config, - NPersQueue::TTopicConverterPtr& fullConverter, - const TString& sourceId, - std::optional preferedPartition) - : Parent(parent) - , FullConverter(fullConverter) - , SourceId(sourceId) - , PreferedPartition(preferedPartition) - , Chooser(config) - , SplitMergeEnabled_(SplitMergeEnabled(config.GetPQTabletConfig())) - , Partition(nullptr) - , BalancerTabletId(config.GetBalancerTabletID()) { -} - -template -void TPartitionChooserActor::Bootstrap(const TActorContext& ctx) { - const auto& pqConfig = AppData(ctx)->PQConfig; - - NeedUpdateTable = (!pqConfig.GetTopicsAreFirstClassCitizen() || pqConfig.GetUseSrcIdMetaMappingInFirstClass()) && !SplitMergeEnabled_ && SourceId; - - if (!SourceId) { - return ChoosePartition(ctx); - } - - TableGeneration = pqConfig.GetTopicsAreFirstClassCitizen() ? ESourceIdTableGeneration::PartitionMapping - : ESourceIdTableGeneration::SrcIdMeta2; - try { - EncodedSourceId = NSourceIdEncoding::EncodeSrcId( - FullConverter->GetTopicForSrcIdHash(), SourceId, TableGeneration - ); - } catch (yexception& e) { - return ReplyError(ErrorCode::BAD_REQUEST, TStringBuilder() << "incorrect sourceId \"" << SourceId << "\": " << e.what(), ctx); - } - - SelectQuery = GetSelectSourceIdQueryFromPath(pqConfig.GetSourceIdTablePath(), TableGeneration); - UpdateQuery = GetUpdateSourceIdQueryFromPath(pqConfig.GetSourceIdTablePath(), TableGeneration); - - DEBUG("SelectQuery: " << SelectQuery); - - if (pqConfig.GetTopicsAreFirstClassCitizen()) { - if (pqConfig.GetUseSrcIdMetaMappingInFirstClass()) { - TThisActor::Become(&TThis::StateInit); - InitTable(ctx); - } else { - ChoosePartition(ctx); - } - } else { - TThisActor::Become(&TThis::StateInit); - StartKqpSession(ctx); - } -} - -template -void TPartitionChooserActor::Stop(const TActorContext& ctx) { - CloseKqpSession(ctx); - if (PipeToBalancer) { - NTabletPipe::CloseClient(ctx, PipeToBalancer); - } - IActor::Die(ctx); -} - -template -void TPartitionChooserActor::ScheduleStop() { - TThisActor::Become(&TThis::StateDestroy); -} - -template -TString TPartitionChooserActor::GetDatabaseName(const NActors::TActorContext& ctx) { - const auto& pqConfig = AppData(ctx)->PQConfig; - switch (TableGeneration) { - case ESourceIdTableGeneration::SrcIdMeta2: - return NKikimr::NPQ::GetDatabaseFromConfig(pqConfig); - case ESourceIdTableGeneration::PartitionMapping: - return AppData(ctx)->TenantName; - } -} - -template -void TPartitionChooserActor::InitTable(const NActors::TActorContext& ctx) { - ctx.Send( - NMetadata::NProvider::MakeServiceId(ctx.SelfID.NodeId()), - new NMetadata::NProvider::TEvPrepareManager(NGRpcProxy::V1::TSrcIdMetaInitManager::GetInstant()) - ); -} - -template -void TPartitionChooserActor::StartKqpSession(const NActors::TActorContext& ctx) { - auto ev = MakeCreateSessionRequest(ctx); - ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release()); -} - -template -void TPartitionChooserActor::CloseKqpSession(const TActorContext& ctx) { - if (KqpSessionId) { - auto ev = MakeCloseSessionRequest(); - ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release()); - - KqpSessionId = ""; - } -} - -template -void TPartitionChooserActor::SendUpdateRequests(const TActorContext& ctx) { - TThisActor::Become(&TThis::StateUpdate); - - auto ev = MakeUpdateQueryRequest(ctx); - ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release()); - - UpdatesInflight++; -} - -template -void TPartitionChooserActor::SendSelectRequest(const NActors::TActorContext& ctx) { - TThisActor::Become(&TThis::StateSelect); - - auto ev = MakeSelectQueryRequest(ctx); - ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release()); - - SelectInflight++; -} - -template -THolder TPartitionChooserActor::MakeCreateSessionRequest(const NActors::TActorContext& ctx) { - auto ev = MakeHolder(); - ev->Record.MutableRequest()->SetDatabase(GetDatabaseName(ctx)); - return ev; -} - -template -THolder TPartitionChooserActor::MakeCloseSessionRequest() { - auto ev = MakeHolder(); - ev->Record.MutableRequest()->SetSessionId(KqpSessionId); - return ev; -} - -template -THolder TPartitionChooserActor::MakeSelectQueryRequest(const NActors::TActorContext& ctx) { - auto ev = MakeHolder(); - - ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE); - ev->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_DML); - ev->Record.MutableRequest()->SetQuery(SelectQuery); - - ev->Record.MutableRequest()->SetDatabase(GetDatabaseName(ctx)); - // fill tx settings: set commit tx flag& begin new serializable tx. - ev->Record.MutableRequest()->SetSessionId(KqpSessionId); - ev->Record.MutableRequest()->MutableTxControl()->set_commit_tx(false); - ev->Record.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write(); - // keep compiled query in cache. - ev->Record.MutableRequest()->MutableQueryCachePolicy()->set_keep_in_cache(true); - - NYdb::TParamsBuilder paramsBuilder = NYdb::TParamsBuilder(); - - SetHashToTParamsBuilder(paramsBuilder, EncodedSourceId); - - paramsBuilder - .AddParam("$Topic") - .Utf8(FullConverter->GetClientsideName()) - .Build() - .AddParam("$SourceId") - .Utf8(EncodedSourceId.EscapedSourceId) - .Build(); - - NYdb::TParams params = paramsBuilder.Build(); - - ev->Record.MutableRequest()->MutableYdbParameters()->swap(*(NYdb::TProtoAccessor::GetProtoMapPtr(params))); - - return ev; -} - -template -THolder TPartitionChooserActor::MakeUpdateQueryRequest(const NActors::TActorContext& ctx) { - auto ev = MakeHolder(); - - ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE); - ev->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_DML); - ev->Record.MutableRequest()->SetQuery(UpdateQuery); - ev->Record.MutableRequest()->SetDatabase(GetDatabaseName(ctx)); - // fill tx settings: set commit tx flag& begin new serializable tx. - ev->Record.MutableRequest()->MutableTxControl()->set_commit_tx(true); - if (KqpSessionId) { - ev->Record.MutableRequest()->SetSessionId(KqpSessionId); - } - if (TxId) { - ev->Record.MutableRequest()->MutableTxControl()->set_tx_id(TxId); - TxId = ""; - } else { - ev->Record.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write(); - } - // keep compiled query in cache. - ev->Record.MutableRequest()->MutableQueryCachePolicy()->set_keep_in_cache(true); - - NYdb::TParamsBuilder paramsBuilder = NYdb::TParamsBuilder(); - - SetHashToTParamsBuilder(paramsBuilder, EncodedSourceId); - - paramsBuilder - .AddParam("$Topic") - .Utf8(FullConverter->GetClientsideName()) - .Build() - .AddParam("$SourceId") - .Utf8(EncodedSourceId.EscapedSourceId) - .Build() - .AddParam("$CreateTime") - .Uint64(CreateTime) - .Build() - .AddParam("$AccessTime") - .Uint64(TInstant::Now().MilliSeconds()) - .Build() - .AddParam("$Partition") - .Uint32(Partition->PartitionId) - .Build(); - - NYdb::TParams params = paramsBuilder.Build(); - - ev->Record.MutableRequest()->MutableYdbParameters()->swap(*(NYdb::TProtoAccessor::GetProtoMapPtr(params))); - - return ev; -} - -template -void TPartitionChooserActor::RequestPQRB(const NActors::TActorContext& ctx) { - Y_ABORT_UNLESS(BalancerTabletId); - - if (!PipeToBalancer) { - NTabletPipe::TClientConfig clientConfig; - clientConfig.RetryPolicy = { - .RetryLimitCount = 6, - .MinRetryTime = TDuration::MilliSeconds(10), - .MaxRetryTime = TDuration::MilliSeconds(100), - .BackoffMultiplier = 2, - .DoFirstRetryInstantly = true - }; - PipeToBalancer = ctx.RegisterWithSameMailbox(NTabletPipe::CreateClient(ctx.SelfID, BalancerTabletId, clientConfig)); - } - - TThisActor::Become(&TThis::StateSelect); - NTabletPipe::SendData(ctx, PipeToBalancer, new TEvPersQueue::TEvGetPartitionIdForWrite()); -} - -template -void TPartitionChooserActor::ReplyResult(const NActors::TActorContext& ctx) { - ctx.Send(Parent, new TEvPartitionChooser::TEvChooseResult(Partition->PartitionId, Partition->TabletId)); -} - -template -void TPartitionChooserActor::ReplyError(ErrorCode code, TString&& errorMessage, const NActors::TActorContext& ctx) { - ctx.Send(Parent, new TEvPartitionChooser::TEvChooseError(code, std::move(errorMessage))); - - Stop(ctx); -} - -template -void TPartitionChooserActor::HandleInit(NMetadata::NProvider::TEvManagerPrepared::TPtr&, const TActorContext& ctx) { - StartKqpSession(ctx); -} - -template -void TPartitionChooserActor::HandleInit(NKqp::TEvKqp::TEvCreateSessionResponse::TPtr& ev, const NActors::TActorContext& ctx) { - const auto& record = ev->Get()->Record; - - if (record.GetYdbStatus() != Ydb::StatusIds::SUCCESS) { - ReplyError(ErrorCode::INITIALIZING, TStringBuilder() << "kqp error Marker# PQ53 : " << record, ctx); - return; - } - - KqpSessionId = record.GetResponse().GetSessionId(); - Y_ABORT_UNLESS(!KqpSessionId.empty()); - - SendSelectRequest(ctx); -} - -template -void TPartitionChooserActor::HandleDestroy(NKqp::TEvKqp::TEvCreateSessionResponse::TPtr& ev, const NActors::TActorContext& ctx) { - const auto& record = ev->Get()->Record; - KqpSessionId = record.GetResponse().GetSessionId(); - - Stop(ctx); -} - -template -void TPartitionChooserActor::HandleSelect(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) { - auto& record = ev->Get()->Record.GetRef(); - - if (record.GetYdbStatus() != Ydb::StatusIds::SUCCESS) { - return ReplyError(ErrorCode::INITIALIZING, TStringBuilder() << "kqp error Marker# PQ50 : " << record, ctx); - } - - auto& t = record.GetResponse().GetResults(0).GetValue().GetStruct(0); - - TxId = record.GetResponse().GetTxMeta().id(); - Y_ABORT_UNLESS(!TxId.empty()); - - if (t.ListSize() != 0) { - auto& tt = t.GetList(0).GetStruct(0); - if (tt.HasOptional() && tt.GetOptional().HasUint32()) { //already got partition - auto accessTime = t.GetList(0).GetStruct(2).GetOptional().GetUint64(); - if (accessTime > AccessTime) { // AccessTime - PartitionId = tt.GetOptional().GetUint32(); - DEBUG("Received partition " << PartitionId << " from table for SourceId=" << SourceId); - Partition = Chooser.GetPartition(PartitionId.value()); - CreateTime = t.GetList(0).GetStruct(1).GetOptional().GetUint64(); - AccessTime = accessTime; - } - } - } - - if (CreateTime == 0) { - CreateTime = TInstant::Now().MilliSeconds(); - } - - if (!Partition) { - ChoosePartition(ctx); - } else { - OnPartitionChosen(ctx); - } -} - -template -void TPartitionChooserActor::HandleSelect(TEvPersQueue::TEvGetPartitionIdForWriteResponse::TPtr& ev, const TActorContext& ctx) { - PartitionId = ev->Get()->Record.GetPartitionId(); - DEBUG("Received partition " << PartitionId << " from PQRB for SourceId=" << SourceId); - Partition = Chooser.GetPartition(PartitionId.value()); - - OnPartitionChosen(ctx); -} - -template -void TPartitionChooserActor::HandleSelect(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext& ctx) { - Y_UNUSED(ev); - - ReplyError(ErrorCode::INITIALIZING, "Pipe destroyed", ctx); -} - -template -void TPartitionChooserActor::HandleUpdate(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) { - auto& record = ev->Get()->Record.GetRef(); - - if (record.GetYdbStatus() == Ydb::StatusIds::ABORTED) { - if (!PartitionPersisted) { - CloseKqpSession(ctx); - StartKqpSession(ctx); - } - return; - } - - if (record.GetYdbStatus() != Ydb::StatusIds::SUCCESS) { - if (!PartitionPersisted) { - ReplyError(ErrorCode::INITIALIZING, TStringBuilder() << "kqp error Marker# PQ51 : " << record, ctx); - } - return; - } - - if (!PartitionPersisted) { - ReplyResult(ctx); - PartitionPersisted = true; - // Use tx only for query after select. Updating AccessTime without transaction. - CloseKqpSession(ctx); - } - - TThisActor::Become(&TThis::StateIdle); -} - -template -void TPartitionChooserActor::HandleDestroy(NKqp::TEvKqp::TEvQueryResponse::TPtr&, const TActorContext& ctx) { - Stop(ctx); -} - -template -void TPartitionChooserActor::HandleIdle(TEvPartitionChooser::TEvRefreshRequest::TPtr&, const TActorContext& ctx) { - if (PartitionPersisted) { - // we do not update AccessTime for Split/Merge partitions because don't use table. - SendUpdateRequests(ctx); - } -} - -template -void TPartitionChooserActor::ChoosePartition(const TActorContext& ctx) { - auto [roundRobin, p] = ChoosePartitionSync(ctx); - if (roundRobin) { - RequestPQRB(ctx); - } else { - Partition = p; - OnPartitionChosen(ctx); - } -} - -template -void TPartitionChooserActor::OnPartitionChosen(const TActorContext& ctx) { - if (!Partition && PreferedPartition) { - return ReplyError(ErrorCode::BAD_REQUEST, - TStringBuilder() << "Prefered partition " << (PreferedPartition.value() + 1) << " is not exists or inactive.", - ctx); - } - - if (!Partition) { - return ReplyError(ErrorCode::INITIALIZING, "Can't choose partition", ctx); - } - - if (PreferedPartition && Partition->PartitionId != PreferedPartition.value()) { - return ReplyError(ErrorCode::BAD_REQUEST, - TStringBuilder() << "MessageGroupId " << SourceId << " is already bound to PartitionGroupId " - << (Partition->PartitionId + 1) << ", but client provided " << (PreferedPartition.value() + 1) - << ". MessageGroupId->PartitionGroupId binding cannot be changed, either use " - "another MessageGroupId, specify PartitionGroupId " << (Partition->PartitionId + 1) - << ", or do not specify PartitionGroupId at all.", - ctx); - } - - if (SplitMergeEnabled_ && SourceId && PartitionId) { - if (Partition != Chooser.GetPartition(SourceId)) { - return ReplyError(ErrorCode::BAD_REQUEST, - TStringBuilder() << "Message group " << SourceId << " not in a partition boundary", ctx); - } - } - - if (NeedUpdateTable) { - SendUpdateRequests(ctx); - } else { - TThisActor::Become(&TThis::StateIdle); - - ReplyResult(ctx); - } -} - -template -std::pair::TPartitionInfo*> TPartitionChooserActor::ChoosePartitionSync(const TActorContext& ctx) const { - const auto& pqConfig = AppData(ctx)->PQConfig; - if (SourceId && SplitMergeEnabled_) { - return {false, Chooser.GetPartition(SourceId)}; - } else if (PreferedPartition) { - return {false, Chooser.GetPartition(PreferedPartition.value())}; - } else if (pqConfig.GetTopicsAreFirstClassCitizen() && SourceId) { - return {false, Chooser.GetPartition(SourceId)}; - } else { - return {true, nullptr}; - } -} - } // namespace NPartitionChooser } // namespace NKikimr::NPQ diff --git a/ydb/core/persqueue/writer/writer.cpp b/ydb/core/persqueue/writer/writer.cpp index 989b7ed6be49..645f1ec47f7a 100644 --- a/ydb/core/persqueue/writer/writer.cpp +++ b/ydb/core/persqueue/writer/writer.cpp @@ -1,4 +1,5 @@ #include "source_id_encoding.h" +#include "util/generic/fwd.h" #include "writer.h" #include @@ -767,18 +768,15 @@ class TPartitionWriter: public TActorBootstrapped, private TRl explicit TPartitionWriter( const TActorId& client, - const std::optional& topicPath, ui64 tabletId, ui32 partitionId, - const std::optional expectedGeneration, - const TString& sourceId, const TPartitionWriterOpts& opts) - : TRlHelpers(topicPath, opts.RlCtx, WRITE_BLOCK_SIZE, !!opts.RlCtx) + : TRlHelpers(opts.TopicPath, opts.RlCtx, WRITE_BLOCK_SIZE, !!opts.RlCtx) , Client(client) , TabletId(tabletId) , PartitionId(partitionId) - , ExpectedGeneration(expectedGeneration) - , SourceId(sourceId) + , ExpectedGeneration(opts.ExpectedGeneration) + , SourceId(opts.SourceId) , Opts(opts) { if (Opts.MeteringMode) { @@ -860,10 +858,13 @@ class TPartitionWriter: public TActorBootstrapped, private TRl ui64 WriteId = INVALID_WRITE_ID; }; // TPartitionWriter -IActor* CreatePartitionWriter(const TActorId& client, const std::optional& topicPath, ui64 tabletId, ui32 partitionId, - const std::optional expectedGeneration, const TString& sourceId, + +IActor* CreatePartitionWriter(const TActorId& client, + // const NKikimrSchemeOp::TPersQueueGroupDescription& config, + ui64 tabletId, + ui32 partitionId, const TPartitionWriterOpts& opts) { - return new TPartitionWriter(client, topicPath, tabletId, partitionId, expectedGeneration, sourceId, opts); + return new TPartitionWriter(client, tabletId, partitionId, opts); } #undef LOG_PREFIX diff --git a/ydb/core/persqueue/writer/writer.h b/ydb/core/persqueue/writer/writer.h index a44c98be5afc..bd3ba4103841 100644 --- a/ydb/core/persqueue/writer/writer.h +++ b/ydb/core/persqueue/writer/writer.h @@ -9,6 +9,8 @@ #include +#include "partition_chooser.h" + namespace NKikimr::NPQ { constexpr ui64 INVALID_WRITE_ID = Max(); @@ -166,10 +168,15 @@ struct TEvPartitionWriter { }; // TEvPartitionWriter + struct TPartitionWriterOpts { bool CheckState = false; bool AutoRegister = false; bool UseDeduplication = true; + + TString SourceId; + std::optional ExpectedGeneration; + TString Database; TString TopicPath; TString Token; @@ -186,6 +193,9 @@ struct TPartitionWriterOpts { TPartitionWriterOpts& WithCheckState(bool value) { CheckState = value; return *this; } TPartitionWriterOpts& WithAutoRegister(bool value) { AutoRegister = value; return *this; } TPartitionWriterOpts& WithDeduplication(bool value) { UseDeduplication = value; return *this; } + TPartitionWriterOpts& WithSourceId(const TString& value) { SourceId = value; return *this; } + TPartitionWriterOpts& WithExpectedGeneration(ui32 value) { ExpectedGeneration = value; return *this; } + TPartitionWriterOpts& WithExpectedGeneration(std::optional value) { ExpectedGeneration = value; return *this; } TPartitionWriterOpts& WithCheckRequestUnits(const NKikimrPQ::TPQTabletConfig::EMeteringMode meteringMode , const TRlContext& rlCtx) { MeteringMode = meteringMode; RlCtx = rlCtx; return *this; } TPartitionWriterOpts& WithDatabase(const TString& value) { Database = value; return *this; } TPartitionWriterOpts& WithTopicPath(const TString& value) { TopicPath = value; return *this; } @@ -196,6 +206,9 @@ struct TPartitionWriterOpts { TPartitionWriterOpts& WithRequestType(const TString& value) { RequestType = value; return *this; } }; -IActor* CreatePartitionWriter(const TActorId& client, const std::optional& topicPath, ui64 tabletId, ui32 partitionId, const std::optional expectedGeneration, const TString& sourceId, +IActor* CreatePartitionWriter(const TActorId& client, + // const NKikimrSchemeOp::TPersQueueGroupDescription& config, + ui64 tabletId, + ui32 partitionId, const TPartitionWriterOpts& opts = {}); } diff --git a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp index b85d4c137dbe..1eb336002136 100644 --- a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp +++ b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp @@ -39,8 +39,9 @@ class TCdcChangeSenderPartition: public TActorBootstrappedPQConfig.GetWriteInitLatencyBigMs(); diff --git a/ydb/services/persqueue_v1/actors/partition_writer_cache_actor.cpp b/ydb/services/persqueue_v1/actors/partition_writer_cache_actor.cpp index 3d7281566786..d1a182401410 100644 --- a/ydb/services/persqueue_v1/actors/partition_writer_cache_actor.cpp +++ b/ydb/services/persqueue_v1/actors/partition_writer_cache_actor.cpp @@ -6,14 +6,10 @@ namespace NKikimr::NGRpcProxy::V1 { TPartitionWriterCacheActor::TPartitionWriterCacheActor(const TActorId& owner, ui32 partition, ui64 tabletId, - std::optional expectedGeneration, - const TString& sourceId, const NPQ::TPartitionWriterOpts& opts) : Owner(owner), Partition(partition), TabletId(tabletId), - ExpectedGeneration(expectedGeneration), - SourceId(sourceId), Opts(opts) { } @@ -50,7 +46,7 @@ STFUNC(TPartitionWriterCacheActor::StateWork) HFunc(NPQ::TEvPartitionWriter::TEvWriteAccepted, Handle); HFunc(NPQ::TEvPartitionWriter::TEvWriteResponse, Handle); HFunc(NPQ::TEvPartitionWriter::TEvDisconnected, Handle); - HFunc(TEvents::TEvPoisonPill, Handle); + HFunc(TEvents::TEvPoison, Handle); } } @@ -270,8 +266,7 @@ TActorId TPartitionWriterCacheActor::CreatePartitionWriter(const TString& sessio } return ctx.RegisterWithSameMailbox(NPQ::CreatePartitionWriter( - ctx.SelfID, {}, TabletId, Partition, ExpectedGeneration, - SourceId, opts + ctx.SelfID, TabletId, Partition, opts )); } @@ -283,7 +278,7 @@ STFUNC(TPartitionWriterCacheActor::StateBroken) HFunc(NPQ::TEvPartitionWriter::TEvWriteAccepted, Handle); HFunc(NPQ::TEvPartitionWriter::TEvWriteResponse, Handle); HFunc(NPQ::TEvPartitionWriter::TEvDisconnected, Handle); - HFunc(TEvents::TEvPoisonPill, Handle); + HFunc(TEvents::TEvPoison, Handle); } } diff --git a/ydb/services/persqueue_v1/actors/partition_writer_cache_actor.h b/ydb/services/persqueue_v1/actors/partition_writer_cache_actor.h index 31927909aef0..36f16d9cbf04 100644 --- a/ydb/services/persqueue_v1/actors/partition_writer_cache_actor.h +++ b/ydb/services/persqueue_v1/actors/partition_writer_cache_actor.h @@ -12,8 +12,6 @@ class TPartitionWriterCacheActor : public NActors::TActorBootstrapped expectedGeneration, - const TString& sourceId, const NPQ::TPartitionWriterOpts& opts); void Bootstrap(const TActorContext& ctx); @@ -68,8 +66,6 @@ class TPartitionWriterCacheActor : public NActors::TActorBootstrapped ExpectedGeneration; - TString SourceId; NPQ::TPartitionWriterOpts Opts; THashMap, TPartitionWriterPtr> Writers; diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.ipp b/ydb/services/persqueue_v1/actors/write_session_actor.ipp index 72023496193f..c757b6e6424d 100644 --- a/ydb/services/persqueue_v1/actors/write_session_actor.ipp +++ b/ydb/services/persqueue_v1/actors/write_session_actor.ipp @@ -679,7 +679,12 @@ void TWriteSessionActor::CreatePartitionWriterCache(const NPQ::TPartitionWriterOpts opts; opts.WithDeduplication(UseDeduplication); - if constexpr (!UseMigrationProtocol) { + opts.WithSourceId(SourceId); + opts.WithExpectedGeneration(ExpectedGeneration); + + if constexpr (UseMigrationProtocol) { + opts.WithTopicPath(InitRequest.topic()); + } else { if (Request->GetDatabaseName()) { opts.WithDatabase(*Request->GetDatabaseName()); } @@ -699,8 +704,6 @@ void TWriteSessionActor::CreatePartitionWriterCache(const std::make_unique(ctx.SelfID, Partition, PartitionTabletId, - ExpectedGeneration, - SourceId, opts); PartitionWriterCache = ctx.RegisterWithSameMailbox(actor.release()); diff --git a/ydb/services/persqueue_v1/ut/partition_writer_cache_actor_fixture.cpp b/ydb/services/persqueue_v1/ut/partition_writer_cache_actor_fixture.cpp index b4c57e054c17..09925460d0f1 100644 --- a/ydb/services/persqueue_v1/ut/partition_writer_cache_actor_fixture.cpp +++ b/ydb/services/persqueue_v1/ut/partition_writer_cache_actor_fixture.cpp @@ -66,12 +66,12 @@ TActorId TPartitionWriterCacheActorFixture::CreatePartitionWriterCacheActor(cons NPQ::TPartitionWriterOpts options; options.WithDeduplication(params.WithDeduplication); options.WithDatabase(params.Database); + options.WithExpectedGeneration(params.Generation); + options.WithSourceId(params.SourceId); auto actor = std::make_unique(Ctx->Edge, params.Partition, PQTabletId, - params.Generation, - params.SourceId, options); TActorId actorId = Ctx->Runtime->Register(actor.release());