Skip to content

Commit 017f2e1

Browse files
authored
Private Cache remove page collection Lock/Unlock logic and respond on Detach immediately (#15904)
1 parent 43af40f commit 017f2e1

20 files changed

+1679
-341
lines changed

ydb/core/tablet_flat/flat_boot_bundle.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ namespace NBoot {
5858
LeftReads -= 1;
5959

6060
if (msg.Status == NKikimrProto::OK) {
61-
Loader->Save(msg.Cookie, msg.Loaded);
61+
Loader->Save(msg.Cookie, msg.Pages);
6262

6363
TryFinalize();
6464

ydb/core/tablet_flat/flat_executor.cpp

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -727,8 +727,8 @@ void TExecutor::DropSingleCache(const TLogoBlobID &label)
727727

728728
auto toActivate = PrivatePageCache->ForgetPageCollection(pageCollection);
729729
ActivateWaitingTransactions(toActivate);
730-
if (!PrivatePageCache->Info(label))
731-
Send(MakeSharedPageCacheId(), new NSharedCache::TEvInvalidate(label));
730+
Y_ENSURE(!PrivatePageCache->Info(label));
731+
Send(MakeSharedPageCacheId(), new NSharedCache::TEvDetach(label));
732732

733733
Counters->Simple()[TExecutorCounters::CACHE_PINNED_SET] = PrivatePageCache->GetStats().PinnedSetSize;
734734
Counters->Simple()[TExecutorCounters::CACHE_PINNED_LOAD] = PrivatePageCache->GetStats().PinnedLoadSize;
@@ -769,9 +769,9 @@ void TExecutor::StickInMemPages(NSharedCache::TEvResult *msg) {
769769
auto partStore = partView.As<NTable::TPartStore>();
770770
for (auto &pageCollection : partStore->PageCollections) {
771771
// Note: page collection search optimization seems useless
772-
if (pageCollection->PageCollection == msg->Origin) {
772+
if (pageCollection->PageCollection == msg->PageCollection) {
773773
ui64 stickySizeBefore = pageCollection->GetStickySize();
774-
for (auto& loaded : msg->Loaded) {
774+
for (auto& loaded : msg->Pages) {
775775
pageCollection->AddSticky(loaded.PageId, loaded.Page);
776776
}
777777
StickyPagesMemory += pageCollection->GetStickySize() - stickySizeBefore;
@@ -2974,7 +2974,7 @@ void TExecutor::Handle(NSharedCache::TEvResult::TPtr &ev) {
29742974
case EPageCollectionRequest::Cache:
29752975
case EPageCollectionRequest::InMemPages:
29762976
{
2977-
TPrivatePageCache::TInfo *collectionInfo = PrivatePageCache->Info(msg->Origin->Label());
2977+
TPrivatePageCache::TInfo *collectionInfo = PrivatePageCache->Info(msg->PageCollection->Label());
29782978
if (!collectionInfo) // collection could be outdated
29792979
return;
29802980

@@ -2993,7 +2993,7 @@ void TExecutor::Handle(NSharedCache::TEvResult::TPtr &ev) {
29932993
if (requestType == EPageCollectionRequest::InMemPages) {
29942994
StickInMemPages(msg);
29952995
}
2996-
for (auto& loaded : msg->Loaded) {
2996+
for (auto& loaded : msg->Pages) {
29972997
TPrivatePageCache::TPage::TWaitQueuePtr transactionsToActivate = PrivatePageCache->ProvideBlock(std::move(loaded), collectionInfo);
29982998
ActivateWaitingTransactions(transactionsToActivate);
29992999
}
@@ -3002,7 +3002,7 @@ void TExecutor::Handle(NSharedCache::TEvResult::TPtr &ev) {
30023002

30033003
case EPageCollectionRequest::PendingInit:
30043004
{
3005-
const auto *pageCollection = msg->Origin.Get();
3005+
const auto *pageCollection = msg->PageCollection.Get();
30063006
TPendingPartSwitch *foundSwitch = nullptr;
30073007
TPendingPartSwitch::TNewBundle *foundBundle = nullptr;
30083008
TPendingPartSwitch::TLoaderStage *foundStage = nullptr;
@@ -3039,7 +3039,7 @@ void TExecutor::Handle(NSharedCache::TEvResult::TPtr &ev) {
30393039
return Broken();
30403040
}
30413041

3042-
foundStage->Loader.Save(msg->Cookie, msg->Loaded);
3042+
foundStage->Loader.Save(msg->Cookie, msg->Pages);
30433043
foundSwitch->PendingLoads--;
30443044

30453045
if (PrepareExternalPart(*foundSwitch, *foundBundle)) {
@@ -3308,9 +3308,6 @@ THolder<TScanSnapshot> TExecutor::PrepareScanSnapshot(ui32 table, const NTable::
33083308
subset = Database->ScanSnapshot(table, snapshot);
33093309
}
33103310

3311-
for (auto &partView : subset->Flatten)
3312-
PrivatePageCache->LockPageCollection(partView->Label);
3313-
33143311
GcLogic->HoldBarrier(barrier->Step);
33153312
CompactionLogic->UpdateLogUsage(LogicRedo->GrabLogUsage());
33163313

@@ -3477,11 +3474,9 @@ void TExecutor::UtilizeSubset(const NTable::TSubset &subset,
34773474

34783475
void TExecutor::ReleaseScanLocks(TIntrusivePtr<TBarrier> barrier, const NTable::TSubset &subset)
34793476
{
3480-
CheckCollectionBarrier(barrier);
3477+
Y_UNUSED(subset);
34813478

3482-
for (auto &partView : subset.Flatten)
3483-
if (PrivatePageCache->UnlockPageCollection(partView->Label))
3484-
Send(MakeSharedPageCacheId(), new NSharedCache::TEvInvalidate(partView->Label));
3479+
CheckCollectionBarrier(barrier);
34853480
}
34863481

34873482
void TExecutor::Handle(NOps::TEvScanStat::TPtr &ev, const TActorContext &ctx) {

ydb/core/tablet_flat/flat_executor_bootlogic.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,7 @@ TExecutorBootLogic::EOpResult TExecutorBootLogic::Receive(::NActors::IEventHandl
271271
if (EPageCollectionRequest(ev.Cookie) != EPageCollectionRequest::BootLogic)
272272
return OpResultUnhandled;
273273

274-
auto it = Loads.find(msg->Origin.Get());
274+
auto it = Loads.find(msg->PageCollection.Get());
275275
if (it == Loads.end()) // could receive outdated results
276276
return OpResultUnhandled;
277277

ydb/core/tablet_flat/flat_executor_ut.cpp

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -602,8 +602,11 @@ Y_UNIT_TEST_SUITE(TFlatTableExecutor_CompactionScan) {
602602
Y_UNIT_TEST(TestCompactionScan) {
603603
TMyEnvBase env;
604604
TRowsModel data;
605+
auto counters = GetSharedPageCounters(env);
605606

606607
env->SetLogPriority(NKikimrServices::RESOURCE_BROKER, NActors::NLog::PRI_DEBUG);
608+
env->SetLogPriority(NKikimrServices::TABLET_EXECUTOR, NActors::NLog::PRI_DEBUG);
609+
env->SetLogPriority(NKikimrServices::TABLET_SAUSAGECACHE, NActors::NLog::PRI_TRACE);
607610

608611
env.FireTablet(env.Edge, env.Tablet, [&env](const TActorId &tablet, TTabletStorageInfo *info) {
609612
return new TTestFlatTablet(env.Edge, tablet, info);
@@ -634,13 +637,21 @@ Y_UNIT_TEST_SUITE(TFlatTableExecutor_CompactionScan) {
634637
env.WaitFor<NFake::TEvCompacted>(28);
635638
env.WaitForWakeUp();
636639

637-
env.SendSync(new TEvTestFlatTablet::TEvQueueScan(data.Rows(), true));
640+
auto queueScan = new TEvTestFlatTablet::TEvQueueScan(data.Rows(), true);
641+
queueScan->ExpectedPageFaults = 8;
642+
env.SendSync(std::move(queueScan));
638643
env.SendAsync(data.MakeRows(1));
639644
env.WaitFor<NFake::TEvCompacted>(3);
640645
env.WaitForWakeUp();
646+
647+
auto cacheHitsBefore = counters->CacheHitPages->Val();
648+
auto cacheMissBefore = counters->CacheMissPages->Val();
641649
env.SendAsync(new TEvTestFlatTablet::TEvStartQueuedScan());
642650
TAutoPtr<IEventHandle> handle;
643651
env->GrabEdgeEventRethrow<TEvTestFlatTablet::TEvScanFinished>(handle);
652+
UNIT_ASSERT_VALUES_EQUAL(cacheHitsBefore + 8, counters->CacheHitPages->Val());
653+
UNIT_ASSERT_VALUES_EQUAL(cacheMissBefore, counters->CacheMissPages->Val());
654+
644655
env.SendSync(new TEvents::TEvPoison, false, true);
645656
}
646657
}
@@ -6756,7 +6767,7 @@ Y_UNIT_TEST_SUITE(TFlatTableExecutor_BTreeIndex) {
67566767
env.WaitFor<NFake::TEvCompacted>();
67576768

67586769
// all pages are always kept in shared cache (except flat index)
6759-
UNIT_ASSERT_VALUES_EQUAL(counters->ActivePages->Val(), 334);
6770+
UNIT_ASSERT_GE(counters->ActivePages->Val(), 334);
67606771

67616772
env.SendSync(new NFake::TEvExecute{ new TTxFullScan(readRows, failedAttempts) });
67626773
UNIT_ASSERT_VALUES_EQUAL(readRows, 1000);
@@ -6794,7 +6805,7 @@ Y_UNIT_TEST_SUITE(TFlatTableExecutor_BTreeIndex) {
67946805
env.WaitFor<NFake::TEvCompacted>();
67956806

67966807
// all pages are always kept in shared cache (except flat index)
6797-
UNIT_ASSERT_VALUES_EQUAL(counters->ActivePages->Val(), 334);
6808+
UNIT_ASSERT_GE(counters->ActivePages->Val(), 334);
67986809

67996810
env.SendSync(new NFake::TEvExecute{ new TTxFullScan(readRows, failedAttempts) });
68006811
UNIT_ASSERT_VALUES_EQUAL(readRows, 1000);
@@ -6833,7 +6844,7 @@ Y_UNIT_TEST_SUITE(TFlatTableExecutor_BTreeIndex) {
68336844
env.WaitFor<NFake::TEvCompacted>();
68346845

68356846
// all pages are always kept in shared cache
6836-
UNIT_ASSERT_VALUES_EQUAL(counters->ActivePages->Val(), 290);
6847+
UNIT_ASSERT_GE(counters->ActivePages->Val(), 290);
68376848

68386849
env.SendSync(new NFake::TEvExecute{ new TTxFullScan(readRows, failedAttempts) });
68396850
UNIT_ASSERT_VALUES_EQUAL(readRows, 1000);
@@ -6872,7 +6883,7 @@ Y_UNIT_TEST_SUITE(TFlatTableExecutor_BTreeIndex) {
68726883
env.WaitFor<NFake::TEvCompacted>();
68736884

68746885
// all pages are always kept in shared cache
6875-
UNIT_ASSERT_VALUES_EQUAL(counters->ActivePages->Val(), 334);
6886+
UNIT_ASSERT_GE(counters->ActivePages->Val(), 334);
68766887

68776888
env.SendSync(new NFake::TEvExecute{ new TTxFullScan(readRows, failedAttempts) });
68786889
UNIT_ASSERT_VALUES_EQUAL(readRows, 1000);
@@ -6911,7 +6922,7 @@ Y_UNIT_TEST_SUITE(TFlatTableExecutor_BTreeIndex) {
69116922
env.WaitFor<NFake::TEvCompacted>();
69126923

69136924
// all pages are always kept in shared cache
6914-
UNIT_ASSERT_VALUES_EQUAL(counters->ActivePages->Val(), 290);
6925+
UNIT_ASSERT_GE(counters->ActivePages->Val(), 290);
69156926

69166927
env.SendSync(new NFake::TEvExecute{ new TTxFullScan(readRows, failedAttempts) });
69176928
UNIT_ASSERT_VALUES_EQUAL(readRows, 1000);
@@ -6950,7 +6961,7 @@ Y_UNIT_TEST_SUITE(TFlatTableExecutor_BTreeIndex) {
69506961
env.WaitFor<NFake::TEvCompacted>();
69516962

69526963
// all pages are always kept in shared cache (except flat index)
6953-
UNIT_ASSERT_VALUES_EQUAL(counters->ActivePages->Val(), 334);
6964+
UNIT_ASSERT_GE(counters->ActivePages->Val(), 334);
69546965

69556966
env.SendSync(new NFake::TEvExecute{ new TTxFullScan(readRows, failedAttempts) });
69566967
UNIT_ASSERT_VALUES_EQUAL(readRows, 1000);
@@ -6999,7 +7010,7 @@ Y_UNIT_TEST_SUITE(TFlatTableExecutor_BTreeIndex) {
69997010

70007011
// gen 0 data pages are always kept in shared cache
70017012
// b-tree index pages are always kept in shared cache
7002-
UNIT_ASSERT_VALUES_EQUAL(counters->ActivePages->Val(), 48);
7013+
UNIT_ASSERT_GE(counters->ActivePages->Val(), 48);
70037014

70047015
env.SendSync(new NFake::TEvExecute{ new TTxFullScan(readRows, failedAttempts) });
70057016
UNIT_ASSERT_VALUES_EQUAL(readRows, 1000);

ydb/core/tablet_flat/flat_ops_compact.h

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -375,11 +375,7 @@ namespace NTabletFlatExecutor {
375375
TStringBuilder error;
376376
error << "Just compacted part needs to load pages";
377377
for (auto collection : fetch) {
378-
error << " " << collection->PageCollection->Label().ToString() << ": [ ";
379-
for (auto pageId : collection->Pages) {
380-
error << pageId << " " << (NTable::NPage::EPage)collection->PageCollection->Page(pageId).Type << " ";
381-
}
382-
error << "]";
378+
error << " " << collection->DebugString(true);
383379
}
384380
Y_TABLET_ERROR(error);
385381
}

ydb/core/tablet_flat/flat_sausage_fetch.h

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,16 @@
11
#pragma once
22

33
#include "flat_sausage_gut.h"
4+
#include "flat_part_iface.h"
45

6+
#include <util/generic/xrange.h>
57
#include <ydb/library/actors/util/shared_data.h>
68

79
namespace NKikimr {
810
namespace NPageCollection {
911

1012
struct TFetch {
11-
TFetch(ui64 cookie, TIntrusiveConstPtr<IPageCollection> pageCollection, TVector<ui32> pages, NWilson::TTraceId traceId = {})
13+
TFetch(ui64 cookie, TIntrusiveConstPtr<IPageCollection> pageCollection, TVector<TPageId> pages, NWilson::TTraceId traceId = {})
1214
: Cookie(cookie)
1315
, PageCollection(std::move(pageCollection))
1416
, Pages(std::move(pages))
@@ -17,24 +19,33 @@ namespace NPageCollection {
1719

1820
}
1921

20-
void Describe(IOutputStream &out) const
22+
TString DebugString(bool detailed = false) const
2123
{
22-
out
23-
<< "Fetch{" << Pages.size() << " pages"
24-
<< " " << PageCollection->Label() << "}";
24+
TStringBuilder str;
25+
str << "PageCollection: " << PageCollection->Label();
26+
if (detailed) {
27+
str << " Pages: [";
28+
for (const auto& pageId : Pages) {
29+
str << " " << pageId;
30+
}
31+
str << " ]";
32+
} else {
33+
str << " Pages: " << Pages.size();
34+
}
35+
if (Cookie != Max<ui64>()) str << " Cookie: " << Cookie;
36+
return str;
2537
}
2638

27-
const ui64 Cookie = Max<ui64>();
28-
39+
ui64 Cookie = Max<ui64>();
2940
TIntrusiveConstPtr<IPageCollection> PageCollection;
30-
TVector<ui32> Pages;
41+
TVector<TPageId> Pages;
3142
NWilson::TTraceId TraceId;
3243
};
3344

3445
struct TLoadedPage {
3546
TLoadedPage() = default;
3647

37-
TLoadedPage(ui32 page, TSharedData data)
48+
TLoadedPage(TPageId page, TSharedData data)
3849
: PageId(page)
3950
, Data(std::move(data))
4051
{
@@ -43,10 +54,10 @@ namespace NPageCollection {
4354

4455
explicit operator bool() const noexcept
4556
{
46-
return Data && PageId != Max<ui32>();
57+
return Data && PageId != Max<TPageId>();
4758
}
4859

49-
ui32 PageId = Max<ui32>();
60+
TPageId PageId = Max<TPageId>();
5061
TSharedData Data;
5162
};
5263

ydb/core/tablet_flat/flat_sausage_solid.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,18 @@
11
#pragma once
22

33
#include "util_fmt_abort.h"
4+
#include "flat_page_iface.h"
45

56
#include <ydb/core/base/logoblob.h>
67
#include <ydb/library/actors/util/shared_data.h>
78

89
namespace NKikimr {
910
namespace NPageCollection {
1011

12+
using TPageId = NTable::NPage::TPageId;
13+
1114
struct TLargeGlobId {
12-
/* ... is a piece of some data up to 4GiB placed on a continous
15+
/* ... is a piece of some data up to 4GiB placed on a continuous
1316
series of TLogoBlobs which IDs are differs only in cookie and
1417
have the single upper chunk bytes limit. All blobs of span have
1518
the same BS storage group.

ydb/core/tablet_flat/flat_sausagecache.cpp

Lines changed: 16 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,13 @@ TPrivatePageCache::TPage::TPage(size_t size, TPageId pageId, TInfo* info)
1515
TPrivatePageCache::TInfo::TInfo(TIntrusiveConstPtr<NPageCollection::IPageCollection> pageCollection)
1616
: Id(pageCollection->Label())
1717
, PageCollection(std::move(pageCollection))
18-
, Users(0)
1918
{
2019
PageMap.resize(PageCollection->Total());
2120
}
2221

2322
TPrivatePageCache::TInfo::TInfo(const TInfo &info)
2423
: Id(info.Id)
2524
, PageCollection(info.PageCollection)
26-
, Users(info.Users)
2725
{
2826
PageMap.resize(info.PageMap.size());
2927
for (const auto& kv : info.PageMap) {
@@ -63,8 +61,6 @@ void TPrivatePageCache::RegisterPageCollection(TIntrusivePtr<TInfo> info) {
6361
ToTouchShared[page->Info->Id].insert(page->Id);
6462
Y_DEBUG_ABORT_UNLESS(!page->IsUnnecessary());
6563
}
66-
67-
++info->Users;
6864
}
6965

7066
TPrivatePageCache::TPage::TWaitQueuePtr TPrivatePageCache::ForgetPageCollection(TIntrusivePtr<TInfo> info) {
@@ -95,48 +91,28 @@ TPrivatePageCache::TPage::TWaitQueuePtr TPrivatePageCache::ForgetPageCollection(
9591
}
9692
}
9793

98-
UnlockPageCollection(info->Id);
99-
100-
return ret;
101-
}
102-
103-
void TPrivatePageCache::LockPageCollection(TLogoBlobID id) {
104-
auto it = PageCollections.find(id);
105-
Y_ENSURE(it != PageCollections.end(), "trying to lock unknown page collection. logic flaw?");
106-
++it->second->Users;
107-
}
108-
109-
bool TPrivatePageCache::UnlockPageCollection(TLogoBlobID id) {
110-
auto it = PageCollections.find(id);
111-
Y_ENSURE(it != PageCollections.end(), "trying to unlock unknown page collection. logic flaw?");
112-
TIntrusivePtr<TInfo> info = it->second;
113-
114-
--info->Users;
115-
116-
// Completely forget page collection if no users remain.
117-
if (!info->Users) {
118-
for (const auto& kv : info->PageMap) {
119-
auto* page = kv.second.Get();
120-
Y_DEBUG_ABORT_UNLESS(page);
94+
// Completely forget page collection
95+
for (const auto& kv : info->PageMap) {
96+
auto* page = kv.second.Get();
97+
Y_DEBUG_ABORT_UNLESS(page);
12198

12299
Y_ENSURE(!page->WaitQueue, "non-empty wait queue in forgotten page.");
123100
Y_ENSURE(!page->PinPad, "non-empty pin pad in forgotten page.");
124101

125-
if (page->SharedBody)
126-
Stats.TotalSharedBody -= page->Size;
127-
if (page->PinnedBody)
128-
Stats.TotalPinnedBody -= page->Size;
129-
if (page->PinnedBody && !page->SharedBody)
130-
Stats.TotalExclusive -= page->Size;
131-
}
132-
133-
info->PageMap.clear();
134-
PageCollections.erase(it);
135-
ToTouchShared.erase(id);
136-
--Stats.TotalCollections;
102+
if (page->SharedBody)
103+
Stats.TotalSharedBody -= page->Size;
104+
if (page->PinnedBody)
105+
Stats.TotalPinnedBody -= page->Size;
106+
if (page->PinnedBody && !page->SharedBody)
107+
Stats.TotalExclusive -= page->Size;
137108
}
138109

139-
return !info->Users;
110+
info->PageMap.clear();
111+
PageCollections.erase(info->Id);
112+
ToTouchShared.erase(info->Id);
113+
--Stats.TotalCollections;
114+
115+
return ret;
140116
}
141117

142118
TPrivatePageCache::TInfo* TPrivatePageCache::Info(TLogoBlobID id) {

0 commit comments

Comments
 (0)