From f64949582813c2d3813276576357ece9e4507db7 Mon Sep 17 00:00:00 2001 From: Sergey Belyakov Date: Thu, 28 Dec 2023 16:15:20 +0000 Subject: [PATCH 1/4] Introduce InitialAllocation option for storage load actor --- ydb/core/load_test/gen.h | 2 + ydb/core/load_test/group_write.cpp | 348 +++++++++++++++++++++++------ ydb/core/protos/load_test.proto | 12 + 3 files changed, 293 insertions(+), 69 deletions(-) diff --git a/ydb/core/load_test/gen.h b/ydb/core/load_test/gen.h index b5ad5af2f4ba..b8e07e18f5db 100644 --- a/ydb/core/load_test/gen.h +++ b/ydb/core/load_test/gen.h @@ -18,6 +18,8 @@ namespace NKikimr { double AccumWeight = 0; public: + TGenerator() = default; + template TGenerator(const google::protobuf::RepeatedPtrField& setting) { for (const auto& item : setting) { diff --git a/ydb/core/load_test/group_write.cpp b/ydb/core/load_test/group_write.cpp index 34ea48e295b2..532459b2ded7 100644 --- a/ydb/core/load_test/group_write.cpp +++ b/ydb/core/load_test/group_write.cpp @@ -86,11 +86,126 @@ class TLogWriterLoadTestActor : public TActorBootstrapped 0 || BlobsToWrite > 0; + } + + bool EnoughBlobsWritten(bool countPending = false) { + if (SizeToWrite > 0) { + return ConfirmedDataSize + PendingWriteSize * countPending >= SizeToWrite; + } else if (BlobsToWrite > 0) { + return ConfirmedBlobs.size() + PendingWrites * countPending >= BlobsToWrite; + } + return true; + } + + void ConfirmBlob(const TLogoBlobID& id, bool success) { + PendingWriteSize -= id.BlobSize(); + PendingWrites -= 1; + if (success) { + ConfirmedBlobs.push_back(id); + ConfirmedDataSize += id.BlobSize(); + } + } + + std::unique_ptr MakePutMessage(ui64 tabletId, ui32 gen, ui32 step, ui32 channel) { + ui32 blobSize = SizeGenerator.Generate(); + const TLogoBlobID id(tabletId, gen, step, channel, blobSize, BlobCookie++); + const TSharedData buffer = GenerateBuffer(id); + auto ev = std::make_unique(id, buffer, TInstant::Max(), PutHandleClass); + PendingWriteSize += blobSize; + PendingWrites += 1; + return std::move(ev); + } + + std::unique_ptr ManageKeepFlags(ui64 tabletId, ui32 gen, ui32 step, + ui32 channel, bool keep) { + auto blobsWritten = std::make_unique>(ConfirmedBlobs); + + if (keep) { + return std::make_unique(tabletId, gen, step, channel, + false, gen, step, blobsWritten.release(), nullptr, TInstant::Max(), false); + } else { + ConfirmedDataSize = 0; + ConfirmedBlobs.clear(); + return std::make_unique(tabletId, gen, step, channel, + true, gen, step, nullptr, blobsWritten.release(), TInstant::Max(), false); + } + } + + TLogoBlobID GetRandomBlobId() { + Y_VERIFY(!IsEmpty()); + auto iter = ConfirmedBlobs.begin(); + std::advance(iter, RandomNumber(ConfirmedBlobs.size())); + return *iter; + } + + TString ToString() { + return TStringBuilder() << "TInitialAllocation# {" + << " PutHandleClass# " << NKikimrBlobStorage::EPutHandleClass_Name(PutHandleClass) + << " SizeToWrite# " << SizeToWrite + << " BlobsToWrite# " << BlobsToWrite + << " WritesInFlight# " << WritesInFlight + << " ConfirmedSize# " << ConfirmedDataSize + << " ConfirmedBlobs.size()# " << ConfirmedBlobs.size() + << " PendingWrites# " << PendingWrites + << " PendingWriteSize# " << PendingWriteSize << " }"; + } + + ui32 GetWritesInFlight() { + return WritesInFlight; + } + + private: + TSizeGenerator SizeGenerator; + NKikimrBlobStorage::EPutHandleClass PutHandleClass = NKikimrBlobStorage::EPutHandleClass::UserData; + + uint64_t SizeToWrite; + uint64_t BlobsToWrite; + ui32 WritesInFlight = 1; + uint64_t ConfirmedDataSize = 0; + TVector ConfirmedBlobs; + + uint64_t PendingWriteSize = 0; + uint64_t PendingWrites = 0; + + ui64 BlobCookie = 0; + }; struct TRequestDelayManager { virtual ~TRequestDelayManager() = default; + virtual void Start(TMonotonic now) = 0; virtual TDuration CalculateDelayForNextRequest(TMonotonic now) = 0; virtual void CountResponse() = 0; @@ -106,6 +221,10 @@ class TLogWriterLoadTestActor : public TActorBootstrapped& duration) + THardRateDelayManager(double requestsPerSecondAtStart, double requestsPerSecondOnFinish, const TMaybe& duration) : EpochDuration(TDuration::MilliSeconds(100)) , RequestRateAtStart((EpochDuration / TDuration::Seconds(1)) * requestsPerSecondAtStart) , RequestRateOnFinish((EpochDuration / TDuration::Seconds(1)) * requestsPerSecondOnFinish) - , CurrentEpochEnd(now) - , LoadStart(now) - , LoadDuration(duration) - , PlannedForCurrentEpoch(std::max(1., CalculateRequestRate(now))) { - CalculateDelayForNextRequest(now); + , LoadDuration(duration) { } const TDuration EpochDuration; @@ -142,12 +257,19 @@ class TLogWriterLoadTestActor : public TActorBootstrapped LoadDuration; double PlannedForCurrentEpoch; ui32 ResponsesAwaiting = 0; + void Start(TMonotonic now) override { + LoadStart = now; + CurrentEpochEnd = now; + PlannedForCurrentEpoch = std::max(1., CalculateRequestRate(now)); + CalculateDelayForNextRequest(now); + } + double CalculateRequestRate(TMonotonic now) { double ratio = LoadDuration ? (now - LoadStart) / *LoadDuration : 0; return ratio * (RequestRateOnFinish - RequestRateAtStart) + RequestRateAtStart; @@ -159,8 +281,6 @@ class TLogWriterLoadTestActor : public TActorBootstrapped SizeGen; std::shared_ptr DelayManager; const ui32 MaxRequestsInFlight; const ui64 MaxBytesInFlight; @@ -222,10 +344,9 @@ class TLogWriterLoadTestActor : public TActorBootstrapped TagCounters; TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters; - TWakeupQueue& WakeupQueue; - TQueryDispatcher& QueryDispatcher; const ui64 TabletId; const ui32 Channel; ui32 Generation; @@ -286,18 +407,23 @@ class TLogWriterLoadTestActor : public TActorBootstrapped ScriptedRequests; + // Initial allocation + TInitialAllocation InitialAllocation; + + bool MainCycleStarted = false; + public: - TTabletWriter(ui64 tag, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, - TWakeupQueue& wakeupQueue, TQueryDispatcher& queryDispatcher, ui64 tabletId, ui32 channel, + TTabletWriter(TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, + TLogWriterLoadTestActor& self, ui64 tabletId, ui32 channel, TMaybe generation, ui32 groupId, NKikimrBlobStorage::EPutHandleClass putHandleClass, const TRequestDispatchingSettings& writeSettings, NKikimrBlobStorage::EGetHandleClass getHandleClass, const TRequestDispatchingSettings& readSettings, TIntervalGenerator garbageCollectIntervalGen, - TDuration scriptedRoundDuration, TVector&& scriptedRequests) - : TagCounters(counters->GetSubgroup("tag", Sprintf("%" PRIu64, tag))) + TDuration scriptedRoundDuration, TVector&& scriptedRequests, + const TInitialAllocation& initialAllocation) + : Self(self) + , TagCounters(counters->GetSubgroup("tag", Sprintf("%" PRIu64, Self.Tag))) , Counters(TagCounters->GetSubgroup("channel", Sprintf("%" PRIu32, channel))) - , WakeupQueue(wakeupQueue) - , QueryDispatcher(queryDispatcher) , TabletId(tabletId) , Channel(channel) , Generation(generation ? *generation : 0) @@ -331,6 +457,7 @@ class TLogWriterLoadTestActor : public TActorBootstrappedGetCounter("tabletId") = tabletId; const auto& percCounters = Counters->GetSubgroup("sensor", "microseconds"); @@ -376,7 +503,7 @@ class TLogWriterLoadTestActor : public TActorBootstrappedBlockedGeneration + 1; IssueTEvBlock(ctx); }; - SendToBSProxy(ctx, GroupId, ev.release(), QueryDispatcher.ObtainCookie(std::move(callback))); + SendToBSProxy(ctx, GroupId, ev.release(), Self.QueryDispatcher.ObtainCookie(std::move(callback))); } void IssueTEvBlock(const TActorContext& ctx) { @@ -398,7 +525,7 @@ class TLogWriterLoadTestActor : public TActorBootstrappedToString()); - StartWorking(ctx); + MakeInitialAllocation(ctx); }; - SendToBSProxy(ctx, GroupId, ev.Release(), QueryDispatcher.ObtainCookie(std::move(callback))); + SendToBSProxy(ctx, GroupId, ev.Release(), Self.QueryDispatcher.ObtainCookie(std::move(callback))); } void StartWorking(const TActorContext& ctx) { + MainCycleStarted = true; StartTimestamp = TActivationContext::Monotonic(); + if (Self.TestDuration) { + ctx.Schedule(*Self.TestDuration, new TEvents::TEvPoisonPill()); + } InitializeTrackers(StartTimestamp); + WriteSettings.DelayManager->Start(StartTimestamp); IssueWriteIfPossible(ctx); + ReadSettings.DelayManager->Start(StartTimestamp); + IssueReadIfPossible(ctx); ScheduleGarbageCollect(ctx); ExposeCounters(ctx); } + void MakeInitialAllocation(const TActorContext& ctx) { + if (InitialAllocation.EnoughBlobsWritten(false)) { + Self.InitialAllocationCompleted(ctx); + return; + } + LOG_DEBUG_S(ctx, NKikimrServices::BS_LOAD_TEST, PrintMe() << " going to make initial allocation," + << InitialAllocation.ToString()); + for (ui32 i = 0; i < InitialAllocation.GetWritesInFlight() && !InitialAllocation.EnoughBlobsWritten(true); ++i) { + IssueInitialPut(ctx); + } + } + + void IssueInitialPut(const TActorContext& ctx) { + auto ev = InitialAllocation.MakePutMessage(TabletId, Generation, GarbageCollectStep, Channel); + + auto callback = [this](IEventBase *event, const TActorContext& ctx) { + auto *res = dynamic_cast(event); + Y_ABORT_UNLESS(res); + + InitialAllocation.ConfirmBlob(res->Id, CheckStatus(ctx, res, {})); + if (!InitialAllocation.EnoughBlobsWritten(true)) { + IssueInitialPut(ctx); + } else if (InitialAllocation.EnoughBlobsWritten(false)) { + SetKeepFlagsOnInitialAllocation(ctx); + } + }; + SendToBSProxy(ctx, GroupId, ev.release(), Self.QueryDispatcher.ObtainCookie(std::move(callback))); + } + + void SetKeepFlagsOnInitialAllocation(const TActorContext& ctx) { + auto ev = InitialAllocation.ManageKeepFlags(TabletId, Generation, GarbageCollectStep, Channel, true); + + LOG_DEBUG_S(ctx, NKikimrServices::BS_LOAD_TEST, PrintMe() << " going to set keep flags on initally allocated blobs, ev#" + << ev->Print(false)); + auto callback = [this](IEventBase *event, const TActorContext& ctx) { + auto *res = dynamic_cast(event); + Y_ABORT_UNLESS(res); + + if (!MainCycleStarted) { + StartWorking(ctx); + } + }; + + SendToBSProxy(ctx, GroupId, ev.release(), Self.QueryDispatcher.ObtainCookie(std::move(callback))); + } + void StopWorking(const TActorContext& ctx) { auto ev = TEvBlobStorage::TEvCollectGarbage::CreateHardBarrier(TabletId, Generation, GarbageCollectStep, Channel, Generation, Max(), TInstant::Max()); @@ -464,7 +644,7 @@ class TLogWriterLoadTestActor : public TActorBootstrapped nums{{9000, 9900, 9990, 9999, 10000}}; @@ -613,20 +794,21 @@ class TLogWriterLoadTestActor : public TActorBootstrapped= NextWriteTimestamp && @@ -648,7 +830,7 @@ class TLogWriterLoadTestActor : public TActorBootstrappedGenerate(); putHandleClass = PutHandleClass; } const TLogoBlobID id(TabletId, Generation, WriteStep, Channel, size, Cookie); @@ -696,14 +878,14 @@ class TLogWriterLoadTestActor : public TActorBootstrappedIncrement(response.MicroSeconds()); IssueWriteIfPossible(ctx); - if (ConfirmedBlobIds.size() == 1) { + if (ConfirmedBlobIds.size() == 1 && InitialAllocation.IsEmpty()) { if (NextReadTimestamp == TMonotonic()) { NextReadTimestamp = TActivationContext::Monotonic(); } IssueReadIfPossible(ctx); } }; - SendToBSProxy(ctx, GroupId, ev.release(), QueryDispatcher.ObtainCookie(std::move(writeCallback))); + SendToBSProxy(ctx, GroupId, ev.release(), Self.QueryDispatcher.ObtainCookie(std::move(writeCallback))); const auto nowCycles = GetCycleCountFast(); WritesInFlightTimestamps.emplace_back(writeQueryId, nowCycles); SentTimestamp.emplace(writeQueryId, nowCycles); @@ -757,7 +939,7 @@ class TLogWriterLoadTestActor : public TActorBootstrapped(event); Y_ABORT_UNLESS(res); }; - SendToBSProxy(ctx, GroupId, ev.release(), QueryDispatcher.ObtainCookie(std::move(callback))); + SendToBSProxy(ctx, GroupId, ev.release(), Self.QueryDispatcher.ObtainCookie(std::move(callback))); // just as we have sent this request, we have to trim all confirmed blobs which are going to be deleted const auto it = std::lower_bound(ConfirmedBlobIds.begin(), ConfirmedBlobIds.end(), @@ -786,10 +968,10 @@ class TLogWriterLoadTestActor : public TActorBootstrapped= NextReadTimestamp && - ConfirmedBlobIds && + ConfirmedBlobIds.size() + InitialAllocation.Size() > 0 && (!ScriptedRequests || ScriptedRequests[ScriptedCounter].EvType == TEvBlobStorage::EvGet)) { IssueReadRequest(ctx); } @@ -801,16 +983,26 @@ class TLogWriterLoadTestActor : public TActorBootstrapped 0); + ui32 blobIdx = RandomNumber(confirmedBlobs + initialBlobs); + + if (blobIdx < confirmedBlobs) { + auto iter = ConfirmedBlobIds.begin(); + std::advance(iter, blobIdx); + id = *iter; + } else { + id = InitialAllocation[blobIdx - confirmedBlobs]; + } - ui32 size; + ui32 size = Max(); if (ScriptedRequests) { const auto& req = ScriptedRequests[ScriptedCounter]; size = req.Size ? req.Size : id.BlobSize(); - } else { - size = ReadSettings.SizeGen.Generate(); + } else if (ReadSettings.SizeGen) { + size = ReadSettings.SizeGen->Generate(); } size = Min(size, id.BlobSize()); @@ -842,7 +1034,7 @@ class TLogWriterLoadTestActor : public TActorBootstrapped - static ResultContainer GenerateBuffer(const TLogoBlobID& id) { - return GenDataForLZ4(id.BlobSize()); - } }; TString ConfingString; @@ -888,6 +1075,7 @@ class TLogWriterLoadTestActor : public TActorBootstrapped writeDelayManager; if (profile.HasWriteHardRateDispatcher()) { - auto now = TActivationContext::Monotonic(); const auto& dispatcherSettings = profile.GetWriteHardRateDispatcher(); double atStart = dispatcherSettings.GetRequestsPerSecondAtStart(); double onFinish = dispatcherSettings.GetRequestsPerSecondOnFinish(); - writeDelayManager = std::make_shared(atStart, onFinish, now, TestDuration); + writeDelayManager = std::make_shared(atStart, onFinish, TestDuration); } else { writeDelayManager = std::make_shared(TIntervalGenerator(profile.GetWriteIntervals())); } TTabletWriter::TRequestDispatchingSettings writeSettings{ + .LoadEnabled = enableWrites, .SizeGen = TSizeGenerator(profile.GetWriteSizes()), .DelayManager = std::move(writeDelayManager), .MaxRequestsInFlight = profile.GetMaxInFlightWriteRequests(), @@ -944,6 +1131,7 @@ class TLogWriterLoadTestActor : public TActorBootstrapped readDelayManager; if (profile.HasReadHardRateDispatcher()) { - auto now = TActivationContext::Monotonic(); const auto& dispatcherSettings = profile.GetReadHardRateDispatcher(); double atStart = dispatcherSettings.GetRequestsPerSecondAtStart(); double onFinish = dispatcherSettings.GetRequestsPerSecondOnFinish(); - readDelayManager = std::make_shared(atStart, onFinish, now, TestDuration); + readDelayManager = std::make_shared(atStart, onFinish, TestDuration); } else { readDelayManager = std::make_shared(TIntervalGenerator(profile.GetReadIntervals())); } + std::optional readSizeGen; + if (profile.ReadSizesSize() > 0) { + readSizeGen.emplace(profile.GetReadSizes()); + } + TTabletWriter::TRequestDispatchingSettings readSettings{ - .SizeGen = TSizeGenerator(profile.GetReadSizes()), + .LoadEnabled = enableReads, + .SizeGen = readSizeGen, .DelayManager = std::move(readDelayManager), .MaxRequestsInFlight = profile.GetMaxInFlightReadRequests(), .MaxBytesInFlight = profile.GetMaxInFlightReadBytes(), @@ -1009,12 +1202,27 @@ class TLogWriterLoadTestActor : public TActorBootstrapped(Tag, counters, WakeupQueue, QueryDispatcher, tabletId, + TabletWriters.emplace_back(std::make_unique(counters, *this, tabletId, tablet.GetChannel(), tablet.HasGeneration() ? TMaybe(tablet.GetGeneration()) : TMaybe(), tablet.GetGroupId(), putHandleClass, writeSettings, getHandleClass, readSettings, garbageCollectIntervalGen, - scriptedRoundDuration, std::move(scriptedRequests))); + scriptedRoundDuration, std::move(scriptedRequests), + initialAllocation)); + + WorkersInInitialState++; + } + } + } + + void InitialAllocationCompleted(const TActorContext& ctx) { + WorkersInInitialState--; + if (!WorkersInInitialState) { + if (TestDuration) { + ctx.Schedule(*TestDuration, new TEvents::TEvPoisonPill()); + } + for (auto& writer : TabletWriters) { + writer->StartWorking(ctx); } } } @@ -1023,9 +1231,6 @@ class TLogWriterLoadTestActor : public TActorBootstrappedBootstrap(ctx); } @@ -1177,6 +1382,11 @@ class TLogWriterLoadTestActor : public TActorBootstrappedGet()->SourceType, ev->Sender.ToString().data()); } + template + static ResultContainer GenerateBuffer(const TLogoBlobID& id) { + return GenDataForLZ4(id.BlobSize()); + } + STRICT_STFUNC(StateFunc, CFunc(EvStopTest, HandleStopTest); CFunc(EvUpdateQuantile, HandleUpdateQuantile); @@ -1187,7 +1397,7 @@ class TLogWriterLoadTestActor : public TActorBootstrapped Date: Fri, 29 Dec 2023 12:20:53 +0000 Subject: [PATCH 2/4] Improve write throttling, add delay after initial write --- ydb/core/load_test/group_write.cpp | 194 ++++++++++++++++++----------- ydb/core/protos/load_test.proto | 6 +- 2 files changed, 122 insertions(+), 78 deletions(-) diff --git a/ydb/core/load_test/group_write.cpp b/ydb/core/load_test/group_write.cpp index 532459b2ded7..e35208ed2c8a 100644 --- a/ydb/core/load_test/group_write.cpp +++ b/ydb/core/load_test/group_write.cpp @@ -86,6 +86,44 @@ class TLogWriterLoadTestActor : public TActorBootstrapped= size && RequestsInFlight >= 0); + BytesInFlight -= size; + --RequestsInFlight; + } + + TString ToString() const{ + return TStringBuilder() << "{" + << " Requests# " << RequestsInFlight << "/" << MaxRequestsInFlight + << " Bytes# " << BytesInFlight << "/" << MaxBytesInFlight + << " }"; + } + + ui32 MaxRequestsInFlight; + ui64 MaxBytesInFlight; + ui32 RequestsInFlight; + ui64 BytesInFlight; + }; + class TInitialAllocation { public: using TProtoSettings = NKikimr::TEvLoadTestRequest::TStorageLoad::TInitialBlobAllocation; @@ -96,7 +134,7 @@ class TLogWriterLoadTestActor : public TActorBootstrapped 0 || BlobsToWrite > 0; } + bool CanSendRequest() { + if (!InFlightTracker) { + return false; + } + return !EnoughBlobsWritten(true); + } + bool EnoughBlobsWritten(bool countPending = false) { if (SizeToWrite > 0) { - return ConfirmedDataSize + PendingWriteSize * countPending >= SizeToWrite; + return ConfirmedDataSize + InFlightTracker.BytesInFlight * countPending >= SizeToWrite; } else if (BlobsToWrite > 0) { - return ConfirmedBlobs.size() + PendingWrites * countPending >= BlobsToWrite; + return ConfirmedBlobs.size() + InFlightTracker.RequestsInFlight * countPending >= BlobsToWrite; } return true; } void ConfirmBlob(const TLogoBlobID& id, bool success) { - PendingWriteSize -= id.BlobSize(); - PendingWrites -= 1; + InFlightTracker.Response(id.BlobSize()); if (success) { ConfirmedBlobs.push_back(id); ConfirmedDataSize += id.BlobSize(); @@ -139,12 +179,12 @@ class TLogWriterLoadTestActor : public TActorBootstrapped MakePutMessage(ui64 tabletId, ui32 gen, ui32 step, ui32 channel) { + Y_DEBUG_ABORT_UNLESS(CanSendRequest()); ui32 blobSize = SizeGenerator.Generate(); const TLogoBlobID id(tabletId, gen, step, channel, blobSize, BlobCookie++); const TSharedData buffer = GenerateBuffer(id); auto ev = std::make_unique(id, buffer, TInstant::Max(), PutHandleClass); - PendingWriteSize += blobSize; - PendingWrites += 1; + InFlightTracker.Request(blobSize); return std::move(ev); } @@ -164,7 +204,7 @@ class TLogWriterLoadTestActor : public TActorBootstrapped ConfirmedBlobs; - uint64_t PendingWriteSize = 0; - uint64_t PendingWrites = 0; + TInFlightTracker InFlightTracker; ui64 BlobCookie = 0; }; @@ -330,8 +362,7 @@ class TLogWriterLoadTestActor : public TActorBootstrapped SizeGen; std::shared_ptr DelayManager; - const ui32 MaxRequestsInFlight; - const ui64 MaxBytesInFlight; + TInFlightTracker InFlightTracker; const ui64 MaxTotalBytes; }; @@ -360,8 +391,6 @@ class TLogWriterLoadTestActor : public TActorBootstrapped SentTimestamp; ui64 WriteQueryId = 0; @@ -378,8 +407,6 @@ class TLogWriterLoadTestActor : public TActorBootstrapped ReadSentTimestamp; ui64 ReadQueryId = 0; @@ -582,13 +609,13 @@ class TLogWriterLoadTestActor : public TActorBootstrappedId, CheckStatus(ctx, res, {})); - if (!InitialAllocation.EnoughBlobsWritten(true)) { + while (InitialAllocation.CanSendRequest()) { IssueInitialPut(ctx); - } else if (InitialAllocation.EnoughBlobsWritten(false)) { + } + if (InitialAllocation.EnoughBlobsWritten()) { SetKeepFlagsOnInitialAllocation(ctx); } }; @@ -672,10 +700,10 @@ class TLogWriterLoadTestActor : public TActorBootstrapped LastLatencyTrackerUpdate + TDuration::Seconds(1)) { LastLatencyTrackerUpdate = now; ResponseQT->Update(); @@ -736,18 +764,12 @@ class TLogWriterLoadTestActor : public TActorBootstrapped= NextWriteTimestamp && (!ScriptedRequests || ScriptedRequests[ScriptedCounter].EvType == TEvBlobStorage::EvPut)) { IssueWriteRequest(ctx); @@ -858,9 +878,7 @@ class TLogWriterLoadTestActor : public TActorBootstrapped= 1 && WriteBytesInFlight >= size); - --WritesInFlight; - WriteBytesInFlight -= size; + WriteSettings.InFlightTracker.Response(size); TotalBytesWritten += size; @@ -878,7 +896,7 @@ class TLogWriterLoadTestActor : public TActorBootstrappedIncrement(response.MicroSeconds()); IssueWriteIfPossible(ctx); - if (ConfirmedBlobIds.size() == 1 && InitialAllocation.IsEmpty()) { + if (ConfirmedBlobIds.size() == 1 && !InitialAllocation) { if (NextReadTimestamp == TMonotonic()) { NextReadTimestamp = TActivationContext::Monotonic(); } @@ -896,8 +914,7 @@ class TLogWriterLoadTestActor : public TActorBootstrapped= NextReadTimestamp && ConfirmedBlobIds.size() + InitialAllocation.Size() > 0 && (!ScriptedRequests || ScriptedRequests[ScriptedCounter].EvType == TEvBlobStorage::EvGet)) { @@ -1020,9 +1036,7 @@ class TLogWriterLoadTestActor : public TActorBootstrapped= 1 && ReadBytesInFlight >= size); - --ReadsInFlight; - ReadBytesInFlight -= size; + ReadSettings.InFlightTracker.Response(size); TotalBytesRead += size; auto it = ReadSentTimestamp.find(readQueryId); @@ -1037,8 +1051,7 @@ class TLogWriterLoadTestActor : public TActorBootstrapped(), }; @@ -1215,14 +1238,22 @@ class TLogWriterLoadTestActor : public TActorBootstrappedStartWorking(ctx); + } + } + void InitialAllocationCompleted(const TActorContext& ctx) { WorkersInInitialState--; - if (!WorkersInInitialState) { - if (TestDuration) { - ctx.Schedule(*TestDuration, new TEvents::TEvPoisonPill()); - } - for (auto& writer : TabletWriters) { - writer->StartWorking(ctx); + if (!WorkersInInitialState) { + if (DelayAfterInitialWrite) { + ctx.Schedule(TDuration::Seconds(DelayAfterInitialWrite), new TEvents::TEvWakeup); + } else { + StartWorkers(ctx); } } } @@ -1282,9 +1313,20 @@ class TLogWriterLoadTestActor : public TActorBootstrappedGet()->Tag) { + case MAIN_CYCLE: + --WakeupsScheduled; + UpdateWakeupQueue(ctx); + break; + + case DELAY_AFTER_INITIAL_WRITE: + StartWorkers(ctx); + break; + + default: + Y_FAIL_S("Unexpected wakeup tag# " << ev->Get()->Tag); + } } void UpdateWakeupQueue(const TActorContext& ctx) { @@ -1390,7 +1432,7 @@ class TLogWriterLoadTestActor : public TActorBootstrapped Date: Fri, 29 Dec 2023 16:32:27 +0000 Subject: [PATCH 3/4] Fix time in UI --- ydb/core/load_test/group_write.cpp | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/ydb/core/load_test/group_write.cpp b/ydb/core/load_test/group_write.cpp index e35208ed2c8a..a1f931883700 100644 --- a/ydb/core/load_test/group_write.cpp +++ b/ydb/core/load_test/group_write.cpp @@ -648,7 +648,7 @@ class TLogWriterLoadTestActor : public TActorBootstrappedStartWorking(ctx); } + TestStartTime = TActivationContext::Monotonic(); + UpdateWakeupQueue(ctx); } void InitialAllocationCompleted(const TActorContext& ctx) { WorkersInInitialState--; - if (!WorkersInInitialState) { + if (!WorkersInInitialState) { if (DelayAfterInitialWrite) { - ctx.Schedule(TDuration::Seconds(DelayAfterInitialWrite), new TEvents::TEvWakeup); + ctx.Schedule(TDuration::Seconds(DelayAfterInitialWrite), + new TEvents::TEvWakeup(DELAY_AFTER_INITIAL_WRITE)); } else { StartWorkers(ctx); } @@ -1261,11 +1264,9 @@ class TLogWriterLoadTestActor : public TActorBootstrappedBootstrap(ctx); } - UpdateWakeupQueue(ctx); HandleUpdateQuantile(ctx); } @@ -1392,7 +1393,12 @@ class TLogWriterLoadTestActor : public TActorBootstrappedSeconds(); } else { From 817ecac990641ad1d71a9235f1f0fcf6ea09e839 Mon Sep 17 00:00:00 2001 From: Sergey Belyakov Date: Wed, 10 Jan 2024 13:30:22 +0000 Subject: [PATCH 4/4] Adress comments --- ydb/core/load_test/group_write.cpp | 35 +++++++++++++++--------------- 1 file changed, 17 insertions(+), 18 deletions(-) diff --git a/ydb/core/load_test/group_write.cpp b/ydb/core/load_test/group_write.cpp index a1f931883700..a57e1584bb9c 100644 --- a/ydb/core/load_test/group_write.cpp +++ b/ydb/core/load_test/group_write.cpp @@ -95,9 +95,9 @@ class TLogWriterLoadTestActor : public TActorBootstrapped= MaxRequestsInFlight) || + (MaxBytesInFlight && BytesInFlight >= MaxBytesInFlight); } void Request(ui64 size) { @@ -106,7 +106,7 @@ class TLogWriterLoadTestActor : public TActorBootstrapped= size && RequestsInFlight >= 0); + Y_DEBUG_ABORT_UNLESS(BytesInFlight >= size && RequestsInFlight > 0); BytesInFlight -= size; --RequestsInFlight; } @@ -142,7 +142,7 @@ class TLogWriterLoadTestActor : public TActorBootstrapped 0 || BlobsToWrite > 0; + bool IsEmpty() const { + return SizeToWrite == 0 && BlobsToWrite == 0; } bool CanSendRequest() { - if (!InFlightTracker) { + if (InFlightTracker.LimitReached()) { return false; } return !EnoughBlobsWritten(true); @@ -205,9 +205,8 @@ class TLogWriterLoadTestActor : public TActorBootstrapped= NextWriteTimestamp && (!ScriptedRequests || ScriptedRequests[ScriptedCounter].EvType == TEvBlobStorage::EvPut)) { @@ -896,7 +895,7 @@ class TLogWriterLoadTestActor : public TActorBootstrappedIncrement(response.MicroSeconds()); IssueWriteIfPossible(ctx); - if (ConfirmedBlobIds.size() == 1 && !InitialAllocation) { + if (ConfirmedBlobIds.size() == 1 && !InitialAllocation.IsEmpty()) { if (NextReadTimestamp == TMonotonic()) { NextReadTimestamp = TActivationContext::Monotonic(); } @@ -985,9 +984,9 @@ class TLogWriterLoadTestActor : public TActorBootstrapped= NextReadTimestamp && - ConfirmedBlobIds.size() + InitialAllocation.Size() > 0 && + ConfirmedBlobIds.size() + InitialAllocation.ConfirmedSize() > 0 && (!ScriptedRequests || ScriptedRequests[ScriptedCounter].EvType == TEvBlobStorage::EvGet)) { IssueReadRequest(ctx); } @@ -1001,8 +1000,8 @@ class TLogWriterLoadTestActor : public TActorBootstrapped 0); + ui32 initialBlobs = InitialAllocation.ConfirmedSize(); + Y_ABORT_UNLESS(confirmedBlobs + initialBlobs > 0); ui32 blobIdx = RandomNumber(confirmedBlobs + initialBlobs); if (blobIdx < confirmedBlobs) {