diff --git a/geode-core/src/main/java/org/apache/geode/internal/offheap/MemoryAllocatorImpl.java b/geode-core/src/main/java/org/apache/geode/internal/offheap/MemoryAllocatorImpl.java index 4e433e4b10ff..70f8f4ab0eab 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/offheap/MemoryAllocatorImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/offheap/MemoryAllocatorImpl.java @@ -20,10 +20,8 @@ import java.util.HashSet; import java.util.List; import java.util.Set; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; import org.apache.logging.log4j.Logger; @@ -39,7 +37,6 @@ import org.apache.geode.internal.lang.SystemProperty; import org.apache.geode.internal.offheap.annotations.OffHeapIdentifier; import org.apache.geode.internal.offheap.annotations.Unretained; -import org.apache.geode.logging.internal.executors.LoggingExecutors; import org.apache.geode.logging.internal.log4j.api.LogService; import org.apache.geode.util.internal.GeodeGlossary; @@ -64,14 +61,14 @@ public class MemoryAllocatorImpl implements MemoryAllocator { SystemProperty.getProductIntegerProperty( "off-heap-stats-update-frequency-ms").orElse(3600000); - private final ScheduledExecutorService updateNonRealTimeStatsExecutor; - - private final ScheduledFuture updateNonRealTimeStatsFuture; + private final NonRealTimeStatsUpdater nonRealTimeStatsUpdater; private volatile OffHeapMemoryStats stats; private volatile OutOfOffHeapMemoryListener ooohml; + private final int updateOffHeapStatsFrequencyMs; + OutOfOffHeapMemoryListener getOutOfOffHeapMemoryListener() { return ooohml; } @@ -98,20 +95,17 @@ public static MemoryAllocatorImpl getAllocator() { public static MemoryAllocator create(OutOfOffHeapMemoryListener ooohml, OffHeapMemoryStats stats, int slabCount, long offHeapMemorySize, long maxSlabSize, - int updateOffHeapStatsFrequencyMs) { + Supplier updateOffHeapStatsFrequencyMsSupplier, + Supplier nonRealTimeStatsUpdaterSupplier) { return create(ooohml, stats, slabCount, offHeapMemorySize, maxSlabSize, null, - SlabImpl::new, updateOffHeapStatsFrequencyMs); + SlabImpl::new, updateOffHeapStatsFrequencyMsSupplier, nonRealTimeStatsUpdaterSupplier); } - public static MemoryAllocator create(OutOfOffHeapMemoryListener ooohml, OffHeapMemoryStats stats, - int slabCount, long offHeapMemorySize, long maxSlabSize) { - return create(ooohml, stats, slabCount, offHeapMemorySize, maxSlabSize, null, - SlabImpl::new, UPDATE_OFF_HEAP_STATS_FREQUENCY_MS); - } - - private static MemoryAllocatorImpl create(OutOfOffHeapMemoryListener ooohml, + static MemoryAllocatorImpl create(OutOfOffHeapMemoryListener ooohml, OffHeapMemoryStats stats, int slabCount, long offHeapMemorySize, long maxSlabSize, - Slab[] slabs, SlabFactory slabFactory, int updateOffHeapStatsFrequencyMs) { + Slab[] slabs, SlabFactory slabFactory, + Supplier updateOffHeapStatsFrequencyMsSupplier, + Supplier nonRealTimeStatsUpdaterSupplier) { MemoryAllocatorImpl result = singleton; boolean created = false; try { @@ -155,7 +149,10 @@ private static MemoryAllocatorImpl create(OutOfOffHeapMemoryListener ooohml, } } - result = new MemoryAllocatorImpl(ooohml, stats, slabs, updateOffHeapStatsFrequencyMs); + result = new MemoryAllocatorImpl(ooohml, stats, slabs, + updateOffHeapStatsFrequencyMsSupplier == null ? UPDATE_OFF_HEAP_STATS_FREQUENCY_MS + : updateOffHeapStatsFrequencyMsSupplier.get(), + nonRealTimeStatsUpdaterSupplier); singleton = result; LifecycleListener.invokeAfterCreate(result); created = true; @@ -170,16 +167,10 @@ private static MemoryAllocatorImpl create(OutOfOffHeapMemoryListener ooohml, } } } + result.start(); return result; } - static MemoryAllocatorImpl createForUnitTest(OutOfOffHeapMemoryListener ooohml, - OffHeapMemoryStats stats, int slabCount, long offHeapMemorySize, long maxSlabSize, - SlabFactory memChunkFactory) { - return create(ooohml, stats, slabCount, offHeapMemorySize, maxSlabSize, null, memChunkFactory, - UPDATE_OFF_HEAP_STATS_FREQUENCY_MS); - } - public static MemoryAllocatorImpl createForUnitTest(OutOfOffHeapMemoryListener oooml, OffHeapMemoryStats stats, Slab[] slabs) { int slabCount = 0; @@ -196,10 +187,9 @@ public static MemoryAllocatorImpl createForUnitTest(OutOfOffHeapMemoryListener o } } return create(oooml, stats, slabCount, offHeapMemorySize, maxSlabSize, slabs, null, - UPDATE_OFF_HEAP_STATS_FREQUENCY_MS); + null, () -> null); } - private void reuse(OutOfOffHeapMemoryListener oooml, OffHeapMemoryStats newStats, long offHeapMemorySize, Slab[] slabs) { if (isClosed()) { @@ -223,7 +213,8 @@ private void reuse(OutOfOffHeapMemoryListener oooml, OffHeapMemoryStats newStats private MemoryAllocatorImpl(final OutOfOffHeapMemoryListener oooml, final OffHeapMemoryStats stats, final Slab[] slabs, - int updateOffHeapStatsFrequencyMs) { + int updateOffHeapStatsFrequencyMs, + Supplier nonRealTimeStatsUpdaterSupplier) { if (oooml == null) { throw new IllegalArgumentException("OutOfOffHeapMemoryListener is null"); } @@ -239,11 +230,19 @@ private MemoryAllocatorImpl(final OutOfOffHeapMemoryListener oooml, this.stats.incMaxMemory(freeList.getTotalMemory()); this.stats.incFreeMemory(freeList.getTotalMemory()); - updateNonRealTimeStatsExecutor = - LoggingExecutors.newSingleThreadScheduledExecutor("Update Freelist Stats thread"); - updateNonRealTimeStatsFuture = - updateNonRealTimeStatsExecutor.scheduleAtFixedRate(freeList::updateNonRealTimeStats, 0, - updateOffHeapStatsFrequencyMs, TimeUnit.MILLISECONDS); + this.updateOffHeapStatsFrequencyMs = updateOffHeapStatsFrequencyMs; + + if (nonRealTimeStatsUpdaterSupplier == null) { + nonRealTimeStatsUpdater = new NonRealTimeStatsUpdater(freeList::updateNonRealTimeStats); + } else { + nonRealTimeStatsUpdater = nonRealTimeStatsUpdaterSupplier.get(); + } + } + + void start() { + if (nonRealTimeStatsUpdater != null) { + nonRealTimeStatsUpdater.start(updateOffHeapStatsFrequencyMs); + } } public List getLostChunks(InternalCache cache) { @@ -407,8 +406,9 @@ private void realClose() { if (setClosed()) { freeList.freeSlabs(); stats.close(); - updateNonRealTimeStatsFuture.cancel(true); - updateNonRealTimeStatsExecutor.shutdown(); + if (nonRealTimeStatsUpdater != null) { + nonRealTimeStatsUpdater.stop(); + } singleton = null; } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/offheap/NonRealTimeStatsUpdater.java b/geode-core/src/main/java/org/apache/geode/internal/offheap/NonRealTimeStatsUpdater.java new file mode 100644 index 000000000000..933b28ba35b8 --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/internal/offheap/NonRealTimeStatsUpdater.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.offheap; + +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.geode.logging.internal.executors.LoggingExecutors; + +public class NonRealTimeStatsUpdater { + private final Runnable updateTask; + private final ScheduledExecutorService updateNonRealTimeStatsExecutor; + private final AtomicReference> updateNonRealTimeStatsFuture = + new AtomicReference<>(); + + NonRealTimeStatsUpdater(Runnable updateTask) { + this.updateTask = updateTask; + updateNonRealTimeStatsExecutor = + LoggingExecutors.newSingleThreadScheduledExecutor("Update Freelist Stats thread"); + + } + + void start(int updateOffHeapStatsFrequencyMs) { + updateNonRealTimeStatsFuture + .set(updateNonRealTimeStatsExecutor.scheduleAtFixedRate(updateTask, 0, + updateOffHeapStatsFrequencyMs, TimeUnit.MILLISECONDS)); + } + + void stop() { + updateNonRealTimeStatsFuture.get().cancel(true); + updateNonRealTimeStatsExecutor.shutdown(); + } +} diff --git a/geode-core/src/main/java/org/apache/geode/internal/offheap/OffHeapStorage.java b/geode-core/src/main/java/org/apache/geode/internal/offheap/OffHeapStorage.java index 755fef9fc178..2bbd5876cbce 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/offheap/OffHeapStorage.java +++ b/geode-core/src/main/java/org/apache/geode/internal/offheap/OffHeapStorage.java @@ -15,6 +15,7 @@ package org.apache.geode.internal.offheap; import java.lang.reflect.Method; +import java.util.function.Supplier; import org.apache.geode.StatisticDescriptor; import org.apache.geode.Statistics; @@ -219,22 +220,12 @@ public static MemoryAllocator createOffHeapStorage(StatisticsFactory sf, long of // ooohml provides the hook for disconnecting and closing cache on OutOfOffHeapMemoryException OutOfOffHeapMemoryListener ooohml = new DisconnectingOutOfOffHeapMemoryListener((InternalDistributedSystem) system); - return basicCreateOffHeapStorage(sf, offHeapMemorySize, ooohml); + return basicCreateOffHeapStorage(sf, offHeapMemorySize, ooohml, null, null); } static MemoryAllocator basicCreateOffHeapStorage(StatisticsFactory sf, long offHeapMemorySize, - OutOfOffHeapMemoryListener ooohml) { - final OffHeapMemoryStats stats = new OffHeapStorage(sf); - - final long maxSlabSize = calcMaxSlabSize(offHeapMemorySize); - - final int slabCount = calcSlabCount(maxSlabSize, offHeapMemorySize); - - return MemoryAllocatorImpl.create(ooohml, stats, slabCount, offHeapMemorySize, maxSlabSize); - } - - static MemoryAllocator basicCreateOffHeapStorage(StatisticsFactory sf, long offHeapMemorySize, - OutOfOffHeapMemoryListener ooohml, int updateOffHeapStatsFrequencyMs) { + OutOfOffHeapMemoryListener ooohml, Supplier updateOffHeapStatsFrequencyMsSupplier, + Supplier nonRealTimeStatsUpdaterSupplier) { final OffHeapMemoryStats stats = new OffHeapStorage(sf); final long maxSlabSize = calcMaxSlabSize(offHeapMemorySize); @@ -242,7 +233,7 @@ static MemoryAllocator basicCreateOffHeapStorage(StatisticsFactory sf, long offH final int slabCount = calcSlabCount(maxSlabSize, offHeapMemorySize); return MemoryAllocatorImpl.create(ooohml, stats, slabCount, offHeapMemorySize, maxSlabSize, - updateOffHeapStatsFrequencyMs); + updateOffHeapStatsFrequencyMsSupplier, nonRealTimeStatsUpdaterSupplier); } private static final long MAX_SLAB_SIZE = Integer.MAX_VALUE; diff --git a/geode-core/src/test/java/org/apache/geode/internal/offheap/MemoryAllocatorJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/offheap/MemoryAllocatorJUnitTest.java index 6de0312f4145..2626fd051b5a 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/offheap/MemoryAllocatorJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/offheap/MemoryAllocatorJUnitTest.java @@ -72,9 +72,9 @@ public void testCreate() { NullOutOfOffHeapMemoryListener listener = new NullOutOfOffHeapMemoryListener(); NullOffHeapMemoryStats stats = new NullOffHeapMemoryStats(); try { - MemoryAllocatorImpl.createForUnitTest(listener, stats, 10, 950, 100, size -> { + MemoryAllocatorImpl.create(listener, stats, 10, 950, 100, null, size -> { throw new OutOfMemoryError("expected"); - }); + }, null, () -> null); } catch (OutOfMemoryError expected) { } assertTrue(listener.isClosed()); @@ -98,7 +98,8 @@ public Slab create(int size) { } } }; - MemoryAllocatorImpl.createForUnitTest(listener, stats, 10, 950, MAX_SLAB_SIZE, factory); + MemoryAllocatorImpl.create(listener, stats, 10, 950, MAX_SLAB_SIZE, null, factory, null, + () -> null); } catch (OutOfMemoryError expected) { } assertTrue(listener.isClosed()); @@ -109,7 +110,8 @@ public Slab create(int size) { NullOffHeapMemoryStats stats = new NullOffHeapMemoryStats(); SlabFactory factory = SlabImpl::new; MemoryAllocator ma = - MemoryAllocatorImpl.createForUnitTest(listener, stats, 10, 950, 100, factory); + MemoryAllocatorImpl.create(listener, stats, 10, 950, 100, null, factory, null, + () -> null); try { assertFalse(listener.isClosed()); assertFalse(stats.isClosed()); @@ -135,7 +137,8 @@ public Slab create(int size) { listener = new NullOutOfOffHeapMemoryListener(); stats2 = new NullOffHeapMemoryStats(); MemoryAllocator ma2 = - MemoryAllocatorImpl.createForUnitTest(listener, stats2, 10, 950, 100, factory); + MemoryAllocatorImpl.create(listener, stats2, 10, 950, 100, null, factory, null, + () -> null); assertSame(ma, ma2); assertTrue(stats.isClosed()); assertFalse(listener.isClosed()); diff --git a/geode-core/src/test/java/org/apache/geode/internal/offheap/OffHeapHelperJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/offheap/OffHeapHelperJUnitTest.java index 7f5fcdcce660..7019848647cb 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/offheap/OffHeapHelperJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/offheap/OffHeapHelperJUnitTest.java @@ -46,7 +46,7 @@ public void setUp() { OffHeapMemoryStats stats = mock(OffHeapMemoryStats.class); ma = MemoryAllocatorImpl.create(ooohml, stats, 3, OffHeapStorage.MIN_SLAB_SIZE * 3, - OffHeapStorage.MIN_SLAB_SIZE); + OffHeapStorage.MIN_SLAB_SIZE, null, () -> null); } /** diff --git a/geode-core/src/test/java/org/apache/geode/internal/offheap/OffHeapRegionEntryHelperInstanceTest.java b/geode-core/src/test/java/org/apache/geode/internal/offheap/OffHeapRegionEntryHelperInstanceTest.java index d32cb8b26174..87786d1df55c 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/offheap/OffHeapRegionEntryHelperInstanceTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/offheap/OffHeapRegionEntryHelperInstanceTest.java @@ -81,7 +81,7 @@ public void setUp() { memoryAllocator = MemoryAllocatorImpl.create(listener, stats, 1, OffHeapStorage.MIN_SLAB_SIZE, - OffHeapStorage.MIN_SLAB_SIZE); + OffHeapStorage.MIN_SLAB_SIZE, null, () -> null); offHeapRegionEntryHelperInstance = spy(new OffHeapRegionEntryHelperInstance(ohAddress -> offHeapStoredObject, diff --git a/geode-core/src/test/java/org/apache/geode/internal/offheap/OffHeapStorageJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/offheap/OffHeapStorageJUnitTest.java index f940bca6f894..d30a4feecad2 100755 --- a/geode-core/src/test/java/org/apache/geode/internal/offheap/OffHeapStorageJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/offheap/OffHeapStorageJUnitTest.java @@ -167,7 +167,8 @@ public void testCreateOffHeapStorage() { StatisticsFactory localStatsFactory = new LocalStatisticsFactory(null); OutOfOffHeapMemoryListener ooohml = mock(OutOfOffHeapMemoryListener.class); MemoryAllocator ma = - OffHeapStorage.basicCreateOffHeapStorage(localStatsFactory, 1024 * 1024, ooohml); + OffHeapStorage.basicCreateOffHeapStorage(localStatsFactory, 1024 * 1024, ooohml, null, + () -> null); try { OffHeapMemoryStats stats = ma.getStats(); assertNotNull(stats.getStats()); diff --git a/geode-core/src/test/java/org/apache/geode/internal/offheap/OffHeapStorageNonRuntimeStatsJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/offheap/OffHeapStorageNonRuntimeStatsJUnitTest.java index 2aecc7b70366..0dd651f64138 100755 --- a/geode-core/src/test/java/org/apache/geode/internal/offheap/OffHeapStorageNonRuntimeStatsJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/offheap/OffHeapStorageNonRuntimeStatsJUnitTest.java @@ -30,7 +30,8 @@ public void testUpdateNonRealTimeOffHeapStorageStats() { StatisticsFactory localStatsFactory = new LocalStatisticsFactory(null); OutOfOffHeapMemoryListener ooohml = mock(OutOfOffHeapMemoryListener.class); MemoryAllocator ma = - OffHeapStorage.basicCreateOffHeapStorage(localStatsFactory, 1024 * 1024, ooohml, 100); + OffHeapStorage.basicCreateOffHeapStorage(localStatsFactory, 1024 * 1024, ooohml, () -> 100, + null); try { OffHeapMemoryStats stats = ma.getStats(); diff --git a/geode-core/src/test/java/org/apache/geode/internal/offheap/OffHeapStoredObjectJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/offheap/OffHeapStoredObjectJUnitTest.java index 2801c6dfc16b..d3e1b90bbceb 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/offheap/OffHeapStoredObjectJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/offheap/OffHeapStoredObjectJUnitTest.java @@ -74,7 +74,7 @@ public void setUp() { OffHeapMemoryStats stats = mock(OffHeapMemoryStats.class); ma = MemoryAllocatorImpl.create(ooohml, stats, 3, OffHeapStorage.MIN_SLAB_SIZE * 3, - OffHeapStorage.MIN_SLAB_SIZE); + OffHeapStorage.MIN_SLAB_SIZE, null, () -> null); } @After