Skip to content

Commit

Permalink
GEODE-10323: Remove schedule threads in MemoryAllocatorImpl construct…
Browse files Browse the repository at this point in the history
…or (#7715)

* GEODE-10323: Remove schedule threads in MemoryAllocatorImpl

The scheduled executor used in MemoryAllocatorImpl
was scheduled in the constructor. This provoked
intermittent failures in OffHeapStorageJUnitTest testCreateOffHeapStorage
test cases due to a race condition.

The scheduling has been moved to a new method (start())
in the MemoryAllocatorImpl class that is in turn
invoked in the create() static method.

* GEODE-10323: Extract update stats code to new class
  • Loading branch information
albertogpz committed Sep 16, 2022
1 parent 0bd51e8 commit 73aa4b3
Show file tree
Hide file tree
Showing 9 changed files with 102 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,8 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;

import org.apache.logging.log4j.Logger;

Expand All @@ -39,7 +37,6 @@
import org.apache.geode.internal.lang.SystemProperty;
import org.apache.geode.internal.offheap.annotations.OffHeapIdentifier;
import org.apache.geode.internal.offheap.annotations.Unretained;
import org.apache.geode.logging.internal.executors.LoggingExecutors;
import org.apache.geode.logging.internal.log4j.api.LogService;
import org.apache.geode.util.internal.GeodeGlossary;

Expand All @@ -64,14 +61,14 @@ public class MemoryAllocatorImpl implements MemoryAllocator {
SystemProperty.getProductIntegerProperty(
"off-heap-stats-update-frequency-ms").orElse(3600000);

private final ScheduledExecutorService updateNonRealTimeStatsExecutor;

private final ScheduledFuture<?> updateNonRealTimeStatsFuture;
private final NonRealTimeStatsUpdater nonRealTimeStatsUpdater;

private volatile OffHeapMemoryStats stats;

private volatile OutOfOffHeapMemoryListener ooohml;

private final int updateOffHeapStatsFrequencyMs;

OutOfOffHeapMemoryListener getOutOfOffHeapMemoryListener() {
return ooohml;
}
Expand All @@ -98,20 +95,17 @@ public static MemoryAllocatorImpl getAllocator() {

public static MemoryAllocator create(OutOfOffHeapMemoryListener ooohml, OffHeapMemoryStats stats,
int slabCount, long offHeapMemorySize, long maxSlabSize,
int updateOffHeapStatsFrequencyMs) {
Supplier<Integer> updateOffHeapStatsFrequencyMsSupplier,
Supplier<NonRealTimeStatsUpdater> nonRealTimeStatsUpdaterSupplier) {
return create(ooohml, stats, slabCount, offHeapMemorySize, maxSlabSize, null,
SlabImpl::new, updateOffHeapStatsFrequencyMs);
SlabImpl::new, updateOffHeapStatsFrequencyMsSupplier, nonRealTimeStatsUpdaterSupplier);
}

public static MemoryAllocator create(OutOfOffHeapMemoryListener ooohml, OffHeapMemoryStats stats,
int slabCount, long offHeapMemorySize, long maxSlabSize) {
return create(ooohml, stats, slabCount, offHeapMemorySize, maxSlabSize, null,
SlabImpl::new, UPDATE_OFF_HEAP_STATS_FREQUENCY_MS);
}

private static MemoryAllocatorImpl create(OutOfOffHeapMemoryListener ooohml,
static MemoryAllocatorImpl create(OutOfOffHeapMemoryListener ooohml,
OffHeapMemoryStats stats, int slabCount, long offHeapMemorySize, long maxSlabSize,
Slab[] slabs, SlabFactory slabFactory, int updateOffHeapStatsFrequencyMs) {
Slab[] slabs, SlabFactory slabFactory,
Supplier<Integer> updateOffHeapStatsFrequencyMsSupplier,
Supplier<NonRealTimeStatsUpdater> nonRealTimeStatsUpdaterSupplier) {
MemoryAllocatorImpl result = singleton;
boolean created = false;
try {
Expand Down Expand Up @@ -155,7 +149,10 @@ private static MemoryAllocatorImpl create(OutOfOffHeapMemoryListener ooohml,
}
}

result = new MemoryAllocatorImpl(ooohml, stats, slabs, updateOffHeapStatsFrequencyMs);
result = new MemoryAllocatorImpl(ooohml, stats, slabs,
updateOffHeapStatsFrequencyMsSupplier == null ? UPDATE_OFF_HEAP_STATS_FREQUENCY_MS
: updateOffHeapStatsFrequencyMsSupplier.get(),
nonRealTimeStatsUpdaterSupplier);
singleton = result;
LifecycleListener.invokeAfterCreate(result);
created = true;
Expand All @@ -170,16 +167,10 @@ private static MemoryAllocatorImpl create(OutOfOffHeapMemoryListener ooohml,
}
}
}
result.start();
return result;
}

static MemoryAllocatorImpl createForUnitTest(OutOfOffHeapMemoryListener ooohml,
OffHeapMemoryStats stats, int slabCount, long offHeapMemorySize, long maxSlabSize,
SlabFactory memChunkFactory) {
return create(ooohml, stats, slabCount, offHeapMemorySize, maxSlabSize, null, memChunkFactory,
UPDATE_OFF_HEAP_STATS_FREQUENCY_MS);
}

public static MemoryAllocatorImpl createForUnitTest(OutOfOffHeapMemoryListener oooml,
OffHeapMemoryStats stats, Slab[] slabs) {
int slabCount = 0;
Expand All @@ -196,10 +187,9 @@ public static MemoryAllocatorImpl createForUnitTest(OutOfOffHeapMemoryListener o
}
}
return create(oooml, stats, slabCount, offHeapMemorySize, maxSlabSize, slabs, null,
UPDATE_OFF_HEAP_STATS_FREQUENCY_MS);
null, () -> null);
}


