Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SNAP-2366] row buffer fault-in, forced roll-over #391

Open
wants to merge 29 commits into
base: snappy/master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
c981ce4
[GITHUB-982] negative bucket size with eviction
May 30, 2018
d7c4a88
fix compilation issue
May 30, 2018
18d74af
[SNAP-2366] row buffer fault-in, forced roll-over
May 31, 2018
a769b96
Some perf optimizations seen in profiling
May 31, 2018
427f8e4
more optimizations seen in profiling
May 31, 2018
ca348e9
Added Koloboke Int/Long sets
May 31, 2018
d0d50db
procedure to purge codegen caches; more optimizations as seen in prof…
Jun 4, 2018
a2e8629
minor change
Jun 4, 2018
c60eddd
Merge remote-tracking branch 'origin/snappy/master' into SNAP-2366
Jun 6, 2018
b444154
fixes for test failures
Jun 6, 2018
bb8ec28
Merge remote-tracking branch 'origin/snappy/master' into SNAP-2366
Jun 28, 2018
f438d87
change maintenance lock to global container lock
Jun 28, 2018
8182d95
Merge remote-tracking branch 'origin/snappy/master' into SNAP-2366
Jun 28, 2018
3cee78d
Merge remote-tracking branch 'origin/snappy/master' into SNAP-2366
Jun 28, 2018
21f34dc
Merge remote-tracking branch 'origin/snappy/master' into SNAP-2366
Jul 9, 2018
94d77c3
fixes
Aug 2, 2018
6483d3d
Merge remote-tracking branch 'origin/snappy/master' into SNAP-2366
Aug 2, 2018
2b6f1fa
cleanup logs etc
Aug 3, 2018
b61d454
skip for remote lock
Aug 3, 2018
1c3fff1
add functions for remote bucket locking
Aug 6, 2018
22cd55f
Merge remote-tracking branch 'origin/snappy/master' into SNAP-2366
Aug 6, 2018
3ba6b1d
Merge remote-tracking branch 'origin/snappy/master' into SNAP-2366
Aug 10, 2018
4f42d0f
Merge remote-tracking branch 'origin/snappy/master' into SNAP-2366
Aug 31, 2018
2d4abe1
Merge remote-tracking branch 'origin/snappy/master' into SNAP-2366
Oct 31, 2018
38e4794
Merge remote-tracking branch 'origin/snappy/master' into SNAP-2366
Nov 2, 2018
25f4591
Merge remote-tracking branch 'origin/snappy/master' into SNAP-2366
Dec 22, 2018
83c2a81
re-generate thrift for new StatementAttrs.lockOwner field
Dec 22, 2018
58fc9e9
build fixes after the master merge
Dec 22, 2018
986d4d2
Merge remote-tracking branch 'origin/snappy/master' into SNAP-2366
Dec 29, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
import com.gemstone.gemfire.internal.cache.lru.LRUAlgorithm;
import com.gemstone.gemfire.internal.cache.snapshot.RegionSnapshotServiceImpl;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.internal.shared.SystemProperties;
import com.gemstone.gemfire.internal.util.ArrayUtils;
import com.gemstone.gemfire.pdx.internal.PeerTypeRegistration;
import com.google.common.util.concurrent.Service.State;
Expand Down Expand Up @@ -1888,6 +1889,13 @@ private void setAttributes(RegionAttributes attrs,String regionName, InternalReg
}

