-
Notifications
You must be signed in to change notification settings - Fork 3.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HBASE-22802 Avoid temp ByteBuffer allocation in FileIOEngine#read #479
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,6 +19,7 @@ | |
|
||
import java.io.IOException; | ||
import java.nio.ByteBuffer; | ||
import java.nio.channels.FileChannel; | ||
import java.nio.channels.ReadableByteChannel; | ||
import java.util.List; | ||
|
||
|
@@ -78,6 +79,23 @@ public boolean release() { | |
return refCnt.release(); | ||
} | ||
|
||
public RefCnt getRefCnt() { | ||
return this.refCnt; | ||
} | ||
|
||
/** | ||
* BucketEntry use this to share refCnt with ByteBuff, so make the method public here, | ||
* the upstream should not use this public method in other place, or the previous recycler | ||
* will be lost. | ||
*/ | ||
public void shareRefCnt(RefCnt refCnt, boolean replace) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In case of File based BC, now we make it to read the cached data into the pooled BBs. Every read RPC will acquire own BBs and read into. There is ideally no sharing of BBs across the readers happen at all.. But seems here we try share the ref count of the BC (File based here) with the RPCs. Little strange to digest. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry, there missing some conversations with @openinx |
||
if (replace) { | ||
this.refCnt = refCnt; | ||
} else { | ||
this.refCnt = new CompositeRefCnt(getRefCnt(), refCnt); | ||
} | ||
} | ||
|
||
/******************************* Methods for ByteBuff **************************************/ | ||
|
||
/** | ||
|
@@ -450,10 +468,37 @@ public byte[] toBytes() { | |
*/ | ||
public abstract int read(ReadableByteChannel channel) throws IOException; | ||
|
||
/** | ||
* Reads bytes from FileChannel into this ByteBuff | ||
*/ | ||
public abstract int read(FileChannel channel, long offset) throws IOException; | ||
|
||
/** | ||
* Write this ByteBuff's data into target file | ||
*/ | ||
public abstract int write(FileChannel channel, long offset) throws IOException; | ||
|
||
/** | ||
* function interface for Channel read | ||
*/ | ||
@FunctionalInterface | ||
interface ChannelReader { | ||
int read(ReadableByteChannel channel, ByteBuffer buf, long offset) throws IOException; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The ReadableByteChannel channel cannot accept the argument with offset position ? Looks strange here we provide a offset argument... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. any good idea to abstract this?because FileChannel need this, so provide it as an arg There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Have no better idea now, maybe we can keep the current way :-) |
||
} | ||
|
||
static final ChannelReader CHANNEL_READER = (channel, buf, offset) -> { | ||
return channel.read(buf); | ||
}; | ||
|
||
static final ChannelReader FILE_READER = (channel, buf, offset) -> { | ||
return ((FileChannel)channel).read(buf, offset); | ||
}; | ||
|
||
// static helper methods | ||
public static int channelRead(ReadableByteChannel channel, ByteBuffer buf) throws IOException { | ||
public static int read(ReadableByteChannel channel, ByteBuffer buf, long offset, | ||
ChannelReader reader) throws IOException { | ||
if (buf.remaining() <= NIO_BUFFER_LIMIT) { | ||
return channel.read(buf); | ||
return reader.read(channel, buf, offset); | ||
} | ||
int originalLimit = buf.limit(); | ||
int initialRemaining = buf.remaining(); | ||
|
@@ -463,7 +508,8 @@ public static int channelRead(ReadableByteChannel channel, ByteBuffer buf) throw | |
try { | ||
int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT); | ||
buf.limit(buf.position() + ioSize); | ||
ret = channel.read(buf); | ||
offset += ret; | ||
ret = reader.read(channel, buf, offset); | ||
if (ret < ioSize) { | ||
break; | ||
} | ||
|
@@ -540,15 +586,7 @@ public String toString() { | |
} | ||
|
||
/********************************* ByteBuff wrapper methods ***********************************/ | ||
|
||
/** | ||
* In theory, the upstream should never construct an ByteBuff by passing an given refCnt, so | ||
* please don't use this public method in other place. Make the method public here because the | ||
* BucketEntry#wrapAsCacheable in hbase-server module will use its own refCnt and ByteBuffers from | ||
* IOEngine to composite an HFileBlock's ByteBuff, we didn't find a better way so keep the public | ||
* way here. | ||
*/ | ||
public static ByteBuff wrap(ByteBuffer[] buffers, RefCnt refCnt) { | ||
private static ByteBuff wrap(ByteBuffer[] buffers, RefCnt refCnt) { | ||
if (buffers == null || buffers.length == 0) { | ||
throw new IllegalArgumentException("buffers shouldn't be null or empty"); | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
/** | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.hadoop.hbase.nio; | ||
|
||
import java.util.Optional; | ||
import org.apache.yetus.audience.InterfaceAudience; | ||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; | ||
import org.apache.hbase.thirdparty.io.netty.util.ReferenceCounted; | ||
|
||
/** | ||
* The CompositeRefCnt is mainly used by exclusive memory HFileBlock, it has a innerRefCnt | ||
* to share with BucketEntry, in order to summarize the number of RPC requests. So when | ||
* BucketCache#freeEntireBuckets is called, will not violate the LRU policy. | ||
* <p> | ||
* And it has its own refCnt & Recycler, Once the cells shipped to client, then both the | ||
* Cacheable#refCnt & BucketEntry#refCnt will be decreased. when Cacheable's refCnt decrease | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This means when say 2 read RPCs access a block from File based BC, that entry can NOT get evicted unless both these RPCs are over? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yep, as mentioned above |
||
* to 0, it's ByteBuff will be reclaimed. and when BucketEntry#refCnt decrease to 0, the | ||
* Bucket can be evicted. | ||
*/ | ||
openinx marked this conversation as resolved.
Show resolved
Hide resolved
|
||
@InterfaceAudience.Private | ||
public class CompositeRefCnt extends RefCnt { | ||
|
||
private Optional<RefCnt> innerRefCnt; | ||
|
||
public CompositeRefCnt(RefCnt orignal, RefCnt inner) { | ||
super(orignal.getRecycler()); | ||
this.innerRefCnt = Optional.ofNullable(inner); | ||
} | ||
|
||
@VisibleForTesting | ||
public Optional<RefCnt> getInnerRefCnt() { | ||
return this.innerRefCnt; | ||
} | ||
|
||
@Override | ||
public boolean release() { | ||
return super.release() && innerRefCnt.map(refCnt -> refCnt.release()).orElse(true); | ||
} | ||
|
||
@Override | ||
public ReferenceCounted retain() { | ||
return innerRefCnt.map(refCnt -> refCnt.retain()).orElseGet(() -> super.retain()); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, you have a clear comment here. seems OK now.