Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-22802 Avoid temp ByteBuffer allocation in FileIOEngine#read #467

Closed
wants to merge 15 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.ReadableByteChannel;
import java.util.List;

Expand Down Expand Up @@ -450,6 +451,16 @@ public byte[] toBytes() {
*/
public abstract int read(ReadableByteChannel channel) throws IOException;

/**
* Reads bytes from FileChannel into this ByteBuff
*/
public abstract int read(FileChannel channel, long offset) throws IOException;

/**
* Write this ByteBuff's data into target file
*/
public abstract int write(FileChannel channel, long offset) throws IOException;

// static helper methods
public static int channelRead(ReadableByteChannel channel, ByteBuffer buf) throws IOException {
if (buf.remaining() <= NIO_BUFFER_LIMIT) {
Expand All @@ -475,6 +486,32 @@ public static int channelRead(ReadableByteChannel channel, ByteBuffer buf) throw
return (nBytes > 0) ? nBytes : ret;
}

public static int fileRead(FileChannel channel, ByteBuffer buf, long offset)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we could make some abstraction between the existed channelRead(...) and the newly introduced fileRead (...) ? Similar with the ByteBufferArray#read & ByteBufferArray#write.. Please take a look.

throws IOException {
if (buf.remaining() <= NIO_BUFFER_LIMIT) {
return channel.read(buf, offset);
}
int originalLimit = buf.limit();
int initialRemaining = buf.remaining();
int ret = 0;

while (buf.remaining() > 0) {
try {
int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
buf.limit(buf.position() + ioSize);
offset += ret;
ret = channel.read(buf, offset);
if (ret < ioSize) {
break;
}
} finally {
buf.limit(originalLimit);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only reset the limit ? should we also reset the position ?

}
}
int nBytes = initialRemaining - buf.remaining();
return (nBytes > 0) ? nBytes : ret;
}

/**
* Read integer from ByteBuff coded in 7 bits and increment position.
* @return Read integer.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.nio.InvalidMarkException;
import java.nio.channels.FileChannel;
import java.nio.channels.ReadableByteChannel;

import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler;
Expand Down Expand Up @@ -1086,6 +1087,48 @@ public int read(ReadableByteChannel channel) throws IOException {
return total;
}

@Override
public int read(FileChannel channel, long offset) throws IOException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should also make the abstraction between MultiByteBuff#read and MultiByteBuff#write ? As said above.

checkRefCount();
int total = 0;
while (true) {
int len = fileRead(channel, this.curItem, offset);
if (len > 0) {
total += len;
offset += len;
}
if (this.curItem.hasRemaining()) {
break;
} else {
if (this.curItemIndex >= this.limitedItemIndex) {
break;
}
this.curItemIndex++;
this.curItem = this.items[this.curItemIndex];
}
}
return total;
}

@Override
public int write(FileChannel channel, long offset) throws IOException {
checkRefCount();
int total = 0;
while (true) {
int len = channel.write(curItem, offset);
if (len > 0) {
total += len;
offset += len;
}
if (this.curItemIndex >= this.limitedItemIndex) {
break;
}
this.curItemIndex++;
this.curItem = this.items[this.curItemIndex];
}
return total;
}

@Override
public ByteBuffer[] nioByteBuffers() {
checkRefCount();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.ReadableByteChannel;

import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler;
Expand Down Expand Up @@ -374,6 +375,18 @@ public int read(ReadableByteChannel channel) throws IOException {
return channelRead(channel, buf);
}

@Override
public int read(FileChannel channel, long offset) throws IOException {
checkRefCount();
return fileRead(channel, buf, offset);
}

@Override
public int write(FileChannel channel, long offset) throws IOException {
checkRefCount();
return channel.write(buf, offset);
}

@Override
public ByteBuffer[] nioByteBuffers() {
checkRefCount();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,9 @@ private void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cached
if (!cacheEnabled) {
return;
}
LOG.trace("Caching key={}, item={}", cacheKey, cachedItem);
if (LOG.isTraceEnabled()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't need this if LOG.isTraceEnabled when using this logging form with the '{}' (Internally it does this test).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct

LOG.trace("Caching key={}, item={}", cacheKey, cachedItem);
}
// Stuff the entry into the RAM cache so it can get drained to the persistent store
RAMQueueEntry re =
new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), inMemory,
Expand Down Expand Up @@ -502,8 +504,10 @@ public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat,
// block will use the refCnt of bucketEntry, which means if two HFileBlock mapping to
// the same BucketEntry, then all of the three will share the same refCnt.
Cacheable cachedBlock = ioEngine.read(bucketEntry);
// RPC start to reference, so retain here.
cachedBlock.retain();
if (ioEngine.usesSharedMemory()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One big concern here: now for exclusive memory IOEngine, the refCnt value of all bucketEntry will be 1, means the reference from BucketCache, no RPC reference. Then I think the BucketCache's eviction policy would always evict those blocks despite that the RPC is still using the block, not say the memory leak issue , but the eviction policy is evicting those RPC referring blocks (violate the LRU ? )....

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The eviction policy will compare BucketEntry with it’s accessCounter, so this will not violate the LRU?

// RPC start to reference, so retain here.
cachedBlock.retain();
}
// Update the cache statistics.
if (updateCacheMetrics) {
cacheStats.hit(caching, key.isPrimary(), key.getBlockType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class BucketEntry implements HBaseReferenceCounted {
*/
private final RefCnt refCnt;
final AtomicBoolean markedAsEvicted;
private final ByteBuffAllocator allocator;
final ByteBuffAllocator allocator;

/**
* Time this block was cached. Presumes we are created just before we are added to the cache.
Expand Down Expand Up @@ -194,7 +194,10 @@ boolean isRpcRef() {
}

Cacheable wrapAsCacheable(ByteBuffer[] buffers) throws IOException {
ByteBuff buf = ByteBuff.wrap(buffers, this.refCnt);
return wrapAsCacheable(ByteBuff.wrap(buffers, this.refCnt));
}

Cacheable wrapAsCacheable(ByteBuff buf) throws IOException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good thing, make the wrapAsCacheable into two methods. the SharedIOEngine use the former one, and the ExclusiveIOEngine use the later one. Good.

return this.deserializerReference().deserialize(buf, allocator);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.apache.hadoop.hbase.io.hfile.bucket;

import java.io.IOException;
import java.nio.ByteBuffer;

import org.apache.hadoop.hbase.io.hfile.Cacheable;
import org.apache.hadoop.hbase.nio.ByteBuff;
Expand All @@ -35,9 +34,9 @@ public ExclusiveMemoryMmapIOEngine(String filePath, long capacity) throws IOExce

@Override
public Cacheable read(BucketEntry be) throws IOException {
ByteBuff dst = ByteBuff.wrap(ByteBuffer.allocate(be.getLength()));
ByteBuff dst = be.allocator.allocate(be.getLength());
bufferArray.read(be.offset(), dst);
dst.position(0).limit(be.getLength());
return be.wrapAsCacheable(dst.nioByteBuffers());
return be.wrapAsCacheable(dst);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public Cacheable read(BucketEntry be) throws IOException {
long offset = be.offset();
int length = be.getLength();
Preconditions.checkArgument(length >= 0, "Length of read can not be less than 0.");
ByteBuffer dstBuffer = ByteBuffer.allocate(length);
ByteBuff dstBuffer = be.allocator.allocate(length);
if (length != 0) {
accessFile(readAccessor, dstBuffer, offset);
// The buffer created out of the fileChannel is formed by copying the data from the file
Expand All @@ -142,7 +142,7 @@ public Cacheable read(BucketEntry be) throws IOException {
}
}
dstBuffer.rewind();
return be.wrapAsCacheable(new ByteBuffer[] { dstBuffer });
return be.wrapAsCacheable(dstBuffer);
}

@VisibleForTesting
Expand All @@ -164,10 +164,7 @@ void closeFileChannels() {
*/
@Override
public void write(ByteBuffer srcBuffer, long offset) throws IOException {
if (!srcBuffer.hasRemaining()) {
return;
}
accessFile(writeAccessor, srcBuffer, offset);
write(ByteBuff.wrap(srcBuffer), offset);
}

/**
Expand Down Expand Up @@ -209,11 +206,13 @@ public void shutdown() {

@Override
public void write(ByteBuff srcBuffer, long offset) throws IOException {
ByteBuffer dup = srcBuffer.asSubByteBuffer(srcBuffer.remaining()).duplicate();
write(dup, offset);
if (!srcBuffer.hasRemaining()) {
return;
}
accessFile(writeAccessor, srcBuffer, offset);
}

private void accessFile(FileAccessor accessor, ByteBuffer buffer,
private void accessFile(FileAccessor accessor, ByteBuff buffer,
long globalOffset) throws IOException {
int startFileNum = getFileNum(globalOffset);
int remainingAccessDataLen = buffer.remaining();
Expand Down Expand Up @@ -304,23 +303,23 @@ void refreshFileConnection(int accessFileNum, IOException ioe) throws IOExceptio
}

private interface FileAccessor {
int access(FileChannel fileChannel, ByteBuffer byteBuffer, long accessOffset)
int access(FileChannel fileChannel, ByteBuff byteBuffer, long accessOffset)
throws IOException;
}

private static class FileReadAccessor implements FileAccessor {
@Override
public int access(FileChannel fileChannel, ByteBuffer byteBuffer,
public int access(FileChannel fileChannel, ByteBuff byteBuffer,
long accessOffset) throws IOException {
return fileChannel.read(byteBuffer, accessOffset);
return byteBuffer.read(fileChannel, accessOffset);
}
}

private static class FileWriteAccessor implements FileAccessor {
@Override
public int access(FileChannel fileChannel, ByteBuffer byteBuffer,
public int access(FileChannel fileChannel, ByteBuff byteBuffer,
long accessOffset) throws IOException {
return fileChannel.write(byteBuffer, accessOffset);
return byteBuffer.write(fileChannel, accessOffset);
}
}
}
Loading