@@ -596,7 +596,8 @@ void CacheAllocator<CacheTrait>::addChainedItem(WriteHandle& parent,
596596 // Increment refcount since this chained item is now owned by the parent
597597 // Parent will decrement the refcount upon release. Since this is an
598598 // internal refcount, we dont include it in active handle tracking.
599- child->incRef ();
599+ auto ret = child->incRef (true );
600+ XDCHECK (ret == RefcountWithFlags::incResult::incOk);
600601 XDCHECK_EQ (2u , child->getRefCount ());
601602
602603 invalidateNvm (*parent);
@@ -842,7 +843,8 @@ CacheAllocator<CacheTrait>::replaceChainedItemLocked(Item& oldItem,
842843 // Since this is an internal refcount, we dont include it in active handle
843844 // tracking.
844845
845- newItemHdl->incRef ();
846+ auto ret = newItemHdl->incRef (true );
847+ XDCHECK (ret == RefcountWithFlags::incResult::incOk);
846848 return oldItemHdl;
847849}
848850
@@ -1008,12 +1010,14 @@ CacheAllocator<CacheTrait>::releaseBackToAllocator(Item& it,
10081010}
10091011
10101012template <typename CacheTrait>
1011- bool CacheAllocator<CacheTrait>::incRef(Item& it) {
1012- if (it.incRef ()) {
1013+ RefcountWithFlags::incResult CacheAllocator<CacheTrait>::incRef(Item& it, bool failIfMoving) {
1014+ auto ret = it.incRef (failIfMoving);
1015+
1016+ if (ret == RefcountWithFlags::incResult::incOk) {
10131017 ++handleCount_.tlStats ();
1014- return true ;
10151018 }
1016- return false ;
1019+
1020+ return ret;
10171021}
10181022
10191023template <typename CacheTrait>
@@ -1033,11 +1037,19 @@ CacheAllocator<CacheTrait>::acquire(Item* it) {
10331037
10341038 SCOPE_FAIL { stats_.numRefcountOverflow .inc (); };
10351039
1036- if (LIKELY (incRef (*it))) {
1040+ auto failIfMoving = true ; // TODO: only for multi-tier
1041+ auto incRes = incRef (*it, failIfMoving);
1042+ if (LIKELY (incRes == RefcountWithFlags::incResult::incOk)) {
10371043 return WriteHandle{it, *this };
1038- } else {
1044+ } else if (incRes == RefcountWithFlags::incResult::incFailedExclusive) {
10391045 // item is being evicted
10401046 return WriteHandle{};
1047+ } else {
1048+ // item is being moved - wait for completion
1049+ WriteHandle hdl{*this };
1050+ auto waitContext = hdl.getItemWaitContext ();
1051+ addWaitContextForMovingItem (it->getKey (), waitContext);
1052+ return hdl;
10411053 }
10421054}
10431055
@@ -1248,38 +1260,53 @@ CacheAllocator<CacheTrait>::insertOrReplace(const WriteHandle& handle) {
12481260 *
12491261 * The thread, which moves Item, allocates new Item in the tier we are moving to
12501262 * and calls moveRegularItemWithSync() method. This method does the following:
1251- * 1. Create MoveCtx and put it to the movesMap.
1252- * 2. Update the access container with the new item from the tier we are
1253- * moving to. This Item has kIncomplete flag set.
1254- * 3. Copy data from the old Item to the new one.
1255- * 4. Unset the kIncomplete flag and Notify MoveCtx
1263+ * 1. Update the access container with the new item from the tier we are
1264+ * moving to. This Item has moving flag set.
1265+ * 2. Copy data from the old Item to the new one.
12561266 *
12571267 * Concurrent threads which are getting handle to the same key:
1258- * 1. When a handle is created it checks if the kIncomplete flag is set
1268+ * 1. When a handle is created it checks if the moving flag is set
12591269 * 2. If so, Handle implementation creates waitContext and adds it to the
12601270 * MoveCtx by calling addWaitContextForMovingItem() method.
12611271 * 3. Wait until the moving thread will complete its job.
12621272 */
12631273template <typename CacheTrait>
1264- bool CacheAllocator<CacheTrait>::addWaitContextForMovingItem(
1274+ void CacheAllocator<CacheTrait>::addWaitContextForMovingItem(
12651275 folly::StringPiece key, std::shared_ptr<WaitContext<ReadHandle>> waiter) {
12661276 auto shard = getShardForKey (key);
12671277 auto & movesMap = getMoveMapForShard (shard);
1268- auto lock = getMoveLockForShard (shard);
1269- auto it = movesMap. find (key );
1270- if (it == movesMap.end ()) {
1271- return false ;
1278+ {
1279+ auto lock = getMoveLockForShard (shard );
1280+ auto ret = movesMap.try_emplace (key, std::make_unique<MoveCtx>());
1281+ ret. first -> second -> addWaiter ( std::move (waiter)) ;
12721282 }
1273- auto ctx = it->second .get ();
1274- ctx->addWaiter (std::move (waiter));
1275- return true ;
12761283}
12771284
12781285template <typename CacheTrait>
1279- template <typename P>
1286+ size_t CacheAllocator<CacheTrait>::wakeUpWaiters(folly::StringPiece key,
1287+ WriteHandle&& handle) {
1288+ std::unique_ptr<MoveCtx> ctx;
1289+ auto shard = getShardForKey (key);
1290+ auto & movesMap = getMoveMapForShard (shard);
1291+ {
1292+ auto lock = getMoveLockForShard (shard);
1293+ movesMap.eraseInto (key, [&](auto &&key, auto &&value) {
1294+ ctx = std::move (value);
1295+ });
1296+ }
1297+
1298+ if (ctx) {
1299+ ctx->setItemHandle (std::move (handle));
1300+ return ctx->numWaiters ();
1301+ }
1302+
1303+ return 0 ;
1304+ }
1305+
1306+ template <typename CacheTrait>
12801307bool CacheAllocator<CacheTrait>::moveRegularItemWithSync(
1281- Item& oldItem, WriteHandle& newItemHdl, P&& predicate ) {
1282- XDCHECK (oldItem.isExclusive ());
1308+ Item& oldItem, WriteHandle& newItemHdl) {
1309+ XDCHECK (oldItem.isMoving ());
12831310 // TODO: should we introduce new latency tracker. E.g. evictRegularLatency_
12841311 // ??? util::LatencyTracker tracker{stats_.evictRegularLatency_};
12851312
@@ -1297,50 +1324,20 @@ bool CacheAllocator<CacheTrait>::moveRegularItemWithSync(
12971324 newItemHdl->markNvmClean ();
12981325 }
12991326
1300- folly::StringPiece key (oldItem.getKey ());
1301- auto shard = getShardForKey (key);
1302- auto & movesMap = getMoveMapForShard (shard);
1303- MoveCtx* ctx (nullptr );
1304- {
1305- auto lock = getMoveLockForShard (shard);
1306- auto res = movesMap.try_emplace (key, std::make_unique<MoveCtx>());
1307- if (!res.second ) {
1308- return {};
1309- }
1310- ctx = res.first ->second .get ();
1311- }
1327+ // mark new item as moving to block readers until the data is copied
1328+ // (moveCb is called)
1329+ auto marked = newItemHdl->markMoving (true );
1330+ XDCHECK (marked);
13121331
1313- auto resHdl = WriteHandle{};
1314- auto guard = folly::makeGuard ([key, this , ctx, shard, &resHdl]() {
1315- auto & movesMap = getMoveMapForShard (shard);
1316- if (resHdl)
1317- resHdl->unmarkIncomplete ();
1318- auto lock = getMoveLockForShard (shard);
1319- ctx->setItemHandle (std::move (resHdl));
1320- movesMap.erase (key);
1321- });
1332+ auto predicate = [&](const Item& item){
1333+ // we rely on moving flag being set (it should block all readers)
1334+ XDCHECK (item.getRefCount () == 0 );
1335+ return true ;
1336+ };
13221337
1323- // TODO: Possibly we can use markMoving() instead. But today
1324- // moveOnSlabRelease logic assume that we mark as moving old Item
1325- // and than do copy and replace old Item with the new one in access
1326- // container. Furthermore, Item can be marked as Moving only
1327- // if it is linked to MM container. In our case we mark the new Item
1328- // and update access container before the new Item is ready (content is
1329- // copied).
1330- newItemHdl->markIncomplete ();
1331-
1332- // Inside the access container's lock, this checks if the old item is
1333- // accessible and its refcount is zero. If the item is not accessible,
1334- // there is no point to replace it since it had already been removed
1335- // or in the process of being removed. If the item is in cache but the
1336- // refcount is non-zero, it means user could be attempting to remove
1337- // this item through an API such as remove(ItemHandle). In this case,
1338- // it is unsafe to replace the old item with a new one, so we should
1339- // also abort.
1340- if (!accessContainer_->replaceIf (oldItem, *newItemHdl,
1341- predicate)) {
1342- return false ;
1343- }
1338+ auto replaced = accessContainer_->replaceIf (oldItem, *newItemHdl,
1339+ predicate);
1340+ XDCHECK (replaced);
13441341
13451342 if (config_.moveCb ) {
13461343 // Execute the move callback. We cannot make any guarantees about the
@@ -1356,20 +1353,12 @@ bool CacheAllocator<CacheTrait>::moveRegularItemWithSync(
13561353 oldItem.getSize ());
13571354 }
13581355
1359- // Inside the MM container's lock, this checks if the old item exists to
1360- // make sure that no other thread removed it, and only then replaces it.
1356+ // Adding the item to mmContainer has to succeed since no one can remove the item
13611357 auto & newContainer = getMMContainer (*newItemHdl);
1362- if (!newContainer.add (*newItemHdl)) {
1363- return false ;
1364- }
1365-
1366- // Replacing into the MM container was successful, but someone could have
1367- // called insertOrReplace() or remove() before or after the
1368- // replaceInMMContainer() operation, which would invalidate newItemHdl.
1369- if (!newItemHdl->isAccessible ()) {
1370- removeFromMMContainer (*newItemHdl);
1371- return false ;
1372- }
1358+ auto mmContainerAdded = newContainer.add (*newItemHdl);
1359+ XDCHECK (mmContainerAdded);
1360+
1361+ XDCHECK (newItemHdl->isAccessible ());
13731362
13741363 // no one can add or remove chained items at this point
13751364 if (oldItem.hasChainedItem ()) {
@@ -1390,7 +1379,7 @@ bool CacheAllocator<CacheTrait>::moveRegularItemWithSync(
13901379 XDCHECK (newItemHdl->hasChainedItem ());
13911380 }
13921381 newItemHdl.unmarkNascent ();
1393- resHdl = std::move ( newItemHdl); // guard will assign it to ctx under lock
1382+ newItemHdl-> unmarkMoving ();
13941383 return true ;
13951384}
13961385
@@ -1592,12 +1581,10 @@ CacheAllocator<CacheTrait>::findEviction(TierId tid, PoolId pid, ClassId cid) {
15921581 ? &toRecycle_->asChainedItem ().getParentItem (compressor_)
15931582 : toRecycle_;
15941583
1595- token = createPutToken (*candidate_);
1596-
15971584 if (shouldWriteToNvmCache (*candidate_) && !token.isValid ()) {
15981585 stats_.evictFailConcurrentFill .inc ();
1599- } else if (candidate_->markExclusive ( )) {
1600- XDCHECK (candidate_->isExclusive ());
1586+ } else if (candidate_->markMoving ( true )) {
1587+ XDCHECK (candidate_->isMoving ());
16011588 // markExclusive to make sure no other thead is evicting the item
16021589 // nor holding a handle to that item
16031590
@@ -1635,16 +1622,30 @@ CacheAllocator<CacheTrait>::findEviction(TierId tid, PoolId pid, ClassId cid) {
16351622
16361623 auto evictedToNext = tryEvictToNextMemoryTier (*candidate, false /* from BgThread */ );
16371624 if (!evictedToNext) {
1625+ token = createPutToken (*candidate);
1626+
1627+ // tryEvictToNextMemoryTier should only fail if allocation of the new item fails
1628+ // in that case, it should be still possible to mark item as exclusive.
1629+ auto ret = candidate->markExclusiveWhenMoving ();
1630+ XDCHECK (ret);
1631+
1632+ // wake up any readers that wait for the move to complete
1633+ // it's safe to do now, as we have the item marked exclusive and
1634+ // no other reader can be added to the waiters list
1635+ wakeUpWaiters (candidate->getKey (), WriteHandle{});
1636+
16381637 unlinkItemExclusive (*candidate);
1638+
1639+ if (token.isValid () && shouldWriteToNvmCacheExclusive (*candidate)) {
1640+ nvmCache_->put (*candidate, std::move (token));
1641+ }
16391642 } else {
1640- auto ref = candidate->unmarkExclusive ();
1641- XDCHECK (ref == 0u );
1643+ XDCHECK (!evictedToNext->isExclusive () && !evictedToNext->isMoving ());
1644+ XDCHECK (!candidate->isExclusive () && !candidate->isMoving ());
1645+ wakeUpWaiters (candidate->getKey (), std::move (evictedToNext));
16421646 }
1643- XDCHECK (!candidate->isExclusive () && !candidate->isMoving ());
16441647
1645- if (token.isValid () && shouldWriteToNvmCacheExclusive (*candidate)) {
1646- nvmCache_->put (*candidate, std::move (token));
1647- }
1648+ XDCHECK (!candidate->isExclusive () && !candidate->isMoving ());
16481649
16491650 // recycle the item. it's safe to do so, even if toReleaseHandle was
16501651 // NULL. If `ref` == 0 then it means that we are the last holder of
@@ -1721,15 +1722,16 @@ bool CacheAllocator<CacheTrait>::shouldWriteToNvmCacheExclusive(
17211722}
17221723
17231724template <typename CacheTrait>
1724- bool CacheAllocator<CacheTrait>::tryEvictToNextMemoryTier(
1725+ typename CacheAllocator<CacheTrait>::WriteHandle
1726+ CacheAllocator<CacheTrait>::tryEvictToNextMemoryTier(
17251727 TierId tid, PoolId pid, Item& item, bool fromBgThread) {
1726- if (item.isChainedItem ()) return false ; // TODO: We do not support ChainedItem yet
1728+ XDCHECK (item.isMoving ());
1729+ XDCHECK (item.getRefCount () == 0 );
1730+ if (item.isChainedItem ()) return {}; // TODO: We do not support ChainedItem yet
17271731 if (item.isExpired ()) {
1728- XDCHECK (item.isExclusive ());
1729- XDCHECK (item.getRefCount () == 0 );
1730-
17311732 accessContainer_->remove (item);
1732- return true ;
1733+ item.unmarkMoving ();
1734+ return acquire (&item); // TODO: wakeUpWaiters with null handle?
17331735 }
17341736
17351737 TierId nextTier = tid; // TODO - calculate this based on some admission policy
@@ -1744,25 +1746,31 @@ bool CacheAllocator<CacheTrait>::tryEvictToNextMemoryTier(
17441746
17451747 if (newItemHdl) {
17461748 XDCHECK_EQ (newItemHdl->getSize (), item.getSize ());
1747- auto predicate = [&](const Item& item){
1748- return item.getRefCount () == 0 ;
1749- };
1750- return moveRegularItemWithSync (item, newItemHdl, predicate);
1749+ if (moveRegularItemWithSync (item, newItemHdl)) {
1750+ item.unmarkMoving ();
1751+ return newItemHdl;
1752+ } else {
1753+ return WriteHandle{};
1754+ }
1755+ } else {
1756+ return WriteHandle{};
17511757 }
17521758 }
17531759
17541760 return {};
17551761}
17561762
17571763template <typename CacheTrait>
1758- bool CacheAllocator<CacheTrait>::tryEvictToNextMemoryTier(Item& item, bool fromBgThread) {
1764+ typename CacheAllocator<CacheTrait>::WriteHandle
1765+ CacheAllocator<CacheTrait>::tryEvictToNextMemoryTier(Item& item, bool fromBgThread) {
17591766 auto tid = getTierId (item);
17601767 auto pid = allocator_[tid]->getAllocInfo (item.getMemory ()).poolId ;
17611768 return tryEvictToNextMemoryTier (tid, pid, item, fromBgThread);
17621769}
17631770
17641771template <typename CacheTrait>
1765- bool CacheAllocator<CacheTrait>::tryPromoteToNextMemoryTier(
1772+ typename CacheAllocator<CacheTrait>::WriteHandle
1773+ CacheAllocator<CacheTrait>::tryPromoteToNextMemoryTier(
17661774 TierId tid, PoolId pid, Item& item, bool fromBgThread) {
17671775 TierId nextTier = tid;
17681776 while (nextTier > 0 ) { // try to evict down to the next memory tiers
@@ -1779,18 +1787,23 @@ bool CacheAllocator<CacheTrait>::tryPromoteToNextMemoryTier(
17791787
17801788 if (newItemHdl) {
17811789 XDCHECK_EQ (newItemHdl->getSize (), item.getSize ());
1782- auto predicate = [&](const Item& item){
1783- return item.getRefCount () == 0 || config_.numDuplicateElements > 0 ;
1784- };
1785- return moveRegularItemWithSync (item, newItemHdl, predicate);
1790+ if (moveRegularItemWithSync (item, newItemHdl)) {
1791+ item.unmarkMoving ();
1792+ return newItemHdl;
1793+ } else {
1794+ return WriteHandle{};
1795+ }
1796+ } else {
1797+ return WriteHandle{};
17861798 }
17871799 }
17881800
1789- return false ;
1801+ return {} ;
17901802}
17911803
17921804template <typename CacheTrait>
1793- bool CacheAllocator<CacheTrait>::tryPromoteToNextMemoryTier(Item& item, bool fromBgThread) {
1805+ typename CacheAllocator<CacheTrait>::WriteHandle
1806+ CacheAllocator<CacheTrait>::tryPromoteToNextMemoryTier(Item& item, bool fromBgThread) {
17941807 auto tid = getTierId (item);
17951808 auto pid = allocator_[tid]->getAllocInfo (item.getMemory ()).poolId ;
17961809 return tryPromoteToNextMemoryTier (tid, pid, item, fromBgThread);
@@ -3318,7 +3331,7 @@ bool CacheAllocator<CacheTrait>::markMovingForSlabRelease(
33183331 // Since this callback is executed, the item is not yet freed
33193332 itemFreed = false ;
33203333 Item* item = static_cast <Item*>(memory);
3321- if (item->markMoving ()) {
3334+ if (item->markMoving (false )) {
33223335 markedMoving = true ;
33233336 }
33243337 };
0 commit comments