Skip to content

Commit

Permalink
HADOOP-19330. Stream Leaks
Browse files Browse the repository at this point in the history
Add a Stream statistic for this with

* Capability of FS (if !prefetching) and stream itself (tested)
* statistic collected in stream and merged with FS (tested)
* test tuning (with failing test fixed)
* doc review

Change-Id: I747a9477e0571d729c81e6f17dac127ffb6ade60
  • Loading branch information
steveloughran committed Nov 13, 2024
1 parent 74496c2 commit 9daa3ee
Show file tree
Hide file tree
Showing 10 changed files with 181 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.util.functional.RunnableRaisingIOE;

import static java.util.Objects.requireNonNull;
Expand All @@ -51,6 +52,13 @@ public class LeakReporter implements Closeable {
private static final Logger LEAK_LOG =
LoggerFactory.getLogger(RESOURCE_LEAKS_LOG_NAME);

/**
* Format string used to build the thread information: {@value}.
*/
@VisibleForTesting
static final String THREAD_FORMAT =
"; thread: %s; id: %d";

/**
* Re-entrancy check.
*/
Expand Down Expand Up @@ -91,7 +99,9 @@ public LeakReporter(
// This includes the error string to print, so as to avoid
// constructing objects in finalize().
this.leakException = new IOException(message
+ "; thread: " + Thread.currentThread().getName());
+ String.format(THREAD_FORMAT,
Thread.currentThread().getName(),
Thread.currentThread().getId()));
}

/**
Expand All @@ -102,7 +112,13 @@ public void close() {
try {
if (!closed.getAndSet(true) && isOpen.getAsBoolean()) {
// log a warning with the creation stack
LEAK_LOG.warn(leakException.getMessage(), leakException);
LEAK_LOG.warn(leakException.getMessage());
// The creation stack is logged at INFO, so that
// it is possible to configure logging to print
// the name of files left open, without printing
// the stacks. This is better for production use.

LEAK_LOG.info("stack", leakException);
closeAction.apply();
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,14 @@
@InterfaceStability.Evolving
public final class StreamStatisticNames {

/**
* Count of Stream leaks from an application which
* is not cleaning up correctly.
* Value :{@value}.
*/
public static final String STREAM_LEAKS =
"stream_leaks";

/**
* Count of times the TCP stream was aborted.
* Value: {@value}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@
import java.util.concurrent.atomic.AtomicInteger;

import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.test.AbstractHadoopTestBase;
import org.apache.hadoop.test.GenericTestUtils;

import static org.apache.hadoop.fs.impl.LeakReporter.THREAD_FORMAT;
import static org.apache.hadoop.test.GenericTestUtils.LogCapturer.captureLogs;

public final class TestLeakReporter extends AbstractHadoopTestBase {
Expand All @@ -57,11 +57,6 @@ public final class TestLeakReporter extends AbstractHadoopTestBase {
*/
private final AtomicInteger closeCount = new AtomicInteger();

@Before
public void setup() throws Exception {
closeCount.set(0);
}

/**
* Big test: creates a reporter, closes it.
* Verifies that the error message and stack traces is printed when
Expand Down Expand Up @@ -91,14 +86,19 @@ public void testLeakInvocation() throws Throwable {
final String output = logs.getOutput();
LOG.info("output of leak log is {}", output);

final String threadInfo = String.format(THREAD_FORMAT,
oldName,
Thread.currentThread().getId());
// log auditing
Assertions.assertThat(output)
.describedAs("output from the logs")
.contains("WARN")
.contains(message)
.contains(Thread.currentThread().getName())
.contains(oldName)
.contains("TestLeakReporter.testLeakInvocation");

.contains(threadInfo)
.contains("TestLeakReporter.testLeakInvocation")
.contains("INFO")
.contains("stack");

// no reentrancy
expectClose(reporter, 1);
Expand Down Expand Up @@ -134,14 +134,21 @@ public void testLeakSkipped() throws Throwable {
expectClose(reporter, 0);
}

/**
* If the probe raises an exception, the exception is swallowed
* and the close action is never invoked.
*/
@Test
public void testProbeFailureSwallowed() throws Throwable {
final LeakReporter reporter = new LeakReporter("<message>",
this::raiseNPE,
this::closed);
expectClose(reporter, 1);
expectClose(reporter, 0);
}

