diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_patch.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_patch.cpp index 19659e6b7759..fbb545409a89 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_patch.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_patch.cpp @@ -29,10 +29,33 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor FoundParts; @@ -69,12 +94,16 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor EmptyResponseFlags; TStackVec ErrorResponseFlags; TStackVec ForceStopFlags; + TStackVec SlowFlags; TBlobStorageGroupInfo::TVDiskIds VDisks; bool UseVPatch = false; bool IsGoodPatchedBlobId = false; bool IsAllowedErasure = false; bool IsSecured = false; + bool HasSlowVDisk = false; + bool IsContinuedVPatch = false; + bool IsMovedPatch = false; #define PATCH_LOG(priority, service, marker, msg, ...) \ STLOG(priority, service, marker, msg, \ @@ -97,6 +126,15 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActorActivePatch; } + void ScheduleWakeUp(TInstant startTime, EWakeUpTag tag) { + TDuration duration = TActivationContext::Now() - startTime; + Schedule(duration, new TEvents::TEvWakeup(tag)); + } + + void ScheduleWakeUp(EWakeUpTag tag) { + ScheduleWakeUp(StageStart, tag); + } + static constexpr ERequestType RequestType() { return ERequestType::Patch; } @@ -279,6 +317,12 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActorType.ErasureFamily() != TErasureType::ErasureMirror) { + if (ReceivedFoundParts == SentStarts / 2 + SentStarts % 2) { + ScheduleWakeUp(VPatchStartTag); + } + } + NKikimrBlobStorage::TEvVPatchFoundParts &record = ev->Get()->Record; Y_ABORT_UNLESS(record.HasCookie()); @@ -312,6 +356,7 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor(TStringBuilder() << ReceivedFoundParts << '/' << SentStarts)), (ErrorReason, errorReason)); @@ -341,6 +386,13 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActorType.ErasureFamily() != TErasureType::ErasureMirror) { + if (ReceivedResults == SentVPatchDiff / 2 + SentVPatchDiff % 2) { + ScheduleWakeUp(VPatchDiffTag); + } + } + PullOutStatusFlagsAndFressSpace(record); Y_ABORT_UNLESS(record.HasStatus()); NKikimrProto::EReplyStatus status = record.GetStatus(); @@ -352,6 +404,7 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor(TStringBuilder() << ReceivedResults << '/' << Info->Type.TotalPartCount())), (ErrorReason, errorReason)); @@ -499,6 +552,7 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor= dataParts, "vdiskIdx# " << vdiskIdx << " partIdx# " << partIdx); placements.push_back(TPartPlacement{static_cast(vdiskIdx), static_cast(partIdx + 1)}); + SentVPatchDiff++; } SendDiffs(placements); } @@ -537,15 +592,38 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor subgroupIdx = 0; - ui32 subgroupIdx = 0; if (OkVDisksWithParts) { ui32 okVDiskIdx = RandomNumber(OkVDisksWithParts.size()); subgroupIdx = OkVDisksWithParts[okVDiskIdx]; } else { + ui64 worstNs = 0; + ui64 nextToWorstNs = 0; + i32 worstSubGroubIdx = -1; + GetWorstPredictedDelaysNs(NKikimrBlobStorage::EVDiskQueueId::PutAsyncBlob, &worstNs, &nextToWorstNs, &worstSubGroubIdx); + if (worstNs > nextToWorstNs * 2) { + SlowFlags[worstSubGroubIdx] = true; + HasSlowVDisk = true; + } + if (HasSlowVDisk) { + TStackVec goodDisks; + for (ui32 idx = 0; idx < VDisks.size(); ++idx) { + if (!SlowFlags[idx] && !ErrorResponseFlags[idx]) { + goodDisks.push_back(idx); + } + } + if (goodDisks.size()) { + ui32 okVDiskIdx = RandomNumber(goodDisks.size()); + subgroupIdx = goodDisks[okVDiskIdx]; + } + } + } + if (!subgroupIdx) { subgroupIdx = RandomNumber(Info->Type.TotalPartCount()); } - TVDiskID vDisk = Info->GetVDiskInSubgroup(subgroupIdx, OriginalId.Hash()); + TVDiskID vDisk = Info->GetVDiskInSubgroup(*subgroupIdx, OriginalId.Hash()); TDeque> events; ui64 cookie = ((ui64)OriginalId.Hash() << 32) | PatchedId.Hash(); @@ -574,7 +652,7 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActorPatchesWithFallback->Inc(); - if (WithMovingPatchRequestToStaticNode && UseVPatch && !IsSecured) { + if (WithMovingPatchRequestToStaticNode && UseVPatch && !IsSecured && !IsMovedPatch) { PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA05, "Start Moved strategy from fallback"); StartMovedPatch(); } else { @@ -587,20 +665,31 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActorPickSubgroup(OriginalId.Hash(), &VDisks, nullptr); ReceivedResponseFlags.assign(VDisks.size(), false); ErrorResponseFlags.assign(VDisks.size(), false); EmptyResponseFlags.assign(VDisks.size(), false); ForceStopFlags.assign(VDisks.size(), false); + SlowFlags.assign(VDisks.size(), false); + + ui64 worstNs = 0; + ui64 nextToWorstNs = 0; + i32 worstSubGroubIdx = -1; + GetWorstPredictedDelaysNs(NKikimrBlobStorage::EVDiskQueueId::GetFastRead, &worstNs, &nextToWorstNs, &worstSubGroubIdx); + if (worstNs > nextToWorstNs * 2) { + SlowFlags[worstSubGroubIdx] = true; + HasSlowVDisk = true; + } TDeque> events; - for (ui32 idx = 0; idx < VDisks.size(); ++idx) { - std::unique_ptr ev = std::make_unique( - OriginalId, PatchedId, VDisks[idx], Deadline, idx, true); - events.emplace_back(std::move(ev)); - SentStarts++; + if (!SlowFlags[idx]) { + std::unique_ptr ev = std::make_unique( + OriginalId, PatchedId, VDisks[idx], Deadline, idx, true); + events.emplace_back(std::move(ev)); + SentStarts++; + } } PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA08, "Start VPatch strategy", @@ -701,6 +790,17 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor nextToWorstNs * 2) { + SlowFlags[worstSubGroubIdx] = true; + HasSlowVDisk = true; + } if (Info->Type.GetErasure() == TErasureType::ErasureMirror3dc) { return ContinueVPatchForMirror3dc(); @@ -713,6 +813,9 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActorPickSubgroup(OriginalId.Hash(), &VDisks, nullptr); IsSecured = (Info->GetEncryptionMode() != TBlobStorageGroupInfo::EEM_NONE); IsGoodPatchedBlobId = result; IsAllowedErasure = Info->Type.ErasureFamily() == TErasureType::ErasureParityBlock || Info->Type.GetErasure() == TErasureType::ErasureNone || Info->Type.GetErasure() == TErasureType::ErasureMirror3dc; - if (IsGoodPatchedBlobId && IsAllowedErasure && UseVPatch && OriginalGroupId == Info->GroupID && !IsSecured) { + if (false && IsGoodPatchedBlobId && IsAllowedErasure && UseVPatch && OriginalGroupId == Info->GroupID && !IsSecured) { PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA03, "Start VPatch strategy from bootstrap"); StartVPatch(); } else { @@ -825,6 +930,75 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActorGetPredictedDelayNsByOrderNumber(diskIdx, queueId);; + if (predictedNs > *outWorstNs) { + *outNextToWorstNs = *outWorstNs; + *outWorstNs = predictedNs; + *outWorstSubgroupIdx = diskIdx; + } else if (predictedNs > *outNextToWorstNs) { + *outNextToWorstNs = predictedNs; + } + } + } + + void SetSlowDisks() { + for (ui32 idx = 0; idx < SlowFlags.size(); ++idx) { + SlowFlags[idx] = !ReceivedResponseFlags[idx] && !EmptyResponseFlags[idx] && !ErrorResponseFlags[idx]; + if (SlowFlags[idx]) { + HasSlowVDisk = true; + } + } + } + + template + void HandleWakeUp(TEvents::TEvWakeup::TPtr &ev) { + PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA36, "HandleWakeUp", + (ExpectedTag, ToString(ExpectedTag)), + (ReceivedTag, ToString(ev->Get()->Tag))); + if (ev->Get()->Tag == ExpectedTag) { + SetSlowDisks(); + StartFallback(); + } + if (ev->Get()->Tag == NeverTag) { + SetSlowDisks(); + StartFallback(); + PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA40, "Found NeverTag wake up", (ExpectedTag, ToString(ExpectedTag))); + } + } + + void HandleVPatchWakeUp(TEvents::TEvWakeup::TPtr &ev) { + ui64 expectedTag = (IsContinuedVPatch ? VPatchDiffTag : VPatchStartTag); + PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA37, "HandleWakeUp", + (ExpectedTag, ToString(expectedTag)), + (ReceivedTag, ToString(ev->Get()->Tag))); + if (ev->Get()->Tag == expectedTag) { + SetSlowDisks(); + StartFallback(); + } + if (ev->Get()->Tag == NeverTag) { + SetSlowDisks(); + StartFallback(); + PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA41, "Found NeverTag wake up", (ExpectedTag, ToString(expectedTag))); + } + } + + void HandleNeverTagWakeUp(TEvents::TEvWakeup::TPtr &ev) { + PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA42, "HandleWakeUp", + (ExpectedTag, ToString(NeverTag)), + (ReceivedTag, ToString(ev->Get()->Tag))); + if (ev->Get()->Tag == NeverTag) { + PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA43, "Found NeverTag wake up in naive state"); + ReplyAndDie(NKikimrProto::DEADLINE); + } + } + STATEFN(NaiveState) { if (ProcessEvent(ev)) { return; @@ -832,9 +1006,14 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActorGetTypeRewrite()) { hFunc(TEvBlobStorage::TEvGetResult, Handle); hFunc(TEvBlobStorage::TEvPutResult, Handle); + + IgnoreFunc(TEvents::TEvWakeup); + //hFunc(TEvents::TEvWakeup, HandleWakeUp); IgnoreFunc(TEvBlobStorage::TEvVPatchResult); + IgnoreFunc(TEvBlobStorage::TEvVPatchFoundParts); + IgnoreFunc(TEvBlobStorage::TEvVMovedPatchResult); default: - Y_ABORT("Received unknown event"); + Y_FAIL_S("Received unknown event " << TypeName(*ev->GetBase())); }; } @@ -844,9 +1023,11 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActorGetTypeRewrite()) { hFunc(TEvBlobStorage::TEvVMovedPatchResult, Handle); + hFunc(TEvents::TEvWakeup, HandleWakeUp); IgnoreFunc(TEvBlobStorage::TEvVPatchResult); + IgnoreFunc(TEvBlobStorage::TEvVPatchFoundParts); default: - Y_ABORT("Received unknown event"); + Y_FAIL_S("Received unknown event " << TypeName(*ev->GetBase())); }; } @@ -857,8 +1038,9 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActorGetTypeRewrite()) { hFunc(TEvBlobStorage::TEvVPatchFoundParts, Handle); hFunc(TEvBlobStorage::TEvVPatchResult, Handle); + hFunc(TEvents::TEvWakeup, HandleVPatchWakeUp); default: - Y_ABORT("Received unknown event"); + Y_FAIL_S("Received unknown event " << TypeName(*ev->GetBase())); }; } }; diff --git a/ydb/core/blobstorage/dsproxy/ut/dsproxy_patch_ut.cpp b/ydb/core/blobstorage/dsproxy/ut/dsproxy_patch_ut.cpp index d6b70cc6f9c2..741d5ce3b875 100644 --- a/ydb/core/blobstorage/dsproxy/ut/dsproxy_patch_ut.cpp +++ b/ydb/core/blobstorage/dsproxy/ut/dsproxy_patch_ut.cpp @@ -110,6 +110,18 @@ enum class ENaivePatchCase { ErrorOnPut, }; +#define CASE_TO_RETURN_STRING(cs) \ + case cs: return #cs \ +// end CASE_TO_RETURN_STRING +TString ToString(ENaivePatchCase cs) { + switch (cs) { + CASE_TO_RETURN_STRING(ENaivePatchCase::Ok); + CASE_TO_RETURN_STRING(ENaivePatchCase::ErrorOnGetItem); + CASE_TO_RETURN_STRING(ENaivePatchCase::ErrorOnGet); + CASE_TO_RETURN_STRING(ENaivePatchCase::ErrorOnPut); + } +} + NKikimrProto::EReplyStatus GetPatchResultStatus(ENaivePatchCase naiveCase) { switch (naiveCase) { case ENaivePatchCase::Ok: @@ -156,6 +168,17 @@ enum class EVPatchCase { Custom, }; +TString ToString(EVPatchCase cs) { + switch (cs) { + CASE_TO_RETURN_STRING(EVPatchCase::Ok); + CASE_TO_RETURN_STRING(EVPatchCase::OneErrorAndAllPartExistInStart); + CASE_TO_RETURN_STRING(EVPatchCase::OnePartLostInStart); + CASE_TO_RETURN_STRING(EVPatchCase::DeadGroupInStart); + CASE_TO_RETURN_STRING(EVPatchCase::ErrorDuringVPatchDiff); + CASE_TO_RETURN_STRING(EVPatchCase::Custom); + } +} + NKikimrProto::EReplyStatus GetPatchResultStatus(EVPatchCase vpatchCase) { switch (vpatchCase) { case EVPatchCase::Ok: @@ -249,6 +272,15 @@ enum class EMovedPatchCase { Error }; +TString ToString(EMovedPatchCase cs) { + switch (cs) { + CASE_TO_RETURN_STRING(EMovedPatchCase::Ok); + CASE_TO_RETURN_STRING(EMovedPatchCase::Error); + } +} + +#undef CASE_TO_RETURN_STRING + NKikimrProto::EReplyStatus GetPatchResultStatus(EMovedPatchCase movedCase) { switch (movedCase) { case EMovedPatchCase::Ok: @@ -289,7 +321,7 @@ void ReceivePatchResult(TTestBasicRuntime &runtime, const TTestArgs &args, NKiki } void ConductGet(TTestBasicRuntime &runtime, const TTestArgs &args, ENaivePatchCase naiveCase) { - CTEST << "ConductGet: Start\n"; + CTEST << "ConductGet: Start NaiveCase: " << ToString(naiveCase) << "\n"; NKikimrProto::EReplyStatus resultStatus = GetGetResultStatus(naiveCase); TAutoPtr handle; TEvBlobStorage::TEvGet *get = runtime.GrabEdgeEventRethrow(handle); @@ -328,10 +360,10 @@ TString MakePatchedBuffer(const TTestArgs &args) { void ConductPut(TTestBasicRuntime &runtime, const TTestArgs &args, ENaivePatchCase naiveCase) { NKikimrProto::EReplyStatus resultStatus = GetPutResultStatus(naiveCase); if (resultStatus == NKikimrProto::UNKNOWN) { - CTEST << "ConductPut: Skip\n"; + CTEST << "ConductPut: Skip NaiveCase: " << ToString(naiveCase) << "\n"; return; } - CTEST << "ConductPut: Start\n"; + CTEST << "ConductPut: Start NaiveCase: " << ToString(naiveCase) << "\n"; TAutoPtr handle; TEvBlobStorage::TEvPut *put = runtime.GrabEdgeEventRethrow(handle); UNIT_ASSERT_VALUES_EQUAL(put->Id, args.PatchedId); @@ -346,7 +378,7 @@ void ConductPut(TTestBasicRuntime &runtime, const TTestArgs &args, ENaivePatchCa } void ConductNaivePatch(TTestBasicRuntime &runtime, const TTestArgs &args, ENaivePatchCase naiveCase) { - CTEST << "ConductNaivePatch: Start\n"; + CTEST << "ConductNaivePatch: Start NaiveCase: " << ToString(naiveCase) << Endl; ConductGet(runtime, args, naiveCase); ConductPut(runtime, args, naiveCase); NKikimrProto::EReplyStatus resultStatus = GetPatchResultStatus(naiveCase); @@ -354,14 +386,27 @@ void ConductNaivePatch(TTestBasicRuntime &runtime, const TTestArgs &args, ENaive CTEST << "ConductNaivePatch: Finish\n"; } +template +TString ToString(const TVector &lst) { + TStringBuilder bld; + bld << '['; + for (ui32 idx = 0; idx < lst.size(); ++idx) { + if (idx) { + bld << ", "; + } + bld << lst[idx]; + } + bld << ']'; + return bld; +} void ConductVPatchStart(TTestBasicRuntime &runtime, const TDSProxyEnv &env, const TTestArgs &args, - EVPatchCase naiveCase, TVDiskPointer vdiskPointer) + EVPatchCase vpatchCase, TVDiskPointer vdiskPointer) { auto [vdiskIdx, idxInSubgroup] = vdiskPointer.GetIndecies(env, args.OriginalId.Hash()); - CTEST << "ConductVPatchStart: Start vdiskIdx# " << vdiskIdx << " idxInSubgroup# " << idxInSubgroup << "\n"; + CTEST << "ConductVPatchStart: Start vdiskIdx# " << vdiskIdx << " idxInSubgroup# " << idxInSubgroup << " VPatchCase: " << ToString(vpatchCase) << "\n"; TVDiskID vdisk = env.Info->GetVDiskInSubgroup(idxInSubgroup, args.OriginalId.Hash()); - auto [status, parts] = GetVPatchFoundPartsStatus(env, args, naiveCase, vdiskPointer); + auto [status, parts] = GetVPatchFoundPartsStatus(env, args, vpatchCase, vdiskPointer); auto start = runtime.GrabEdgeEventRethrow({env.VDisks[vdiskIdx]}); auto &startRecord = start->Get()->Record; @@ -376,21 +421,22 @@ void ConductVPatchStart(TTestBasicRuntime &runtime, const TDSProxyEnv &env, cons for (auto partId : parts) { foundParts->AddPart(partId); } + CTEST << "ConductVPatchStart: Send FoundParts vdiskIdx# " << vdiskIdx << " idxInSubgroup# " << idxInSubgroup << "parts# " << ToString(parts) << "\n"; SendByHandle(runtime, start, std::move(foundParts)); CTEST << "ConductVPatchStart: Finish vdiskIdx# " << vdiskIdx << " idxInSubgroup# " << idxInSubgroup << "\n"; } void ConductVPatchDiff(TTestBasicRuntime &runtime, const TDSProxyEnv &env, const TTestArgs &args, - EVPatchCase naiveCase, TVDiskPointer vdiskPointer) + EVPatchCase vpatchCase, TVDiskPointer vdiskPointer) { auto [vdiskIdx, idxInSubgroup] = vdiskPointer.GetIndecies(env, args.PatchedId.Hash()); TVDiskID vdisk = env.Info->GetVDiskInSubgroup(idxInSubgroup, args.PatchedId.Hash()); - NKikimrProto::EReplyStatus resultStatus = GetVPatchResultStatus(env, args, naiveCase, vdiskPointer); + NKikimrProto::EReplyStatus resultStatus = GetVPatchResultStatus(env, args, vpatchCase, vdiskPointer); if (resultStatus == NKikimrProto::UNKNOWN) { - CTEST << "ConductVPatchDiff: Skip vdiskIdx# " << vdiskIdx << " idxInSubgroup# " << idxInSubgroup << "\n"; + CTEST << "ConductVPatchDiff: Skip vdiskIdx# " << vdiskIdx << " idxInSubgroup# " << idxInSubgroup << " VPatchCase: " << ToString(vpatchCase) << "\n"; return; } - CTEST << "ConductVPatchDiff: Start vdiskIdx# " << vdiskIdx << " idxInSubgroup# " << idxInSubgroup << "\n"; + CTEST << "ConductVPatchDiff: Start vdiskIdx# " << vdiskIdx << " idxInSubgroup# " << idxInSubgroup << " VPatchCase: " << ToString(vpatchCase) << "\n"; auto diffEv = runtime.GrabEdgeEventRethrow({env.VDisks[vdiskIdx]}); auto &diffRecord = diffEv->Get()->Record; @@ -415,6 +461,7 @@ void ConductVPatchDiff(TTestBasicRuntime &runtime, const TDSProxyEnv &env, const } void ConductFailedVPatch(TTestBasicRuntime &runtime, const TDSProxyEnv &env, const TTestArgs &args) { + return; // disabled vpatch CTEST << "ConductFailedVPatch: Start\n"; for (ui32 idxInSubgroup = 0; idxInSubgroup < args.GType.BlobSubgroupSize(); ++idxInSubgroup) { TVDiskPointer vdisk = TVDiskPointer::GetVDiskIdx(idxInSubgroup); @@ -429,7 +476,7 @@ void ConductFailedVPatch(TTestBasicRuntime &runtime, const TDSProxyEnv &env, con void ConductVMovedPatch(TTestBasicRuntime &runtime, const TTestArgs &args, EMovedPatchCase movedCase) { - CTEST << "ConductVMovedPatch: Start\n"; + CTEST << "ConductVMovedPatch: Start MovedPatchCase: " << ToString(movedCase) << Endl; NKikimrProto::EReplyStatus resultStatus = GetVMovedPatchResultStatus(movedCase); TAutoPtr handle; TEvBlobStorage::TEvVMovedPatch *vPatch = runtime.GrabEdgeEventRethrow(handle); @@ -459,7 +506,7 @@ void ConductVMovedPatch(TTestBasicRuntime &runtime, const TTestArgs &args, EMove void ConductMovedPatch(TTestBasicRuntime &runtime, const TDSProxyEnv &env, const TTestArgs &args, EMovedPatchCase movedCase) { - CTEST << "ConductMovedPatch: Start\n"; + CTEST << "ConductMovedPatch: Start MovedPatchCase: " << ToString(movedCase) << Endl; ConductFailedVPatch(runtime, env, args); ConductVMovedPatch(runtime, args, movedCase); NKikimrProto::EReplyStatus resultStatus = GetPatchResultStatus(movedCase); @@ -481,7 +528,8 @@ void ConductFallbackPatch(TTestBasicRuntime &runtime, const TTestArgs &args) { void ConductVPatchEvents(TTestBasicRuntime &runtime, const TDSProxyEnv &env, const TTestArgs &args, EVPatchCase vpatchCase) { - CTEST << "ConductVPatchEvents: Start\n"; + return; // disabled vpatch + CTEST << "ConductVPatchEvents: Start VPatchCase: " << ToString(vpatchCase) << Endl; for (ui32 idxInSubgroup = 0; idxInSubgroup < args.GType.BlobSubgroupSize(); ++idxInSubgroup) { TVDiskPointer vdisk = TVDiskPointer::GetVDiskIdx(idxInSubgroup); ConductVPatchStart(runtime, env, args, vpatchCase, vdisk); @@ -496,7 +544,7 @@ void ConductVPatchEvents(TTestBasicRuntime &runtime, const TDSProxyEnv &env, con void ConductVPatch(TTestBasicRuntime &runtime, const TDSProxyEnv &env, const TTestArgs &args, EVPatchCase vpatchCase) { - CTEST << "ConductFallbackPatch: Start\n"; + CTEST << "ConductFallbackPatch: Start VPatchCase: " << ToString(vpatchCase) << Endl; ConductVPatchEvents(runtime, env, args, vpatchCase); NKikimrProto::EReplyStatus resultStatus = GetPatchResultStatus(vpatchCase); if (resultStatus == NKikimrProto::UNKNOWN) { @@ -620,17 +668,18 @@ void RunGeneralTest(void(*runner)(TTestBasicRuntime &runtime, const TTestArgs &a Y_UNIT_TEST_NAIVE(ErrorOnPut, erasure) \ Y_UNIT_TEST_MOVED(Ok, erasure) \ Y_UNIT_TEST_MOVED(Error, erasure) \ - Y_UNIT_TEST_VPATCH(Ok, erasure) \ - Y_UNIT_TEST_VPATCH(OneErrorAndAllPartExistInStart, erasure) \ - Y_UNIT_TEST_VPATCH(OnePartLostInStart, erasure) \ - Y_UNIT_TEST_VPATCH(DeadGroupInStart, erasure) \ - Y_UNIT_TEST_VPATCH(ErrorDuringVPatchDiff, erasure) \ Y_UNIT_TEST_SECURED(Ok, erasure) \ Y_UNIT_TEST_SECURED(ErrorOnGetItem, erasure) \ Y_UNIT_TEST_SECURED(ErrorOnGet, erasure) \ Y_UNIT_TEST_SECURED(ErrorOnPut, erasure) \ // end Y_UNIT_TEST_PATCH_PACK +// Y_UNIT_TEST_VPATCH(Ok, erasure) +// Y_UNIT_TEST_VPATCH(OneErrorAndAllPartExistInStart, erasure) +// Y_UNIT_TEST_VPATCH(OnePartLostInStart, erasure) +// Y_UNIT_TEST_VPATCH(DeadGroupInStart, erasure) +// Y_UNIT_TEST_VPATCH(ErrorDuringVPatchDiff, erasure) + Y_UNIT_TEST_PATCH_PACK(ErasureNone) Y_UNIT_TEST_PATCH_PACK(Erasure4Plus2Block) Y_UNIT_TEST_PATCH_PACK(ErasureMirror3dc) @@ -712,6 +761,7 @@ EFaultToleranceCase GetFaultToleranceCaseForBlock4Plus2(const TDSProxyEnv &env, } } } + return EFaultToleranceCase::Fallback; // disabled vpatch if (layout.CountEffectiveReplicas(env.Info->Type) == env.Info->Type.TotalPartCount()) { return EFaultToleranceCase::Ok; } else { @@ -736,6 +786,7 @@ EFaultToleranceCase GetFaultToleranceCaseForMirror3dc(const TDSProxyEnv &env, co for (ui32 dcIdx = 0; dcIdx < dcCnt; ++dcIdx) { x2cnt += (replInDc[dcIdx] >= 2); } + return EFaultToleranceCase::Fallback; // disabled vpatch if ((replInDc[0] && replInDc[1] && replInDc[2]) || x2cnt >= 2) { return EFaultToleranceCase::Ok; } else { diff --git a/ydb/core/blobstorage/vdisk/common/vdisk_events.h b/ydb/core/blobstorage/vdisk/common/vdisk_events.h index f31c02db3bcf..98473e08648b 100644 --- a/ydb/core/blobstorage/vdisk/common/vdisk_events.h +++ b/ydb/core/blobstorage/vdisk/common/vdisk_events.h @@ -1585,7 +1585,7 @@ namespace NKikimr { if (deadline != TInstant::Max()) { this->Record.MutableMsgQoS()->SetDeadlineSeconds((ui32)deadline.Seconds()); } - this->Record.MutableMsgQoS()->SetExtQueueId(HandleClassToQueueId(NKikimrBlobStorage::AsyncBlob)); + this->Record.MutableMsgQoS()->SetExtQueueId(NKikimrBlobStorage::PutAsyncBlob); } bool GetIgnoreBlock() const { @@ -1965,6 +1965,25 @@ namespace NKikimr { } Record.MutableMsgQoS()->SetExtQueueId(NKikimrBlobStorage::EVDiskQueueId::GetFastRead); } + + TString ToString() const { + return ToString(this->Record); + } + + static TString ToString(const NKikimrBlobStorage::TEvVPatchStart &record) { + TStringStream str; + TLogoBlobID originalId = LogoBlobIDFromLogoBlobID(record.GetOriginalBlobId()); + TLogoBlobID patchedId = LogoBlobIDFromLogoBlobID(record.GetPatchedBlobId()); + str << "{TEvVPatchStart"; + str << " OriginalBlobId# " << originalId.ToString(); + str << " PatchedBlobId# " << patchedId.ToString(); + if (record.HasMsgQoS()) { + str << " "; + TEvBlobStorage::TEvVPut::OutMsgQos(record.GetMsgQoS(), str); + } + str << "}"; + return str.Str(); + } }; struct TEvBlobStorage::TEvVPatchFoundParts @@ -2010,6 +2029,25 @@ namespace NKikimr { Record.SetStatus(status); } + TString ToString() const { + return ToString(this->Record); + } + + static TString ToString(const NKikimrBlobStorage::TEvVPatchFoundParts &record) { + TStringStream str; + TLogoBlobID originalId = LogoBlobIDFromLogoBlobID(record.GetOriginalBlobId()); + TLogoBlobID patchedId = LogoBlobIDFromLogoBlobID(record.GetPatchedBlobId()); + str << "{TEvVPatchFoundParts"; + str << " OriginalBlobId# " << originalId.ToString(); + str << " PatchedBlobId# " << patchedId.ToString(); + if (record.HasMsgQoS()) { + str << " "; + TEvBlobStorage::TEvVPut::OutMsgQos(record.GetMsgQoS(), str); + } + str << "}"; + return str.Str(); + } + void MakeError(NKikimrProto::EReplyStatus status, const TString& errorReason, const NKikimrBlobStorage::TEvVPatchStart &request) { Record.SetErrorReason(errorReason); @@ -2099,6 +2137,25 @@ namespace NKikimr { } return result; } + + TString ToString() const { + return ToString(this->Record); + } + + static TString ToString(const NKikimrBlobStorage::TEvVPatchDiff &record) { + TStringStream str; + TLogoBlobID originalId = LogoBlobIDFromLogoBlobID(record.GetOriginalPartBlobId()); + TLogoBlobID patchedId = LogoBlobIDFromLogoBlobID(record.GetPatchedPartBlobId()); + str << "{TEvVPatchDiff"; + str << " OriginalBlobId# " << originalId.ToString(); + str << " PatchedBlobId# " << patchedId.ToString(); + if (record.HasMsgQoS()) { + str << " "; + TEvBlobStorage::TEvVPut::OutMsgQos(record.GetMsgQoS(), str); + } + str << "}"; + return str.Str(); + } }; @@ -2144,6 +2201,25 @@ namespace NKikimr { } return result; } + + TString ToString() const { + return ToString(this->Record); + } + + static TString ToString(const NKikimrBlobStorage::TEvVPatchXorDiff &record) { + TStringStream str; + TLogoBlobID originalId = LogoBlobIDFromLogoBlobID(record.GetOriginalPartBlobId()); + TLogoBlobID patchedId = LogoBlobIDFromLogoBlobID(record.GetPatchedPartBlobId()); + str << "{TEvVPatchXorDiff"; + str << " OriginalBlobId# " << originalId.ToString(); + str << " PatchedBlobId# " << patchedId.ToString(); + if (record.HasMsgQoS()) { + str << " "; + TEvBlobStorage::TEvVPut::OutMsgQos(record.GetMsgQoS(), str); + } + str << "}"; + return str.Str(); + } }; struct TEvBlobStorage::TEvVPatchXorDiffResult diff --git a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp index 043b5300e308..5f5e42fc51cc 100644 --- a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp +++ b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp @@ -225,12 +225,19 @@ namespace NKikimr { //////////////////////////////////////////////////////////////////////// void Handle(TEvBlobStorage::TEvVMovedPatch::TPtr &ev, const TActorContext &ctx) { + LOG_DEBUG_S(ctx, BS_VDISK_PATCH, VCtx->VDiskLogPrefix << "TEvVMovedPatch: receive request;" + << " Event# " << ev->Get()->ToString()); if (!CheckIfWriteAllowed(ev, ctx)) { + LOG_DEBUG_S(ctx, BS_VDISK_PATCH, VCtx->VDiskLogPrefix << "TEvVMovedPatch: is not allowed;" + << " Event# " << ev->Get()->ToString()); return; } const bool postpone = OverloadHandler->PostponeEvent(ev); if (!postpone) { PrivateHandle(ev, ctx); + } else { + LOG_DEBUG_S(ctx, BS_VDISK_PATCH, VCtx->VDiskLogPrefix << "TEvVMovedPatch: is postponned;" + << " Event# " << ev->Get()->ToString()); } } @@ -270,11 +277,16 @@ namespace NKikimr { void Handle(TEvBlobStorage::TEvVPatchStart::TPtr &ev, const TActorContext &ctx) { if (!CheckIfWriteAllowed(ev, ctx)) { + LOG_DEBUG_S(ctx, BS_VDISK_PATCH, VCtx->VDiskLogPrefix << "TEvVPatchStart: receive request;" + << " Event# " << ev->Get()->ToString()); return; } const bool postpone = OverloadHandler->PostponeEvent(ev); if (!postpone) { PrivateHandle(ev, ctx); + } else { + LOG_DEBUG_S(ctx, BS_VDISK_PATCH, VCtx->VDiskLogPrefix << "TEvVPatchStart: postponned;" + << " Event# " << ev->Get()->ToString()); } } @@ -307,24 +319,32 @@ namespace NKikimr { template void HandleVPatchDiffResending(TEvDiffPtr &ev, const TActorContext &ctx) { if (!CheckIfWriteAllowed(ev, ctx)) { + LOG_DEBUG_S(ctx, BS_VDISK_PATCH, VCtx->VDiskLogPrefix << "TEvVPatch: is not allowed;" + << " Event# " << ev->Get()->ToString()); return; } if constexpr (std::is_same_v) { LOG_DEBUG_S(ctx, BS_VDISK_PATCH, VCtx->VDiskLogPrefix << "TEvVPatch: recieve diff;" << " Event# " << ev->Get()->ToString()); IFaceMonGroup->PatchDiffMsgs()++; - } - if constexpr (std::is_same_v) { + } else if constexpr (std::is_same_v) { LOG_DEBUG_S(ctx, BS_VDISK_PATCH, VCtx->VDiskLogPrefix << "TEvVPatch: recieve xor diff;" << " Event# " << ev->Get()->ToString()); IFaceMonGroup->PatchXorDiffMsgs()++; + } else { + LOG_ERROR_S(ctx, BS_VDISK_PATCH, VCtx->VDiskLogPrefix << "TEvVPatch: UNKNOWN diff;" + << " Event# " << ev->Get()->ToString()); } TLogoBlobID patchedBlobId = LogoBlobIDFromLogoBlobID(ev->Get()->Record.GetPatchedPartBlobId()).FullID(); auto it = VPatchActors.find(patchedBlobId); if (it != VPatchActors.end()) { TActivationContext::Send(ev->Forward(it->second)); + LOG_DEBUG_S(ctx, BS_VDISK_PATCH, VCtx->VDiskLogPrefix << "TEvVPatch: diff sent to actor;" + << " Event# " << ev->Get()->ToString()); } else { ReplyError(NKikimrProto::ERROR, "VPatchActor doesn't exist", ev, ctx, TAppData::TimeProvider->Now()); + LOG_DEBUG_S(ctx, BS_VDISK_PATCH, VCtx->VDiskLogPrefix << "TEvVPatch: diff didn't send to actor; actor didn't exist" + << " Event# " << ev->Get()->ToString()); } } diff --git a/ydb/core/blobstorage/vdisk/skeleton/skeleton_vmovedpatch_actor.cpp b/ydb/core/blobstorage/vdisk/skeleton/skeleton_vmovedpatch_actor.cpp index db94713be486..a0b22acc05a0 100644 --- a/ydb/core/blobstorage/vdisk/skeleton/skeleton_vmovedpatch_actor.cpp +++ b/ydb/core/blobstorage/vdisk/skeleton/skeleton_vmovedpatch_actor.cpp @@ -9,8 +9,6 @@ namespace NKikimr { class TVMovedPatchActor : public TActorBootstrapped { friend TActorBootstrapped; - static constexpr ui64 SubRequestDurationMs = 1000; - ui32 OriginalGroupId; ui32 PatchedGroupId; TLogoBlobID OriginalId; @@ -29,6 +27,7 @@ namespace NKikimr { TActorId LeaderId; TOutOfSpaceStatus OOSStatus; + TInstant Deadline = TInstant::Zero(); NLWTrace::TOrbit Orbit; @@ -58,6 +57,10 @@ namespace NKikimr { OriginalId = LogoBlobIDFromLogoBlobID(record.GetOriginalBlobId()); Y_ABORT_UNLESS(record.HasPatchedBlobId()); PatchedId = LogoBlobIDFromLogoBlobID(record.GetPatchedBlobId()); + Deadline = TInstant::Seconds(record.GetMsgQoS().HasDeadlineSeconds()); + if (record.HasMsgQoS() && record.GetMsgQoS().HasDeadlineSeconds()) { + Deadline = TInstant::Seconds(record.GetMsgQoS().HasDeadlineSeconds()); + } DiffCount = record.DiffsSize(); Diffs.reset(new TEvBlobStorage::TEvPatch::TDiff[DiffCount]); @@ -96,6 +99,12 @@ namespace NKikimr { << " ErrorReason# " << ErrorReason << " Marker# BSVSP01"); } + LOG_DEBUG_S(ctx, NKikimrServices::BS_VDISK_PATCH, VCtx->VDiskLogPrefix + << "Send result TEvVMovedPatch: " << errorSubMsg << ';' + << " OriginalBlobId# " << OriginalId + << " PatchedBlobId# " << PatchedId + << " ErrorReason# " << ErrorReason + << " Marker# BSVSP01"); SendVDiskResponse(ctx, Event->Sender, vMovedPatchResult.release(), Event->Cookie, VCtx); PassAway(); } @@ -108,6 +117,10 @@ namespace NKikimr { } void Handle(TEvBlobStorage::TEvGetResult::TPtr &ev, const TActorContext &ctx) { + LOG_DEBUG_S(ctx, NKikimrServices::BS_VDISK_PATCH, VCtx->VDiskLogPrefix + << "Receive Get ub TEvVMovedPatch: " + << " OriginalBlobId# " << OriginalId + << " PatchedBlobId# " << PatchedId); TEvBlobStorage::TEvGetResult *result = ev->Get(); Orbit = std::move(result->Orbit); @@ -138,15 +151,18 @@ namespace NKikimr { Buffer = result->Responses[0].Buffer.ConvertToString(); ApplyDiffs(); - TInstant deadline = TActivationContext::Now() + TDuration::MilliSeconds(SubRequestDurationMs); // We have chosen UserData as PutHandleClass on purpose. // If VMovedPatch and Put were AsyncWrite, it would become a deadlock // because the put subrequest may not send and the moved patch request will end by timeout. - std::unique_ptr put = std::make_unique(PatchedId, Buffer, deadline, + std::unique_ptr put = std::make_unique(PatchedId, Buffer, Deadline, NKikimrBlobStorage::UserData, TEvBlobStorage::TEvPut::TacticDefault); put->Orbit = std::move(Orbit); + LOG_DEBUG_S(ctx, NKikimrServices::BS_VDISK_PATCH, VCtx->VDiskLogPrefix + << "Send Put ub TEvVMovedPatch: " + << " OriginalBlobId# " << OriginalId + << " PatchedBlobId# " << PatchedId); SendToBSProxy(SelfId(), PatchedGroupId, put.release(), OriginalId.Hash()); } @@ -156,6 +172,11 @@ namespace NKikimr { ui32 originalIdHash = OriginalId.Hash(); + LOG_DEBUG_S(ctx, NKikimrServices::BS_VDISK_PATCH, VCtx->VDiskLogPrefix + << "Receive Put ub TEvVMovedPatch: " + << " OriginalBlobId# " << OriginalId + << " PatchedBlobId# " << PatchedId); + constexpr auto errorSubMsg = "failed on VPut"; if (ev->Cookie != originalIdHash) { ErrorReason = "Couldn't put the patched blob; Received TEvPutResult with wrong cookie"; @@ -173,11 +194,20 @@ namespace NKikimr { } void Bootstrap() { - TInstant deadline = TActivationContext::Now() + TDuration::MilliSeconds(SubRequestDurationMs); + if (Deadline && Deadline < TActivationContext::Now()) { + SendResponseAndDie(TActivationContext::AsActorContext(), NKikimrProto::DEADLINE); + return; + } + std::unique_ptr get = std::make_unique(OriginalId, 0, - OriginalId.BlobSize(), deadline, NKikimrBlobStorage::AsyncRead); + OriginalId.BlobSize(), Deadline, NKikimrBlobStorage::AsyncRead); get->Orbit = std::move(Event->Get()->Orbit); + LOG_DEBUG_S(TActivationContext::AsActorContext(), NKikimrServices::BS_VDISK_PATCH, VCtx->VDiskLogPrefix + << "Send Get ub TEvVMovedPatch: " + << " OriginalBlobId# " << OriginalId + << " PatchedBlobId# " << PatchedId); + SendToBSProxy(SelfId(), OriginalGroupId, get.release(), PatchedId.Hash()); Become(&TThis::StateWait); } diff --git a/ydb/core/blobstorage/vdisk/skeleton/skeleton_vpatch_actor.cpp b/ydb/core/blobstorage/vdisk/skeleton/skeleton_vpatch_actor.cpp index 5c4325e85ced..05f6f035277a 100644 --- a/ydb/core/blobstorage/vdisk/skeleton/skeleton_vpatch_actor.cpp +++ b/ydb/core/blobstorage/vdisk/skeleton/skeleton_vpatch_actor.cpp @@ -306,7 +306,7 @@ namespace NKikimr::NPrivate { void SendVPatchResult(NKikimrProto::EReplyStatus status, bool forceEnd = false) { STLOG(PRI_INFO, BS_VDISK_PATCH, BSVSP07, - VDiskLogPrefix << " TEvVPatch: send patch result;", + VDiskLogPrefix << " TEvVPatch: " << (forceEnd ? "received force end;" : "send patch result;"), (OriginalBlobId, OriginalBlobId), (PatchedBlobId, PatchedBlobId), (OriginalPartId, (ui32)OriginalPartId), diff --git a/ydb/core/keyvalue/keyvalue_state.cpp b/ydb/core/keyvalue/keyvalue_state.cpp index 8135c86c2a2b..5ec47557f1fa 100644 --- a/ydb/core/keyvalue/keyvalue_state.cpp +++ b/ydb/core/keyvalue/keyvalue_state.cpp @@ -3092,6 +3092,9 @@ void TKeyValueState::RegisterRequestActor(const TActorContext &ctx, THolderRefCountsIncr.emplace_back(patch.PatchedBlobId, true); + + LOG_INFO_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletId + << " PatchedKey# " << patch.PatchedKey << " BlobId# " << patch.PatchedBlobId); }; for (auto& write : intermediate->Writes) {