this.compressor = attrs.getCompressor();
if (this.compressor != null && !RegionEntryContext.COMPRESSION_ENABLED) {
throw new IllegalStateException("Compressor set on region " +
getFullPath() + " with global compression disabled. " +
"First enable compression with '" +
SystemProperties.getServerInstance().getSystemPropertyNamePrefix() +
"compression.enable=true'.");
}
// enable concurrency checks for persistent regions
if(!attrs.getConcurrencyChecksEnabled()
&& attrs.getDataPolicy().withPersistence()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2709,8 +2709,9 @@ public void changeEventValue(Object v) {
}

static boolean isCompressible(RegionEntryContext context, Object value) {
return (context != null && context.getCompressor() != null &&
value != null && !Token.isInvalidOrRemoved(value));
return (RegionEntryContext.COMPRESSION_ENABLED && context != null &&
context.getCompressor() != null && value != null &&
!Token.isInvalidOrRemoved(value));
}

/* subclasses supporting versions must override this */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2306,7 +2306,7 @@ else if (cbEvent != null && owner.getConcurrencyChecksEnabled()
catch (RegionClearedException rce) {
clearOccured = true;
}
owner.txApplyDestroyPart2(re, key, inTokenMode,
owner.txApplyDestroyPart2(txState, re, key, inTokenMode,
clearOccured /* Clear Conflciting with the operation */);
if (cbEvent != null) {
if (pendingCallbacks == null) {
Expand Down Expand Up @@ -2376,12 +2376,12 @@ else if (inTokenMode || owner.concurrencyChecksEnabled) {
EntryLogger.logTXDestroy(_getOwnerObject(), key);
}
owner.updateSizeOnRemove(key, oldSize);
owner.txApplyDestroyPart2(re, key, inTokenMode,
owner.txApplyDestroyPart2(txState, re, key, inTokenMode,
false /* Clear Conflicting with the operation */);
lruEntryDestroy(re);
}
catch (RegionClearedException rce) {
owner.txApplyDestroyPart2(re, key, inTokenMode,
owner.txApplyDestroyPart2(txState, re, key, inTokenMode,
true /* Clear Conflicting with the operation */);
}

Expand Down Expand Up @@ -3249,7 +3249,8 @@ public final void txApplyInvalidate(final RegionEntry re,
} catch (RegionClearedException rce) {
clearOccured = true;
}
owner.txApplyInvalidatePart2(re, key, didDestroy, true, clearOccured);
owner.txApplyInvalidatePart2(txState, re, key, didDestroy,
true, clearOccured);
// didInvalidate = true;
if (cbEvent != null) {
if (pendingCallbacks == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,16 @@
import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
import com.gemstone.gemfire.i18n.LogWriterI18n;
import com.gemstone.gemfire.internal.Assert;
import com.gemstone.gemfire.internal.cache.locks.ExclusiveSharedSynchronizer;
import com.gemstone.gemfire.internal.cache.locks.LockMode;
import com.gemstone.gemfire.internal.cache.locks.ReentrantReadWriteWriteShareLock;
import com.gemstone.gemfire.internal.cache.partitioned.Bucket;
import com.gemstone.gemfire.internal.cache.partitioned.BucketListener;
import com.gemstone.gemfire.internal.cache.partitioned.BucketProfileUpdateMessage;
import com.gemstone.gemfire.internal.cache.partitioned.DeposePrimaryBucketMessage;
import com.gemstone.gemfire.internal.cache.partitioned.DeposePrimaryBucketMessage.DeposePrimaryBucketResponse;
import com.gemstone.gemfire.internal.cache.partitioned.RegionAdvisor;
import com.gemstone.gemfire.internal.concurrent.ConcurrentHashSet;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.internal.util.StopWatch;

Expand Down Expand Up @@ -92,6 +96,12 @@ public final class BucketAdvisor extends CacheDistributionAdvisor {
/** Local lock used by PRHARedundancyProvider */
protected final ReentrantLock redundancyLock;

/** lock for maintenance operations */
private final ReentrantReadWriteWriteShareLock maintenanceLock =
new ReentrantReadWriteWriteShareLock();
private final ConcurrentHashSet<Object> maintenanceLockReaders =
new ConcurrentHashSet<>(8);

//private static final byte MASK_HOSTING = 1; // 0001
//private static final byte MASK_VOLUNTEERING = 2; // 0010
//private static final byte MASK_OTHER_PRIMARY = 4; // 0100
Expand Down Expand Up @@ -293,6 +303,86 @@ public void tryLockIfPrimary() {
}
}

boolean isLockededForMaintenance() {
return ExclusiveSharedSynchronizer.isExclusive(maintenanceLock.getState());
}

private BucketRegion getRowBuffer() {
PartitionedRegion pr = getPartitionedRegion();
assert pr.isInternalColumnTable();
PartitionedRegion rowBuffer = ColocationHelper.getColocatedRegion(pr);
assert rowBuffer != null;
return rowBuffer.getDataStore().getLocalBucketById(getProxyBucketRegion().getId());
}

boolean lockForMaintenance(boolean forWrite, long msecs, Object owner) {
if (getPartitionedRegion().isInternalColumnTable()) {
return getRowBuffer().getBucketAdvisor().lockForMaintenance(
forWrite, msecs, owner);
}
boolean locked;
if (forWrite) {
locked = maintenanceLock.attemptLock(LockMode.EX, msecs, owner);
} else {
locked = maintenanceLock.attemptLock(LockMode.SH, msecs, owner);
// avoid re-entry by same owner
if (locked && !maintenanceLockReaders.add(owner)) {
maintenanceLock.releaseLock(LockMode.SH, false, owner);
locked = false;
}
}
getLogWriter().convertToLogWriter().info("SW:0: locked " + getProxyBucketRegion().getFullPath() + " forWrite=" + forWrite + " locked=" + locked);
return locked;
}

void unlockAfterMaintenance(boolean forWrite, Object owner) {
if (getPartitionedRegion().isInternalColumnTable()) {
getRowBuffer().getBucketAdvisor().unlockAfterMaintenance(forWrite, owner);
return;
}
if (forWrite) {
maintenanceLock.releaseLock(LockMode.EX, false, owner);
} else if (maintenanceLockReaders.remove(owner)) {
maintenanceLock.releaseLock(LockMode.SH, false, owner);
} else {
throw new IllegalMonitorStateException("Bucket " + getProxyBucketRegion()
.getFullPath() + " not read-locked for maintenance by " + owner);
}
getLogWriter().convertToLogWriter().info("SW:0: unlocked " + getProxyBucketRegion().getFullPath() + " forWrite=" + forWrite);
}

void unlockAllAfterMaintenance(boolean forWrite) {
if (getPartitionedRegion().isInternalColumnTable()) {
getRowBuffer().getBucketAdvisor().unlockAllAfterMaintenance(forWrite);
return;
}
if (forWrite) {
Object writer = maintenanceLock.getOwnerId(null);
if (writer != null) {
maintenanceLock.releaseLock(LockMode.EX, false, writer);
}
} else {
for (Object reader : maintenanceLockReaders) {
try {
maintenanceLock.releaseLock(LockMode.SH, false, reader);
} catch (IllegalMonitorStateException ignored) {
}
}
}
getLogWriter().convertToLogWriter().info("SW:0: unlocked all " + getProxyBucketRegion().getFullPath() + " forWrite=" + forWrite);
}

boolean hasMaintenanceLock(boolean forWrite, Object owner) {
if (getPartitionedRegion().isInternalColumnTable()) {
return getRowBuffer().getBucketAdvisor().hasMaintenanceLock(forWrite, owner);
}
if (forWrite) {
return Objects.equals(owner, maintenanceLock.getOwnerId(null));
} else {
return owner != null && maintenanceLockReaders.contains(owner);
}
}

/**
* Makes this <code>BucketAdvisor</code> give up being a primary and become
* a secondary. Does nothing if not currently the primary.
Expand Down Expand Up @@ -1230,6 +1320,8 @@ public boolean becomePrimary(boolean isRebalance) {

long startTime
= getPartitionedRegionStats().startPrimaryTransfer(isRebalance);
// acquire maintenance lock
lockForMaintenance(true, Long.MAX_VALUE, this);
try {
long waitTime = 2000; // time each iteration will wait
while (!isPrimary()) {
Expand Down Expand Up @@ -1326,6 +1418,7 @@ public boolean becomePrimary(boolean isRebalance) {
Thread.currentThread().interrupt();

} finally {
unlockAfterMaintenance(true, this);
getPartitionedRegionStats().endPrimaryTransfer(
startTime, isPrimary(), isRebalance);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Predicate;

import com.gemstone.gemfire.*;
import com.gemstone.gemfire.cache.*;
Expand Down Expand Up @@ -268,10 +268,6 @@ public static RawValueFactory getFactory() {

public static final long INVALID_UUID = VMIdAdvisor.INVALID_ID;

public final ReentrantReadWriteLock columnBatchFlushLock =
new ReentrantReadWriteLock();


/**
* A read/write lock to prevent writing to the bucket when GII from this bucket is in progress
*/
Expand Down Expand Up @@ -761,19 +757,53 @@ public final boolean checkForColumnBatchCreation(TXStateInterface tx) {
|| getTotalBytes() >= pr.getColumnBatchSize());
}

private static final Predicate<?> TRUE_CHECK = v -> true;

@SuppressWarnings("unchecked")
public static <T> Predicate<T> TRUE_PREDICATE() {
return (Predicate<T>)TRUE_CHECK;
}

public boolean isLockededForMaintenance() {
return getBucketAdvisor().isLockededForMaintenance();
}

public boolean lockForMaintenance(boolean forWrite, long msecs, Object owner) {
return getBucketAdvisor().lockForMaintenance(forWrite, msecs, owner);
}

public void unlockAfterMaintenance(boolean forWrite, Object owner) {
getBucketAdvisor().unlockAfterMaintenance(forWrite, owner);
}

public void unlockAllAfterMaintenance(boolean forWrite) {
getBucketAdvisor().unlockAllAfterMaintenance(forWrite);
}

public boolean hasMaintenanceLock(boolean forWrite, Object owner) {
return getBucketAdvisor().hasMaintenanceLock(forWrite, owner);
}

@SuppressWarnings("unchecked")
public final boolean createAndInsertColumnBatch(TXStateInterface tx,
boolean forceFlush) {
// do nothing if a flush is already in progress
if (this.columnBatchFlushLock.isWriteLocked()) {
return createAndInsertColumnBatch(tx, forceFlush, 0, TRUE_PREDICATE());
}

public final boolean createAndInsertColumnBatch(TXStateInterface tx,
boolean forceFlush, long msecs, Predicate<BucketRegion> checkFlushInLock) {
// first acquire the maintenance lock to prevent concurrent
// updates/deletes to change entries being rolled over
Object owner = tx != null ? tx.getTransactionId() : null;
if (owner == null) owner = Thread.currentThread();
if (!lockForMaintenance(true, msecs, owner)) {
return false;
}
final ReentrantReadWriteLock.WriteLock sync =
this.columnBatchFlushLock.writeLock();
sync.lock();
try {
return internalCreateAndInsertColumnBatch(tx, forceFlush);
return checkFlushInLock.test(this) &&
internalCreateAndInsertColumnBatch(tx, forceFlush);
} finally {
sync.unlock();
unlockAfterMaintenance(true, owner);
}
}

Expand Down Expand Up @@ -864,7 +894,8 @@ private Set createColumnBatchAndPutInColumnTable(long key) {
// This destroy is under a lock which makes sure that there is no put into the region
// No need to take the lock on key
private void destroyAllEntries(Set keysToDestroy, long batchKey) {
for(Object key : keysToDestroy) {
TXStateInterface txState = getTXState();
for (Object key : keysToDestroy) {
if (getCache().getLoggerI18n().fineEnabled()) {
getCache()
.getLoggerI18n()
Expand All @@ -878,8 +909,8 @@ private void destroyAllEntries(Set keysToDestroy, long batchKey) {

event.setKey(key);
event.setBucketId(this.getId());
event.setTXState(txState);

TXStateInterface txState = event.getTXState(this);
if (txState != null) {
event.setRegion(this);
txState.destroyExistingEntry(event, true, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -724,7 +724,7 @@ public void releaseReadLock() {
getDiskStore().releaseReadLock(this);
}
void basicAcquireReadLock() {
this.rwLock.readLock().lock();
this.rwLock.readLock().lockDelayCancel();
// basicAcquireLock();
}
void basicReleaseReadLock() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3972,10 +3972,10 @@ public boolean contains(long id) {
public int size() {
return this.ints.size() + this.longs.size();
}

public void addAll(OplogEntryIdSet toAdd) {
this.ints.addAll(toAdd.ints.toArray());
this.longs.addAll(toAdd.longs.toArray());
this.ints.addAll(toAdd.ints);
this.longs.addAll(toAdd.longs);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@
import com.gemstone.gemfire.internal.util.ArraySortedCollection;
import com.gemstone.gemfire.internal.util.concurrent.StoppableCountDownLatch;
import com.gemstone.gnu.trove.TObjectIntProcedure;
import com.gemstone.gnu.trove.TObjectObjectProcedure;
import com.gemstone.org.jgroups.util.StringId;
import org.eclipse.collections.impl.map.mutable.UnifiedMap;

Expand Down Expand Up @@ -1796,7 +1795,7 @@ private void cleanUpDestroyedTokensAndMarkGIIComplete(GIIStatus giiStatus)
final TXManagerImpl txMgr = getCache().getCacheTransactionManager();
for (TXRegionState txrs : orderedTXRegionState) {
TXState txState = txrs.getTXState();
int txOrder = 0;
long txOrder = 0;
getLogWriterI18n().info(LocalizedStrings.DEBUG, "Locking txState = " + txState);
txState.lockTXState();
if (txState.isInProgress() && (txOrder = is.getFinishedTXOrder(
Expand Down Expand Up @@ -1825,8 +1824,8 @@ private void cleanUpDestroyedTokensAndMarkGIIComplete(GIIStatus giiStatus)
Arrays.sort(finishedTXRS, new Comparator<TXRegionState>() {
@Override
public int compare(TXRegionState t1, TXRegionState t2) {
final int o1 = t1.getFinishOrder();
final int o2 = t2.getFinishOrder();
final long o1 = t1.getFinishOrder();
final long o2 = t2.getFinishOrder();
return (o1 < o2) ? -1 : ((o1 == o2) ? 0 : 1);
}
});
Expand Down Expand Up @@ -3656,7 +3655,8 @@ private RegionEntry moveNext() {
}
}
RegionEntry re = (RegionEntry)this.it.next();
if (re.isOverflowedToDisk(this.region, dp, false)) {
if (re.isValueNull() &&
re.isOverflowedToDisk(this.region, dp, false)) {
// add dp to sorted list
// avoid create DiskPage everytime for lookup
// TODO: SW: Can reduce the intermediate memory and boost
Expand Down
Loading