private void reuse(OutOfOffHeapMemoryListener oooml, OffHeapMemoryStats newStats,
long offHeapMemorySize, Slab[] slabs) {
if (isClosed()) {
Expand All @@ -223,7 +213,8 @@ private void reuse(OutOfOffHeapMemoryListener oooml, OffHeapMemoryStats newStats

private MemoryAllocatorImpl(final OutOfOffHeapMemoryListener oooml,
final OffHeapMemoryStats stats, final Slab[] slabs,
int updateOffHeapStatsFrequencyMs) {
int updateOffHeapStatsFrequencyMs,
Supplier<NonRealTimeStatsUpdater> nonRealTimeStatsUpdaterSupplier) {
if (oooml == null) {
throw new IllegalArgumentException("OutOfOffHeapMemoryListener is null");
}
Expand All @@ -239,11 +230,19 @@ private MemoryAllocatorImpl(final OutOfOffHeapMemoryListener oooml,
this.stats.incMaxMemory(freeList.getTotalMemory());
this.stats.incFreeMemory(freeList.getTotalMemory());

updateNonRealTimeStatsExecutor =
LoggingExecutors.newSingleThreadScheduledExecutor("Update Freelist Stats thread");
updateNonRealTimeStatsFuture =
updateNonRealTimeStatsExecutor.scheduleAtFixedRate(freeList::updateNonRealTimeStats, 0,
updateOffHeapStatsFrequencyMs, TimeUnit.MILLISECONDS);
this.updateOffHeapStatsFrequencyMs = updateOffHeapStatsFrequencyMs;

if (nonRealTimeStatsUpdaterSupplier == null) {
nonRealTimeStatsUpdater = new NonRealTimeStatsUpdater(freeList::updateNonRealTimeStats);
} else {
nonRealTimeStatsUpdater = nonRealTimeStatsUpdaterSupplier.get();
}
}

void start() {
if (nonRealTimeStatsUpdater != null) {
nonRealTimeStatsUpdater.start(updateOffHeapStatsFrequencyMs);
}
}

public List<OffHeapStoredObject> getLostChunks(InternalCache cache) {
Expand Down Expand Up @@ -407,8 +406,9 @@ private void realClose() {
if (setClosed()) {
freeList.freeSlabs();
stats.close();
updateNonRealTimeStatsFuture.cancel(true);
updateNonRealTimeStatsExecutor.shutdown();
if (nonRealTimeStatsUpdater != null) {
nonRealTimeStatsUpdater.stop();
}
singleton = null;
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.offheap;

import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.geode.logging.internal.executors.LoggingExecutors;

public class NonRealTimeStatsUpdater {
private final Runnable updateTask;
private final ScheduledExecutorService updateNonRealTimeStatsExecutor;
private final AtomicReference<ScheduledFuture<?>> updateNonRealTimeStatsFuture =
new AtomicReference<>();

NonRealTimeStatsUpdater(Runnable updateTask) {
this.updateTask = updateTask;
updateNonRealTimeStatsExecutor =
LoggingExecutors.newSingleThreadScheduledExecutor("Update Freelist Stats thread");

}

void start(int updateOffHeapStatsFrequencyMs) {
updateNonRealTimeStatsFuture
.set(updateNonRealTimeStatsExecutor.scheduleAtFixedRate(updateTask, 0,
updateOffHeapStatsFrequencyMs, TimeUnit.MILLISECONDS));
}

void stop() {
updateNonRealTimeStatsFuture.get().cancel(true);
updateNonRealTimeStatsExecutor.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package org.apache.geode.internal.offheap;

import java.lang.reflect.Method;
import java.util.function.Supplier;

import org.apache.geode.StatisticDescriptor;
import org.apache.geode.Statistics;
Expand Down Expand Up @@ -219,30 +220,20 @@ public static MemoryAllocator createOffHeapStorage(StatisticsFactory sf, long of
// ooohml provides the hook for disconnecting and closing cache on OutOfOffHeapMemoryException
OutOfOffHeapMemoryListener ooohml =
new DisconnectingOutOfOffHeapMemoryListener((InternalDistributedSystem) system);
return basicCreateOffHeapStorage(sf, offHeapMemorySize, ooohml);
return basicCreateOffHeapStorage(sf, offHeapMemorySize, ooohml, null, null);
}

static MemoryAllocator basicCreateOffHeapStorage(StatisticsFactory sf, long offHeapMemorySize,
OutOfOffHeapMemoryListener ooohml) {
final OffHeapMemoryStats stats = new OffHeapStorage(sf);

final long maxSlabSize = calcMaxSlabSize(offHeapMemorySize);

final int slabCount = calcSlabCount(maxSlabSize, offHeapMemorySize);

return MemoryAllocatorImpl.create(ooohml, stats, slabCount, offHeapMemorySize, maxSlabSize);
}

static MemoryAllocator basicCreateOffHeapStorage(StatisticsFactory sf, long offHeapMemorySize,
OutOfOffHeapMemoryListener ooohml, int updateOffHeapStatsFrequencyMs) {
OutOfOffHeapMemoryListener ooohml, Supplier<Integer> updateOffHeapStatsFrequencyMsSupplier,
Supplier<NonRealTimeStatsUpdater> nonRealTimeStatsUpdaterSupplier) {
final OffHeapMemoryStats stats = new OffHeapStorage(sf);

final long maxSlabSize = calcMaxSlabSize(offHeapMemorySize);

final int slabCount = calcSlabCount(maxSlabSize, offHeapMemorySize);

return MemoryAllocatorImpl.create(ooohml, stats, slabCount, offHeapMemorySize, maxSlabSize,
updateOffHeapStatsFrequencyMs);
updateOffHeapStatsFrequencyMsSupplier, nonRealTimeStatsUpdaterSupplier);
}

private static final long MAX_SLAB_SIZE = Integer.MAX_VALUE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ public void testCreate() {
NullOutOfOffHeapMemoryListener listener = new NullOutOfOffHeapMemoryListener();
NullOffHeapMemoryStats stats = new NullOffHeapMemoryStats();
try {
MemoryAllocatorImpl.createForUnitTest(listener, stats, 10, 950, 100, size -> {
MemoryAllocatorImpl.create(listener, stats, 10, 950, 100, null, size -> {
throw new OutOfMemoryError("expected");
});
}, null, () -> null);
} catch (OutOfMemoryError expected) {
}
assertTrue(listener.isClosed());
Expand All @@ -98,7 +98,8 @@ public Slab create(int size) {
}
}
};
MemoryAllocatorImpl.createForUnitTest(listener, stats, 10, 950, MAX_SLAB_SIZE, factory);
MemoryAllocatorImpl.create(listener, stats, 10, 950, MAX_SLAB_SIZE, null, factory, null,
() -> null);
} catch (OutOfMemoryError expected) {
}
assertTrue(listener.isClosed());
Expand All @@ -109,7 +110,8 @@ public Slab create(int size) {
NullOffHeapMemoryStats stats = new NullOffHeapMemoryStats();
SlabFactory factory = SlabImpl::new;
MemoryAllocator ma =
MemoryAllocatorImpl.createForUnitTest(listener, stats, 10, 950, 100, factory);
MemoryAllocatorImpl.create(listener, stats, 10, 950, 100, null, factory, null,
() -> null);
try {
assertFalse(listener.isClosed());
assertFalse(stats.isClosed());
Expand All @@ -135,7 +137,8 @@ public Slab create(int size) {
listener = new NullOutOfOffHeapMemoryListener();
stats2 = new NullOffHeapMemoryStats();
MemoryAllocator ma2 =
MemoryAllocatorImpl.createForUnitTest(listener, stats2, 10, 950, 100, factory);
MemoryAllocatorImpl.create(listener, stats2, 10, 950, 100, null, factory, null,
() -> null);
assertSame(ma, ma2);
assertTrue(stats.isClosed());
assertFalse(listener.isClosed());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public void setUp() {
OffHeapMemoryStats stats = mock(OffHeapMemoryStats.class);

ma = MemoryAllocatorImpl.create(ooohml, stats, 3, OffHeapStorage.MIN_SLAB_SIZE * 3,
OffHeapStorage.MIN_SLAB_SIZE);
OffHeapStorage.MIN_SLAB_SIZE, null, () -> null);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public void setUp() {

memoryAllocator =
MemoryAllocatorImpl.create(listener, stats, 1, OffHeapStorage.MIN_SLAB_SIZE,
OffHeapStorage.MIN_SLAB_SIZE);
OffHeapStorage.MIN_SLAB_SIZE, null, () -> null);

offHeapRegionEntryHelperInstance =
spy(new OffHeapRegionEntryHelperInstance(ohAddress -> offHeapStoredObject,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,8 @@ public void testCreateOffHeapStorage() {
StatisticsFactory localStatsFactory = new LocalStatisticsFactory(null);
OutOfOffHeapMemoryListener ooohml = mock(OutOfOffHeapMemoryListener.class);
MemoryAllocator ma =
OffHeapStorage.basicCreateOffHeapStorage(localStatsFactory, 1024 * 1024, ooohml);
OffHeapStorage.basicCreateOffHeapStorage(localStatsFactory, 1024 * 1024, ooohml, null,
() -> null);
try {
OffHeapMemoryStats stats = ma.getStats();
assertNotNull(stats.getStats());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ public void testUpdateNonRealTimeOffHeapStorageStats() {
StatisticsFactory localStatsFactory = new LocalStatisticsFactory(null);
OutOfOffHeapMemoryListener ooohml = mock(OutOfOffHeapMemoryListener.class);
MemoryAllocator ma =
OffHeapStorage.basicCreateOffHeapStorage(localStatsFactory, 1024 * 1024, ooohml, 100);
OffHeapStorage.basicCreateOffHeapStorage(localStatsFactory, 1024 * 1024, ooohml, () -> 100,
null);
try {
OffHeapMemoryStats stats = ma.getStats();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public void setUp() {
OffHeapMemoryStats stats = mock(OffHeapMemoryStats.class);

ma = MemoryAllocatorImpl.create(ooohml, stats, 3, OffHeapStorage.MIN_SLAB_SIZE * 3,
OffHeapStorage.MIN_SLAB_SIZE);
OffHeapStorage.MIN_SLAB_SIZE, null, () -> null);
}

@After
Expand Down

0 comments on commit 73aa4b3

Please sign in to comment.