/**
* Any exception raised in the close action it is swallowed.
*/
@Test
public void testCloseActionSwallowed() throws Throwable {
final LeakReporter reporter = new LeakReporter("<message>",
Expand All @@ -150,8 +157,8 @@ public void testCloseActionSwallowed() throws Throwable {
reporter.close();

Assertions.assertThat(reporter.isClosed())
.describedAs("reporter closed)").
isTrue();
.describedAs("reporter closed)")
.isTrue();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.fs.statistics.IOStatisticsContext;
import org.apache.hadoop.fs.statistics.StreamStatisticNames;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
import org.apache.hadoop.fs.store.LogExactlyOnce;
import org.apache.hadoop.fs.store.audit.AuditEntryPoint;
Expand Down Expand Up @@ -5593,6 +5594,10 @@ public boolean hasPathCapability(final Path path, final String capability)
case AWS_S3_ACCESS_GRANTS_ENABLED:
return s3AccessGrantsEnabled;

// stream leak detection.
case StreamStatisticNames.STREAM_LEAKS:
return !prefetchEnabled;

default:
// is it a performance flag?
if (performanceFlags.hasCapability(capability)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.slf4j.LoggerFactory;

import org.apache.hadoop.fs.impl.LeakReporter;
import org.apache.hadoop.fs.statistics.StreamStatisticNames;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
Expand Down Expand Up @@ -294,11 +295,14 @@ private boolean isStreamOpen() {
}

/**
* Brute force stream close.
* Brute force stream close; invoked by {@link LeakReporter}.
* All exceptions raised are ignored.
*/
private void abortInFinalizer() {
try {
// stream was leaked: update statistic
streamStatistics.streamLeaked();
// abort the stream. This merges statistics into the filesystem.
closeStream("finalize()", true, true).get();
} catch (InterruptedException | ExecutionException ignroed) {
/* ignore this failure shutdown */
Expand Down Expand Up @@ -1418,6 +1422,7 @@ public boolean hasCapability(String capability) {
switch (toLowerCase(capability)) {
case StreamCapabilities.IOSTATISTICS:
case StreamCapabilities.IOSTATISTICS_CONTEXT:
case StreamStatisticNames.STREAM_LEAKS:
case StreamCapabilities.READAHEAD:
case StreamCapabilities.UNBUFFER:
case StreamCapabilities.VECTOREDIO:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -862,6 +862,7 @@ private InputStreamStatistics(
this.filesystemStatistics = filesystemStatistics;
IOStatisticsStore st = iostatisticsStore()
.withCounters(
StreamStatisticNames.STREAM_LEAKS,
StreamStatisticNames.STREAM_READ_ABORTED,
StreamStatisticNames.STREAM_READ_BYTES_DISCARDED_ABORT,
StreamStatisticNames.STREAM_READ_CLOSED,
Expand Down Expand Up @@ -1126,6 +1127,15 @@ public void close() {
merge(true);
}

/**
* Stream was leaked.
*/
public void streamLeaked() {
increment(StreamStatisticNames.STREAM_LEAKS);
// merge as if closed.
merge(true);
}

/**
* {@inheritDoc}.
* As well as incrementing the {@code STREAM_READ_SEEK_POLICY_CHANGED}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,10 @@ public enum Statistic {
StoreStatisticNames.OBJECT_PUT_BYTES_PENDING,
"number of bytes queued for upload/being actively uploaded",
TYPE_GAUGE),
STREAM_LEAKS(
StreamStatisticNames.STREAM_LEAKS,
"Streams detected as not closed safely",
TYPE_COUNTER),
STREAM_READ_ABORTED(
StreamStatisticNames.STREAM_READ_ABORTED,
"Count of times the TCP stream was aborted",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,4 +210,8 @@ void readVectoredOperationStarted(int numIncomingRanges,
*/
DurationTracker initiateInnerStreamClose(boolean abort);

/**
* Stream was leaked.
*/
default void streamLeaked() {};
}
Original file line number Diff line number Diff line change
Expand Up @@ -511,28 +511,34 @@ This will free up the connection until another read operation is invoked -yet
still re-open faster than if `open(Path)` were invoked.

Applications may also be "leaking" http connections by failing to
close() them.
This can only be fixed at a code level
`close()` them. This is potentially fatal as eventually the connection pool
can get exhausted -at which point the program will no longer work.

Applications MUST call `close()` on an input stream when the contents of
the file are longer needed.
This can only be fixed in the application code: it is _not_ a bug in
the S3A filesystem.

1. Applications MUST call `close()` on an input stream when the contents of
the file are longer needed.
2. If long-lived applications eventually fail with unrecoverable
`ApiCallTimeout` exceptions, they are not doing so.

To aid in identifying the location of these leaks, when a JVM garbage
collection releases an unreferenced `S3AInputStream` instance,
it will log at `WARN` level that it has not been closed,
listing the filename, thread creating the file, and the stack trace
of the `open()` call
listing the file URL, and the thread name + ID of the the thread
which creating the file.
The the stack trace of the `open()` call will be logged at `INFO`

```
2024-11-12 15:08:56,410 [Finalizer] WARN resource.leaks (LeakReporter.java:close(104))
- Stream not closed while reading s3a://bucket/test/testFinalizer; thread: JUnit-testFinalizer
java.io.IOException: Stream not closed while reading s3a://bucket/test/testFinalizer; thread: JUnit-testFinalizer
at org.apache.hadoop.fs.impl.LeakReporter.<init>(LeakReporter.java:93)
at org.apache.hadoop.fs.s3a.S3AInputStream.<init>(S3AInputStream.java:256)
at org.apache.hadoop.fs.s3a.S3AFileSystem.executeOpen(S3AFileSystem.java:1890)
at org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:1840)
2024-11-13 12:48:24,537 [Finalizer] WARN resource.leaks (LeakReporter.java:close(114)) - Stream not closed while reading s3a://bucket/test/testFinalizer; thread: JUnit-testFinalizer; id: 11
2024-11-13 12:48:24,537 [Finalizer] INFO resource.leaks (LeakReporter.java:close(120)) - stack
java.io.IOException: Stream not closed while reading s3a://bucket/test/testFinalizer; thread: JUnit-testFinalizer; id: 11
at org.apache.hadoop.fs.impl.LeakReporter.<init>(LeakReporter.java:101)
at org.apache.hadoop.fs.s3a.S3AInputStream.<init>(S3AInputStream.java:257)
at org.apache.hadoop.fs.s3a.S3AFileSystem.executeOpen(S3AFileSystem.java:1891)
at org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:1841)
at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:997)
at org.apache.hadoop.fs.s3a.ITestS3AInputStreamLeakage.testFinalizer(ITestS3AInputStreamLeakage.java:79)
at org.apache.hadoop.fs.s3a.ITestS3AInputStreamLeakage.testFinalizer(ITestS3AInputStreamLeakage.java:99)
```

It will also `abort()` the HTTP connection, freeing up space in the connection pool.
Expand All @@ -543,14 +549,26 @@ rapid enough to prevent an application running out of connections.
It is possible to stop these warning messages from being logged,
by restricting the log `org.apache.hadoop.fs.resource.leaks` to
only log at `ERROR` or above.
This will also disable error logging for all other resources whose leakes
are tracked.

This will also disable error logging for _all other resources whose leaks
are detected.

```properties
log4j.logger.org.apache.hadoop.fs.s3a.connection.leaks=ERROR
```

To disable stack traces without the URI/thread information, set the log level to `WARN`

```properties
log4j.logger.org.apache.hadoop.fs.s3a.connection.leaks=WARN
```

This is better for production deployments: leakages are reported but
stack traces only of relevance to the application developers are
omitted.

Finally, note that the filesystem and thread context IOStatistic `stream_leaks"` is updated;
if these statistics are collected then the existence of leakages can be detected.

### <a name="inconsistent-config"></a> Inconsistent configuration across a cluster

All hosts in the cluster need to have the configuration secrets;
Expand Down
Loading

0 comments on commit 9daa3ee

Please sign in to comment.