@@ -320,4 +320,161 @@ Y_UNIT_TEST_SUITE(Donor) {
320320 }
321321 // env.Sim(TDuration::Seconds(10));
322322 }
323+
324+ void CheckHasDonor (TEnvironmentSetup& env, const TActorId& vdiskActorId, const TVDiskID& vdiskId) {
325+ auto baseConfig = env.FetchBaseConfig ();
326+ bool found = false ;
327+ for (const auto & slot : baseConfig.GetVSlot ()) {
328+ if (slot.DonorsSize ()) {
329+ UNIT_ASSERT (!found);
330+ UNIT_ASSERT_VALUES_EQUAL (slot.DonorsSize (), 1 );
331+ const auto & donor = slot.GetDonors (0 );
332+ const auto & id = donor.GetVSlotId ();
333+ UNIT_ASSERT_VALUES_EQUAL (vdiskActorId, MakeBlobStorageVDiskID (id.GetNodeId (), id.GetPDiskId (), id.GetVSlotId ()));
334+ UNIT_ASSERT_VALUES_EQUAL (VDiskIDFromVDiskID (donor.GetVDiskId ()), vdiskId);
335+ found = true ;
336+ }
337+ }
338+ UNIT_ASSERT (found);
339+ }
340+
341+ TVector<NKikimrBlobStorage::TBaseConfig_TVSlot_TDonorDisk> GetDonors (TEnvironmentSetup& env, const TVDiskID& vdiskId) {
342+ TVector<NKikimrBlobStorage::TBaseConfig_TVSlot_TDonorDisk> result;
343+ const auto & baseConfig = env.FetchBaseConfig ();
344+ for (const auto & slot : baseConfig.GetVSlot ()) {
345+ for (size_t donorId = 0 ; donorId < slot.DonorsSize (); ++donorId) {
346+ const auto & donor = slot.GetDonors (donorId);
347+ if (VDiskIDFromVDiskID (donor.GetVDiskId ()) == vdiskId) {
348+ result.push_back (donor);
349+ }
350+ }
351+ }
352+ return result;
353+ }
354+
355+ Y_UNIT_TEST (CheckOnlineReadRequestToDonor) {
356+ TEnvironmentSetup env{TEnvironmentSetup::TSettings{
357+ .NodeCount = 8 ,
358+ .VDiskReplPausedAtStart = true ,
359+ .Erasure = TBlobStorageGroupType::Erasure4Plus2Block,
360+ }};
361+ auto & runtime = env.Runtime ;
362+
363+ env.EnableDonorMode ();
364+ env.CreateBoxAndPool (2 , 1 );
365+ env.CommenceReplication ();
366+ env.Sim (TDuration::Seconds (30 ));
367+
368+ const ui32 groupId = env.GetGroups ().front ();
369+
370+ const TActorId edge = runtime->AllocateEdgeActor (1 , __FILE__, __LINE__);
371+ const TString buffer = TString (2_MB, ' b' );
372+ TLogoBlobID logoBlobId (1 , 1 , 0 , 0 , buffer.size (), 0 );
373+ TVDiskID vdiskId;
374+ bool vdiskIdWithBlobSet = false ;
375+ TLogoBlobID vdiskLogoBlobId;
376+
377+ // Put blob and find vdisk with it and partId = 1
378+ {
379+ env.Runtime ->FilterFunction = [&](ui32 nodeId, std::unique_ptr<IEventHandle>& ev) {
380+ if (ev->GetTypeRewrite () == TEvBlobStorage::EvVPut) {
381+ Y_UNUSED (nodeId);
382+ auto * msg = ev->Get <TEvBlobStorage::TEvVPut>();
383+ const auto & blobId = LogoBlobIDFromLogoBlobID (msg->Record .GetBlobID ());
384+ if (blobId.IsSameBlob (logoBlobId) && blobId.PartId () == 1 && !vdiskIdWithBlobSet) {
385+ vdiskId = VDiskIDFromVDiskID (msg->Record .GetVDiskID ());
386+ vdiskLogoBlobId = blobId;
387+ vdiskIdWithBlobSet = true ;
388+ } else {
389+ }
390+ }
391+ return true ;
392+ };
393+
394+ runtime->WrapInActorContext (edge, [&] {
395+ SendToBSProxy (edge, groupId, new TEvBlobStorage::TEvPut (logoBlobId, buffer, TInstant::Max ()));
396+ });
397+ auto res = env.WaitForEdgeActorEvent <TEvBlobStorage::TEvPutResult>(edge, false );
398+ UNIT_ASSERT_VALUES_EQUAL (res->Get ()->Status , NKikimrProto::OK);
399+ UNIT_ASSERT (vdiskIdWithBlobSet);
400+ }
401+
402+ auto info = env.GetGroupInfo (groupId);
403+ const TActorId& vdiskActorId = info->GetActorId (vdiskId);
404+
405+ // Move slot out from disk and finf donor
406+ env.SettlePDisk (vdiskActorId);
407+ CheckHasDonor (env, vdiskActorId, vdiskId);
408+ const auto & donors = GetDonors (env, vdiskId);
409+ UNIT_ASSERT_VALUES_EQUAL (donors.size (), 1 );
410+ const auto & donor = donors.front ();
411+
412+ bool requestVdiskNotYet = false ;
413+ bool fastRequestToDonor = false ;
414+ bool asyncRequestToDonor = false ;
415+
416+ const auto & checkRequestToDonor = [&](std::unique_ptr<IEventHandle>& ev, const NKikimrBlobStorage::EGetHandleClass& handleClass, bool & requestExist) {
417+ auto * msg = ev->Get <TEvBlobStorage::TEvVGet>();
418+ if (msg->Record .ExtremeQueriesSize () != 1 ) {
419+ return ;
420+ }
421+ const auto & query = msg->Record .GetExtremeQueries (0 );
422+ const auto & blobId = LogoBlobIDFromLogoBlobID (query.GetId ());
423+ const auto & slotId = donor.GetVSlotId ();
424+ const auto & donorActorId = MakeBlobStorageVDiskID (slotId.GetNodeId (), slotId.GetPDiskId (), slotId.GetVSlotId ());
425+
426+ if (blobId == vdiskLogoBlobId &&
427+ ev->Recipient == donorActorId &&
428+ msg->Record .GetHandleClass () == handleClass) {
429+ UNIT_ASSERT (!requestExist);
430+ requestExist = true ;
431+ }
432+ return ;
433+ };
434+
435+ // Check disk answer TEvEnrichNotYet and request FastRead from donor for online read
436+ env.Runtime ->FilterFunction = [&](ui32 nodeId, std::unique_ptr<IEventHandle>& ev) {
437+ Y_UNUSED (nodeId);
438+ if (ev->GetTypeRewrite () == TEvBlobStorage::EvEnrichNotYet) {
439+ UNIT_ASSERT (!requestVdiskNotYet);
440+ auto msg = ev->Get <TEvBlobStorage::TEvEnrichNotYet>()->Query .Get ()->Get ();
441+ UNIT_ASSERT_VALUES_EQUAL (msg->Record .ExtremeQueriesSize (), 1 );
442+ const auto & query = msg->Record .GetExtremeQueries (0 );
443+ const auto & vdid = VDiskIDFromVDiskID (msg->Record .GetVDiskID ());
444+ const auto & blobId = LogoBlobIDFromLogoBlobID (query.GetId ());
445+ UNIT_ASSERT (vdid.SameExceptGeneration (vdiskId));
446+ UNIT_ASSERT_VALUES_EQUAL (vdid.GroupGeneration , 2 );
447+ UNIT_ASSERT_VALUES_EQUAL (blobId, vdiskLogoBlobId);
448+ requestVdiskNotYet = true ;
449+ }
450+
451+ if (ev->GetTypeRewrite () == TEvBlobStorage::EvVGet) {
452+ checkRequestToDonor (ev, NKikimrBlobStorage::EGetHandleClass::FastRead, fastRequestToDonor);
453+ }
454+ return true ;
455+ };
456+
457+ // Get blob
458+ {
459+ auto ev = new TEvBlobStorage::TEvGet (logoBlobId, 0 , 0 , TInstant::Max (), NKikimrBlobStorage::EGetHandleClass::FastRead);
460+ runtime->WrapInActorContext (edge, [&] {SendToBSProxy (edge, groupId, ev);});
461+ auto res = env.WaitForEdgeActorEvent <TEvBlobStorage::TEvGetResult>(edge, false );
462+ UNIT_ASSERT_VALUES_EQUAL (res->Get ()->Status , NKikimrProto::OK);
463+ UNIT_ASSERT (requestVdiskNotYet);
464+ UNIT_ASSERT (fastRequestToDonor);
465+ }
466+
467+ // Check disk request AsyncRead from donor for replication
468+ env.Runtime ->FilterFunction = [&](ui32 nodeId, std::unique_ptr<IEventHandle>& ev) {
469+ Y_UNUSED (nodeId);
470+ if (ev->GetTypeRewrite () == TEvBlobStorage::EvVGet) {
471+ checkRequestToDonor (ev, NKikimrBlobStorage::EGetHandleClass::AsyncRead, asyncRequestToDonor);
472+ }
473+ return true ;
474+ };
475+
476+ // Start replication
477+ env.CommenceReplication ();
478+ UNIT_ASSERT (asyncRequestToDonor);
479+ }
323480}
0 commit comments