Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GEODE-10323: Remove schedule threads in MemoryAllocatorImpl constructor #7715

Merged
merged 2 commits into from
Jun 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,8 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;

import org.apache.logging.log4j.Logger;

Expand All @@ -39,7 +37,6 @@
import org.apache.geode.internal.lang.SystemProperty;
import org.apache.geode.internal.offheap.annotations.OffHeapIdentifier;
import org.apache.geode.internal.offheap.annotations.Unretained;
import org.apache.geode.logging.internal.executors.LoggingExecutors;
import org.apache.geode.logging.internal.log4j.api.LogService;
import org.apache.geode.util.internal.GeodeGlossary;

Expand All @@ -64,14 +61,14 @@ public class MemoryAllocatorImpl implements MemoryAllocator {
SystemProperty.getProductIntegerProperty(
"off-heap-stats-update-frequency-ms").orElse(3600000);

private final ScheduledExecutorService updateNonRealTimeStatsExecutor;

private final ScheduledFuture<?> updateNonRealTimeStatsFuture;
private final NonRealTimeStatsUpdater nonRealTimeStatsUpdater;

private volatile OffHeapMemoryStats stats;

private volatile OutOfOffHeapMemoryListener ooohml;

private final int updateOffHeapStatsFrequencyMs;

OutOfOffHeapMemoryListener getOutOfOffHeapMemoryListener() {
return ooohml;
}
Expand All @@ -98,20 +95,17 @@ public static MemoryAllocatorImpl getAllocator() {

public static MemoryAllocator create(OutOfOffHeapMemoryListener ooohml, OffHeapMemoryStats stats,
int slabCount, long offHeapMemorySize, long maxSlabSize,
int updateOffHeapStatsFrequencyMs) {
Supplier<Integer> updateOffHeapStatsFrequencyMsSupplier,
Supplier<NonRealTimeStatsUpdater> nonRealTimeStatsUpdaterSupplier) {
return create(ooohml, stats, slabCount, offHeapMemorySize, maxSlabSize, null,
SlabImpl::new, updateOffHeapStatsFrequencyMs);
SlabImpl::new, updateOffHeapStatsFrequencyMsSupplier, nonRealTimeStatsUpdaterSupplier);
}

public static MemoryAllocator create(OutOfOffHeapMemoryListener ooohml, OffHeapMemoryStats stats,
int slabCount, long offHeapMemorySize, long maxSlabSize) {
return create(ooohml, stats, slabCount, offHeapMemorySize, maxSlabSize, null,
SlabImpl::new, UPDATE_OFF_HEAP_STATS_FREQUENCY_MS);
}

private static MemoryAllocatorImpl create(OutOfOffHeapMemoryListener ooohml,
static MemoryAllocatorImpl create(OutOfOffHeapMemoryListener ooohml,
OffHeapMemoryStats stats, int slabCount, long offHeapMemorySize, long maxSlabSize,
Slab[] slabs, SlabFactory slabFactory, int updateOffHeapStatsFrequencyMs) {
Slab[] slabs, SlabFactory slabFactory,
Supplier<Integer> updateOffHeapStatsFrequencyMsSupplier,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like the update frequency "int" should be part of the NonRealTimeStatsUpdater and then you wouldn't need two suppliers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a good idea but there is one test case in which you don't pass a Supplier so that the default NonRealTimeStatsUpdater is constructed but you need to specify a smaller frequency so that the test does not need to wait for the default frequency value:
See testUpdateNonRealTimeOffHeapStorageStats() from class OffHeapStorageNonRuntimeStatsJUnitTest.
For this test case I would need to specify the frequency but not the supplier.

Supplier<NonRealTimeStatsUpdater> nonRealTimeStatsUpdaterSupplier) {
MemoryAllocatorImpl result = singleton;
boolean created = false;
try {
Expand Down Expand Up @@ -155,7 +149,10 @@ private static MemoryAllocatorImpl create(OutOfOffHeapMemoryListener ooohml,
}
}

result = new MemoryAllocatorImpl(ooohml, stats, slabs, updateOffHeapStatsFrequencyMs);
result = new MemoryAllocatorImpl(ooohml, stats, slabs,
updateOffHeapStatsFrequencyMsSupplier == null ? UPDATE_OFF_HEAP_STATS_FREQUENCY_MS
: updateOffHeapStatsFrequencyMsSupplier.get(),
nonRealTimeStatsUpdaterSupplier);
singleton = result;
LifecycleListener.invokeAfterCreate(result);
created = true;
Expand All @@ -170,16 +167,10 @@ private static MemoryAllocatorImpl create(OutOfOffHeapMemoryListener ooohml,
}
}
}
result.start();
return result;
}

