Skip to content

Commit aaf92fe

Browse files
HADOOP-18526. Leak of S3AInstrumentation instances via hadoop Metrics references (apache#5144)
This has triggered an OOM in a process which was churning through s3a fs instances; the increased memory footprint of IOStatistics amplified what must have been a long-standing issue with FS instances being created and not closed() * Makes sure instrumentation is closed when the FS is closed. * Uses a weak reference from metrics to instrumentation, so even if the FS wasn't closed (see HADOOP-18478), this back reference would not cause the S3AInstrumentation reference to be retained. * If S3AFileSystem is configured to log at TRACE it will log the calling stack of initialize(), so help identify where the instance is being created. This should help track down the cause of instance leakage. Contributed by Steve Loughran.
1 parent 63b9a6a commit aaf92fe

File tree

6 files changed

+289
-30
lines changed

6 files changed

+289
-30
lines changed
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.fs.impl;
20+
21+
import java.lang.ref.WeakReference;
22+
23+
import org.apache.hadoop.classification.InterfaceAudience;
24+
import org.apache.hadoop.metrics2.MetricsCollector;
25+
import org.apache.hadoop.metrics2.MetricsSource;
26+
27+
import static java.util.Objects.requireNonNull;
28+
29+
/**
30+
* A weak referenced metrics source which avoids hanging on to large objects
31+
* if somehow they don't get fully closed/cleaned up.
32+
* The JVM may clean up all objects which are only weakly referenced whenever
33+
* it does a GC, <i>even if there is no memory pressure</i>.
34+
* To avoid these refs being removed, always keep a strong reference around
35+
* somewhere.
36+
*/
37+
@InterfaceAudience.Private
38+
public class WeakRefMetricsSource implements MetricsSource {
39+
40+
/**
41+
* Name to know when unregistering.
42+
*/
43+
private final String name;
44+
45+
/**
46+
* Underlying metrics source.
47+
*/
48+
private final WeakReference<MetricsSource> sourceWeakReference;
49+
50+
/**
51+
* Constructor.
52+
* @param name Name to know when unregistering.
53+
* @param source metrics source
54+
*/
55+
public WeakRefMetricsSource(final String name, final MetricsSource source) {
56+
this.name = name;
57+
this.sourceWeakReference = new WeakReference<>(requireNonNull(source));
58+
}
59+
60+
/**
61+
* If the weak reference is non null, update the metrics.
62+
* @param collector to contain the resulting metrics snapshot
63+
* @param all if true, return all metrics even if unchanged.
64+
*/
65+
@Override
66+
public void getMetrics(final MetricsCollector collector, final boolean all) {
67+
MetricsSource metricsSource = sourceWeakReference.get();
68+
if (metricsSource != null) {
69+
metricsSource.getMetrics(collector, all);
70+
}
71+
}
72+
73+
/**
74+
* Name to know when unregistering.
75+
* @return the name passed in during construction.
76+
*/
77+
public String getName() {
78+
return name;
79+
}
80+
81+
/**
82+
* Get the source, will be null if the reference has been GC'd
83+
* @return the source reference
84+
*/
85+
public MetricsSource getSource() {
86+
return sourceWeakReference.get();
87+
}
88+
89+
@Override
90+
public String toString() {
91+
return "WeakRefMetricsSource{" +
92+
"name='" + name + '\'' +
93+
", sourceWeakReference is " +
94+
(sourceWeakReference.get() == null ? "unset" : "set") +
95+
'}';
96+
}
97+
}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,6 @@
138138
import org.apache.hadoop.fs.statistics.DurationTracker;
139139
import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
140140
import org.apache.hadoop.fs.statistics.IOStatistics;
141-
import org.apache.hadoop.fs.statistics.IOStatisticsLogging;
142141
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
143142
import org.apache.hadoop.fs.statistics.IOStatisticsContext;
144143
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
@@ -459,6 +458,13 @@ public void initialize(URI name, Configuration originalConf)
459458
AuditSpan span = null;
460459
try {
461460
LOG.debug("Initializing S3AFileSystem for {}", bucket);
461+
if (LOG.isTraceEnabled()) {
462+
// log a full trace for deep diagnostics of where an object is created,
463+
// for tracking down memory leak issues.
464+
LOG.trace("Filesystem for {} created; fs.s3a.impl.disable.cache = {}",
465+
name, originalConf.getBoolean("fs.s3a.impl.disable.cache", false),
466+
new RuntimeException(super.toString()));
467+
}
462468
// clone the configuration into one with propagated bucket options
463469
Configuration conf = propagateBucketOptions(originalConf, bucket);
464470
// HADOOP-17894. remove references to s3a stores in JCEKS credentials.
@@ -3999,22 +4005,18 @@ public void close() throws IOException {
39994005
}
40004006
isClosed = true;
40014007
LOG.debug("Filesystem {} is closed", uri);
4002-
if (getConf() != null) {
4003-
String iostatisticsLoggingLevel =
4004-
getConf().getTrimmed(IOSTATISTICS_LOGGING_LEVEL,
4005-
IOSTATISTICS_LOGGING_LEVEL_DEFAULT);
4006-
logIOStatisticsAtLevel(LOG, iostatisticsLoggingLevel, getIOStatistics());
4007-
}
40084008
try {
40094009
super.close();
40104010
} finally {
40114011
stopAllServices();
4012-
}
4013-
// Log IOStatistics at debug.
4014-
if (LOG.isDebugEnabled()) {
4015-
// robust extract and convert to string
4016-
LOG.debug("Statistics for {}: {}", uri,
4017-
IOStatisticsLogging.ioStatisticsToPrettyString(getIOStatistics()));
4012+
// log IO statistics, including of any file deletion during
4013+
// superclass close
4014+
if (getConf() != null) {
4015+
String iostatisticsLoggingLevel =
4016+
getConf().getTrimmed(IOSTATISTICS_LOGGING_LEVEL,
4017+
IOSTATISTICS_LOGGING_LEVEL_DEFAULT);
4018+
logIOStatisticsAtLevel(LOG, iostatisticsLoggingLevel, getIOStatistics());
4019+
}
40184020
}
40194021
}
40204022

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java

Lines changed: 59 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.hadoop.classification.InterfaceAudience;
2828
import org.apache.hadoop.classification.InterfaceStability;
2929
import org.apache.hadoop.fs.FileSystem;
30+
import org.apache.hadoop.fs.impl.WeakRefMetricsSource;
3031
import org.apache.hadoop.fs.s3a.statistics.BlockOutputStreamStatistics;
3132
import org.apache.hadoop.fs.s3a.statistics.ChangeTrackerStatistics;
3233
import org.apache.hadoop.fs.s3a.statistics.CommitterStatistics;
@@ -160,7 +161,10 @@ public class S3AInstrumentation implements Closeable, MetricsSource,
160161

161162
private final DurationTrackerFactory durationTrackerFactory;
162163

163-
private String metricsSourceName;
164+
/**
165+
* Weak reference so there's no back reference to the instrumentation.
166+
*/
167+
private WeakRefMetricsSource metricsSourceReference;
164168

165169
private final MetricsRegistry registry =
166170
new MetricsRegistry("s3aFileSystem").setContext(CONTEXT);
@@ -233,19 +237,33 @@ public S3AInstrumentation(URI name) {
233237
new MetricDurationTrackerFactory());
234238
}
235239

