Skip to content

Commit

Permalink
GG-23963 Dirty pages count calculation optimized.
Browse files Browse the repository at this point in the history
  • Loading branch information
ibessonov committed Nov 7, 2019
1 parent 57387e9 commit 4abbfac
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1744,25 +1744,6 @@ private boolean safeToUpdatePageMemories() {

checkpointLock.readLock().unlock();

if (checkpointer != null) {
Collection<DataRegion> dataRegs = context().database().dataRegions();

if (dataRegs != null) {
for (DataRegion dataReg : dataRegs) {
if (!dataReg.config().isPersistenceEnabled())
continue;

PageMemoryEx mem = (PageMemoryEx)dataReg.pageMemory();

if (mem != null && !mem.safeToUpdate()) {
checkpointer.wakeupForCheckpoint(0, "too many dirty pages");

break;
}
}
}
}

if (ASSERTION_ENABLED)
CHECKPOINT_LOCK_HOLD_COUNT.set(CHECKPOINT_LOCK_HOLD_COUNT.get() - 1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public long acquirePage(int grpId, long pageId, IoStatisticsHolder statHldr,

/**
* Heuristic method which allows a thread to check if it safe to start memory struture modifications
* in regard with checkpointing.
* in regard with checkpointing. May return false-negative result during or after partition eviction.
*
* @return {@code False} if there are too many dirty pages and a thread should wait for a
* checkpoint to begin.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
Expand Down Expand Up @@ -96,6 +97,7 @@
import org.apache.ignite.spi.encryption.noop.NoopEncryptionSpi;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;

import static java.lang.Boolean.FALSE;
import static java.lang.Boolean.TRUE;
Expand Down Expand Up @@ -246,6 +248,9 @@ public class PageMemoryImpl implements PageMemoryEx {
/** Segments array. */
private volatile Segment[] segments;

/** @see #safeToUpdate() */
private final AtomicBoolean safeToUpdate = new AtomicBoolean(true);

/** Lock for segments changes. */
private Object segmentsLock = new Object();

Expand Down Expand Up @@ -952,8 +957,10 @@ private long refreshOutdatedPage(Segment seg, int grpId, long pageId, boolean rm

Collection<FullPageId> dirtyPages = seg.dirtyPages;

if (dirtyPages != null)
dirtyPages.remove(new FullPageId(pageId, grpId));
if (dirtyPages != null) {
if (dirtyPages.remove(new FullPageId(pageId, grpId)))
seg.dirtyPagesCntr.decrementAndGet();
}

return relPtr;
}
Expand Down Expand Up @@ -1080,11 +1087,8 @@ private void tryToRestorePage(FullPageId fullId, ByteBuffer buf) throws IgniteCh

/** {@inheritDoc} */
@Override public boolean safeToUpdate() {
if (segments != null) {
for (Segment segment : segments)
if (!segment.safeToUpdate())
return false;
}
if (segments != null)
return safeToUpdate.get();

return true;
}
Expand Down Expand Up @@ -1163,8 +1167,11 @@ public long totalPages() {
seg.checkpointPages = new CheckpointPages(dirtyPages, allowToReplace);

seg.dirtyPages = new GridConcurrentHashSet<>();
seg.dirtyPagesCntr.set(0);
}

safeToUpdate.set(true);

memMetrics.resetDirtyPages();

boolean hasUserDirtyPages = dirtyUserPagesPresent.getAndSet(false);
Expand Down Expand Up @@ -1711,7 +1718,7 @@ private void writeUnlockPage(
assert PageIO.getCrc(page + PAGE_OVERHEAD) == 0; //TODO GG-11480

if (markDirty)
setDirty(fullId, page, markDirty, false);
setDirty(fullId, page, true, false);

beforeReleaseWrite(fullId, page + PAGE_OVERHEAD, pageWalRec);
}
Expand Down Expand Up @@ -1851,20 +1858,29 @@ private void setDirty(FullPageId pageId, long absPtr, boolean dirty, boolean for
assert stateChecker.checkpointLockIsHeldByThread();

if (!wasDirty || forceAdd) {
boolean added = segment(pageId.groupId(), pageId.pageId()).dirtyPages.add(pageId);
Segment seg = segment(pageId.groupId(), pageId.pageId());

if (seg.dirtyPages.add(pageId)) {
long dirtyPagesCnt = seg.dirtyPagesCntr.incrementAndGet();

if (dirtyPagesCnt >= seg.maxDirtyPages)
safeToUpdate.set(false);

if (added)
memMetrics.incrementDirtyPages();
}
}

if (pageId.groupId() != CU.UTILITY_CACHE_GROUP_ID && !dirtyUserPagesPresent.get())
dirtyUserPagesPresent.set(true);
}
else {
boolean rmv = segment(pageId.groupId(), pageId.pageId()).dirtyPages.remove(pageId);
Segment seg = segment(pageId.groupId(), pageId.pageId());

if (seg.dirtyPages.remove(pageId)) {
seg.dirtyPagesCntr.decrementAndGet();

if (rmv)
memMetrics.decrementDirtyPages();
}
}
}

Expand Down Expand Up @@ -2091,9 +2107,10 @@ private int pages() {
*
* @return Collection of all page IDs marked as dirty.
*/
@TestOnly
public Collection<FullPageId> dirtyPages() {
if (segments == null)
return Collections.EMPTY_SET;
return Collections.emptySet();

Collection<FullPageId> res = new HashSet<>((int)loadedPages());

Expand Down Expand Up @@ -2134,11 +2151,14 @@ private class Segment extends ReentrantReadWriteLock {
/** Pages marked as dirty since the last checkpoint. */
private volatile Collection<FullPageId> dirtyPages = new GridConcurrentHashSet<>();

/** Atomic size counter for {@link #dirtyPages}. Used for {@link PageMemoryImpl#safeToUpdate()} calculation. */
private final AtomicLong dirtyPagesCntr = new AtomicLong();

/** Wrapper of pages of current checkpoint. */
private volatile CheckpointPages checkpointPages;

/** */
private final int maxDirtyPages;
private final long maxDirtyPages;

/** Initial partition generation. */
private static final int INIT_PART_GENERATION = 1;
Expand Down Expand Up @@ -2179,8 +2199,8 @@ private Segment(int idx, DirectMemoryRegion region, int cpPoolPages, ThrottlingP
pool = new PagePool(idx, poolRegion, null);

maxDirtyPages = throttlingPlc != ThrottlingPolicy.DISABLED
? pool.pages() * 3 / 4
: Math.min(pool.pages() * 2 / 3, cpPoolPages);
? pool.pages() * 3L / 4
: Math.min(pool.pages() * 2L / 3, cpPoolPages);
}

/**
Expand All @@ -2197,13 +2217,6 @@ private void close() {
}
}

/**
*
*/
private boolean safeToUpdate() {
return dirtyPages.size() < maxDirtyPages;
}

/**
* @param dirtyRatioThreshold Throttle threshold.
*/
Expand All @@ -2215,7 +2228,7 @@ private boolean shouldThrottle(double dirtyRatioThreshold) {
* @return dirtyRatio to be compared with Throttle threshold.
*/
private double getDirtyPagesRatio() {
return ((double)dirtyPages.size()) / pages();
return dirtyPagesCntr.doubleValue() / pages();
}

/**
Expand Down Expand Up @@ -2606,7 +2619,7 @@ private long tryToFindSequentially(int cap, PageStoreWriter saveDirtyPage) throw
throw new IgniteOutOfMemoryException("Failed to find a page for eviction [segmentCapacity=" + cap +
", loaded=" + loadedPages.size() +
", maxDirtyPages=" + maxDirtyPages +
", dirtyPages=" + dirtyPages.size() +
", dirtyPages=" + dirtyPagesCntr +
", cpPages=" + (checkpointPages == null ? 0 : checkpointPages.size()) +
", pinnedInSegment=" + pinnedCnt +
", failedToPrepare=" + failToPrepare +
Expand Down Expand Up @@ -3023,7 +3036,8 @@ private ClearSegmentRunnable(
if (rmvDirty) {
FullPageId fullId = PageHeader.fullPageId(absPtr);

seg.dirtyPages.remove(fullId);
if (seg.dirtyPages.remove(fullId))
seg.dirtyPagesCntr.decrementAndGet();
}

GridUnsafe.setMemory(absPtr + PAGE_OVERHEAD, pageSize, (byte)0);
Expand Down

0 comments on commit 4abbfac

Please sign in to comment.