Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 142 additions & 0 deletions ydb/core/blobstorage/ut_blobstorage/donor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -483,4 +483,146 @@ Y_UNIT_TEST_SUITE(Donor) {
}
// env.Sim(TDuration::Seconds(10));
}

TVector<NKikimrBlobStorage::TBaseConfig_TVSlot_TDonorDisk> GetDonors(TEnvironmentSetup& env, const TVDiskID& vdiskId) {
TVector<NKikimrBlobStorage::TBaseConfig_TVSlot_TDonorDisk> result;
const auto& baseConfig = env.FetchBaseConfig();
for (const auto& slot : baseConfig.GetVSlot()) {
for (size_t donorId = 0; donorId < slot.DonorsSize(); ++donorId) {
const auto& donor = slot.GetDonors(donorId);
if (VDiskIDFromVDiskID(donor.GetVDiskId()) == vdiskId) {
result.push_back(donor);
}
}
}
return result;
}

Y_UNIT_TEST(CheckOnlineReadRequestToDonor) {
TEnvironmentSetup env{{
.NodeCount = 8,
.VDiskReplPausedAtStart = true,
.Erasure = TBlobStorageGroupType::Erasure4Plus2Block,
.ReplMaxQuantumBytes = 1 << 20,
.ReplMaxDonorNotReadyCount = 2
}};
auto& runtime = env.Runtime;

env.EnableDonorMode();
env.CreateBoxAndPool(2, 1);
env.CommenceReplication();
env.Sim(TDuration::Seconds(30));

const ui32 groupId = env.GetGroups().front();

const TActorId edge = runtime->AllocateEdgeActor(1, __FILE__, __LINE__);
const TString buffer = TString(2_MB, 'b');
TLogoBlobID logoBlobId(1, 1, 0, 0, buffer.size(), 0);
TVDiskID vdiskId;
bool vdiskIdWithBlobSet = false;
TLogoBlobID vdiskLogoBlobId;

// Put blob and find vdisk with it and partId = 1
{
env.Runtime->FilterFunction = [&](ui32 nodeId, std::unique_ptr<IEventHandle>& ev) {
if (ev->GetTypeRewrite() == TEvBlobStorage::EvVPut) {
Y_UNUSED(nodeId);
auto* msg = ev->Get<TEvBlobStorage::TEvVPut>();
const auto& blobId = LogoBlobIDFromLogoBlobID(msg->Record.GetBlobID());
if (blobId.IsSameBlob(logoBlobId) && blobId.PartId() == 1 && !vdiskIdWithBlobSet) {
vdiskId = VDiskIDFromVDiskID(msg->Record.GetVDiskID());
vdiskLogoBlobId = blobId;
vdiskIdWithBlobSet = true;
} else {
}
}
return true;
};

runtime->WrapInActorContext(edge, [&] {
SendToBSProxy(edge, groupId, new TEvBlobStorage::TEvPut(logoBlobId, buffer, TInstant::Max()));
});
auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvPutResult>(edge, false);
UNIT_ASSERT_VALUES_EQUAL(res->Get()->Status, NKikimrProto::OK);
UNIT_ASSERT(vdiskIdWithBlobSet);
}

auto info = env.GetGroupInfo(groupId);
const TActorId& vdiskActorId = info->GetActorId(vdiskId);

// Move slot out from disk and finf donor
env.SettlePDisk(vdiskActorId);
CheckHasDonor(env, vdiskActorId, vdiskId);
const auto& donors = GetDonors(env, vdiskId);
UNIT_ASSERT_VALUES_EQUAL(donors.size(), 1);
const auto& donor = donors.front();

bool requestVdiskNotYet = false;
bool fastRequestToDonor = false;
bool asyncRequestToDonor = false;

const auto& checkRequestToDonor = [&](std::unique_ptr<IEventHandle>& ev, const NKikimrBlobStorage::EGetHandleClass& handleClass, bool& requestExist) {
auto* msg = ev->Get<TEvBlobStorage::TEvVGet>();
if (msg->Record.ExtremeQueriesSize() != 1) {
return;
}
const auto& query = msg->Record.GetExtremeQueries(0);
const auto& blobId = LogoBlobIDFromLogoBlobID(query.GetId());
const auto& slotId = donor.GetVSlotId();
const auto& donorActorId = MakeBlobStorageVDiskID(slotId.GetNodeId(), slotId.GetPDiskId(), slotId.GetVSlotId());

if (blobId == vdiskLogoBlobId &&
ev->Recipient == donorActorId &&
msg->Record.GetHandleClass() == handleClass) {
UNIT_ASSERT(!requestExist);
requestExist = true;
}
return;
};

// Check disk answer TEvEnrichNotYet and request FastRead from donor for online read
env.Runtime->FilterFunction = [&](ui32 nodeId, std::unique_ptr<IEventHandle>& ev) {
Y_UNUSED(nodeId);
if (ev->GetTypeRewrite() == TEvBlobStorage::EvEnrichNotYet) {
UNIT_ASSERT(!requestVdiskNotYet);
auto msg = ev->Get<TEvBlobStorage::TEvEnrichNotYet>()->Query.Get()->Get();
UNIT_ASSERT_VALUES_EQUAL(msg->Record.ExtremeQueriesSize(), 1);
const auto& query = msg->Record.GetExtremeQueries(0);
const auto& vdid = VDiskIDFromVDiskID(msg->Record.GetVDiskID());
const auto& blobId = LogoBlobIDFromLogoBlobID(query.GetId());
UNIT_ASSERT(vdid.SameExceptGeneration(vdiskId));
UNIT_ASSERT_VALUES_EQUAL(vdid.GroupGeneration, 2);
UNIT_ASSERT_VALUES_EQUAL(blobId, vdiskLogoBlobId);
requestVdiskNotYet = true;
}

if (ev->GetTypeRewrite() == TEvBlobStorage::EvVGet) {
checkRequestToDonor(ev, NKikimrBlobStorage::EGetHandleClass::FastRead, fastRequestToDonor);
}
return true;
};

// Get blob
{
auto ev = new TEvBlobStorage::TEvGet(logoBlobId, 0, 0, TInstant::Max(), NKikimrBlobStorage::EGetHandleClass::FastRead);
runtime->WrapInActorContext(edge, [&] {SendToBSProxy(edge, groupId, ev);});
auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvGetResult>(edge, false);
UNIT_ASSERT_VALUES_EQUAL(res->Get()->Status, NKikimrProto::OK);
UNIT_ASSERT(requestVdiskNotYet);
UNIT_ASSERT(fastRequestToDonor);
}

// Check disk request AsyncRead from donor for replication
env.Runtime->FilterFunction = [&](ui32 nodeId, std::unique_ptr<IEventHandle>& ev) {
Y_UNUSED(nodeId);
if (ev->GetTypeRewrite() == TEvBlobStorage::EvVGet) {
checkRequestToDonor(ev, NKikimrBlobStorage::EGetHandleClass::AsyncRead, asyncRequestToDonor);
}
return true;
};

// Start replication
env.CommenceReplication();
UNIT_ASSERT(asyncRequestToDonor);
}
}
32 changes: 22 additions & 10 deletions ydb/core/blobstorage/vdisk/repl/blobstorage_repl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ namespace NKikimr {

struct TDonorQueueItem {
TVDiskID VDiskId;
TActorId QueueActorId;
TDonorQueueActors QueueActors;
ui32 NodeId;
ui32 PDiskId;
ui32 VSlotId;
Expand All @@ -176,7 +176,7 @@ namespace NKikimr {
TMilestoneQueue MilestoneQueue;
TActorId ReplJobActorId;
std::list<std::optional<TDonorQueueItem>> DonorQueue;
std::deque<std::pair<TVDiskID, TActorId>> Donors;
std::deque<std::pair<TVDiskID, TDonorQueueActors>> Donors;
std::set<TVDiskID> ConnectedPeerDisks, ConnectedDonorDisks;
TEvResumeForce *ResumeForceToken = nullptr;
TInstant ReplicationEndTime;
Expand Down Expand Up @@ -222,23 +222,32 @@ namespace NKikimr {
for (const auto& [vdiskId, vdiskActorId] : ReplCtx->VDiskCfg->BaseInfo.DonorDiskIds) {
TIntrusivePtr<NBackpressure::TFlowRecord> flowRecord(new NBackpressure::TFlowRecord);
auto info = MakeIntrusive<TBlobStorageGroupInfo>(ReplCtx->GInfo, vdiskId, vdiskActorId);
const TActorId queueActorId = Register(CreateVDiskBackpressureClient(info, vdiskId,
const TActorId asyncReadQueueActorId = Register(CreateVDiskBackpressureClient(info, vdiskId,
NKikimrBlobStorage::EVDiskQueueId::GetAsyncRead, ReplCtx->MonGroup.GetGroup(), ReplCtx->VCtx,
NBackpressure::TQueueClientId(NBackpressure::EQueueClientType::ReplJob, 0), "Donor",
NBackpressure::TQueueClientId(NBackpressure::EQueueClientType::ReplJob, 0), "ReplicationDonor",
ReplCtx->VDiskCfg->ReplInterconnectChannel, vdiskActorId.NodeId() == SelfId().NodeId(),
TDuration::Minutes(1), flowRecord, NMonitoring::TCountableBase::EVisibility::Private));

const TActorId fastReadQueueActorId = Register(CreateVDiskBackpressureClient(info, vdiskId,
NKikimrBlobStorage::EVDiskQueueId::GetFastRead, ReplCtx->MonGroup.GetGroup(), ReplCtx->VCtx,
NBackpressure::TQueueClientId(NBackpressure::EQueueClientType::ReplJob, 0), "OnlineReadDonor",
ReplCtx->VDiskCfg->ReplInterconnectChannel, vdiskActorId.NodeId() == SelfId().NodeId(),
TDuration::Minutes(1), flowRecord, NMonitoring::TCountableBase::EVisibility::Private));
ui32 nodeId, pdiskId, vslotId;
std::tie(nodeId, pdiskId, vslotId) = DecomposeVDiskServiceId(vdiskActorId);
DonorQueue.emplace_back(TDonorQueueItem{
.VDiskId = vdiskId,
.QueueActorId = queueActorId,
.QueueActors = TDonorQueueActors{
.AsyncReadQueueActorId = asyncReadQueueActorId,
.FastReadQueueActorId = fastReadQueueActorId
},
.NodeId = nodeId,
.PDiskId = pdiskId,
.VSlotId = vslotId,
.NotReady = false,
.NotReadyCount = 0
});
Donors.emplace_back(vdiskId, queueActorId);
Donors.emplace_back(vdiskId, TDonorQueueActors(asyncReadQueueActorId, fastReadQueueActorId));
}
DonorQueue.emplace_back(std::nullopt); // disks from group

Expand Down Expand Up @@ -358,8 +367,10 @@ namespace NKikimr {
}

void DropDonor(const TDonorQueueItem& donor) {
Donors.erase(std::find(Donors.begin(), Donors.end(), std::make_pair(donor.VDiskId, donor.QueueActorId)));
Send(donor.QueueActorId, new TEvents::TEvPoison); // kill the queue actor
Donors.erase(std::find(Donors.begin(), Donors.end(), std::make_pair(donor.VDiskId,
TDonorQueueActors(donor.QueueActors.AsyncReadQueueActorId, donor.QueueActors.FastReadQueueActorId))));
Send(donor.QueueActors.AsyncReadQueueActorId, new TEvents::TEvPoison); // kill the queue actor
Send(donor.QueueActors.FastReadQueueActorId, new TEvents::TEvPoison); // kill the queue actor
Send(MakeBlobStorageNodeWardenID(SelfId().NodeId()), new TEvBlobStorage::TEvDropDonor(donor.NodeId,
donor.PDiskId, donor.VSlotId, donor.VDiskId));
}
Expand Down Expand Up @@ -520,7 +531,7 @@ namespace NKikimr {
donor->NodeId << ":" << donor->PDiskId << ":" << donor->VSlotId << "}") : "generic"));
ReplJobActorId = Register(CreateReplJobActor(ReplCtx, SelfId(), from, QueueActorMapPtr,
BlobsToReplicatePtr, UnreplicatedBlobsPtr, donor ? std::make_optional(std::make_pair(
donor->VDiskId, donor->QueueActorId)) : std::nullopt, std::move(UnreplicatedBlobRecords),
donor->VDiskId, donor->QueueActors.AsyncReadQueueActorId)) : std::nullopt, std::move(UnreplicatedBlobRecords),
std::move(MilestoneQueue)));
}

Expand Down Expand Up @@ -694,7 +705,8 @@ namespace NKikimr {
}
for (const auto& donor : DonorQueue) {
if (donor) {
Send(donor->QueueActorId, new TEvents::TEvPoison);
Send(donor->QueueActors.AsyncReadQueueActorId, new TEvents::TEvPoison);
Send(donor->QueueActors.FastReadQueueActorId, new TEvents::TEvPoison);
}
}
for (const TActorId& actorId : DonorQueryActors) {
Expand Down
9 changes: 9 additions & 0 deletions ydb/core/blobstorage/vdisk/repl/blobstorage_repl.h
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,15 @@ namespace NKikimr {

struct TEvReplCheckProgress : TEventLocal<TEvReplCheckProgress, TEvBlobStorage::EvReplCheckProgress> {};

struct TDonorQueueActors {
TActorId AsyncReadQueueActorId;
TActorId FastReadQueueActorId;

bool operator==(const TDonorQueueActors &other) const {
return AsyncReadQueueActorId == other.AsyncReadQueueActorId && FastReadQueueActorId == other.FastReadQueueActorId;
}
};

////////////////////////////////////////////////////////////////////////////
// REPL ACTOR CREATOR
////////////////////////////////////////////////////////////////////////////
Expand Down
18 changes: 12 additions & 6 deletions ydb/core/blobstorage/vdisk/repl/query_donor.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ namespace NKikimr {
const ui64 Cookie;
std::unique_ptr<TEvBlobStorage::TEvVGetResult> Result;
TActorId ParentId;
std::deque<std::pair<TVDiskID, TActorId>> Donors;
std::deque<std::pair<TVDiskID, TDonorQueueActors>> Donors;
TDynBitMap UnresolvedItems;
TIntrusivePtr<TVDiskContext> VCtx;

public:
TDonorQueryActor(TEvBlobStorage::TEvEnrichNotYet& msg, std::deque<std::pair<TVDiskID, TActorId>> donors, const TIntrusivePtr<TVDiskContext>& vCtx)
TDonorQueryActor(TEvBlobStorage::TEvEnrichNotYet& msg, std::deque<std::pair<TVDiskID, TDonorQueueActors>> donors, const TIntrusivePtr<TVDiskContext>& vCtx)
: Query(msg.Query->Release().Release())
, Sender(msg.Query->Sender)
, Cookie(msg.Query->Cookie)
Expand Down Expand Up @@ -45,7 +45,7 @@ namespace NKikimr {
return PassAway();
}

auto [vdiskId, actorId] = Donors.back();
auto [vdiskId, actors] = Donors.back();
Donors.pop_back();

// we use AsyncRead priority as we are going to use the replication queue for the VDisk; also this doesn't
Expand All @@ -57,7 +57,13 @@ namespace NKikimr {
const auto flags = record.GetShowInternals()
? TEvBlobStorage::TEvVGet::EFlags::ShowInternals
: TEvBlobStorage::TEvVGet::EFlags::None;
auto query = fun(vdiskId, TInstant::Max(), NKikimrBlobStorage::EGetHandleClass::AsyncRead, flags, {}, {}, std::nullopt);
const auto handleClass = record.GetHandleClass() == NKikimrBlobStorage::EGetHandleClass::FastRead
? NKikimrBlobStorage::EGetHandleClass::FastRead
: NKikimrBlobStorage::EGetHandleClass::AsyncRead;
const auto queueActorId = record.GetHandleClass() == NKikimrBlobStorage::EGetHandleClass::FastRead
? actors.FastReadQueueActorId
: actors.AsyncReadQueueActorId;
auto query = fun(vdiskId, TInstant::Max(), handleClass, flags, {}, {}, std::nullopt);

bool action = false;
Y_FOR_EACH_BIT(i, UnresolvedItems) {
Expand All @@ -69,8 +75,8 @@ namespace NKikimr {

if (action) {
LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::BS_VDISK_GET, SelfId() << " sending " << query->ToString()
<< " to " << actorId);
Send(actorId, query.release(), IEventHandle::FlagTrackDelivery);
<< " to " << queueActorId);
Send(queueActorId, query.release(), IEventHandle::FlagTrackDelivery);
} else {
PassAway();
}
Expand Down
Loading