From e05c764ed547ab9dd66109b327e94a9a2e231342 Mon Sep 17 00:00:00 2001 From: zstan Date: Mon, 30 Nov 2020 21:31:30 +0300 Subject: [PATCH 1/7] ignite-13775 checkpointRWLock wrapper refactoring. --- .../checkpoint/CheckpointReadWriteLock.java | 11 +- .../ignite/internal/util/IgniteUtils.java | 179 ---------- .../ReentrantReadWriteLockWithTracking.java | 330 ++++++++++++++++++ .../CheckpointReadLockFailureTest.java | 39 ++- 4 files changed, 363 insertions(+), 196 deletions(-) create mode 100644 modules/core/src/main/java/org/apache/ignite/internal/util/ReentrantReadWriteLockWithTracking.java diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointReadWriteLock.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointReadWriteLock.java index 2066201f0b93c..ad4d4570bc910 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointReadWriteLock.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointReadWriteLock.java @@ -18,12 +18,11 @@ package org.apache.ignite.internal.processors.cache.persistence.checkpoint; import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Function; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; -import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.internal.util.ReentrantReadWriteLockWithTracking; import static org.apache.ignite.IgniteSystemProperties.getBoolean; import static org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager.IGNITE_PDS_LOG_CP_READ_LOCK_HOLDERS; @@ -45,18 +44,16 @@ public class CheckpointReadWriteLock { static final String CHECKPOINT_RUNNER_THREAD_PREFIX = "checkpoint-runner"; /** Checkpont lock. */ - private final ReentrantReadWriteLock checkpointLock; + private final ReentrantReadWriteLockWithTracking checkpointLock; /** * @param logger Logger. */ CheckpointReadWriteLock(Function, IgniteLogger> logger) { - ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); - if (getBoolean(IGNITE_PDS_LOG_CP_READ_LOCK_HOLDERS)) - checkpointLock = new U.ReentrantReadWriteLockTracer(lock, logger.apply(getClass()), 5_000); + checkpointLock = new ReentrantReadWriteLockWithTracking(logger.apply(getClass()), 5_000); else - checkpointLock = lock; + checkpointLock = new ReentrantReadWriteLockWithTracking(); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java index b3b644d6cafe3..3adcd4121ca40 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java @@ -139,7 +139,6 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Consumer; import java.util.jar.JarFile; import java.util.logging.ConsoleHandler; @@ -242,7 +241,6 @@ import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.internal.util.typedef.P1; -import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.SB; @@ -363,9 +361,6 @@ public abstract class IgniteUtils { /** Default user version. */ public static final String DFLT_USER_VERSION = "0"; - /** Lock hold message. */ - public static final String LOCK_HOLD_MESSAGE = "ReadLock held the lock more than "; - /** Cache for {@link GridPeerDeployAware} fields to speed up reflection. */ private static final ConcurrentMap, Collection>> p2pFields = new ConcurrentHashMap<>(); @@ -11528,180 +11523,6 @@ public static Runnable wrapIgniteFuture(Runnable r, GridFutureAdapter fut) { }; } - /** */ - public static class ReentrantReadWriteLockTracer extends ReentrantReadWriteLock { - /** */ - private static final long serialVersionUID = 0L; - - /** Read lock. */ - private final ReadLockTracer readLock; - - /** Write lock. */ - private final WriteLockTracer writeLock; - - /** Lock print threshold. */ - private long readLockThreshold; - - /** */ - private IgniteLogger log; - - /** - * @param delegate RWLock delegate. - * @param log Ignite logger. - * @param readLockThreshold ReadLock threshold timeout. - * - */ - public ReentrantReadWriteLockTracer(ReentrantReadWriteLock delegate, IgniteLogger log, long readLockThreshold) { - this.log = log; - - readLock = new ReadLockTracer(delegate, log, readLockThreshold); - - writeLock = new WriteLockTracer(delegate); - - this.readLockThreshold = readLockThreshold; - } - - /** {@inheritDoc} */ - @Override public ReadLock readLock() { - return readLock; - } - - /** {@inheritDoc} */ - @Override public WriteLock writeLock() { - return writeLock; - } - - /** */ - public long lockWaitThreshold() { - return readLockThreshold; - } - } - - /** */ - private static class ReadLockTracer extends ReentrantReadWriteLock.ReadLock { - /** */ - private static final long serialVersionUID = 0L; - - /** Delegate. */ - private final ReentrantReadWriteLock.ReadLock delegate; - - /** */ - private static final ThreadLocal> READ_LOCK_HOLDER_TS = - ThreadLocal.withInitial(() -> new T2<>(0, 0L)); - - /** */ - private IgniteLogger log; - - /** */ - private long readLockThreshold; - - /** */ - public ReadLockTracer(ReentrantReadWriteLock lock, IgniteLogger log, long readLockThreshold) { - super(lock); - - delegate = lock.readLock(); - - this.log = log; - - this.readLockThreshold = readLockThreshold; - } - - /** */ - private void inc() { - T2 val = READ_LOCK_HOLDER_TS.get(); - - int cntr = val.get1(); - - if (cntr == 0) - val.set2(U.currentTimeMillis()); - - val.set1(++cntr); - - READ_LOCK_HOLDER_TS.set(val); - } - - /** */ - private void dec() { - T2 val = READ_LOCK_HOLDER_TS.get(); - - int cntr = val.get1(); - - if (--cntr == 0) { - long timeout = U.currentTimeMillis() - val.get2(); - - if (timeout > readLockThreshold) { - GridStringBuilder sb = new GridStringBuilder(); - - sb.a(LOCK_HOLD_MESSAGE + timeout + " ms." + nl()); - - U.printStackTrace(Thread.currentThread().getId(), sb); - - U.warn(log, sb.toString()); - } - } - - val.set1(cntr); - - READ_LOCK_HOLDER_TS.set(val); - } - - /** {@inheritDoc} */ - @SuppressWarnings("LockAcquiredButNotSafelyReleased") - @Override public void lock() { - delegate.lock(); - - inc(); - } - - /** {@inheritDoc} */ - @SuppressWarnings("LockAcquiredButNotSafelyReleased") - @Override public void lockInterruptibly() throws InterruptedException { - delegate.lockInterruptibly(); - - inc(); - } - - /** {@inheritDoc} */ - @Override public boolean tryLock() { - if (delegate.tryLock()) { - inc(); - - return true; - } - else - return false; - } - - /** {@inheritDoc} */ - @Override public boolean tryLock(long time, @NotNull TimeUnit unit) throws InterruptedException { - if (delegate.tryLock(time, unit)) { - inc(); - - return true; - } - else - return false; - } - - /** {@inheritDoc} */ - @Override public void unlock() { - delegate.unlock(); - - dec(); - } - } - - /** */ - private static class WriteLockTracer extends ReentrantReadWriteLock.WriteLock { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - public WriteLockTracer(ReentrantReadWriteLock lock) { - super(lock); - } - } - /** * @param key Cipher Key. * @param encMode Enc mode see {@link Cipher#ENCRYPT_MODE}, {@link Cipher#DECRYPT_MODE}, etc. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ReentrantReadWriteLockWithTracking.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ReentrantReadWriteLockWithTracking.java new file mode 100644 index 0000000000000..d8eff1b266b90 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ReentrantReadWriteLockWithTracking.java @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.util.typedef.T2; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.internal.util.IgniteUtils.nl; + +/** */ +public class ReentrantReadWriteLockWithTracking implements ReadWriteLock { + /** Lock hold message. */ + public static final String LOCK_HOLD_MESSAGE = "ReadLock held the lock more than "; + + /** Lock print threshold. */ + private long readLockThreshold; + + /** Delegate instance. */ + private final ReentrantReadWriteLock delegate = new ReentrantReadWriteLock(); + + /** Read lock holder. */ + private ReentrantReadWriteLockWithTracking.ReadLock readLock; + + /** Write lock holder. */ + private ReentrantReadWriteLockWithTracking.WriteLock writeLock = new ReentrantReadWriteLockWithTracking.WriteLock(delegate); + + /** + * ReentrantRWLock wrapper, provides additional trace info on {@link ReadLockWithTracking#unlock()} method, if someone + * holds the lock more than {@code readLockThreshold}. + * + * @param log Ignite logger. + * @param readLockThreshold ReadLock threshold timeout. + */ + public ReentrantReadWriteLockWithTracking(IgniteLogger log, long readLockThreshold) { + readLock = new ReadLockWithTracking(delegate, log, readLockThreshold); + + this.readLockThreshold = readLockThreshold; + } + + /** Delegator implementation. */ + public ReentrantReadWriteLockWithTracking() { + readLock = new ReentrantReadWriteLockWithTracking.ReadLock(delegate); + } + + /** {@inheritDoc} */ + @Override public ReentrantReadWriteLockWithTracking.ReadLock readLock() { + return readLock; + } + + /** {@inheritDoc} */ + @Override public ReentrantReadWriteLockWithTracking.WriteLock writeLock() { + return writeLock; + } + + /** */ + public long lockWaitThreshold() { + return readLockThreshold; + } + + /** + * Queries if the write lock is held by the current thread. + * + * @return {@code true} if the current thread holds the write lock and + * {@code false} otherwise + */ + public boolean isWriteLockedByCurrentThread() { + return delegate.isWriteLockedByCurrentThread(); + } + + /** + * Queries the number of reentrant read holds on this lock by the + * current thread. A reader thread has a hold on a lock for + * each lock action that is not matched by an unlock action. + * + * @return the number of holds on the read lock by the current thread, + * or zero if the read lock is not held by the current thread + */ + public int getReadHoldCount() { + return delegate.getReadHoldCount(); + } + + /** + * Queries the number of read locks held for this lock. This + * method is designed for use in monitoring system state, not for + * synchronization control. + * @return the number of read locks held + */ + public int getReadLockCount() { + return delegate.getReadLockCount(); + } + + /** */ + public static class WriteLock implements Lock { + /** Delegate instance. */ + private final ReentrantReadWriteLock delegate; + + /** */ + public WriteLock(ReentrantReadWriteLock lock) { + delegate = lock; + } + + /** {@inheritDoc} */ + @SuppressWarnings("LockAcquiredButNotSafelyReleased") + @Override public void lock() { + delegate.writeLock().lock(); + } + + /** {@inheritDoc} */ + @SuppressWarnings("LockAcquiredButNotSafelyReleased") + @Override public void lockInterruptibly() throws InterruptedException { + delegate.writeLock().lockInterruptibly(); + } + + /** {@inheritDoc} */ + @Override public boolean tryLock() { + return delegate.writeLock().tryLock(); + } + + /** {@inheritDoc} */ + @Override public boolean tryLock(long time, @NotNull TimeUnit unit) throws InterruptedException { + return delegate.writeLock().tryLock(time, unit); + } + + /** {@inheritDoc} */ + @Override public void unlock() { + delegate.writeLock().unlock(); + } + + /** {@inheritDoc} */ + @NotNull @Override public Condition newCondition() { + return delegate.writeLock().newCondition(); + } + + /** + * Queries if this write lock is held by the current thread. + * Identical in effect to {@link + * ReentrantReadWriteLock#isWriteLockedByCurrentThread}. + * + * @return {@code true} if the current thread holds this lock and + * {@code false} otherwise + */ + public boolean isHeldByCurrentThread() { + return delegate.writeLock().isHeldByCurrentThread(); + } + } + + /** Tracks long rlock holders. */ + public static class ReadLockWithTracking extends ReadLock { + /** + * Delegate instance. + */ + private final ReentrantReadWriteLock delegate; + + /** */ + private static final ThreadLocal> READ_LOCK_HOLDER_TS = + ThreadLocal.withInitial(() -> new T2<>(0, 0L)); + + /** */ + private IgniteLogger log; + + /** */ + private long readLockThreshold; + + /** */ + protected ReadLockWithTracking(ReentrantReadWriteLock lock, @Nullable IgniteLogger log, long readLockThreshold) { + super(lock); + + delegate = lock; + + this.log = log; + + this.readLockThreshold = readLockThreshold; + } + + /** */ + private void inc() { + T2 val = READ_LOCK_HOLDER_TS.get(); + + int cntr = val.get1(); + + if (cntr == 0) + val.set2(U.currentTimeMillis()); + + val.set1(++cntr); + + READ_LOCK_HOLDER_TS.set(val); + } + + /** */ + private void dec() { + T2 val = READ_LOCK_HOLDER_TS.get(); + + int cntr = val.get1(); + + if (--cntr == 0) { + long timeout = U.currentTimeMillis() - val.get2(); + + if (timeout > readLockThreshold) { + GridStringBuilder sb = new GridStringBuilder(); + + sb.a(LOCK_HOLD_MESSAGE + timeout + " ms." + nl()); + + U.printStackTrace(Thread.currentThread().getId(), sb); + + U.warn(log, sb.toString()); + } + } + + val.set1(cntr); + + READ_LOCK_HOLDER_TS.set(val); + } + + /** {@inheritDoc} */ + @SuppressWarnings("LockAcquiredButNotSafelyReleased") + @Override public void lock() { + delegate.readLock().lock(); + + inc(); + } + + /** {@inheritDoc} */ + @SuppressWarnings("LockAcquiredButNotSafelyReleased") + @Override public void lockInterruptibly() throws InterruptedException { + delegate.readLock().lockInterruptibly(); + + inc(); + } + + /** {@inheritDoc} */ + @Override public boolean tryLock() { + if (delegate.readLock().tryLock()) { + inc(); + + return true; + } + else + return false; + } + + /** {@inheritDoc} */ + @Override public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { + if (delegate.readLock().tryLock(timeout, unit)) { + inc(); + + return true; + } + else + return false; + } + + /** {@inheritDoc} */ + @Override public void unlock() { + delegate.readLock().unlock(); + + dec(); + } + } + + /** Default implementation. */ + public static class ReadLock implements Lock { + /** Delegate instance. */ + private final ReentrantReadWriteLock delegate; + + /** {@inheritDoc} */ + protected ReadLock(ReentrantReadWriteLock lock) { + delegate = lock; + } + + /** {@inheritDoc} */ + @SuppressWarnings("LockAcquiredButNotSafelyReleased") + @Override public void lock() { + delegate.readLock().lock(); + } + + /** {@inheritDoc} */ + @SuppressWarnings("LockAcquiredButNotSafelyReleased") + @Override public void lockInterruptibly() throws InterruptedException { + delegate.readLock().lockInterruptibly(); + } + + /** {@inheritDoc} */ + @Override public boolean tryLock() { + return delegate.readLock().tryLock(); + } + + /** {@inheritDoc} */ + @Override public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { + return delegate.readLock().tryLock(timeout, unit); + } + + /** {@inheritDoc} */ + @Override public void unlock() { + delegate.readLock().unlock(); + } + + /** {@inheritDoc} */ + @Override public Condition newCondition() { + return delegate.readLock().newCondition(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return delegate.readLock().toString(); + } + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/CheckpointReadLockFailureTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/CheckpointReadLockFailureTest.java index f7613875f1e27..4e84c9b624abc 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/CheckpointReadLockFailureTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/CheckpointReadLockFailureTest.java @@ -21,8 +21,8 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.ignite.Ignite; +import org.apache.ignite.cluster.ClusterState; import org.apache.ignite.configuration.DataRegionConfiguration; import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; @@ -33,6 +33,7 @@ import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointReadWriteLock; +import org.apache.ignite.internal.util.ReentrantReadWriteLockWithTracking; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.ListeningTestLogger; @@ -45,7 +46,7 @@ import org.junit.Test; import static org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager.IGNITE_PDS_LOG_CP_READ_LOCK_HOLDERS; -import static org.apache.ignite.internal.util.IgniteUtils.LOCK_HOLD_MESSAGE; +import static org.apache.ignite.internal.util.ReentrantReadWriteLockWithTracking.LOCK_HOLD_MESSAGE; /** * Tests critical failure handling on checkpoint read lock acquisition errors. @@ -125,7 +126,7 @@ public void testFailureTypeOnTimeout() throws Exception { IgniteEx ig = startGrid(0); - ig.cluster().active(true); + ig.cluster().state(ClusterState.ACTIVE); GridCacheDatabaseSharedManager db = (GridCacheDatabaseSharedManager)ig.context().cache().context().database(); @@ -168,7 +169,7 @@ public void testFailureTypeOnTimeout() throws Exception { public void testPrintCpRLockHolder() throws Exception { CountDownLatch canRelease = new CountDownLatch(1); - testLog = new ListeningTestLogger(false, log); + testLog = new ListeningTestLogger(log); LogListener lsnr = LogListener.matches(LOCK_HOLD_MESSAGE).build(); @@ -176,14 +177,14 @@ public void testPrintCpRLockHolder() throws Exception { IgniteEx ig = startGrid(0); - ig.cluster().active(true); + ig.cluster().state(ClusterState.ACTIVE); GridCacheDatabaseSharedManager db = (GridCacheDatabaseSharedManager)ig.context().cache().context().database(); CheckpointReadWriteLock checkpointReadWriteLock = U.field( db.checkpointManager.checkpointTimeoutLock(), "checkpointReadWriteLock" ); - U.ReentrantReadWriteLockTracer tracker = U.field(checkpointReadWriteLock, "checkpointLock"); + ReentrantReadWriteLockWithTracking tracker = U.field(checkpointReadWriteLock, "checkpointLock"); GridTestUtils.runAsync(() -> { checkpointReadWriteLock.readLock(); @@ -212,7 +213,7 @@ public void testPrintCpRLockHolder() throws Exception { public void testReentrance() throws Exception { IgniteEx ig = startGrid(0); - ig.cluster().active(true); + ig.cluster().state(ClusterState.ACTIVE); GridCacheDatabaseSharedManager db = (GridCacheDatabaseSharedManager)ig.context().cache().context().database(); @@ -220,7 +221,7 @@ public void testReentrance() throws Exception { db.checkpointManager.checkpointTimeoutLock(), "checkpointReadWriteLock" ); - ReentrantReadWriteLock rwLock = U.field(checkpointReadWriteLock, "checkpointLock"); + ReentrantReadWriteLockWithTracking rwLock = U.field(checkpointReadWriteLock, "checkpointLock"); CountDownLatch waitFirstRLock = new CountDownLatch(1); @@ -243,7 +244,7 @@ public void testReentrance() throws Exception { waitSecondRLock.await(); } catch (InterruptedException e) { - e.printStackTrace(); + fail(e.toString()); } rwLock.readLock().unlock(); @@ -254,7 +255,7 @@ public void testReentrance() throws Exception { waitFirstRLock.await(); } catch (InterruptedException e) { - e.printStackTrace(); + fail(e.toString()); } try { @@ -288,4 +289,22 @@ public void testReentrance() throws Exception { stopGrid(0); } + + /** + * @throws Exception If failed. + */ + @Test + @WithSystemProperty(key = IGNITE_PDS_LOG_CP_READ_LOCK_HOLDERS, value = "true") + public void test0() { + ReentrantReadWriteLockWithTracking wrapped = new ReentrantReadWriteLockWithTracking(log, 1_000); + + wrapped.writeLock().lock(); + + try { + assertTrue(wrapped.isWriteLockedByCurrentThread()); + } + finally { + wrapped.writeLock().unlock(); + } + } } From daa1f3bc46d76704b2c0c14b9f54ef398813ead3 Mon Sep 17 00:00:00 2001 From: zstan Date: Tue, 1 Dec 2020 08:41:32 +0300 Subject: [PATCH 2/7] fix javadoc --- .../internal/util/ReentrantReadWriteLockWithTracking.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ReentrantReadWriteLockWithTracking.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ReentrantReadWriteLockWithTracking.java index d8eff1b266b90..57e991899925d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/ReentrantReadWriteLockWithTracking.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ReentrantReadWriteLockWithTracking.java @@ -285,7 +285,7 @@ public static class ReadLock implements Lock { /** Delegate instance. */ private final ReentrantReadWriteLock delegate; - /** {@inheritDoc} */ + /** */ protected ReadLock(ReentrantReadWriteLock lock) { delegate = lock; } From cf6bdd4169ac23b66ddfeb3384bbb4cb9113d872 Mon Sep 17 00:00:00 2001 From: zstan Date: Tue, 1 Dec 2020 08:47:27 +0300 Subject: [PATCH 3/7] fix test --- .../processors/cache/persistence/db/IgnitePdsWithTtlTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlTest.java index 47351df919ef2..f502e58292845 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlTest.java @@ -23,7 +23,6 @@ import java.util.TreeSet; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.ReentrantReadWriteLock; import javax.cache.expiry.AccessedExpiryPolicy; import javax.cache.expiry.CreatedExpiryPolicy; import javax.cache.expiry.Duration; @@ -55,6 +54,7 @@ import org.apache.ignite.internal.processors.cache.IgniteCacheProxy; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointManager; +import org.apache.ignite.internal.util.ReentrantReadWriteLockWithTracking; import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.lang.GridCursor; import org.apache.ignite.internal.util.typedef.F; @@ -286,7 +286,7 @@ public void testPutOpsIntoCacheWithExpirationConcurrentlyWithCheckpointCompleteS checkpointManager.checkpointTimeoutLock(), "checkpointReadWriteLock" ); - ReentrantReadWriteLock lock = U.field(checkpointReadWriteLock, "checkpointLock"); + ReentrantReadWriteLockWithTracking lock = U.field(checkpointReadWriteLock, "checkpointLock"); while (!timeoutReached.get()) { try { From 8579b3ca7c795a09745af81acaae52d4d63daa71 Mon Sep 17 00:00:00 2001 From: zstan Date: Tue, 1 Dec 2020 14:23:56 +0300 Subject: [PATCH 4/7] test rename --- .../cache/persistence/CheckpointReadLockFailureTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/CheckpointReadLockFailureTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/CheckpointReadLockFailureTest.java index 4e84c9b624abc..a1ebfb48c947a 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/CheckpointReadLockFailureTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/CheckpointReadLockFailureTest.java @@ -295,7 +295,7 @@ public void testReentrance() throws Exception { */ @Test @WithSystemProperty(key = IGNITE_PDS_LOG_CP_READ_LOCK_HOLDERS, value = "true") - public void test0() { + public void testWriteLockedByCurrentThread() { ReentrantReadWriteLockWithTracking wrapped = new ReentrantReadWriteLockWithTracking(log, 1_000); wrapped.writeLock().lock(); From 1f32bcf6f8a2e303175677e2fc9b16763404f13d Mon Sep 17 00:00:00 2001 From: zstan Date: Thu, 3 Dec 2020 11:56:20 +0300 Subject: [PATCH 5/7] fix after review --- .../ReentrantReadWriteLockWithTracking.java | 137 ++---------------- 1 file changed, 11 insertions(+), 126 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ReentrantReadWriteLockWithTracking.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ReentrantReadWriteLockWithTracking.java index 57e991899925d..2c8a66d358257 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/ReentrantReadWriteLockWithTracking.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ReentrantReadWriteLockWithTracking.java @@ -18,14 +18,11 @@ package org.apache.ignite.internal.util; import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.U; -import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.internal.util.IgniteUtils.nl; @@ -42,10 +39,10 @@ public class ReentrantReadWriteLockWithTracking implements ReadWriteLock { private final ReentrantReadWriteLock delegate = new ReentrantReadWriteLock(); /** Read lock holder. */ - private ReentrantReadWriteLockWithTracking.ReadLock readLock; + private ReentrantReadWriteLock.ReadLock readLock; /** Write lock holder. */ - private ReentrantReadWriteLockWithTracking.WriteLock writeLock = new ReentrantReadWriteLockWithTracking.WriteLock(delegate); + private ReentrantReadWriteLock.WriteLock writeLock = new ReentrantReadWriteLock.WriteLock(delegate) {}; /** * ReentrantRWLock wrapper, provides additional trace info on {@link ReadLockWithTracking#unlock()} method, if someone @@ -62,16 +59,16 @@ public ReentrantReadWriteLockWithTracking(IgniteLogger log, long readLockThresho /** Delegator implementation. */ public ReentrantReadWriteLockWithTracking() { - readLock = new ReentrantReadWriteLockWithTracking.ReadLock(delegate); + readLock = new ReentrantReadWriteLock.ReadLock(delegate) {}; } /** {@inheritDoc} */ - @Override public ReentrantReadWriteLockWithTracking.ReadLock readLock() { + @Override public ReentrantReadWriteLock.ReadLock readLock() { return readLock; } /** {@inheritDoc} */ - @Override public ReentrantReadWriteLockWithTracking.WriteLock writeLock() { + @Override public ReentrantReadWriteLock.WriteLock writeLock() { return writeLock; } @@ -112,68 +109,8 @@ public int getReadLockCount() { return delegate.getReadLockCount(); } - /** */ - public static class WriteLock implements Lock { - /** Delegate instance. */ - private final ReentrantReadWriteLock delegate; - - /** */ - public WriteLock(ReentrantReadWriteLock lock) { - delegate = lock; - } - - /** {@inheritDoc} */ - @SuppressWarnings("LockAcquiredButNotSafelyReleased") - @Override public void lock() { - delegate.writeLock().lock(); - } - - /** {@inheritDoc} */ - @SuppressWarnings("LockAcquiredButNotSafelyReleased") - @Override public void lockInterruptibly() throws InterruptedException { - delegate.writeLock().lockInterruptibly(); - } - - /** {@inheritDoc} */ - @Override public boolean tryLock() { - return delegate.writeLock().tryLock(); - } - - /** {@inheritDoc} */ - @Override public boolean tryLock(long time, @NotNull TimeUnit unit) throws InterruptedException { - return delegate.writeLock().tryLock(time, unit); - } - - /** {@inheritDoc} */ - @Override public void unlock() { - delegate.writeLock().unlock(); - } - - /** {@inheritDoc} */ - @NotNull @Override public Condition newCondition() { - return delegate.writeLock().newCondition(); - } - - /** - * Queries if this write lock is held by the current thread. - * Identical in effect to {@link - * ReentrantReadWriteLock#isWriteLockedByCurrentThread}. - * - * @return {@code true} if the current thread holds this lock and - * {@code false} otherwise - */ - public boolean isHeldByCurrentThread() { - return delegate.writeLock().isHeldByCurrentThread(); - } - } - /** Tracks long rlock holders. */ - public static class ReadLockWithTracking extends ReadLock { - /** - * Delegate instance. - */ - private final ReentrantReadWriteLock delegate; - + public static class ReadLockWithTracking extends ReentrantReadWriteLock.ReadLock { /** */ private static final ThreadLocal> READ_LOCK_HOLDER_TS = ThreadLocal.withInitial(() -> new T2<>(0, 0L)); @@ -188,8 +125,6 @@ public static class ReadLockWithTracking extends ReadLock { protected ReadLockWithTracking(ReentrantReadWriteLock lock, @Nullable IgniteLogger log, long readLockThreshold) { super(lock); - delegate = lock; - this.log = log; this.readLockThreshold = readLockThreshold; @@ -235,24 +170,22 @@ private void dec() { } /** {@inheritDoc} */ - @SuppressWarnings("LockAcquiredButNotSafelyReleased") @Override public void lock() { - delegate.readLock().lock(); + super.lock(); inc(); } /** {@inheritDoc} */ - @SuppressWarnings("LockAcquiredButNotSafelyReleased") @Override public void lockInterruptibly() throws InterruptedException { - delegate.readLock().lockInterruptibly(); + super.lockInterruptibly(); inc(); } /** {@inheritDoc} */ @Override public boolean tryLock() { - if (delegate.readLock().tryLock()) { + if (super.tryLock()) { inc(); return true; @@ -263,7 +196,7 @@ private void dec() { /** {@inheritDoc} */ @Override public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { - if (delegate.readLock().tryLock(timeout, unit)) { + if (super.tryLock(timeout, unit)) { inc(); return true; @@ -274,57 +207,9 @@ private void dec() { /** {@inheritDoc} */ @Override public void unlock() { - delegate.readLock().unlock(); + super.unlock(); dec(); } } - - /** Default implementation. */ - public static class ReadLock implements Lock { - /** Delegate instance. */ - private final ReentrantReadWriteLock delegate; - - /** */ - protected ReadLock(ReentrantReadWriteLock lock) { - delegate = lock; - } - - /** {@inheritDoc} */ - @SuppressWarnings("LockAcquiredButNotSafelyReleased") - @Override public void lock() { - delegate.readLock().lock(); - } - - /** {@inheritDoc} */ - @SuppressWarnings("LockAcquiredButNotSafelyReleased") - @Override public void lockInterruptibly() throws InterruptedException { - delegate.readLock().lockInterruptibly(); - } - - /** {@inheritDoc} */ - @Override public boolean tryLock() { - return delegate.readLock().tryLock(); - } - - /** {@inheritDoc} */ - @Override public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { - return delegate.readLock().tryLock(timeout, unit); - } - - /** {@inheritDoc} */ - @Override public void unlock() { - delegate.readLock().unlock(); - } - - /** {@inheritDoc} */ - @Override public Condition newCondition() { - return delegate.readLock().newCondition(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return delegate.readLock().toString(); - } - } } From 54a0accef8b5d7eb84c76632c7793385e047e2e0 Mon Sep 17 00:00:00 2001 From: zstan Date: Thu, 3 Dec 2020 12:06:01 +0300 Subject: [PATCH 6/7] fix --- .../internal/util/ReentrantReadWriteLockWithTracking.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ReentrantReadWriteLockWithTracking.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ReentrantReadWriteLockWithTracking.java index 2c8a66d358257..8dbf8937dad0a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/ReentrantReadWriteLockWithTracking.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ReentrantReadWriteLockWithTracking.java @@ -111,6 +111,9 @@ public int getReadLockCount() { /** Tracks long rlock holders. */ public static class ReadLockWithTracking extends ReentrantReadWriteLock.ReadLock { + /** */ + private static final long serialVersionUID = 0L; + /** */ private static final ThreadLocal> READ_LOCK_HOLDER_TS = ThreadLocal.withInitial(() -> new T2<>(0, 0L)); From ac2906919dcd43372c3709f145d477bfb3de8a08 Mon Sep 17 00:00:00 2001 From: zstan Date: Fri, 4 Dec 2020 09:04:32 +0300 Subject: [PATCH 7/7] fix test --- .../internal/util/ReentrantReadWriteLockWithTracking.java | 2 +- .../persistence/IgnitePdsCacheEntriesExpirationTest.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ReentrantReadWriteLockWithTracking.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ReentrantReadWriteLockWithTracking.java index 8dbf8937dad0a..227be476f63fa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/ReentrantReadWriteLockWithTracking.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ReentrantReadWriteLockWithTracking.java @@ -27,7 +27,7 @@ import static org.apache.ignite.internal.util.IgniteUtils.nl; -/** */ +/** ReentrantReadWriteLock adapter with readLock tracking. */ public class ReentrantReadWriteLockWithTracking implements ReadWriteLock { /** Lock hold message. */ public static final String LOCK_HOLD_MESSAGE = "ReadLock held the lock more than "; diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheEntriesExpirationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheEntriesExpirationTest.java index 74ac21152403c..986c680452c99 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheEntriesExpirationTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheEntriesExpirationTest.java @@ -21,7 +21,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.ReentrantReadWriteLock; import javax.cache.expiry.CreatedExpiryPolicy; import javax.cache.expiry.Duration; import org.apache.ignite.IgniteCache; @@ -44,6 +43,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopologyImpl; import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointReadWriteLock; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.util.ReentrantReadWriteLockWithTracking; import org.apache.ignite.internal.util.lang.IgniteInClosure2X; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.testframework.GridTestUtils; @@ -243,7 +243,7 @@ public void testDeadlockBetweenCachePutAndEntryExpiration() throws Exception { db.checkpointManager.checkpointTimeoutLock(), "checkpointReadWriteLock" ); - ReentrantReadWriteLock rwLock = U.field(checkpointReadWriteLock, "checkpointLock"); + ReentrantReadWriteLockWithTracking rwLock = U.field(checkpointReadWriteLock, "checkpointLock"); int key = 0;