static MemoryAllocatorImpl createForUnitTest(OutOfOffHeapMemoryListener ooohml,
OffHeapMemoryStats stats, int slabCount, long offHeapMemorySize, long maxSlabSize,
SlabFactory memChunkFactory) {
return create(ooohml, stats, slabCount, offHeapMemorySize, maxSlabSize, null, memChunkFactory,
UPDATE_OFF_HEAP_STATS_FREQUENCY_MS);
}

public static MemoryAllocatorImpl createForUnitTest(OutOfOffHeapMemoryListener oooml,
OffHeapMemoryStats stats, Slab[] slabs) {
int slabCount = 0;
Expand All @@ -196,10 +187,9 @@ public static MemoryAllocatorImpl createForUnitTest(OutOfOffHeapMemoryListener o
}
}
return create(oooml, stats, slabCount, offHeapMemorySize, maxSlabSize, slabs, null,
UPDATE_OFF_HEAP_STATS_FREQUENCY_MS);
null, () -> null);
}


private void reuse(OutOfOffHeapMemoryListener oooml, OffHeapMemoryStats newStats,
long offHeapMemorySize, Slab[] slabs) {
if (isClosed()) {
Expand All @@ -223,7 +213,8 @@ private void reuse(OutOfOffHeapMemoryListener oooml, OffHeapMemoryStats newStats

private MemoryAllocatorImpl(final OutOfOffHeapMemoryListener oooml,
final OffHeapMemoryStats stats, final Slab[] slabs,
int updateOffHeapStatsFrequencyMs) {
int updateOffHeapStatsFrequencyMs,
Supplier<NonRealTimeStatsUpdater> nonRealTimeStatsUpdaterSupplier) {
if (oooml == null) {
throw new IllegalArgumentException("OutOfOffHeapMemoryListener is null");
}
Expand All @@ -239,11 +230,19 @@ private MemoryAllocatorImpl(final OutOfOffHeapMemoryListener oooml,
this.stats.incMaxMemory(freeList.getTotalMemory());
this.stats.incFreeMemory(freeList.getTotalMemory());

updateNonRealTimeStatsExecutor =
LoggingExecutors.newSingleThreadScheduledExecutor("Update Freelist Stats thread");
updateNonRealTimeStatsFuture =
updateNonRealTimeStatsExecutor.scheduleAtFixedRate(freeList::updateNonRealTimeStats, 0,
updateOffHeapStatsFrequencyMs, TimeUnit.MILLISECONDS);
this.updateOffHeapStatsFrequencyMs = updateOffHeapStatsFrequencyMs;

if (nonRealTimeStatsUpdaterSupplier == null) {
nonRealTimeStatsUpdater = new NonRealTimeStatsUpdater(freeList::updateNonRealTimeStats);
} else {
nonRealTimeStatsUpdater = nonRealTimeStatsUpdaterSupplier.get();
}
}

void start() {
if (nonRealTimeStatsUpdater != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the constructor above it looks like you always initialize nonRealTimeStatsUpdaterSupplier to non-null. So what good are these null checks? Oh I see. Your supplier returns null. That works but I think it would be better to have the supplier returns a dummy NonRealTimeStatsUpdater that does nothing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. As I was too quick to merge the PR, should I open a new one?

nonRealTimeStatsUpdater.start(updateOffHeapStatsFrequencyMs);
}
}

public List<OffHeapStoredObject> getLostChunks(InternalCache cache) {
Expand Down Expand Up @@ -407,8 +406,9 @@ private void realClose() {
if (setClosed()) {
freeList.freeSlabs();
stats.close();
updateNonRealTimeStatsFuture.cancel(true);
updateNonRealTimeStatsExecutor.shutdown();
if (nonRealTimeStatsUpdater != null) {
nonRealTimeStatsUpdater.stop();
}
singleton = null;
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.offheap;

import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.geode.logging.internal.executors.LoggingExecutors;

public class NonRealTimeStatsUpdater {
private final Runnable updateTask;
private final ScheduledExecutorService updateNonRealTimeStatsExecutor;
private final AtomicReference<ScheduledFuture<?>> updateNonRealTimeStatsFuture =
new AtomicReference<>();

NonRealTimeStatsUpdater(Runnable updateTask) {
this.updateTask = updateTask;
updateNonRealTimeStatsExecutor =
LoggingExecutors.newSingleThreadScheduledExecutor("Update Freelist Stats thread");

}

void start(int updateOffHeapStatsFrequencyMs) {
updateNonRealTimeStatsFuture
.set(updateNonRealTimeStatsExecutor.scheduleAtFixedRate(updateTask, 0,
updateOffHeapStatsFrequencyMs, TimeUnit.MILLISECONDS));
}

void stop() {
updateNonRealTimeStatsFuture.get().cancel(true);
updateNonRealTimeStatsExecutor.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package org.apache.geode.internal.offheap;

import java.lang.reflect.Method;
import java.util.function.Supplier;

import org.apache.geode.StatisticDescriptor;
import org.apache.geode.Statistics;
Expand Down Expand Up @@ -219,30 +220,20 @@ public static MemoryAllocator createOffHeapStorage(StatisticsFactory sf, long of
// ooohml provides the hook for disconnecting and closing cache on OutOfOffHeapMemoryException
OutOfOffHeapMemoryListener ooohml =
new DisconnectingOutOfOffHeapMemoryListener((InternalDistributedSystem) system);
return basicCreateOffHeapStorage(sf, offHeapMemorySize, ooohml);
return basicCreateOffHeapStorage(sf, offHeapMemorySize, ooohml, null, null);
}

static MemoryAllocator basicCreateOffHeapStorage(StatisticsFactory sf, long offHeapMemorySize,
OutOfOffHeapMemoryListener ooohml) {
final OffHeapMemoryStats stats = new OffHeapStorage(sf);

final long maxSlabSize = calcMaxSlabSize(offHeapMemorySize);

final int slabCount = calcSlabCount(maxSlabSize, offHeapMemorySize);

return MemoryAllocatorImpl.create(ooohml, stats, slabCount, offHeapMemorySize, maxSlabSize);
}

static MemoryAllocator basicCreateOffHeapStorage(StatisticsFactory sf, long offHeapMemorySize,
OutOfOffHeapMemoryListener ooohml, int updateOffHeapStatsFrequencyMs) {
OutOfOffHeapMemoryListener ooohml, Supplier<Integer> updateOffHeapStatsFrequencyMsSupplier,
Supplier<NonRealTimeStatsUpdater> nonRealTimeStatsUpdaterSupplier) {
final OffHeapMemoryStats stats = new OffHeapStorage(sf);

final long maxSlabSize = calcMaxSlabSize(offHeapMemorySize);

final int slabCount = calcSlabCount(maxSlabSize, offHeapMemorySize);

return MemoryAllocatorImpl.create(ooohml, stats, slabCount, offHeapMemorySize, maxSlabSize,
updateOffHeapStatsFrequencyMs);
updateOffHeapStatsFrequencyMsSupplier, nonRealTimeStatsUpdaterSupplier);
}

private static final long MAX_SLAB_SIZE = Integer.MAX_VALUE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ public void testCreate() {
NullOutOfOffHeapMemoryListener listener = new NullOutOfOffHeapMemoryListener();
NullOffHeapMemoryStats stats = new NullOffHeapMemoryStats();
try {
MemoryAllocatorImpl.createForUnitTest(listener, stats, 10, 950, 100, size -> {
MemoryAllocatorImpl.create(listener, stats, 10, 950, 100, null, size -> {
throw new OutOfMemoryError("expected");
});
}, null, () -> null);
} catch (OutOfMemoryError expected) {
}
assertTrue(listener.isClosed());
Expand All @@ -98,7 +98,8 @@ public Slab create(int size) {
}
}
};
MemoryAllocatorImpl.createForUnitTest(listener, stats, 10, 950, MAX_SLAB_SIZE, factory);
MemoryAllocatorImpl.create(listener, stats, 10, 950, MAX_SLAB_SIZE, null, factory, null,
() -> null);
} catch (OutOfMemoryError expected) {
}
assertTrue(listener.isClosed());
Expand All @@ -109,7 +110,8 @@ public Slab create(int size) {
NullOffHeapMemoryStats stats = new NullOffHeapMemoryStats();
SlabFactory factory = SlabImpl::new;
MemoryAllocator ma =
MemoryAllocatorImpl.createForUnitTest(listener, stats, 10, 950, 100, factory);
MemoryAllocatorImpl.create(listener, stats, 10, 950, 100, null, factory, null,
() -> null);
try {
assertFalse(listener.isClosed());
assertFalse(stats.isClosed());
Expand All @@ -135,7 +137,8 @@ public Slab create(int size) {
listener = new NullOutOfOffHeapMemoryListener();
stats2 = new NullOffHeapMemoryStats();
MemoryAllocator ma2 =
MemoryAllocatorImpl.createForUnitTest(listener, stats2, 10, 950, 100, factory);
MemoryAllocatorImpl.create(listener, stats2, 10, 950, 100, null, factory, null,
() -> null);
assertSame(ma, ma2);
assertTrue(stats.isClosed());
assertFalse(listener.isClosed());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public void setUp() {
OffHeapMemoryStats stats = mock(OffHeapMemoryStats.class);

ma = MemoryAllocatorImpl.create(ooohml, stats, 3, OffHeapStorage.MIN_SLAB_SIZE * 3,
OffHeapStorage.MIN_SLAB_SIZE);
OffHeapStorage.MIN_SLAB_SIZE, null, () -> null);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public void setUp() {

memoryAllocator =
MemoryAllocatorImpl.create(listener, stats, 1, OffHeapStorage.MIN_SLAB_SIZE,
OffHeapStorage.MIN_SLAB_SIZE);
OffHeapStorage.MIN_SLAB_SIZE, null, () -> null);

offHeapRegionEntryHelperInstance =
spy(new OffHeapRegionEntryHelperInstance(ohAddress -> offHeapStoredObject,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,8 @@ public void testCreateOffHeapStorage() {
StatisticsFactory localStatsFactory = new LocalStatisticsFactory(null);
OutOfOffHeapMemoryListener ooohml = mock(OutOfOffHeapMemoryListener.class);
MemoryAllocator ma =
OffHeapStorage.basicCreateOffHeapStorage(localStatsFactory, 1024 * 1024, ooohml);
OffHeapStorage.basicCreateOffHeapStorage(localStatsFactory, 1024 * 1024, ooohml, null,
() -> null);
try {
OffHeapMemoryStats stats = ma.getStats();
assertNotNull(stats.getStats());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ public void testUpdateNonRealTimeOffHeapStorageStats() {
StatisticsFactory localStatsFactory = new LocalStatisticsFactory(null);
OutOfOffHeapMemoryListener ooohml = mock(OutOfOffHeapMemoryListener.class);
MemoryAllocator ma =
OffHeapStorage.basicCreateOffHeapStorage(localStatsFactory, 1024 * 1024, ooohml, 100);
OffHeapStorage.basicCreateOffHeapStorage(localStatsFactory, 1024 * 1024, ooohml, () -> 100,
null);
try {
OffHeapMemoryStats stats = ma.getStats();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public void setUp() {
OffHeapMemoryStats stats = mock(OffHeapMemoryStats.class);

ma = MemoryAllocatorImpl.create(ooohml, stats, 3, OffHeapStorage.MIN_SLAB_SIZE * 3,
OffHeapStorage.MIN_SLAB_SIZE);
OffHeapStorage.MIN_SLAB_SIZE, null, () -> null);
}

@After
Expand Down