-
Notifications
You must be signed in to change notification settings - Fork 686
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GEODE-10323: Remove schedule threads in MemoryAllocatorImpl constructor #7715
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,10 +20,8 @@ | |
import java.util.HashSet; | ||
import java.util.List; | ||
import java.util.Set; | ||
import java.util.concurrent.ScheduledExecutorService; | ||
import java.util.concurrent.ScheduledFuture; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
import java.util.function.Supplier; | ||
|
||
import org.apache.logging.log4j.Logger; | ||
|
||
|
@@ -39,7 +37,6 @@ | |
import org.apache.geode.internal.lang.SystemProperty; | ||
import org.apache.geode.internal.offheap.annotations.OffHeapIdentifier; | ||
import org.apache.geode.internal.offheap.annotations.Unretained; | ||
import org.apache.geode.logging.internal.executors.LoggingExecutors; | ||
import org.apache.geode.logging.internal.log4j.api.LogService; | ||
import org.apache.geode.util.internal.GeodeGlossary; | ||
|
||
|
@@ -64,14 +61,14 @@ public class MemoryAllocatorImpl implements MemoryAllocator { | |
SystemProperty.getProductIntegerProperty( | ||
"off-heap-stats-update-frequency-ms").orElse(3600000); | ||
|
||
private final ScheduledExecutorService updateNonRealTimeStatsExecutor; | ||
|
||
private final ScheduledFuture<?> updateNonRealTimeStatsFuture; | ||
private final NonRealTimeStatsUpdater nonRealTimeStatsUpdater; | ||
|
||
private volatile OffHeapMemoryStats stats; | ||
|
||
private volatile OutOfOffHeapMemoryListener ooohml; | ||
|
||
private final int updateOffHeapStatsFrequencyMs; | ||
|
||
OutOfOffHeapMemoryListener getOutOfOffHeapMemoryListener() { | ||
return ooohml; | ||
} | ||
|
@@ -98,20 +95,17 @@ public static MemoryAllocatorImpl getAllocator() { | |
|
||
public static MemoryAllocator create(OutOfOffHeapMemoryListener ooohml, OffHeapMemoryStats stats, | ||
int slabCount, long offHeapMemorySize, long maxSlabSize, | ||
int updateOffHeapStatsFrequencyMs) { | ||
Supplier<Integer> updateOffHeapStatsFrequencyMsSupplier, | ||
Supplier<NonRealTimeStatsUpdater> nonRealTimeStatsUpdaterSupplier) { | ||
return create(ooohml, stats, slabCount, offHeapMemorySize, maxSlabSize, null, | ||
SlabImpl::new, updateOffHeapStatsFrequencyMs); | ||
SlabImpl::new, updateOffHeapStatsFrequencyMsSupplier, nonRealTimeStatsUpdaterSupplier); | ||
} | ||
|
||
public static MemoryAllocator create(OutOfOffHeapMemoryListener ooohml, OffHeapMemoryStats stats, | ||
int slabCount, long offHeapMemorySize, long maxSlabSize) { | ||
return create(ooohml, stats, slabCount, offHeapMemorySize, maxSlabSize, null, | ||
SlabImpl::new, UPDATE_OFF_HEAP_STATS_FREQUENCY_MS); | ||
} | ||
|
||
private static MemoryAllocatorImpl create(OutOfOffHeapMemoryListener ooohml, | ||
static MemoryAllocatorImpl create(OutOfOffHeapMemoryListener ooohml, | ||
OffHeapMemoryStats stats, int slabCount, long offHeapMemorySize, long maxSlabSize, | ||
Slab[] slabs, SlabFactory slabFactory, int updateOffHeapStatsFrequencyMs) { | ||
Slab[] slabs, SlabFactory slabFactory, | ||
Supplier<Integer> updateOffHeapStatsFrequencyMsSupplier, | ||
Supplier<NonRealTimeStatsUpdater> nonRealTimeStatsUpdaterSupplier) { | ||
MemoryAllocatorImpl result = singleton; | ||
boolean created = false; | ||
try { | ||
|
@@ -155,7 +149,10 @@ private static MemoryAllocatorImpl create(OutOfOffHeapMemoryListener ooohml, | |
} | ||
} | ||
|
||
result = new MemoryAllocatorImpl(ooohml, stats, slabs, updateOffHeapStatsFrequencyMs); | ||
result = new MemoryAllocatorImpl(ooohml, stats, slabs, | ||
updateOffHeapStatsFrequencyMsSupplier == null ? UPDATE_OFF_HEAP_STATS_FREQUENCY_MS | ||
: updateOffHeapStatsFrequencyMsSupplier.get(), | ||
nonRealTimeStatsUpdaterSupplier); | ||
singleton = result; | ||
LifecycleListener.invokeAfterCreate(result); | ||
created = true; | ||
|
@@ -170,16 +167,10 @@ private static MemoryAllocatorImpl create(OutOfOffHeapMemoryListener ooohml, | |
} | ||
} | ||
} | ||
result.start(); | ||
return result; | ||
} | ||
|
||
static MemoryAllocatorImpl createForUnitTest(OutOfOffHeapMemoryListener ooohml, | ||
OffHeapMemoryStats stats, int slabCount, long offHeapMemorySize, long maxSlabSize, | ||
SlabFactory memChunkFactory) { | ||
return create(ooohml, stats, slabCount, offHeapMemorySize, maxSlabSize, null, memChunkFactory, | ||
UPDATE_OFF_HEAP_STATS_FREQUENCY_MS); | ||
} | ||
|
||
public static MemoryAllocatorImpl createForUnitTest(OutOfOffHeapMemoryListener oooml, | ||
OffHeapMemoryStats stats, Slab[] slabs) { | ||
int slabCount = 0; | ||
|
@@ -196,10 +187,9 @@ public static MemoryAllocatorImpl createForUnitTest(OutOfOffHeapMemoryListener o | |
} | ||
} | ||
return create(oooml, stats, slabCount, offHeapMemorySize, maxSlabSize, slabs, null, | ||
UPDATE_OFF_HEAP_STATS_FREQUENCY_MS); | ||
null, () -> null); | ||
} | ||
|
||
|
||
private void reuse(OutOfOffHeapMemoryListener oooml, OffHeapMemoryStats newStats, | ||
long offHeapMemorySize, Slab[] slabs) { | ||
if (isClosed()) { | ||
|
@@ -223,7 +213,8 @@ private void reuse(OutOfOffHeapMemoryListener oooml, OffHeapMemoryStats newStats | |
|
||
private MemoryAllocatorImpl(final OutOfOffHeapMemoryListener oooml, | ||
final OffHeapMemoryStats stats, final Slab[] slabs, | ||
int updateOffHeapStatsFrequencyMs) { | ||
int updateOffHeapStatsFrequencyMs, | ||
Supplier<NonRealTimeStatsUpdater> nonRealTimeStatsUpdaterSupplier) { | ||
if (oooml == null) { | ||
throw new IllegalArgumentException("OutOfOffHeapMemoryListener is null"); | ||
} | ||
|
@@ -239,11 +230,19 @@ private MemoryAllocatorImpl(final OutOfOffHeapMemoryListener oooml, | |
this.stats.incMaxMemory(freeList.getTotalMemory()); | ||
this.stats.incFreeMemory(freeList.getTotalMemory()); | ||
|
||
updateNonRealTimeStatsExecutor = | ||
LoggingExecutors.newSingleThreadScheduledExecutor("Update Freelist Stats thread"); | ||
updateNonRealTimeStatsFuture = | ||
updateNonRealTimeStatsExecutor.scheduleAtFixedRate(freeList::updateNonRealTimeStats, 0, | ||
updateOffHeapStatsFrequencyMs, TimeUnit.MILLISECONDS); | ||
this.updateOffHeapStatsFrequencyMs = updateOffHeapStatsFrequencyMs; | ||
|
||
if (nonRealTimeStatsUpdaterSupplier == null) { | ||
nonRealTimeStatsUpdater = new NonRealTimeStatsUpdater(freeList::updateNonRealTimeStats); | ||
} else { | ||
nonRealTimeStatsUpdater = nonRealTimeStatsUpdaterSupplier.get(); | ||
} | ||
} | ||
|
||
void start() { | ||
if (nonRealTimeStatsUpdater != null) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the constructor above it looks like you always initialize nonRealTimeStatsUpdaterSupplier to non-null. So what good are these null checks? Oh I see. Your supplier returns null. That works but I think it would be better to have the supplier returns a dummy NonRealTimeStatsUpdater that does nothing. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree. As I was too quick to merge the PR, should I open a new one? |
||
nonRealTimeStatsUpdater.start(updateOffHeapStatsFrequencyMs); | ||
} | ||
} | ||
|
||
public List<OffHeapStoredObject> getLostChunks(InternalCache cache) { | ||
|
@@ -407,8 +406,9 @@ private void realClose() { | |
if (setClosed()) { | ||
freeList.freeSlabs(); | ||
stats.close(); | ||
updateNonRealTimeStatsFuture.cancel(true); | ||
updateNonRealTimeStatsExecutor.shutdown(); | ||
if (nonRealTimeStatsUpdater != null) { | ||
nonRealTimeStatsUpdater.stop(); | ||
} | ||
singleton = null; | ||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license | ||
* agreements. See the NOTICE file distributed with this work for additional information regarding | ||
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance with the License. You may obtain a | ||
* copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software distributed under the License | ||
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express | ||
* or implied. See the License for the specific language governing permissions and limitations under | ||
* the License. | ||
*/ | ||
package org.apache.geode.internal.offheap; | ||
|
||
import java.util.concurrent.ScheduledExecutorService; | ||
import java.util.concurrent.ScheduledFuture; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.atomic.AtomicReference; | ||
|
||
import org.apache.geode.logging.internal.executors.LoggingExecutors; | ||
|
||
public class NonRealTimeStatsUpdater { | ||
private final Runnable updateTask; | ||
private final ScheduledExecutorService updateNonRealTimeStatsExecutor; | ||
private final AtomicReference<ScheduledFuture<?>> updateNonRealTimeStatsFuture = | ||
new AtomicReference<>(); | ||
|
||
NonRealTimeStatsUpdater(Runnable updateTask) { | ||
this.updateTask = updateTask; | ||
updateNonRealTimeStatsExecutor = | ||
LoggingExecutors.newSingleThreadScheduledExecutor("Update Freelist Stats thread"); | ||
|
||
} | ||
|
||
void start(int updateOffHeapStatsFrequencyMs) { | ||
updateNonRealTimeStatsFuture | ||
.set(updateNonRealTimeStatsExecutor.scheduleAtFixedRate(updateTask, 0, | ||
updateOffHeapStatsFrequencyMs, TimeUnit.MILLISECONDS)); | ||
} | ||
|
||
void stop() { | ||
updateNonRealTimeStatsFuture.get().cancel(true); | ||
updateNonRealTimeStatsExecutor.shutdown(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like the update frequency "int" should be part of the NonRealTimeStatsUpdater and then you wouldn't need two suppliers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a good idea but there is one test case in which you don't pass a Supplier so that the default
NonRealTimeStatsUpdater
is constructed but you need to specify a smaller frequency so that the test does not need to wait for the default frequency value:See
testUpdateNonRealTimeOffHeapStorageStats()
from classOffHeapStorageNonRuntimeStatsJUnitTest
.For this test case I would need to specify the frequency but not the supplier.