Skip to content

Commit

Permalink
Add a GCThrashingPerPeriod option for MemoryMonitor (apache#11885)
Browse files Browse the repository at this point in the history
* Add a GCThrashingPerPeriod option for MemoryMonitor
  • Loading branch information
aaltay authored Jun 3, 2020
1 parent f31ac46 commit 043cdfd
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,22 @@ public Dataflow create(PipelineOptions options) {

void setDumpHeapOnOOM(boolean dumpHeapBeforeExit);

/**
* The GC thrashing threshold percentage. A given period of time is considered "thrashing" if this
* percentage of CPU time is spent in garbage collection. Dataflow will force fail tasks after
* sustained periods of thrashing.
*
* <p>If {@literal 100} is given as the value, MemoryMonitor will be disabled.
*/
@Description(
"The GC thrashing threshold percentage. A given period of time is considered \"thrashing\" if this "
+ "percentage of CPU time is spent in garbage collection. Dataflow will force fail tasks after "
+ "sustained periods of thrashing.")
@Default.Double(50.0)
Double getGCThrashingPercentagePerPeriod();

void setGCThrashingPercentagePerPeriod(Double value);

/**
* The size of the worker's in-memory cache, in megabytes.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,6 @@ public class MemoryMonitor implements Runnable, StatusDataProvider {
*/
private static final int NUM_MONITORED_PERIODS = 4; // ie 1 min's worth.

/**
* The GC thrashing threshold (0.00 - 100.00) for every period. If the time spent on garbage
* collection in one period exceeds this threshold, that period is considered to be in GC
* thrashing.
*/
private static final double GC_THRASHING_PERCENTAGE_PER_PERIOD = 50.0;

/**
* The <code>(# monitored periods in GC thrashing) / (# monitored
* periods)</code> threshold after which the server is considered to be in GC thrashing, expressed
Expand Down Expand Up @@ -174,6 +167,12 @@ public long totalGCTimeMilliseconds() {
/** If true, dump the heap when thrashing or requested. */
private final boolean canDumpHeap;

/**
* The GC thrashing threshold for every period. If the time spent on garbage collection in one
* period exceeds this threshold, that period is considered to be in GC thrashing.
*/
private final double gcThrashingPercentagePerPeriod;

private final AtomicBoolean isThrashing = new AtomicBoolean(false);

private final AtomicBoolean isRunning = new AtomicBoolean(false);
Expand All @@ -199,11 +198,14 @@ public static MemoryMonitor fromOptions(PipelineOptions options) {
DataflowPipelineDebugOptions debugOptions = options.as(DataflowPipelineDebugOptions.class);
String uploadToGCSPath = debugOptions.getSaveHeapDumpsToGcsPath();
boolean canDumpHeap = uploadToGCSPath != null || debugOptions.getDumpHeapOnOOM();
double gcThrashingPercentagePerPeriod = debugOptions.getGCThrashingPercentagePerPeriod();

return new MemoryMonitor(
new SystemGCStatsProvider(),
DEFAULT_SLEEP_TIME_MILLIS,
DEFAULT_SHUT_DOWN_AFTER_NUM_GCTHRASHING,
canDumpHeap,
gcThrashingPercentagePerPeriod,
uploadToGCSPath,
getLoggingDir());
}
Expand All @@ -214,13 +216,15 @@ static MemoryMonitor forTest(
long sleepTimeMillis,
int shutDownAfterNumGCThrashing,
boolean canDumpHeap,
double gcThrashingPercentagePerPeriod,
@Nullable String uploadToGCSPath,
File localDumpFolder) {
return new MemoryMonitor(
gcStatsProvider,
sleepTimeMillis,
shutDownAfterNumGCThrashing,
canDumpHeap,
gcThrashingPercentagePerPeriod,
uploadToGCSPath,
localDumpFolder);
}
Expand All @@ -230,12 +234,14 @@ private MemoryMonitor(
long sleepTimeMillis,
int shutDownAfterNumGCThrashing,
boolean canDumpHeap,
double gcThrashingPercentagePerPeriod,
@Nullable String uploadToGCSPath,
File localDumpFolder) {
this.gcStatsProvider = gcStatsProvider;
this.sleepTimeMillis = sleepTimeMillis;
this.shutDownAfterNumGCThrashing = shutDownAfterNumGCThrashing;
this.canDumpHeap = canDumpHeap;
this.gcThrashingPercentagePerPeriod = gcThrashingPercentagePerPeriod;
this.uploadToGCSPath = uploadToGCSPath;
this.localDumpFolder = localDumpFolder;
}
Expand Down Expand Up @@ -404,7 +410,7 @@ private boolean wasLastPeriodInGCThrashing(long now, long lastTimeWokeUp) {
maxGCPercentage.set(Math.max(maxGCPercentage.get(), gcPercentage));
timeInGC = inGC;

return gcPercentage > GC_THRASHING_PERCENTAGE_PER_PERIOD;
return gcPercentage > this.gcThrashingPercentagePerPeriod;
}

/**
Expand Down Expand Up @@ -467,6 +473,14 @@ private void shutDownDueToGcThrashing(int thrashingCount) {
public void run() {
synchronized (waitingForStateChange) {
Preconditions.checkState(!isRunning.getAndSet(true), "already running");

if (this.gcThrashingPercentagePerPeriod <= 0 || this.gcThrashingPercentagePerPeriod >= 100) {
LOG.warn(
"gcThrashingPercentagePerPeriod: {} is not valid value. Not starting MemoryMonitor.",
this.gcThrashingPercentagePerPeriod);
isRunning.set(false);
}

waitingForStateChange.notifyAll();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void setup() throws IOException {
provider = new FakeGCStatsProvider();
localDumpFolder = tempFolder.newFolder();
// Update every 10ms, never shutdown VM.
monitor = MemoryMonitor.forTest(provider, 10, 0, false, null, localDumpFolder);
monitor = MemoryMonitor.forTest(provider, 10, 0, false, 50.0, null, localDumpFolder);
thread = new Thread(monitor);
thread.start();
}
Expand Down Expand Up @@ -122,7 +122,8 @@ public void heapDumpTwice() throws Exception {
@Test
public void uploadToGcs() throws Exception {
File remoteFolder = tempFolder.newFolder();
monitor = MemoryMonitor.forTest(provider, 10, 0, true, remoteFolder.getPath(), localDumpFolder);
monitor =
MemoryMonitor.forTest(provider, 10, 0, true, 50.0, remoteFolder.getPath(), localDumpFolder);

// Force the monitor to generate a local heap dump
monitor.dumpHeap();
Expand All @@ -138,12 +139,28 @@ public void uploadToGcs() throws Exception {

@Test
public void uploadToGcsDisabled() throws Exception {
monitor = MemoryMonitor.forTest(provider, 10, 0, true, null, localDumpFolder);
monitor = MemoryMonitor.forTest(provider, 10, 0, true, 50.0, null, localDumpFolder);

// Force the monitor to generate a local heap dump
monitor.dumpHeap();

// Try to upload the heap dump
assertFalse(monitor.tryUploadHeapDumpIfItExists());
}

@Test
public void disableMemoryMonitor() throws Exception {
MemoryMonitor disabledMonitor =
MemoryMonitor.forTest(provider, 10, 0, true, 100.0, null, localDumpFolder);
Thread disabledMonitorThread = new Thread(disabledMonitor);
disabledMonitorThread.start();

// Monitor thread should stop quickly after starting. Wait 10 seconds, and check that monitor
// thread is not alive.
disabledMonitorThread.join(10000);
assertFalse(disabledMonitorThread.isAlive());

// Enabled monitor thread should still be running.
assertTrue(thread.isAlive());
}
}

0 comments on commit 043cdfd

Please sign in to comment.