Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HADOOP-18383. Codecs with @DoNotPool annotation are not closed causing memory leak #4585

Merged
merged 10 commits into from
Aug 12, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ public static void returnCompressor(Compressor compressor) {
}
// if the compressor can't be reused, don't pool it.
if (compressor.getClass().isAnnotationPresent(DoNotPool.class)) {
compressor.end();
return;
}
compressor.reset();
Expand All @@ -225,6 +226,7 @@ public static void returnDecompressor(Decompressor decompressor) {
}
// if the decompressor can't be reused, don't pool it.
if (decompressor.getClass().isAnnotationPresent(DoNotPool.class)) {
decompressor.end();
return;
}
decompressor.reset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,23 @@
package org.apache.hadoop.io.compress;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.compress.zlib.BuiltInGzipCompressor;
import org.apache.hadoop.io.compress.zlib.BuiltInGzipDecompressor;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

Expand Down Expand Up @@ -189,4 +198,54 @@ public void testDecompressorNotReturnSameInstance() {
CodecPool.returnDecompressor(decompressor);
}
}

@Test(timeout = 10000)
public void testDoNotPoolCompressorNotUseableAfterReturn() throws IOException {

final GzipCodec gzipCodec = new GzipCodec();
gzipCodec.setConf(new Configuration());

// BuiltInGzipCompressor is an explicit example of a Compressor with the @DoNotPool annotation
final Compressor compressor = new BuiltInGzipCompressor(new Configuration());
CodecPool.returnCompressor(compressor);

try (CompressionOutputStream outputStream =
gzipCodec.createOutputStream(new ByteArrayOutputStream(), compressor)) {
outputStream.write(1);
kevins-29 marked this conversation as resolved.
Show resolved Hide resolved
fail("Compressor from Codec with @DoNotPool should not be useable after returning to CodecPool");
} catch (NullPointerException exception) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NPE is the best we can do?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately I couldn't find another way to test that the underlying Compressor/Decompress has been closed. There is finished but that is set by reset() and has different semantics.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add a check in the place where we would encounter the null and trigger a more friendly exception from there?
Something like an already closed exception?

Assert.assertEquals("Deflater has been closed", exception.getMessage());
}
}

@Test(timeout = 10000)
public void testDoNotPoolDecompressorNotUseableAfterReturn() throws IOException {

final GzipCodec gzipCodec = new GzipCodec();
gzipCodec.setConf(new Configuration());

final Random random = new Random();
final byte[] bytes = new byte[1024];
random.nextBytes(bytes);

ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (OutputStream outputStream = gzipCodec.createOutputStream(baos)) {
outputStream.write(bytes);
}

final byte[] gzipBytes = baos.toByteArray();
final ByteArrayInputStream bais = new ByteArrayInputStream(gzipBytes);

// BuiltInGzipDecompressor is an explicit example of a Decompressor with the @DoNotPool annotation
final Decompressor decompressor = new BuiltInGzipDecompressor();
CodecPool.returnDecompressor(decompressor);

try (CompressionInputStream inputStream =
goiri marked this conversation as resolved.
Show resolved Hide resolved
gzipCodec.createInputStream(bais, decompressor)) {
inputStream.read();
fail("Decompressor from Codec with @DoNotPool should not be useable after returning to CodecPool");
} catch (NullPointerException exception) {
Assert.assertEquals("Inflater has been closed", exception.getMessage());
}
}
}