diff --git a/ydb/core/blobstorage/pdisk/mock/pdisk_mock.cpp b/ydb/core/blobstorage/pdisk/mock/pdisk_mock.cpp index 06d2da56e615..7eb6f4036225 100644 --- a/ydb/core/blobstorage/pdisk/mock/pdisk_mock.cpp +++ b/ydb/core/blobstorage/pdisk/mock/pdisk_mock.cpp @@ -3,6 +3,7 @@ #include #include +#include #include namespace NKikimr { @@ -50,7 +51,12 @@ struct TPDiskMockState::TImpl { NPDisk::EDeviceType DeviceType; std::optional Metadata; - TImpl(ui32 nodeId, ui32 pdiskId, ui64 pdiskGuid, ui64 size, ui32 chunkSize, bool isDiskReadOnly, NPDisk::EDeviceType deviceType) + ESpaceColorPolicy SpaceColorPolicy; + std::shared_ptr ChunkSharedQuota; + double Occupancy = 0; + + TImpl(ui32 nodeId, ui32 pdiskId, ui64 pdiskGuid, ui64 size, ui32 chunkSize, bool isDiskReadOnly, NPDisk::EDeviceType deviceType, + ESpaceColorPolicy spaceColorPolicy) : NodeId(nodeId) , PDiskId(pdiskId) , PDiskGuid(pdiskGuid) @@ -62,7 +68,20 @@ struct TPDiskMockState::TImpl { , NextFreeChunk(1) , StatusFlags(NPDisk::TStatusFlags{}) , DeviceType(deviceType) - {} + , SpaceColorPolicy(spaceColorPolicy) + { + switch (SpaceColorPolicy) { + case ESpaceColorPolicy::SharedQuota: { + ChunkSharedQuota = std::make_shared(); + // 13% for CYAN is default value in prod + ChunkSharedQuota->ForceHardLimit(TotalChunks, NPDisk::TColorLimits::MakeChunkLimits(130)); + break; + } + case ESpaceColorPolicy::None: + default: + break; + } + } TImpl(const TImpl&) = default; @@ -76,6 +95,28 @@ struct TPDiskMockState::TImpl { } } + void UpdateStatusFlags() { + switch (SpaceColorPolicy) { + case ESpaceColorPolicy::SharedQuota: { + i64 before = ChunkSharedQuota->GetFree(); + i64 now = GetNumFreeChunks(); + if (before < now) { + ChunkSharedQuota->Release(now - before); + } else if (before > now) { + ChunkSharedQuota->ForceAllocate(before - now); + } + + NKikimrBlobStorage::TPDiskSpaceColor::E newColor = + ChunkSharedQuota->EstimateSpaceColor(0, &Occupancy); + SetStatusFlags(SpaceColorToStatusFlag(newColor)); + break; + } + case ESpaceColorPolicy::None: + default: + break; + } + } + ui32 AllocateChunk(TOwner& to) { ui32 chunkIdx = TotalChunks; @@ -88,7 +129,8 @@ struct TPDiskMockState::TImpl { to.ReservedChunks.insert(FreeChunks.extract(it)); } - Y_VERIFY(chunkIdx != TotalChunks); + Y_ABORT_UNLESS(chunkIdx != TotalChunks); + return chunkIdx; } @@ -172,6 +214,7 @@ struct TPDiskMockState::TImpl { for (const TChunkIdx chunkIdx : owner.ReservedChunks) { owner.ChunkData.erase(chunkIdx); } + FreeChunks.merge(owner.ReservedChunks); AdjustFreeChunks(); } @@ -188,7 +231,8 @@ struct TPDiskMockState::TImpl { Y_VERIFY(num); owner.ChunkData.erase(chunkIdx); const bool inserted = FreeChunks.insert(chunkIdx).second; - Y_VERIFY(inserted); + Y_ABORT_UNLESS(inserted); + AdjustFreeChunks(); } @@ -286,8 +330,9 @@ struct TPDiskMockState::TImpl { }; TPDiskMockState::TPDiskMockState(ui32 nodeId, ui32 pdiskId, ui64 pdiskGuid, ui64 size, ui32 chunkSize, bool isDiskReadOnly, - NPDisk::EDeviceType deviceType) - : TPDiskMockState(std::make_unique(nodeId, pdiskId, pdiskGuid, size, chunkSize, isDiskReadOnly, deviceType)) + NPDisk::EDeviceType deviceType, ESpaceColorPolicy spaceColorPolicy) + : TPDiskMockState(std::make_unique(nodeId, pdiskId, pdiskGuid, size, chunkSize, isDiskReadOnly, deviceType, + spaceColorPolicy)) {} TPDiskMockState::TPDiskMockState(std::unique_ptr&& impl) @@ -670,12 +715,14 @@ class TPDiskMockActor : public TActorBootstrapped { if (Impl.GetNumFreeChunks() < msg->SizeChunks) { PDISK_MOCK_LOG(NOTICE, PDM09, "received TEvChunkReserve", (Msg, msg->ToString()), (Error, "no free chunks")); res->Status = NKikimrProto::OUT_OF_SPACE; + res->StatusFlags = GetStatusFlags() | ui32(NKikimrBlobStorage::StatusNotEnoughDiskSpaceForOperation); res->ErrorReason = "no free chunks"; } else { PDISK_MOCK_LOG(DEBUG, PDM07, "received TEvChunkReserve", (Msg, msg->ToString()), (VDiskId, owner->VDiskId)); for (ui32 i = 0; i < msg->SizeChunks; ++i) { res->ChunkIds.push_back(Impl.AllocateChunk(*owner)); } + res->StatusFlags = GetStatusFlags(); PDISK_MOCK_LOG(DEBUG, PDM10, "sending TEvChunkReserveResult", (Msg, res->ToString())); } } @@ -742,9 +789,11 @@ class TPDiskMockActor : public TActorBootstrapped { if (!msg->ChunkIdx) { // allocate chunk if (!Impl.GetNumFreeChunks()) { res->Status = NKikimrProto::OUT_OF_SPACE; + res->StatusFlags = GetStatusFlags() | ui32(NKikimrBlobStorage::StatusNotEnoughDiskSpaceForOperation); res->ErrorReason = "no free chunks"; } else { msg->ChunkIdx = res->ChunkIdx = Impl.AllocateChunk(*owner); + res->StatusFlags = GetStatusFlags(); } } if (msg->ChunkIdx) { @@ -848,7 +897,7 @@ class TPDiskMockActor : public TActorBootstrapped { auto res = std::make_unique(NKikimrProto::OK, GetStatusFlags(), Impl.GetNumFreeChunks(), Impl.TotalChunks, Impl.TotalChunks - Impl.GetNumFreeChunks(), Impl.Owners.size(), TString()); - res->Occupancy = (double)res->UsedChunks / res->TotalChunks; + res->Occupancy = GetOccupancy(); Impl.FindOwner(msg, res); // to ensure correct owner/round Send(ev->Sender, res.release()); } @@ -883,9 +932,16 @@ class TPDiskMockActor : public TActorBootstrapped { } NPDisk::TStatusFlags GetStatusFlags() { + Impl.UpdateStatusFlags(); return Impl.StatusFlags; } + double GetOccupancy() { + return (Impl.Occupancy == 0) + ? ((double)(Impl.TotalChunks - Impl.GetNumFreeChunks()) / Impl.TotalChunks) + : Impl.Occupancy; + } + void ErrorHandle(NPDisk::TEvYardInit::TPtr &ev) { Send(ev->Sender, new NPDisk::TEvYardInitResult(NKikimrProto::CORRUPTED, State->GetStateErrorReason())); } diff --git a/ydb/core/blobstorage/pdisk/mock/pdisk_mock.h b/ydb/core/blobstorage/pdisk/mock/pdisk_mock.h index 2ab9c0e85384..de91065b792f 100644 --- a/ydb/core/blobstorage/pdisk/mock/pdisk_mock.h +++ b/ydb/core/blobstorage/pdisk/mock/pdisk_mock.h @@ -17,12 +17,19 @@ namespace NKikimr { std::unique_ptr Impl; friend class TPDiskMockActor; + public: + enum class ESpaceColorPolicy { + None = 0, + SharedQuota, + }; + public: using TPtr = TIntrusivePtr; public: TPDiskMockState(ui32 nodeId, ui32 pdiskId, ui64 pdiskGuid, ui64 size, ui32 chunkSize = 128 << 20, - bool isDiskReadOnly = false, NPDisk::EDeviceType deviceType = NPDisk::EDeviceType::DEVICE_TYPE_NVME); + bool isDiskReadOnly = false, NPDisk::EDeviceType deviceType = NPDisk::EDeviceType::DEVICE_TYPE_NVME, + ESpaceColorPolicy spaceColorPolicy = ESpaceColorPolicy::None); TPDiskMockState(std::unique_ptr&& impl); ~TPDiskMockState(); diff --git a/ydb/core/blobstorage/ut_blobstorage/lib/env.h b/ydb/core/blobstorage/ut_blobstorage/lib/env.h index 58aed6c51477..5c631dfdc72d 100644 --- a/ydb/core/blobstorage/ut_blobstorage/lib/env.h +++ b/ydb/core/blobstorage/ut_blobstorage/lib/env.h @@ -56,6 +56,9 @@ struct TEnvironmentSetup { const ui32 MaxNumOfSlowDisks = 2; const ui32 ReplMaxQuantumBytes = 0; const ui32 ReplMaxDonorNotReadyCount = 0; + const ui64 PDiskSize = 10_TB; + const ui64 PDiskChunkSize = 0; + const bool TrackSharedQuotaInPDiskMock = false; }; const TSettings Settings; @@ -73,8 +76,12 @@ struct TEnvironmentSetup { const auto key = std::make_pair(nodeId, pdiskId); TIntrusivePtr& state = Env.PDiskMockStates[key]; if (!state) { - state.Reset(new TPDiskMockState(nodeId, pdiskId, cfg->PDiskGuid, ui64(10) << 40, cfg->ChunkSize, - cfg->ReadOnly, Env.Settings.DiskType)); + ui64 chunkSize = Env.Settings.PDiskChunkSize ? Env.Settings.PDiskChunkSize : cfg->ChunkSize; + TPDiskMockState::ESpaceColorPolicy spaceColorPolicy = Env.Settings.TrackSharedQuotaInPDiskMock + ? TPDiskMockState::ESpaceColorPolicy::SharedQuota + : TPDiskMockState::ESpaceColorPolicy::None; + state.Reset(new TPDiskMockState(nodeId, pdiskId, cfg->PDiskGuid, Env.Settings.PDiskSize, chunkSize, + cfg->ReadOnly, Env.Settings.DiskType, spaceColorPolicy)); } const TActorId& actorId = ctx.Register(CreatePDiskMockActor(state), TMailboxType::HTSwap, poolId); const TActorId& serviceId = MakeBlobStoragePDiskID(nodeId, pdiskId); diff --git a/ydb/core/blobstorage/ut_blobstorage/replication.cpp b/ydb/core/blobstorage/ut_blobstorage/replication.cpp index e4922d6303b8..c4e54744a276 100644 --- a/ydb/core/blobstorage/ut_blobstorage/replication.cpp +++ b/ydb/core/blobstorage/ut_blobstorage/replication.cpp @@ -1,10 +1,16 @@ #include #include #include + #include +#include + +#include "ut_helpers.h" #define SINGLE_THREAD 1 +#define Ctest Cnull + enum class EState { OK, FORMAT, @@ -350,3 +356,227 @@ Y_UNIT_TEST_SUITE(Replication) { DoTestCase(TBlobStorageGroupType::ErasureMirror3dc, {E::OK, E::FORMAT, E::OK, E::OK, E::OFFLINE, E::OK, E::OK, E::OFFLINE, E::OK}, true); } } + +struct TTestCtx : public TTestCtxBase { +public: + TTestCtx(TBlobStorageGroupType erasure, ui64 pdiskSize, ui32 groupsCount) + : TTestCtxBase(TEnvironmentSetup::TSettings{ + .NodeCount = erasure.BlobSubgroupSize(), + .Erasure = erasure, + .PDiskSize = pdiskSize, + .PDiskChunkSize = 32_MB, + .TrackSharedQuotaInPDiskMock = true, + }) + , PDiskSize(pdiskSize) + , GroupsCount(groupsCount) + {} + + void Initialize() override { + Env->CreateBoxAndPool(GroupsCount, GroupsCount); + Env->Sim(TDuration::Minutes(1)); + + BaseConfig = Env->FetchBaseConfig(); + UNIT_ASSERT_VALUES_EQUAL(BaseConfig.GroupSize(), GroupsCount); + for (const auto& group : BaseConfig.GetGroup()) { + Groups.push_back(group.GetGroupId()); + } + + AllocateEdgeActor(); + for (const ui32 groupId : Groups) { + GetGroupStatus(groupId); + } + } + +public: + ui64 PDiskSize; + ui32 GroupsCount; + std::vector Groups; +}; + +Y_UNIT_TEST_SUITE(ReplicationSpace) { + + struct TVDiskStats { + double Occupancy; + bool IsReplicated; + }; + + TVDiskID VDiskIdFromVSlot(const NKikimrBlobStorage::TBaseConfig::TVSlot& vslot) { + return TVDiskID(vslot.GetGroupId(), vslot.GetGroupGeneration(), vslot.GetFailRealmIdx(), + vslot.GetFailDomainIdx(), vslot.GetVDiskIdx());; + } + + void TestSpace(ui64 diskSize, ui64 blobSize, float usedSpaceFraction, bool donorMode) { + TBlobStorageGroupType erasure = TBlobStorageGroupType::ErasureMirror3dc; + TTestCtx ctx(erasure, diskSize, 2); + ctx.Initialize(); + + // disable self-heal + ctx.Env->UpdateSettings(false, donorMode, false); + + ui64 perDiskDataSize = diskSize * usedSpaceFraction; + ui64 dataSize = perDiskDataSize; + + // assure that all groups are green + for (ui32 groupId : ctx.Groups) { + auto status = ctx.GetGroupStatus(groupId); + UNIT_ASSERT(status->Get()->Status == NKikimrProto::OK); + Ctest << "Group# " << groupId << " Status# " << status->Get()->ToString() << Endl; + UNIT_ASSERT(!status->Get()->StatusFlags.Check(NKikimrBlobStorage::StatusDiskSpaceCyan)); + } + + // write data + for (ui32 groupId : ctx.Groups) { + ctx.WriteCompressedData(TTestCtxBase::TDataProfile{ + .GroupId = groupId, + .TotalSize = dataSize, + .BlobSize = blobSize, + .DelayBetweenPuts = TDuration::Seconds(1), + .Erasure = erasure, + .CookieStrategy = TTestCtxBase::TDataProfile::ECookieStrategy::WithSamePlacement, + }); + Ctest << "Data written for group " << groupId << Endl; + } + + // wait for compaction to finish + ctx.Env->Sim(TDuration::Minutes(360)); + + // assure that all groups are green + for (ui32 groupId : ctx.Groups) { + auto status = ctx.GetGroupStatus(groupId); + UNIT_ASSERT(status->Get()->Status == NKikimrProto::OK); + Ctest << "Group# " << groupId << " Status# " << status->Get()->ToString() << Endl; + UNIT_ASSERT(!status->Get()->StatusFlags.Check(NKikimrBlobStorage::StatusDiskSpaceCyan)); + } + + auto getVDiskStats = [&](const TVDiskID& vdiskId) -> TVDiskStats { + double occupancy; + bool isReplicated; + ctx.Env->WithQueueId(vdiskId, NKikimrBlobStorage::EVDiskQueueId::PutTabletLog, [&](TActorId queueId) { + ctx.Env->Runtime->Send(new IEventHandle(queueId, ctx.Edge, new TEvBlobStorage::TEvVStatus()), queueId.NodeId()); + auto res = ctx.Env->WaitForEdgeActorEvent(ctx.Edge, false, TInstant::Max()); + occupancy = 1 - res->Get()->Record.GetApproximateFreeSpaceShare(); + isReplicated = res->Get()->Record.GetReplicated(); + }); + + return { occupancy, isReplicated }; + }; + + TVDiskID chosenVDiskId; + ui32 chosenPDiskId = 0; + ui32 chosenNodeId = 0; + + // reassign vdisk + { + // choose pdisk with low space + ctx.FetchBaseConfig(); + for (const auto& vslot : ctx.BaseConfig.GetVSlot()) { + if (vslot.GetGroupId() == ctx.Groups[0]) { + TVDiskStats stats = getVDiskStats(VDiskIdFromVSlot(vslot)); + Ctest << "VDisk# " << VDiskIdFromVSlot(vslot).ToString() << " " << stats.Occupancy << " " << stats.IsReplicated << Endl; + if (stats.Occupancy > 1 - usedSpaceFraction) { + chosenNodeId = vslot.GetVSlotId().GetNodeId(); + chosenPDiskId = vslot.GetVSlotId().GetPDiskId(); + break; + } + } + } + UNIT_ASSERT(chosenNodeId != 0); + + NKikimrBlobStorage::TConfigRequest request; + for (const auto& vslot : ctx.BaseConfig.GetVSlot()) { + if (vslot.GetGroupId() == ctx.Groups[1]) { + TVDiskID vdiskId = VDiskIdFromVSlot(vslot); + TVDiskStats stats = getVDiskStats(vdiskId); + if (stats.Occupancy > 1 - usedSpaceFraction) { + chosenVDiskId = vdiskId; + NKikimrBlobStorage::TReassignGroupDisk* cmd = request.AddCommand()->MutableReassignGroupDisk(); + cmd->SetGroupId(vslot.GetGroupId()); + cmd->SetGroupGeneration(vslot.GetGroupGeneration()); + cmd->SetFailRealmIdx(vslot.GetFailRealmIdx()); + cmd->SetFailDomainIdx(vslot.GetFailDomainIdx()); + cmd->SetVDiskIdx(vslot.GetVDiskIdx()); + auto* target = cmd->MutableTargetPDiskId(); + target->SetNodeId(chosenNodeId); + target->SetPDiskId(chosenPDiskId); + break; + } + } + } + auto res = ctx.Env->Invoke(request); + UNIT_ASSERT_C(res.GetSuccess(), res.GetErrorDescription()); + UNIT_ASSERT_C(res.GetStatus(0).GetSuccess(), res.GetStatus(0).GetErrorDescription()); + } + + Ctest << "Chosen PDisk# [" << chosenNodeId << ":" << chosenPDiskId << + "] chosen VDiskId# " << chosenVDiskId.ToString() << Endl; + + // wait for replication to stuck + ctx.Env->Sim(TDuration::Minutes(360)); + + // check that all groups are YELLOW at worst + for (ui32 groupId : ctx.Groups) { + auto status = ctx.GetGroupStatus(groupId); + UNIT_ASSERT(status->Get()->Status == NKikimrProto::OK); + Ctest << "Group# " << groupId << " Status# " << status->Get()->ToString() << Endl; + UNIT_ASSERT(!status->Get()->StatusFlags.Check(NKikimrBlobStorage::StatusDiskSpacePreOrange)); + } + + // disable donor mode to free space immediately + ctx.Env->UpdateSettings(false, false, false); + + // reassign second vdisk from chosen pdisk + { + NKikimrBlobStorage::TConfigRequest request; + for (const auto& vslot : ctx.BaseConfig.GetVSlot()) { + if (vslot.GetGroupId() == ctx.Groups[0]) { + if (vslot.GetVSlotId().GetNodeId() == chosenNodeId && vslot.GetVSlotId().GetPDiskId() == chosenPDiskId) { + NKikimrBlobStorage::TReassignGroupDisk* cmd = request.AddCommand()->MutableReassignGroupDisk(); + cmd->SetGroupId(vslot.GetGroupId()); + cmd->SetGroupGeneration(vslot.GetGroupGeneration()); + cmd->SetFailRealmIdx(vslot.GetFailRealmIdx()); + cmd->SetFailDomainIdx(vslot.GetFailDomainIdx()); + cmd->SetVDiskIdx(vslot.GetVDiskIdx()); + break; + } + } + } + auto res = ctx.Env->Invoke(request); + UNIT_ASSERT_C(res.GetSuccess(), res.GetErrorDescription()); + UNIT_ASSERT_C(res.GetStatus(0).GetSuccess(), res.GetStatus(0).GetErrorDescription()); + } + + Ctest << "Evicting second VDisk" << Endl; + + // wait for replication + ctx.Env->Sim(TDuration::Hours(12)); + + // check that chosen VDisk finished replication + { + ctx.FetchBaseConfig(); + for (const auto& vslot : ctx.BaseConfig.GetVSlot()) { + if (vslot.GetGroupId() == ctx.Groups[1]) { + TVDiskID vdiskId = VDiskIdFromVSlot(vslot); + TVDiskStats stats = getVDiskStats(vdiskId); + UNIT_ASSERT_C(stats.IsReplicated, "Unreplicated VDiskId# " << vdiskId.ToString() + << " Occupancy# " << stats.Occupancy); + } + } + } + } + + Y_UNIT_TEST(HugeBlobsWithDonor) { + TestSpace(4_GB, 8_MB, 0.5, true); + } + + Y_UNIT_TEST(SmallBlobsWithDonor) { + TestSpace(4_GB, 100_KB, 0.5, true); + } + + Y_UNIT_TEST(HugeBlobsNoDonor) { + TestSpace(4_GB, 8_MB, 0.5, false); + } + + Y_UNIT_TEST(SmallBlobsNoDonor) { + TestSpace(4_GB, 100_KB, 0.5, false); + } +} diff --git a/ydb/core/blobstorage/ut_blobstorage/ut_helpers.h b/ydb/core/blobstorage/ut_blobstorage/ut_helpers.h index b93c702c0ada..e0866b061045 100644 --- a/ydb/core/blobstorage/ut_blobstorage/ut_helpers.h +++ b/ydb/core/blobstorage/ut_blobstorage/ut_helpers.h @@ -260,4 +260,147 @@ class TInflightActorPatch : public TInflightActor { ui32 DataSize; }; + +struct TTestCtxBase { +public: + TTestCtxBase(TEnvironmentSetup::TSettings settings) + : NodeCount(settings.NodeCount) + , Erasure(settings.Erasure) + , Env(new TEnvironmentSetup(std::move(settings))) + {} + + virtual ~TTestCtxBase() = default; + + void CreateOneGroup() { + Env->CreateBoxAndPool(1, 1); + Env->Sim(TDuration::Minutes(1)); + + FetchBaseConfig(); + + UNIT_ASSERT_VALUES_EQUAL(BaseConfig.GroupSize(), 1); + const auto& group = BaseConfig.GetGroup(0); + GroupId = group.GetGroupId(); + } + + void AllocateEdgeActor() { + Edge = Env->Runtime->AllocateEdgeActor(NodeCount); + } + + void FetchBaseConfig() { + BaseConfig = Env->FetchBaseConfig(); + } + + TAutoPtr> GetGroupStatus(ui32 groupId) { + Env->Runtime->WrapInActorContext(Edge, [&] { + SendToBSProxy(Edge, groupId, new TEvBlobStorage::TEvStatus(TInstant::Max())); + }); + return Env->WaitForEdgeActorEvent(Edge, false, TInstant::Max()); + } + + virtual void Initialize() { + CreateOneGroup(); + AllocateEdgeActor(); + GetGroupStatus(GroupId); + } + +public: + struct TDataProfile { + public: + enum class ECookieStrategy { + SimpleIncrement = 0, + WithSamePlacement, + }; + + enum class EContentType { + Zeros = 0, + RepetitivePattern, + }; + + public: + ui32 GroupId; + ui64 TotalSize; + ui64 BlobSize; + EContentType ContentType = EContentType::Zeros; + + TDuration DelayBetweenPuts = TDuration::Zero(); + + // must be specified when using ECookieStrategy::WithSamePlacement + std::optional Erasure = std::nullopt; + + ui64 TabletId = 5000; + ui32 Channel = 1; + ui32 Generation = 1; + ui32 Step = 1; + ECookieStrategy CookieStrategy = ECookieStrategy::SimpleIncrement; + + public: + ui64 NextCookie(ui64 prevCookie) const { + switch (CookieStrategy) { + case TDataProfile::ECookieStrategy::SimpleIncrement: + return ++prevCookie; + case TDataProfile::ECookieStrategy::WithSamePlacement: { + ui64 originalHash = TLogoBlobID(TabletId, Generation, Step, Channel, BlobSize, prevCookie).Hash(); + while (prevCookie < TLogoBlobID::MaxCookie) { + TLogoBlobID next(TabletId, Generation, Step, Channel, BlobSize, ++prevCookie); + if (next.Hash() % Erasure->BlobSubgroupSize() == originalHash % Erasure->BlobSubgroupSize()) { + return prevCookie; + } + } + } + default: + Y_FAIL(); + } + } + }; + + std::vector WriteCompressedData(TDataProfile profile) { + std::vector blobs; + + static ui64 cookie = 0; + + for (ui64 size = 0; size < profile.TotalSize; size += profile.BlobSize) { + cookie = profile.NextCookie(cookie); + blobs.emplace_back(profile.TabletId, profile.Generation, profile.Step, profile.Channel, + profile.BlobSize, cookie); + + Env->Runtime->WrapInActorContext(Edge, [&] { + TString data; + + switch (profile.ContentType) { + case TDataProfile::EContentType::Zeros: + data = TString(profile.BlobSize, '\0'); + break; + case TDataProfile::EContentType::RepetitivePattern: + data = MakeData(profile.BlobSize); + break; + default: + Y_FAIL(); + } + + SendToBSProxy(Edge, profile.GroupId, new TEvBlobStorage::TEvPut(blobs.back(), data, TInstant::Max()), + NKikimrBlobStorage::TabletLog); + }); + + auto res = Env->WaitForEdgeActorEvent( + Edge, false, TInstant::Max()); + // Cerr << "Write data " << size << " " << res->Get()->ToString()<< Endl; + UNIT_ASSERT(res->Get()->Status == NKikimrProto::OK); + + if (profile.DelayBetweenPuts != TDuration::Zero()) { + Env->Sim(profile.DelayBetweenPuts); + } + } + return blobs; + } + +public: + ui32 NodeCount; + TBlobStorageGroupType Erasure; + std::shared_ptr Env; + + NKikimrBlobStorage::TBaseConfig BaseConfig; + ui32 GroupId; + TActorId Edge; +}; + } // namespace NKikimr diff --git a/ydb/core/blobstorage/ut_blobstorage/ut_replication/ya.make b/ydb/core/blobstorage/ut_blobstorage/ut_replication/ya.make index 4299deb81122..ff7375f99467 100644 --- a/ydb/core/blobstorage/ut_blobstorage/ut_replication/ya.make +++ b/ydb/core/blobstorage/ut_blobstorage/ut_replication/ya.make @@ -9,6 +9,7 @@ INCLUDE(${ARCADIA_ROOT}/ydb/tests/large.inc) SRCS( replication.cpp replication_huge.cpp + ut_helpers.cpp ) PEERDIR( diff --git a/ydb/core/blobstorage/vdisk/repl/blobstorage_hullrepljob.cpp b/ydb/core/blobstorage/vdisk/repl/blobstorage_hullrepljob.cpp index ef30d0f81d2d..0ec586471476 100644 --- a/ydb/core/blobstorage/vdisk/repl/blobstorage_hullrepljob.cpp +++ b/ydb/core/blobstorage/vdisk/repl/blobstorage_hullrepljob.cpp @@ -326,6 +326,18 @@ namespace NKikimr { std::optional CurrentItem; TLogoBlobID LastProcessedKey; + // OOS handling + constexpr static TDuration OOSRecoveryRetryDelay = TDuration::Seconds(60); + enum class EStatus { + Working, + WaitingForRetry, + }; + EStatus Status; + + // TODO: postpone replication on CYAN flag with possibilty of manual force continuation + NKikimrBlobStorage::EStatusFlags PostponeReplicationThreshold = NKikimrBlobStorage::StatusDiskSpaceYellowStop; + + private: void Finish() { STLOG(PRI_DEBUG, BS_REPL, BSVR01, VDISKP(ReplCtx->VCtx->VDiskLogPrefix, "finished replication job"), (LastKey, LastKey), (Eof, Eof)); @@ -494,8 +506,7 @@ namespace NKikimr { } void Merge() { - while (MergeIteration()) - ; + while (Status == EStatus::Working && MergeIteration()) {} } bool MergeIteration() { @@ -873,6 +884,7 @@ namespace NKikimr { void HandleYard(NPDisk::TEvChunkWriteResult::TPtr& ev) { CHECK_PDISK_RESPONSE(ReplCtx->VCtx, ev, ActorContext()); Writer.Apply(ev->Get()); + CheckSpace(ev); Merge(); } @@ -881,6 +893,7 @@ namespace NKikimr { STLOG(PRI_INFO, BS_REPL, BSVR10, VDISKP(ReplCtx->VCtx->VDiskLogPrefix, "reserved chunks"), (ChunkIds, FormatList(ev->Get()->ChunkIds))); Writer.Apply(ev->Get()); + CheckSpace(ev); Merge(); } @@ -917,12 +930,73 @@ namespace NKikimr { Merge(); } - void Handle(TEvBlobStorage::TEvVPutResult::TPtr& /*ev*/) { - // FIXME: Handle NotOK + void Handle(TEvBlobStorage::TEvVPutResult::TPtr& ev) { // this message is received when huge blob is written by Skeleton Y_VERIFY_S(HugeBlobsInFlight != 0, ReplCtx->VCtx->VDiskLogPrefix); --HugeBlobsInFlight; - Merge(); + const auto& record = ev->Get()->Record; + TStorageStatusFlags flags(record.GetStatusFlags()); + switch (record.GetStatus()) { + case NKikimrProto::OUT_OF_SPACE: + STLOG(PRI_ERROR, BS_REPL, BSVR43, VDISKP(ReplCtx->VCtx->VDiskLogPrefix, + "PDisk out of space, delaying replication"), + (StatusFlags, flags.ToString())); + SwitchToRetriableErrorState(); + break; + case NKikimrProto::OK: + default: { // TODO: Handle other reply statuses + if (flags.Check(PostponeReplicationThreshold)) { + STLOG(PRI_ERROR, BS_REPL, BSVR41, VDISKP(ReplCtx->VCtx->VDiskLogPrefix, + "Available space is running low, delaying replication"), + (StatusFlags, flags.ToString())); + SwitchToRetriableErrorState(); + } + Merge(); + break; + } + } + } + + void SwitchToRetriableErrorState() { + if (std::exchange(Status, EStatus::WaitingForRetry) == EStatus::Working) { + ScheduleSpaceCheck(); + TimeAccount.SetState(ETimeState::OUT_OF_SPACE_DELAY); + // we don't have to relinquish replication token here, because other VDisks + // on this PDisk also lack space + } + } + + void ScheduleSpaceCheck() { + ui8 owner = ReplCtx->PDiskCtx->Dsk->Owner; + ui64 ownerRound = ReplCtx->PDiskCtx->Dsk->OwnerRound; + std::unique_ptr ev = std::make_unique( + ReplCtx->PDiskCtx->PDiskId, SelfId(), new NPDisk::TEvCheckSpace(owner, ownerRound)); + + TActivationContext::Schedule(OOSRecoveryRetryDelay, ev.release()); + } + + template + void CheckSpace(const TYardEventPtr& ev) { + TStorageStatusFlags flags(ev->Get()->StatusFlags); + if (flags.Check(PostponeReplicationThreshold)) { + STLOG(PRI_ERROR, BS_REPL, BSVR40, VDISKP(ReplCtx->VCtx->VDiskLogPrefix, + "Available space is running low, delaying replication"), + (StatusFlags, flags.ToString())); + SwitchToRetriableErrorState(); + } + } + + void Handle(NPDisk::TEvCheckSpaceResult::TPtr& ev) { + TStorageStatusFlags flags(ev->Get()->StatusFlags); + if (!flags.Check(PostponeReplicationThreshold)) { + STLOG(PRI_NOTICE, BS_REPL, BSVR42, VDISKP(ReplCtx->VCtx->VDiskLogPrefix, + "Enough space on PDisk, continuing replication")); + EStatus prevStatus = std::exchange(Status, EStatus::Working); + Y_DEBUG_ABORT_UNLESS(prevStatus == EStatus::WaitingForRetry); + Merge(); + } else { + ScheduleSpaceCheck(); + } } void PassAway() override { @@ -946,12 +1020,14 @@ namespace NKikimr { // yard messages coming to Writer hFunc(NPDisk::TEvChunkWriteResult, HandleYard) hFunc(NPDisk::TEvChunkReserveResult, HandleYard) + hFunc(TEvBlobStorage::TEvGetResult, Handle) hFunc(TEvAddBulkSstResult, Handle) hFunc(TEvBlobStorage::TEvVPutResult, Handle) cFunc(TEvBlobStorage::EvDetectedPhantomBlobCommitted, HandleDetectedPhantomBlobCommitted) cFunc(TEvents::TSystem::Poison, PassAway) hFunc(TEvReplInvoke, Handle) + hFunc(NPDisk::TEvCheckSpaceResult, Handle) ) STRICT_STFUNC(StateInit, @@ -1005,6 +1081,7 @@ namespace NKikimr { , UnreplicatedBlobRecords(std::move(ubr)) , MilestoneQueue(std::move(milestoneQueue)) , Donor(donor) + , Status(EStatus::Working) { if (Donor) { ReplInfo->DonorVDiskId = Donor->first; diff --git a/ydb/core/blobstorage/vdisk/repl/blobstorage_hullreplwritesst.h b/ydb/core/blobstorage/vdisk/repl/blobstorage_hullreplwritesst.h index 187d84c7ede8..b02aa80b8cd6 100644 --- a/ydb/core/blobstorage/vdisk/repl/blobstorage_hullreplwritesst.h +++ b/ydb/core/blobstorage/vdisk/repl/blobstorage_hullreplwritesst.h @@ -38,6 +38,27 @@ namespace NKikimr { ERROR, // something gone wrong, further operation impossible }; + static TString StateToString(EState state) { + switch (state) { + case EState::INVALID: + return "INVALID"; + case EState::STOPPED: + return "STOPPED"; + case EState::PDISK_MESSAGE_PENDING: + return "PDISK_MESSAGE_PENDING"; + case EState::NOT_READY: + return "NOT_READY"; + case EState::COLLECT: + return "COLLECT"; + case EState::COMMIT_PENDING: + return "COMMIT_PENDING"; + case EState::WAITING_FOR_COMMIT: + return "WAITING_FOR_COMMIT"; + case EState::ERROR: + return "ERROR"; + } + } + enum class EOutputState { INVALID, INTERMEDIATE_CHUNK, diff --git a/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.cpp b/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.cpp index a92996f5a084..c25b33bf7471 100644 --- a/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.cpp +++ b/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.cpp @@ -104,6 +104,7 @@ namespace NKikimr { PARAM_V(CommitDuration); PARAM_V(OtherDuration); PARAM_V(PhantomDuration); + PARAM_V(OutOfSpaceDelayDuration); } GROUP("VDisk Stats") { PARAM_V(ProxyStat->VDiskReqs); diff --git a/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.h b/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.h index 1f15622965ee..5b5a7f425504 100644 --- a/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.h +++ b/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.h @@ -162,6 +162,7 @@ namespace NKikimr { TDuration CommitDuration; TDuration OtherDuration; TDuration PhantomDuration; + TDuration OutOfSpaceDelayDuration; std::unique_ptr ProxyStat; diff --git a/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine.h b/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine.h index e379480c3d97..b071f3d60c6f 100644 --- a/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine.h +++ b/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine.h @@ -19,6 +19,7 @@ namespace NKikimr { COMMIT, OTHER, PHANTOM, + OUT_OF_SPACE_DELAY, COUNT }; @@ -46,6 +47,7 @@ namespace NKikimr { replInfo.CommitDuration = Durations[static_cast(ETimeState::COMMIT)]; replInfo.OtherDuration = Durations[static_cast(ETimeState::OTHER)]; replInfo.PhantomDuration = Durations[static_cast(ETimeState::PHANTOM)]; + replInfo.OutOfSpaceDelayDuration = Durations[static_cast(ETimeState::OUT_OF_SPACE_DELAY)]; } private: