diff --git a/ydb/core/blobstorage/ut_blobstorage/donor.cpp b/ydb/core/blobstorage/ut_blobstorage/donor.cpp index de62dc03cd15..dce76e1a6e9a 100644 --- a/ydb/core/blobstorage/ut_blobstorage/donor.cpp +++ b/ydb/core/blobstorage/ut_blobstorage/donor.cpp @@ -483,4 +483,146 @@ Y_UNIT_TEST_SUITE(Donor) { } // env.Sim(TDuration::Seconds(10)); } + + TVector GetDonors(TEnvironmentSetup& env, const TVDiskID& vdiskId) { + TVector result; + const auto& baseConfig = env.FetchBaseConfig(); + for (const auto& slot : baseConfig.GetVSlot()) { + for (size_t donorId = 0; donorId < slot.DonorsSize(); ++donorId) { + const auto& donor = slot.GetDonors(donorId); + if (VDiskIDFromVDiskID(donor.GetVDiskId()) == vdiskId) { + result.push_back(donor); + } + } + } + return result; + } + + Y_UNIT_TEST(CheckOnlineReadRequestToDonor) { + TEnvironmentSetup env{{ + .NodeCount = 8, + .VDiskReplPausedAtStart = true, + .Erasure = TBlobStorageGroupType::Erasure4Plus2Block, + .ReplMaxQuantumBytes = 1 << 20, + .ReplMaxDonorNotReadyCount = 2 + }}; + auto& runtime = env.Runtime; + + env.EnableDonorMode(); + env.CreateBoxAndPool(2, 1); + env.CommenceReplication(); + env.Sim(TDuration::Seconds(30)); + + const ui32 groupId = env.GetGroups().front(); + + const TActorId edge = runtime->AllocateEdgeActor(1, __FILE__, __LINE__); + const TString buffer = TString(2_MB, 'b'); + TLogoBlobID logoBlobId(1, 1, 0, 0, buffer.size(), 0); + TVDiskID vdiskId; + bool vdiskIdWithBlobSet = false; + TLogoBlobID vdiskLogoBlobId; + + // Put blob and find vdisk with it and partId = 1 + { + env.Runtime->FilterFunction = [&](ui32 nodeId, std::unique_ptr& ev) { + if (ev->GetTypeRewrite() == TEvBlobStorage::EvVPut) { + Y_UNUSED(nodeId); + auto* msg = ev->Get(); + const auto& blobId = LogoBlobIDFromLogoBlobID(msg->Record.GetBlobID()); + if (blobId.IsSameBlob(logoBlobId) && blobId.PartId() == 1 && !vdiskIdWithBlobSet) { + vdiskId = VDiskIDFromVDiskID(msg->Record.GetVDiskID()); + vdiskLogoBlobId = blobId; + vdiskIdWithBlobSet = true; + } else { + } + } + return true; + }; + + runtime->WrapInActorContext(edge, [&] { + SendToBSProxy(edge, groupId, new TEvBlobStorage::TEvPut(logoBlobId, buffer, TInstant::Max())); + }); + auto res = env.WaitForEdgeActorEvent(edge, false); + UNIT_ASSERT_VALUES_EQUAL(res->Get()->Status, NKikimrProto::OK); + UNIT_ASSERT(vdiskIdWithBlobSet); + } + + auto info = env.GetGroupInfo(groupId); + const TActorId& vdiskActorId = info->GetActorId(vdiskId); + + // Move slot out from disk and finf donor + env.SettlePDisk(vdiskActorId); + CheckHasDonor(env, vdiskActorId, vdiskId); + const auto& donors = GetDonors(env, vdiskId); + UNIT_ASSERT_VALUES_EQUAL(donors.size(), 1); + const auto& donor = donors.front(); + + bool requestVdiskNotYet = false; + bool fastRequestToDonor = false; + bool asyncRequestToDonor = false; + + const auto& checkRequestToDonor = [&](std::unique_ptr& ev, const NKikimrBlobStorage::EGetHandleClass& handleClass, bool& requestExist) { + auto* msg = ev->Get(); + if (msg->Record.ExtremeQueriesSize() != 1) { + return; + } + const auto& query = msg->Record.GetExtremeQueries(0); + const auto& blobId = LogoBlobIDFromLogoBlobID(query.GetId()); + const auto& slotId = donor.GetVSlotId(); + const auto& donorActorId = MakeBlobStorageVDiskID(slotId.GetNodeId(), slotId.GetPDiskId(), slotId.GetVSlotId()); + + if (blobId == vdiskLogoBlobId && + ev->Recipient == donorActorId && + msg->Record.GetHandleClass() == handleClass) { + UNIT_ASSERT(!requestExist); + requestExist = true; + } + return; + }; + + // Check disk answer TEvEnrichNotYet and request FastRead from donor for online read + env.Runtime->FilterFunction = [&](ui32 nodeId, std::unique_ptr& ev) { + Y_UNUSED(nodeId); + if (ev->GetTypeRewrite() == TEvBlobStorage::EvEnrichNotYet) { + UNIT_ASSERT(!requestVdiskNotYet); + auto msg = ev->Get()->Query.Get()->Get(); + UNIT_ASSERT_VALUES_EQUAL(msg->Record.ExtremeQueriesSize(), 1); + const auto& query = msg->Record.GetExtremeQueries(0); + const auto& vdid = VDiskIDFromVDiskID(msg->Record.GetVDiskID()); + const auto& blobId = LogoBlobIDFromLogoBlobID(query.GetId()); + UNIT_ASSERT(vdid.SameExceptGeneration(vdiskId)); + UNIT_ASSERT_VALUES_EQUAL(vdid.GroupGeneration, 2); + UNIT_ASSERT_VALUES_EQUAL(blobId, vdiskLogoBlobId); + requestVdiskNotYet = true; + } + + if (ev->GetTypeRewrite() == TEvBlobStorage::EvVGet) { + checkRequestToDonor(ev, NKikimrBlobStorage::EGetHandleClass::FastRead, fastRequestToDonor); + } + return true; + }; + + // Get blob + { + auto ev = new TEvBlobStorage::TEvGet(logoBlobId, 0, 0, TInstant::Max(), NKikimrBlobStorage::EGetHandleClass::FastRead); + runtime->WrapInActorContext(edge, [&] {SendToBSProxy(edge, groupId, ev);}); + auto res = env.WaitForEdgeActorEvent(edge, false); + UNIT_ASSERT_VALUES_EQUAL(res->Get()->Status, NKikimrProto::OK); + UNIT_ASSERT(requestVdiskNotYet); + UNIT_ASSERT(fastRequestToDonor); + } + + // Check disk request AsyncRead from donor for replication + env.Runtime->FilterFunction = [&](ui32 nodeId, std::unique_ptr& ev) { + Y_UNUSED(nodeId); + if (ev->GetTypeRewrite() == TEvBlobStorage::EvVGet) { + checkRequestToDonor(ev, NKikimrBlobStorage::EGetHandleClass::AsyncRead, asyncRequestToDonor); + } + return true; + }; + + // Start replication + env.CommenceReplication(); + UNIT_ASSERT(asyncRequestToDonor); + } } diff --git a/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.cpp b/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.cpp index 5e624869cd4b..c617f21aaa70 100644 --- a/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.cpp +++ b/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.cpp @@ -151,7 +151,7 @@ namespace NKikimr { struct TDonorQueueItem { TVDiskID VDiskId; - TActorId QueueActorId; + TDonorQueueActors QueueActors; ui32 NodeId; ui32 PDiskId; ui32 VSlotId; @@ -176,7 +176,7 @@ namespace NKikimr { TMilestoneQueue MilestoneQueue; TActorId ReplJobActorId; std::list> DonorQueue; - std::deque> Donors; + std::deque> Donors; std::set ConnectedPeerDisks, ConnectedDonorDisks; TEvResumeForce *ResumeForceToken = nullptr; TInstant ReplicationEndTime; @@ -222,23 +222,32 @@ namespace NKikimr { for (const auto& [vdiskId, vdiskActorId] : ReplCtx->VDiskCfg->BaseInfo.DonorDiskIds) { TIntrusivePtr flowRecord(new NBackpressure::TFlowRecord); auto info = MakeIntrusive(ReplCtx->GInfo, vdiskId, vdiskActorId); - const TActorId queueActorId = Register(CreateVDiskBackpressureClient(info, vdiskId, + const TActorId asyncReadQueueActorId = Register(CreateVDiskBackpressureClient(info, vdiskId, NKikimrBlobStorage::EVDiskQueueId::GetAsyncRead, ReplCtx->MonGroup.GetGroup(), ReplCtx->VCtx, - NBackpressure::TQueueClientId(NBackpressure::EQueueClientType::ReplJob, 0), "Donor", + NBackpressure::TQueueClientId(NBackpressure::EQueueClientType::ReplJob, 0), "ReplicationDonor", + ReplCtx->VDiskCfg->ReplInterconnectChannel, vdiskActorId.NodeId() == SelfId().NodeId(), + TDuration::Minutes(1), flowRecord, NMonitoring::TCountableBase::EVisibility::Private)); + + const TActorId fastReadQueueActorId = Register(CreateVDiskBackpressureClient(info, vdiskId, + NKikimrBlobStorage::EVDiskQueueId::GetFastRead, ReplCtx->MonGroup.GetGroup(), ReplCtx->VCtx, + NBackpressure::TQueueClientId(NBackpressure::EQueueClientType::ReplJob, 0), "OnlineReadDonor", ReplCtx->VDiskCfg->ReplInterconnectChannel, vdiskActorId.NodeId() == SelfId().NodeId(), TDuration::Minutes(1), flowRecord, NMonitoring::TCountableBase::EVisibility::Private)); ui32 nodeId, pdiskId, vslotId; std::tie(nodeId, pdiskId, vslotId) = DecomposeVDiskServiceId(vdiskActorId); DonorQueue.emplace_back(TDonorQueueItem{ .VDiskId = vdiskId, - .QueueActorId = queueActorId, + .QueueActors = TDonorQueueActors{ + .AsyncReadQueueActorId = asyncReadQueueActorId, + .FastReadQueueActorId = fastReadQueueActorId + }, .NodeId = nodeId, .PDiskId = pdiskId, .VSlotId = vslotId, .NotReady = false, .NotReadyCount = 0 }); - Donors.emplace_back(vdiskId, queueActorId); + Donors.emplace_back(vdiskId, TDonorQueueActors(asyncReadQueueActorId, fastReadQueueActorId)); } DonorQueue.emplace_back(std::nullopt); // disks from group @@ -358,8 +367,10 @@ namespace NKikimr { } void DropDonor(const TDonorQueueItem& donor) { - Donors.erase(std::find(Donors.begin(), Donors.end(), std::make_pair(donor.VDiskId, donor.QueueActorId))); - Send(donor.QueueActorId, new TEvents::TEvPoison); // kill the queue actor + Donors.erase(std::find(Donors.begin(), Donors.end(), std::make_pair(donor.VDiskId, + TDonorQueueActors(donor.QueueActors.AsyncReadQueueActorId, donor.QueueActors.FastReadQueueActorId)))); + Send(donor.QueueActors.AsyncReadQueueActorId, new TEvents::TEvPoison); // kill the queue actor + Send(donor.QueueActors.FastReadQueueActorId, new TEvents::TEvPoison); // kill the queue actor Send(MakeBlobStorageNodeWardenID(SelfId().NodeId()), new TEvBlobStorage::TEvDropDonor(donor.NodeId, donor.PDiskId, donor.VSlotId, donor.VDiskId)); } @@ -520,7 +531,7 @@ namespace NKikimr { donor->NodeId << ":" << donor->PDiskId << ":" << donor->VSlotId << "}") : "generic")); ReplJobActorId = Register(CreateReplJobActor(ReplCtx, SelfId(), from, QueueActorMapPtr, BlobsToReplicatePtr, UnreplicatedBlobsPtr, donor ? std::make_optional(std::make_pair( - donor->VDiskId, donor->QueueActorId)) : std::nullopt, std::move(UnreplicatedBlobRecords), + donor->VDiskId, donor->QueueActors.AsyncReadQueueActorId)) : std::nullopt, std::move(UnreplicatedBlobRecords), std::move(MilestoneQueue))); } @@ -694,7 +705,8 @@ namespace NKikimr { } for (const auto& donor : DonorQueue) { if (donor) { - Send(donor->QueueActorId, new TEvents::TEvPoison); + Send(donor->QueueActors.AsyncReadQueueActorId, new TEvents::TEvPoison); + Send(donor->QueueActors.FastReadQueueActorId, new TEvents::TEvPoison); } } for (const TActorId& actorId : DonorQueryActors) { diff --git a/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.h b/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.h index 8a7fb888dfb6..1f15622965ee 100644 --- a/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.h +++ b/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.h @@ -250,6 +250,15 @@ namespace NKikimr { struct TEvReplCheckProgress : TEventLocal {}; + struct TDonorQueueActors { + TActorId AsyncReadQueueActorId; + TActorId FastReadQueueActorId; + + bool operator==(const TDonorQueueActors &other) const { + return AsyncReadQueueActorId == other.AsyncReadQueueActorId && FastReadQueueActorId == other.FastReadQueueActorId; + } + }; + //////////////////////////////////////////////////////////////////////////// // REPL ACTOR CREATOR //////////////////////////////////////////////////////////////////////////// diff --git a/ydb/core/blobstorage/vdisk/repl/query_donor.h b/ydb/core/blobstorage/vdisk/repl/query_donor.h index 41ba61551d46..3c34246b386d 100644 --- a/ydb/core/blobstorage/vdisk/repl/query_donor.h +++ b/ydb/core/blobstorage/vdisk/repl/query_donor.h @@ -10,12 +10,12 @@ namespace NKikimr { const ui64 Cookie; std::unique_ptr Result; TActorId ParentId; - std::deque> Donors; + std::deque> Donors; TDynBitMap UnresolvedItems; TIntrusivePtr VCtx; public: - TDonorQueryActor(TEvBlobStorage::TEvEnrichNotYet& msg, std::deque> donors, const TIntrusivePtr& vCtx) + TDonorQueryActor(TEvBlobStorage::TEvEnrichNotYet& msg, std::deque> donors, const TIntrusivePtr& vCtx) : Query(msg.Query->Release().Release()) , Sender(msg.Query->Sender) , Cookie(msg.Query->Cookie) @@ -45,7 +45,7 @@ namespace NKikimr { return PassAway(); } - auto [vdiskId, actorId] = Donors.back(); + auto [vdiskId, actors] = Donors.back(); Donors.pop_back(); // we use AsyncRead priority as we are going to use the replication queue for the VDisk; also this doesn't @@ -57,7 +57,13 @@ namespace NKikimr { const auto flags = record.GetShowInternals() ? TEvBlobStorage::TEvVGet::EFlags::ShowInternals : TEvBlobStorage::TEvVGet::EFlags::None; - auto query = fun(vdiskId, TInstant::Max(), NKikimrBlobStorage::EGetHandleClass::AsyncRead, flags, {}, {}, std::nullopt); + const auto handleClass = record.GetHandleClass() == NKikimrBlobStorage::EGetHandleClass::FastRead + ? NKikimrBlobStorage::EGetHandleClass::FastRead + : NKikimrBlobStorage::EGetHandleClass::AsyncRead; + const auto queueActorId = record.GetHandleClass() == NKikimrBlobStorage::EGetHandleClass::FastRead + ? actors.FastReadQueueActorId + : actors.AsyncReadQueueActorId; + auto query = fun(vdiskId, TInstant::Max(), handleClass, flags, {}, {}, std::nullopt); bool action = false; Y_FOR_EACH_BIT(i, UnresolvedItems) { @@ -69,8 +75,8 @@ namespace NKikimr { if (action) { LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::BS_VDISK_GET, SelfId() << " sending " << query->ToString() - << " to " << actorId); - Send(actorId, query.release(), IEventHandle::FlagTrackDelivery); + << " to " << queueActorId); + Send(queueActorId, query.release(), IEventHandle::FlagTrackDelivery); } else { PassAway(); }