Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-28338 Bounded leak of FSDataInputStream buffers from checksum switching #5660

Merged
merged 4 commits into from
Feb 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,13 @@

import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.fs.CanUnbuffer;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.io.Closeables;

Expand All @@ -40,8 +36,6 @@
*/
@InterfaceAudience.Private
public class FSDataInputStreamWrapper implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(FSDataInputStreamWrapper.class);
private static final boolean isLogTraceEnabled = LOG.isTraceEnabled();

private final HFileSystem hfs;
private final Path path;
Expand Down Expand Up @@ -94,9 +88,6 @@ private static class ReadStatistics {
long totalZeroCopyBytesRead;
}

private Boolean instanceOfCanUnbuffer = null;
private CanUnbuffer unbuffer = null;

protected Path readerPath;

public FSDataInputStreamWrapper(FileSystem fs, Path path) throws IOException {
Expand Down Expand Up @@ -314,41 +305,22 @@ public HFileSystem getHfs() {
* stream, the current socket will be closed and a new socket will be opened to serve the
* requests.
*/
@SuppressWarnings({ "rawtypes" })
public void unbuffer() {
// todo: it may make sense to always unbuffer both streams. we'd need to carefully
// research the usages to know if that is safe. for now just do the current.
FSDataInputStream stream = this.getStream(this.shouldUseHBaseChecksum());
if (stream != null) {
InputStream wrappedStream = stream.getWrappedStream();
// CanUnbuffer interface was added as part of HDFS-7694 and the fix is available in Hadoop
// 2.6.4+ and 2.7.1+ versions only so check whether the stream object implements the
// CanUnbuffer interface or not and based on that call the unbuffer api.
final Class<? extends InputStream> streamClass = wrappedStream.getClass();
if (this.instanceOfCanUnbuffer == null) {
// To ensure we compute whether the stream is instance of CanUnbuffer only once.
this.instanceOfCanUnbuffer = false;
if (wrappedStream instanceof CanUnbuffer) {
this.unbuffer = (CanUnbuffer) wrappedStream;
this.instanceOfCanUnbuffer = true;
}
}
if (this.instanceOfCanUnbuffer) {
try {
this.unbuffer.unbuffer();
} catch (UnsupportedOperationException e) {
if (isLogTraceEnabled) {
LOG.trace("Failed to invoke 'unbuffer' method in class " + streamClass
+ " . So there may be the stream does not support unbuffering.", e);
}
}
} else {
if (isLogTraceEnabled) {
LOG.trace("Failed to find 'unbuffer' method in class " + streamClass);
}
}
stream.unbuffer();
}
}

public Path getReaderPath() {
return readerPath;
}

// For tests
void setShouldUseHBaseChecksum() {
useHBaseChecksumConfigured = true;
useHBaseChecksum = true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.io;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import java.io.IOException;
Expand All @@ -31,6 +32,7 @@
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.HasEnhancedByteBufferAccess;
import org.apache.hadoop.fs.ReadOption;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.io.ByteBufferPool;
Expand All @@ -48,22 +50,22 @@ public class TestFSDataInputStreamWrapper {
@Test
public void testUnbuffer() throws Exception {
InputStream pc = new ParentClass();
FSDataInputStreamWrapper fsdisw1 = new FSDataInputStreamWrapper(new FSDataInputStream(pc));
InputStream noChecksumPc = new ParentClass();
FSDataInputStreamWrapper fsdisw1 =
new FSDataInputStreamWrapper(new FSDataInputStream(pc), new FSDataInputStream(noChecksumPc));
fsdisw1.unbuffer();
// parent class should be true
// should have called main stream unbuffer, but not no-checksum
assertTrue(((ParentClass) pc).getIsCallUnbuffer());
assertFalse(((ParentClass) noChecksumPc).getIsCallUnbuffer());
// switch to checksums and call unbuffer again. should unbuffer the nochecksum stream now
fsdisw1.setShouldUseHBaseChecksum();
fsdisw1.unbuffer();
assertTrue(((ParentClass) noChecksumPc).getIsCallUnbuffer());
fsdisw1.close();

InputStream cc1 = new ChildClass1();
FSDataInputStreamWrapper fsdisw2 = new FSDataInputStreamWrapper(new FSDataInputStream(cc1));
fsdisw2.unbuffer();
// child1 class should be true
assertTrue(((ChildClass1) cc1).getIsCallUnbuffer());
fsdisw2.close();
}

private class ParentClass extends FSInputStream implements ByteBufferReadable, CanSetDropBehind,
CanSetReadahead, HasEnhancedByteBufferAccess, CanUnbuffer {
CanSetReadahead, HasEnhancedByteBufferAccess, CanUnbuffer, StreamCapabilities {

public boolean isCallUnbuffer = false;

Expand Down Expand Up @@ -122,12 +124,10 @@ public long getPos() throws IOException {
public boolean seekToNewSource(long paramLong) throws IOException {
return false;
}
}

private class ChildClass1 extends ParentClass {
@Override
public void unbuffer() {
isCallUnbuffer = true;
public boolean hasCapability(String s) {
return s.equals(StreamCapabilities.UNBUFFER);
}
}
}