240+
/**
241+
* Get the current metrics system; demand creating.
242+
* @return a metric system, creating if need be.
243+
*/
236244
@VisibleForTesting
237-
public MetricsSystem getMetricsSystem() {
245+
static MetricsSystem getMetricsSystem() {
238246
synchronized (METRICS_SYSTEM_LOCK) {
239247
if (metricsSystem == null) {
240248
metricsSystem = new MetricsSystemImpl();
241249
metricsSystem.init(METRICS_SYSTEM_NAME);
250+
LOG.debug("Metrics system inited {}", metricsSystem);
242251
}
243252
}
244253
return metricsSystem;
245254
}
246255

247256
/**
248-
* Register this instance as a metrics source.
257+
* Does the instrumentation have a metrics system?
258+
* @return true if the metrics system is present.
259+
*/
260+
@VisibleForTesting
261+
static boolean hasMetricSystem() {
262+
return metricsSystem != null;
263+
}
264+
265+
/**
266+
* Register this instance as a metrics source via a weak reference.
249267
* @param name s3a:// URI for the associated FileSystem instance
250268
*/
251269
private void registerAsMetricsSource(URI name) {
@@ -257,8 +275,9 @@ private void registerAsMetricsSource(URI name) {
257275
number = ++metricsSourceNameCounter;
258276
}
259277
String msName = METRICS_SOURCE_BASENAME + number;
260-
metricsSourceName = msName + "-" + name.getHost();
261-
metricsSystem.register(metricsSourceName, "", this);
278+
String metricsSourceName = msName + "-" + name.getHost();
279+
metricsSourceReference = new WeakRefMetricsSource(metricsSourceName, this);
280+
metricsSystem.register(metricsSourceName, "", metricsSourceReference);
262281
}
263282

264283
/**
@@ -680,19 +699,42 @@ public void getMetrics(MetricsCollector collector, boolean all) {
680699
registry.snapshot(collector.addRecord(registry.info().name()), true);
681700
}
682701

702+
/**
703+
* if registered with the metrics, return the
704+
* name of the source.
705+
* @return the name of the metrics, or null if this instance is not bonded.
706+
*/
707+
public String getMetricSourceName() {
708+
return metricsSourceReference != null
709+
? metricsSourceReference.getName()
710+
: null;
711+
}
712+
683713
public void close() {
684-
synchronized (METRICS_SYSTEM_LOCK) {
685-
// it is critical to close each quantile, as they start a scheduled
686-
// task in a shared thread pool.
687-
throttleRateQuantile.stop();
688-
metricsSystem.unregisterSource(metricsSourceName);
689-
metricsSourceActiveCounter--;
690-
int activeSources = metricsSourceActiveCounter;
691-
if (activeSources == 0) {
692-
LOG.debug("Shutting down metrics publisher");
693-
metricsSystem.publishMetricsNow();
694-
metricsSystem.shutdown();
695-
metricsSystem = null;
714+
if (metricsSourceReference != null) {
715+
// get the name
716+
String name = metricsSourceReference.getName();
717+
LOG.debug("Unregistering metrics for {}", name);
718+
// then set to null so a second close() is a noop here.
719+
metricsSourceReference = null;
720+
synchronized (METRICS_SYSTEM_LOCK) {
721+
// it is critical to close each quantile, as they start a scheduled
722+
// task in a shared thread pool.
723+
if (metricsSystem == null) {
724+
LOG.debug("there is no metric system to unregister {} from", name);
725+
return;
726+
}
727+
throttleRateQuantile.stop();
728+
729+
metricsSystem.unregisterSource(name);
730+
metricsSourceActiveCounter--;
731+
int activeSources = metricsSourceActiveCounter;
732+
if (activeSources == 0) {
733+
LOG.debug("Shutting down metrics publisher");
734+
metricsSystem.publishMetricsNow();
735+
metricsSystem.shutdown();
736+
metricsSystem = null;
737+
}
696738
}
697739
}
698740
}

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClosedFS.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,4 +103,16 @@ public void testClosedOpen() throws Exception {
103103
() -> getFileSystem().open(path("to-open")));
104104
}
105105

106+
@Test
107+
public void testClosedInstrumentation() throws Exception {
108+
// no metrics
109+
Assertions.assertThat(S3AInstrumentation.hasMetricSystem())
110+
.describedAs("S3AInstrumentation.hasMetricSystem()")
111+
.isFalse();
112+
113+
Assertions.assertThat(getFileSystem().getIOStatistics())
114+
.describedAs("iostatistics of %s", getFileSystem())
115+
.isNotNull();
116+
}
117+
106118
}
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.fs.s3a;
20+
21+
import java.net.URI;
22+
23+
import org.assertj.core.api.Assertions;
24+
import org.junit.Test;
25+
26+
import org.apache.hadoop.fs.impl.WeakRefMetricsSource;
27+
import org.apache.hadoop.metrics2.MetricsSource;
28+
import org.apache.hadoop.metrics2.MetricsSystem;
29+
import org.apache.hadoop.test.AbstractHadoopTestBase;
30+
31+
import static org.apache.hadoop.fs.s3a.S3AInstrumentation.getMetricsSystem;
32+
import static org.apache.hadoop.fs.s3a.Statistic.DIRECTORIES_CREATED;
33+
import static org.assertj.core.api.Assertions.assertThat;
34+
35+
/**
36+
* Test the {@link S3AInstrumentation} lifecycle, in particular how
37+
* it binds to hadoop metrics through a {@link WeakRefMetricsSource}
38+
* and that it will deregister itself in {@link S3AInstrumentation#close()}.
39+
*/
40+
public class TestInstrumentationLifecycle extends AbstractHadoopTestBase {
41+
42+
@Test
43+
public void testDoubleClose() throws Throwable {
44+
S3AInstrumentation instrumentation = new S3AInstrumentation(new URI("s3a://example/"));
45+
46+
// the metric system is created in the constructor
47+
assertThat(S3AInstrumentation.hasMetricSystem())
48+
.describedAs("S3AInstrumentation.hasMetricSystem()")
49+
.isTrue();
50+
// ask for a metric
51+
String metricName = DIRECTORIES_CREATED.getSymbol();
52+
assertThat(instrumentation.lookupMetric(metricName))
53+
.describedAs("lookupMetric(%s) while open", metricName)
54+
.isNotNull();
55+
56+
MetricsSystem activeMetrics = getMetricsSystem();
57+
final String metricSourceName = instrumentation.getMetricSourceName();
58+
final MetricsSource source = activeMetrics.getSource(metricSourceName);
59+
// verify the source is registered through a weak ref, and that the
60+
// reference maps to the instance.
61+
Assertions.assertThat(source)
62+
.describedAs("metric source %s", metricSourceName)
63+
.isNotNull()
64+
.isInstanceOf(WeakRefMetricsSource.class)
65+
.extracting(m -> ((WeakRefMetricsSource) m).getSource())
66+
.isSameAs(instrumentation);
67+
68+
// this will close the metrics system
69+
instrumentation.close();
70+
71+
// iostats is still valid
72+
assertThat(instrumentation.getIOStatistics())
73+
.describedAs("iostats of %s", instrumentation)
74+
.isNotNull();
75+
76+
// no metrics
77+
assertThat(S3AInstrumentation.hasMetricSystem())
78+
.describedAs("S3AInstrumentation.hasMetricSystem()")
79+
.isFalse();
80+
81+
// metric lookup still works, so any invocation of an s3a
82+
// method which still updates a metric also works
83+
assertThat(instrumentation.lookupMetric(metricName))
84+
.describedAs("lookupMetric(%s) when closed", metricName)
85+
.isNotNull();
86+
87+
// which we can implicitly verify by asking for it and
88+
// verifying that we get given a different one back
89+
// from the demand-created instance
90+
MetricsSystem metrics2 = getMetricsSystem();
91+
assertThat(metrics2)
92+
.describedAs("metric system 2")
93+
.isNotSameAs(activeMetrics);
94+
95+
// this is going to be a no-op
96+
instrumentation.close();
97+
98+
// which we can verify because the metrics system doesn't
99+
// get closed this time
100+
assertThat(getMetricsSystem())
101+
.describedAs("metric system 3")
102+
.isSameAs(metrics2);
103+
}
104+
}

0 commit comments

Comments
 (0)