Skip to content

Commit f9b9df6

Browse files
Add Leak Tracking Infrastructure (#67688) (#69592)
This commit adds leak tracking infrastructure that enables assertions about the state of objects at GC time (simplified version of what Netty uses to track `ByteBuf` instances). This commit uses the infrastructure to improve the quality of leak checks for page recycling in the mock nio transport (the logic in `org.elasticsearch.common.util.MockPageCacheRecycler#ensureAllPagesAreReleased` does not run for all tests and tracks too little information to allow for debugging what caused a specific leak in most cases due to the lack of an equivalent of the added `#touch` logic). Co-authored-by: David Turner <david.turner@elastic.co>
1 parent 7d11fe6 commit f9b9df6

File tree

5 files changed

+317
-31
lines changed

5 files changed

+317
-31
lines changed

libs/core/src/main/java/org/elasticsearch/common/util/concurrent/AbstractRefCounted.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ public final boolean tryIncRef() {
3636
int i = refCount.get();
3737
if (i > 0) {
3838
if (refCount.compareAndSet(i, i + 1)) {
39+
touch();
3940
return true;
4041
}
4142
} else {
@@ -46,6 +47,7 @@ public final boolean tryIncRef() {
4647

4748
@Override
4849
public final boolean decRef() {
50+
touch();
4951
int i = refCount.decrementAndGet();
5052
assert i >= 0;
5153
if (i == 0) {
@@ -55,6 +57,13 @@ public final boolean decRef() {
5557
return false;
5658
}
5759

60+
/**
61+
* Called whenever the ref count is incremented or decremented. Can be implemented by implementations to a record of access to the
62+
* instance for debugging purposes.
63+
*/
64+
protected void touch() {
65+
}
66+
5867
protected void alreadyClosed() {
5968
throw new IllegalStateException(name + " is already closed can't increment refCount current count [" + refCount.get() + "]");
6069
}

server/src/main/java/org/elasticsearch/common/bytes/ReleasableBytesReference.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,13 @@ public final class ReleasableBytesReference implements RefCounted, Releasable, B
3030
private final AbstractRefCounted refCounted;
3131

3232
public ReleasableBytesReference(BytesReference delegate, Releasable releasable) {
33-
this.delegate = delegate;
34-
this.refCounted = new RefCountedReleasable(releasable);
33+
this(delegate, new RefCountedReleasable(releasable));
3534
}
3635

37-
private ReleasableBytesReference(BytesReference delegate, AbstractRefCounted refCounted) {
36+
public ReleasableBytesReference(BytesReference delegate, AbstractRefCounted refCounted) {
3837
this.delegate = delegate;
3938
this.refCounted = refCounted;
40-
refCounted.incRef();
39+
assert refCounted.refCount() > 0;
4140
}
4241

4342
public static ReleasableBytesReference wrap(BytesReference reference) {
@@ -72,7 +71,9 @@ public ReleasableBytesReference retainedSlice(int from, int length) {
7271
if (from == 0 && length() == length) {
7372
return retain();
7473
}
75-
return new ReleasableBytesReference(delegate.slice(from, length), refCounted);
74+
final BytesReference slice = delegate.slice(from, length);
75+
refCounted.incRef();
76+
return new ReleasableBytesReference(slice, refCounted);
7677
}
7778

7879
@Override

test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java

Lines changed: 27 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@
102102
import org.elasticsearch.test.junit.listeners.LoggingListener;
103103
import org.elasticsearch.test.junit.listeners.ReproduceInfoPrinter;
104104
import org.elasticsearch.threadpool.ThreadPool;
105+
import org.elasticsearch.transport.LeakTracker;
105106
import org.elasticsearch.transport.nio.MockNioTransportPlugin;
106107
import org.joda.time.DateTimeZone;
107108
import org.junit.After;
@@ -178,7 +179,7 @@ public abstract class ESTestCase extends LuceneTestCase {
178179

179180
private static final AtomicInteger portGenerator = new AtomicInteger();
180181

181-
private static final Collection<String> nettyLoggedLeaks = new ArrayList<>();
182+
private static final Collection<String> loggedLeaks = new ArrayList<>();
182183
public static final int MIN_PRIVATE_PORT = 13301;
183184

184185
private HeaderWarningAppender headerWarningAppender;
@@ -202,29 +203,29 @@ public static void resetPortCounter() {
202203
setTestSysProps();
203204
LogConfigurator.loadLog4jPlugins();
204205

205-
String leakLoggerName = "io.netty.util.ResourceLeakDetector";
206-
Logger leakLogger = LogManager.getLogger(leakLoggerName);
207-
Appender leakAppender = new AbstractAppender(leakLoggerName, null,
208-
PatternLayout.newBuilder().withPattern("%m").build()) {
209-
@Override
210-
public void append(LogEvent event) {
211-
String message = event.getMessage().getFormattedMessage();
212-
if (Level.ERROR.equals(event.getLevel()) && message.contains("LEAK:")) {
213-
synchronized (nettyLoggedLeaks) {
214-
nettyLoggedLeaks.add(message);
206+
for (String leakLoggerName : Arrays.asList("io.netty.util.ResourceLeakDetector", LeakTracker.class.getName())) {
207+
Logger leakLogger = LogManager.getLogger(leakLoggerName);
208+
Appender leakAppender = new AbstractAppender(leakLoggerName, null,
209+
PatternLayout.newBuilder().withPattern("%m").build()) {
210+
@Override
211+
public void append(LogEvent event) {
212+
String message = event.getMessage().getFormattedMessage();
213+
if (Level.ERROR.equals(event.getLevel()) && message.contains("LEAK:")) {
214+
synchronized (loggedLeaks) {
215+
loggedLeaks.add(message);
216+
}
215217
}
216218
}
217-
}
218-
};
219-
leakAppender.start();
220-
Loggers.addAppender(leakLogger, leakAppender);
221-
222-
// shutdown hook so that when the test JVM exits, logging is shutdown too
223-
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
224-
leakAppender.stop();
225-
LoggerContext context = (LoggerContext) LogManager.getContext(false);
226-
Configurator.shutdown(context);
227-
}));
219+
};
220+
leakAppender.start();
221+
Loggers.addAppender(leakLogger, leakAppender);
222+
// shutdown hook so that when the test JVM exits, logging is shutdown too
223+
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
224+
leakAppender.stop();
225+
LoggerContext context = (LoggerContext) LogManager.getContext(false);
226+
Configurator.shutdown(context);
227+
}));
228+
}
228229

229230
BootstrapForTesting.ensureInitialized();
230231

@@ -549,6 +550,7 @@ public void log(StatusData data) {
549550

550551
// separate method so that this can be checked again after suite scoped cluster is shut down
551552
protected static void checkStaticState(boolean afterClass) throws Exception {
553+
LeakTracker.INSTANCE.reportLeak();
552554
if (afterClass) {
553555
MockPageCacheRecycler.ensureAllPagesAreReleased();
554556
}
@@ -568,11 +570,11 @@ protected static void checkStaticState(boolean afterClass) throws Exception {
568570
statusData.clear();
569571
}
570572
}
571-
synchronized (nettyLoggedLeaks) {
573+
synchronized (loggedLeaks) {
572574
try {
573-
assertThat(nettyLoggedLeaks, empty());
575+
assertThat(loggedLeaks, empty());
574576
} finally {
575-
nettyLoggedLeaks.clear();
577+
loggedLeaks.clear();
576578
}
577579
}
578580
}
Lines changed: 247 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,247 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.transport;
10+
11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
13+
import org.elasticsearch.common.Randomness;
14+
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
15+
16+
import java.lang.ref.ReferenceQueue;
17+
import java.lang.ref.WeakReference;
18+
import java.util.HashSet;
19+
import java.util.Set;
20+
import java.util.concurrent.ConcurrentMap;
21+
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
22+
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
23+
24+
/**
25+
* Leak tracking mechanism that allows for ensuring that a resource has been properly released before a given object is garbage collected.
26+
*
27+
*/
28+
public final class LeakTracker {
29+
30+
private static final Logger logger = LogManager.getLogger(LeakTracker.class);
31+
32+
private static final int TARGET_RECORDS = 25;
33+
34+
private final Set<Leak<?>> allLeaks = ConcurrentCollections.newConcurrentSet();
35+
36+
private final ReferenceQueue<Object> refQueue = new ReferenceQueue<>();
37+
private final ConcurrentMap<String, Boolean> reportedLeaks = ConcurrentCollections.newConcurrentMap();
38+
39+
public static final LeakTracker INSTANCE = new LeakTracker();
40+
41+
private LeakTracker() {
42+
}
43+
44+
/**
45+
* Track the given object.
46+
*
47+
* @param obj object to track
48+
* @return leak object that must be released by a call to {@link Leak#close(Object)} before {@code obj} goes out of scope
49+
*/
50+
public <T> Leak<T> track(T obj) {
51+
reportLeak();
52+
return new Leak<>(obj, refQueue, allLeaks);
53+
}
54+
55+
public void reportLeak() {
56+
while (true) {
57+
Leak<?> ref = (Leak<?>) refQueue.poll();
58+
if (ref == null) {
59+
break;
60+
}
61+
62+
if (ref.dispose() == false || logger.isErrorEnabled() == false) {
63+
continue;
64+
}
65+
66+
String records = ref.toString();
67+
if (reportedLeaks.putIfAbsent(records, Boolean.TRUE) == null) {
68+
logger.error("LEAK: resource was not cleaned up before it was garbage-collected.{}", records);
69+
}
70+
}
71+
}
72+
73+
public static final class Leak<T> extends WeakReference<Object> {
74+
75+
@SuppressWarnings({"unchecked", "rawtypes"})
76+
private static final AtomicReferenceFieldUpdater<Leak<?>, Record> headUpdater =
77+
(AtomicReferenceFieldUpdater) AtomicReferenceFieldUpdater.newUpdater(Leak.class, Record.class, "head");
78+
79+
@SuppressWarnings({"unchecked", "rawtypes"})
80+
private static final AtomicIntegerFieldUpdater<Leak<?>> droppedRecordsUpdater =
81+
(AtomicIntegerFieldUpdater) AtomicIntegerFieldUpdater.newUpdater(Leak.class, "droppedRecords");
82+
83+
@SuppressWarnings("unused")
84+
private volatile Record head;
85+
@SuppressWarnings("unused")
86+
private volatile int droppedRecords;
87+
88+
private final Set<Leak<?>> allLeaks;
89+
private final int trackedHash;
90+
91+
private Leak(Object referent, ReferenceQueue<Object> refQueue, Set<Leak<?>> allLeaks) {
92+
super(referent, refQueue);
93+
94+
assert referent != null;
95+
96+
trackedHash = System.identityHashCode(referent);
97+
allLeaks.add(this);
98+
headUpdater.set(this, new Record(Record.BOTTOM));
99+
this.allLeaks = allLeaks;
100+
}
101+
102+
/**
103+
* Adds an access record that includes the current stack trace to the leak.
104+
*/
105+
public void record() {
106+
Record oldHead;
107+
Record newHead;
108+
boolean dropped;
109+
do {
110+
Record prevHead;
111+
if ((prevHead = oldHead = headUpdater.get(this)) == null) {
112+
// already closed.
113+
return;
114+
}
115+
final int numElements = oldHead.pos + 1;
116+
if (numElements >= TARGET_RECORDS) {
117+
final int backOffFactor = Math.min(numElements - TARGET_RECORDS, 30);
118+
if (dropped = Randomness.get().nextInt(1 << backOffFactor) != 0) {
119+
prevHead = oldHead.next;
120+
}
121+
} else {
122+
dropped = false;
123+
}
124+
newHead = new Record(prevHead);
125+
} while (headUpdater.compareAndSet(this, oldHead, newHead) == false);
126+
if (dropped) {
127+
droppedRecordsUpdater.incrementAndGet(this);
128+
}
129+
}
130+
131+
private boolean dispose() {
132+
clear();
133+
return allLeaks.remove(this);
134+
}
135+
136+
/**
137+
* Stop tracking the object that this leak was created for.
138+
*
139+
* @param trackedObject the object that this leak was originally created for
140+
* @return true if the leak was released by this call, false if the leak had already been released
141+
*/
142+
public boolean close(T trackedObject) {
143+
assert trackedHash == System.identityHashCode(trackedObject);
144+
try {
145+
if (allLeaks.remove(this)) {
146+
// Call clear so the reference is not even enqueued.
147+
clear();
148+
headUpdater.set(this, null);
149+
return true;
150+
}
151+
return false;
152+
} finally {
153+
reachabilityFence0(trackedObject);
154+
}
155+
}
156+
157+
private static void reachabilityFence0(Object ref) {
158+
if (ref != null) {
159+
synchronized (ref) {
160+
// empty on purpose
161+
}
162+
}
163+
}
164+
165+
@Override
166+
public String toString() {
167+
Record oldHead = headUpdater.get(this);
168+
if (oldHead == null) {
169+
// Already closed
170+
return "";
171+
}
172+
173+
final int dropped = droppedRecordsUpdater.get(this);
174+
int duped = 0;
175+
176+
int present = oldHead.pos + 1;
177+
// Guess about 2 kilobytes per stack trace
178+
StringBuilder buf = new StringBuilder(present * 2048).append('\n');
179+
buf.append("Recent access records: ").append('\n');
180+
181+
int i = 1;
182+
Set<String> seen = new HashSet<>(present);
183+
for (; oldHead != Record.BOTTOM; oldHead = oldHead.next) {
184+
String s = oldHead.toString();
185+
if (seen.add(s)) {
186+
if (oldHead.next == Record.BOTTOM) {
187+
buf.append("Created at:").append('\n').append(s);
188+
} else {
189+
buf.append('#').append(i++).append(':').append('\n').append(s);
190+
}
191+
} else {
192+
duped++;
193+
}
194+
}
195+
196+
if (duped > 0) {
197+
buf.append(": ")
198+
.append(duped)
199+
.append(" leak records were discarded because they were duplicates")
200+
.append('\n');
201+
}
202+
203+
if (dropped > 0) {
204+
buf.append(": ")
205+
.append(dropped)
206+
.append(" leak records were discarded because the leak record count is targeted to ")
207+
.append(TARGET_RECORDS)
208+
.append('.')
209+
.append('\n');
210+
}
211+
buf.setLength(buf.length() - "\n".length());
212+
return buf.toString();
213+
}
214+
}
215+
216+
private static final class Record extends Throwable {
217+
218+
private static final Record BOTTOM = new Record();
219+
220+
private final Record next;
221+
private final int pos;
222+
223+
Record(Record next) {
224+
this.next = next;
225+
this.pos = next.pos + 1;
226+
}
227+
228+
private Record() {
229+
next = null;
230+
pos = -1;
231+
}
232+
233+
@Override
234+
public String toString() {
235+
StringBuilder buf = new StringBuilder();
236+
StackTraceElement[] array = getStackTrace();
237+
// Skip the first three elements since those are just related to the leak tracker.
238+
for (int i = 3; i < array.length; i++) {
239+
StackTraceElement element = array[i];
240+
buf.append('\t');
241+
buf.append(element.toString());
242+
buf.append('\n');
243+
}
244+
return buf.toString();
245+
}
246+
}
247+
}

0 commit comments

Comments
 (0)