diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java index b9514813d05f4..26d18b32a4678 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java @@ -1744,25 +1744,6 @@ private boolean safeToUpdatePageMemories() { checkpointLock.readLock().unlock(); - if (checkpointer != null) { - Collection dataRegs = context().database().dataRegions(); - - if (dataRegs != null) { - for (DataRegion dataReg : dataRegs) { - if (!dataReg.config().isPersistenceEnabled()) - continue; - - PageMemoryEx mem = (PageMemoryEx)dataReg.pageMemory(); - - if (mem != null && !mem.safeToUpdate()) { - checkpointer.wakeupForCheckpoint(0, "too many dirty pages"); - - break; - } - } - } - } - if (ASSERTION_ENABLED) CHECKPOINT_LOCK_HOLD_COUNT.set(CHECKPOINT_LOCK_HOLD_COUNT.get() - 1); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryEx.java index 61f19fe80f83a..08c1b92fc1db5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryEx.java @@ -99,7 +99,7 @@ public long acquirePage(int grpId, long pageId, IoStatisticsHolder statHldr, /** * Heuristic method which allows a thread to check if it safe to start memory struture modifications - * in regard with checkpointing. + * in regard with checkpointing. May return false-negative result during or after partition eviction. * * @return {@code False} if there are too many dirty pages and a thread should wait for a * checkpoint to begin. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java index 6cead2d04f2bc..c740a1f06dc1c 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java @@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; @@ -96,6 +97,7 @@ import org.apache.ignite.spi.encryption.noop.NoopEncryptionSpi; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import org.jetbrains.annotations.TestOnly; import static java.lang.Boolean.FALSE; import static java.lang.Boolean.TRUE; @@ -246,6 +248,9 @@ public class PageMemoryImpl implements PageMemoryEx { /** Segments array. */ private volatile Segment[] segments; + /** @see #safeToUpdate() */ + private final AtomicBoolean safeToUpdate = new AtomicBoolean(true); + /** Lock for segments changes. */ private Object segmentsLock = new Object(); @@ -952,8 +957,10 @@ private long refreshOutdatedPage(Segment seg, int grpId, long pageId, boolean rm Collection dirtyPages = seg.dirtyPages; - if (dirtyPages != null) - dirtyPages.remove(new FullPageId(pageId, grpId)); + if (dirtyPages != null) { + if (dirtyPages.remove(new FullPageId(pageId, grpId))) + seg.dirtyPagesCntr.decrementAndGet(); + } return relPtr; } @@ -1080,11 +1087,8 @@ private void tryToRestorePage(FullPageId fullId, ByteBuffer buf) throws IgniteCh /** {@inheritDoc} */ @Override public boolean safeToUpdate() { - if (segments != null) { - for (Segment segment : segments) - if (!segment.safeToUpdate()) - return false; - } + if (segments != null) + return safeToUpdate.get(); return true; } @@ -1163,8 +1167,11 @@ public long totalPages() { seg.checkpointPages = new CheckpointPages(dirtyPages, allowToReplace); seg.dirtyPages = new GridConcurrentHashSet<>(); + seg.dirtyPagesCntr.set(0); } + safeToUpdate.set(true); + memMetrics.resetDirtyPages(); boolean hasUserDirtyPages = dirtyUserPagesPresent.getAndSet(false); @@ -1711,7 +1718,7 @@ private void writeUnlockPage( assert PageIO.getCrc(page + PAGE_OVERHEAD) == 0; //TODO GG-11480 if (markDirty) - setDirty(fullId, page, markDirty, false); + setDirty(fullId, page, true, false); beforeReleaseWrite(fullId, page + PAGE_OVERHEAD, pageWalRec); } @@ -1851,20 +1858,29 @@ private void setDirty(FullPageId pageId, long absPtr, boolean dirty, boolean for assert stateChecker.checkpointLockIsHeldByThread(); if (!wasDirty || forceAdd) { - boolean added = segment(pageId.groupId(), pageId.pageId()).dirtyPages.add(pageId); + Segment seg = segment(pageId.groupId(), pageId.pageId()); + + if (seg.dirtyPages.add(pageId)) { + long dirtyPagesCnt = seg.dirtyPagesCntr.incrementAndGet(); + + if (dirtyPagesCnt >= seg.maxDirtyPages) + safeToUpdate.set(false); - if (added) memMetrics.incrementDirtyPages(); + } } if (pageId.groupId() != CU.UTILITY_CACHE_GROUP_ID && !dirtyUserPagesPresent.get()) dirtyUserPagesPresent.set(true); } else { - boolean rmv = segment(pageId.groupId(), pageId.pageId()).dirtyPages.remove(pageId); + Segment seg = segment(pageId.groupId(), pageId.pageId()); + + if (seg.dirtyPages.remove(pageId)) { + seg.dirtyPagesCntr.decrementAndGet(); - if (rmv) memMetrics.decrementDirtyPages(); + } } } @@ -2091,9 +2107,10 @@ private int pages() { * * @return Collection of all page IDs marked as dirty. */ + @TestOnly public Collection dirtyPages() { if (segments == null) - return Collections.EMPTY_SET; + return Collections.emptySet(); Collection res = new HashSet<>((int)loadedPages()); @@ -2134,11 +2151,14 @@ private class Segment extends ReentrantReadWriteLock { /** Pages marked as dirty since the last checkpoint. */ private volatile Collection dirtyPages = new GridConcurrentHashSet<>(); + /** Atomic size counter for {@link #dirtyPages}. Used for {@link PageMemoryImpl#safeToUpdate()} calculation. */ + private final AtomicLong dirtyPagesCntr = new AtomicLong(); + /** Wrapper of pages of current checkpoint. */ private volatile CheckpointPages checkpointPages; /** */ - private final int maxDirtyPages; + private final long maxDirtyPages; /** Initial partition generation. */ private static final int INIT_PART_GENERATION = 1; @@ -2179,8 +2199,8 @@ private Segment(int idx, DirectMemoryRegion region, int cpPoolPages, ThrottlingP pool = new PagePool(idx, poolRegion, null); maxDirtyPages = throttlingPlc != ThrottlingPolicy.DISABLED - ? pool.pages() * 3 / 4 - : Math.min(pool.pages() * 2 / 3, cpPoolPages); + ? pool.pages() * 3L / 4 + : Math.min(pool.pages() * 2L / 3, cpPoolPages); } /** @@ -2197,13 +2217,6 @@ private void close() { } } - /** - * - */ - private boolean safeToUpdate() { - return dirtyPages.size() < maxDirtyPages; - } - /** * @param dirtyRatioThreshold Throttle threshold. */ @@ -2215,7 +2228,7 @@ private boolean shouldThrottle(double dirtyRatioThreshold) { * @return dirtyRatio to be compared with Throttle threshold. */ private double getDirtyPagesRatio() { - return ((double)dirtyPages.size()) / pages(); + return dirtyPagesCntr.doubleValue() / pages(); } /** @@ -2606,7 +2619,7 @@ private long tryToFindSequentially(int cap, PageStoreWriter saveDirtyPage) throw throw new IgniteOutOfMemoryException("Failed to find a page for eviction [segmentCapacity=" + cap + ", loaded=" + loadedPages.size() + ", maxDirtyPages=" + maxDirtyPages + - ", dirtyPages=" + dirtyPages.size() + + ", dirtyPages=" + dirtyPagesCntr + ", cpPages=" + (checkpointPages == null ? 0 : checkpointPages.size()) + ", pinnedInSegment=" + pinnedCnt + ", failedToPrepare=" + failToPrepare + @@ -3023,7 +3036,8 @@ private ClearSegmentRunnable( if (rmvDirty) { FullPageId fullId = PageHeader.fullPageId(absPtr); - seg.dirtyPages.remove(fullId); + if (seg.dirtyPages.remove(fullId)) + seg.dirtyPagesCntr.decrementAndGet(); } GridUnsafe.setMemory(absPtr + PAGE_OVERHEAD, pageSize, (byte)0);