Skip to content

Commit

Permalink
IGNITE-11265 JVM Crashes on TeamCity
Browse files Browse the repository at this point in the history
  • Loading branch information
EdShangGG authored and DirectXceriD committed Feb 20, 2019
1 parent 824f60f commit 119978c
Show file tree
Hide file tree
Showing 6 changed files with 219 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,11 @@ public class PageMemoryNoStoreImpl implements PageMemory {
/** Shared context. */
private final GridCacheSharedContext<?, ?> ctx;

/**
* Marker that stop was invoked and memory is not supposed for any usage.
*/
private volatile boolean stopped;

/**
* @param log Logger.
* @param directMemoryProvider Memory allocator to use.
Expand Down Expand Up @@ -202,6 +207,8 @@ public PageMemoryNoStoreImpl(

/** {@inheritDoc} */
@Override public void start() throws IgniteException {
stopped = false;

long startSize = dataRegionCfg.getInitialSize();
long maxSize = dataRegionCfg.getMaxSize();

Expand Down Expand Up @@ -243,6 +250,8 @@ public PageMemoryNoStoreImpl(
if (log.isDebugEnabled())
log.debug("Stopping page memory.");

stopped = true;

directMemoryProvider.shutdown(deallocate);

if (directMemoryProvider instanceof Closeable) {
Expand All @@ -262,6 +271,8 @@ public PageMemoryNoStoreImpl(

/** {@inheritDoc} */
@Override public long allocatePage(int grpId, int partId, byte flags) {
assert !stopped;

long relPtr = borrowFreePage();
long absPtr = 0;

Expand Down Expand Up @@ -326,6 +337,8 @@ public PageMemoryNoStoreImpl(

/** {@inheritDoc} */
@Override public boolean freePage(int cacheId, long pageId) {
assert !stopped;

releaseFreePage(pageId);

return true;
Expand Down Expand Up @@ -445,6 +458,8 @@ private long fromSegmentIndex(int segIdx, long pageIdx) {

/** {@inheritDoc} */
@Override public long acquirePage(int cacheId, long pageId) {
assert !stopped;

int pageIdx = PageIdUtils.pageIndex(pageId);

Segment seg = segment(pageIdx);
Expand All @@ -454,6 +469,8 @@ private long fromSegmentIndex(int segIdx, long pageIdx) {

/** {@inheritDoc} */
@Override public void releasePage(int cacheId, long pageId, long page) {
assert !stopped;

if (trackAcquiredPages) {
Segment seg = segment(PageIdUtils.pageIndex(pageId));

Expand All @@ -463,6 +480,8 @@ private long fromSegmentIndex(int segIdx, long pageIdx) {

/** {@inheritDoc} */
@Override public long readLock(int cacheId, long pageId, long page) {
assert !stopped;

if (rwLock.readLock(page + LOCK_OFFSET, PageIdUtils.tag(pageId)))
return page + PAGE_OVERHEAD;

Expand All @@ -471,6 +490,8 @@ private long fromSegmentIndex(int segIdx, long pageIdx) {

/** {@inheritDoc} */
@Override public long readLockForce(int cacheId, long pageId, long page) {
assert !stopped;

if (rwLock.readLock(page + LOCK_OFFSET, -1))
return page + PAGE_OVERHEAD;

Expand All @@ -479,11 +500,15 @@ private long fromSegmentIndex(int segIdx, long pageIdx) {

/** {@inheritDoc} */
@Override public void readUnlock(int cacheId, long pageId, long page) {
assert !stopped;

rwLock.readUnlock(page + LOCK_OFFSET);
}

/** {@inheritDoc} */
@Override public long writeLock(int cacheId, long pageId, long page) {
assert !stopped;

if (rwLock.writeLock(page + LOCK_OFFSET, PageIdUtils.tag(pageId)))
return page + PAGE_OVERHEAD;

Expand All @@ -492,6 +517,8 @@ private long fromSegmentIndex(int segIdx, long pageIdx) {

/** {@inheritDoc} */
@Override public long tryWriteLock(int cacheId, long pageId, long page) {
assert !stopped;

if (rwLock.tryWriteLock(page + LOCK_OFFSET, PageIdUtils.tag(pageId)))
return page + PAGE_OVERHEAD;

Expand All @@ -506,6 +533,8 @@ private long fromSegmentIndex(int segIdx, long pageIdx) {
Boolean walPlc,
boolean dirtyFlag
) {
assert !stopped;

long actualId = PageIO.getPageId(page + PAGE_OVERHEAD);

rwLock.writeUnlock(page + LOCK_OFFSET, PageIdUtils.tag(actualId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1541,23 +1541,24 @@ public void onLocalJoin(
assert grp != null;

if (affReq != null && affReq.contains(aff.groupId())) {
assert AffinityTopologyVersion.NONE.equals(aff.lastVersion());
assert AffinityTopologyVersion.NONE.equals(aff.lastVersion()) : aff.lastVersion();

CacheGroupAffinityMessage affMsg = receivedAff.get(aff.groupId());

assert affMsg != null;

List<List<ClusterNode>> assignments = affMsg.createAssignments(nodesByOrder, evts.discoveryCache());

assert resTopVer.equals(evts.topologyVersion());
assert resTopVer.equals(evts.topologyVersion()) : "resTopVer=" + resTopVer +
", evts.topVer=" + evts.topologyVersion();

List<List<ClusterNode>> idealAssign =
affMsg.createIdealAssignments(nodesByOrder, evts.discoveryCache());

if (idealAssign != null)
aff.idealAssignment(idealAssign);
else {
assert !aff.centralizedAffinityFunction();
assert !aff.centralizedAffinityFunction() : aff;

// Calculate ideal assignments.
aff.calculate(evts.topologyVersion(), evts, evts.discoveryCache());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.IgniteNeedReconnectException;
import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.events.DiscoveryCustomEvent;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
Expand Down Expand Up @@ -763,7 +764,7 @@ public static Object rebalanceTopic(int idx) {
stopErr = cctx.kernalContext().clientDisconnected() ?
new IgniteClientDisconnectedCheckedException(cctx.kernalContext().cluster().clientReconnectFuture(),
"Client node disconnected: " + cctx.igniteInstanceName()) :
new IgniteCheckedException("Node is stopping: " + cctx.igniteInstanceName());
new NodeStoppingException("Node is stopping: " + cctx.igniteInstanceName());

// Stop exchange worker
U.cancel(exchWorker);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.ignite.internal.IgniteDiagnosticAware;
import org.apache.ignite.internal.IgniteDiagnosticPrepareContext;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
Expand Down Expand Up @@ -883,7 +884,8 @@ private void remap(final AffinityTopologyVersion topVer) {
if (trackable)
cctx.mvcc().removeFuture(futId);

cctx.dht().sendTtlUpdateRequest(expiryPlc);
if (!(err instanceof NodeStoppingException))
cctx.dht().sendTtlUpdateRequest(expiryPlc);

return true;
}
Expand Down
Loading

0 comments on commit 119978c

Please sign in to comment.