Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,11 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_ENABLE_READAHEAD)
private boolean enabledReadAhead;

@BooleanConfigurationValidatorAnnotation(
ConfigurationKey = FS_AZURE_ENABLE_READAHEAD_V2,
DefaultValue = DEFAULT_ENABLE_READAHEAD_V2)
private boolean isReadAheadV2Enabled;

@LongConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS,
MinValue = 0,
DefaultValue = DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS)
Expand Down Expand Up @@ -1332,6 +1337,15 @@ void setReadAheadEnabled(final boolean enabledReadAhead) {
this.enabledReadAhead = enabledReadAhead;
}

public boolean isReadAheadV2Enabled() {
return this.isReadAheadV2Enabled;
}

@VisibleForTesting
void setReadAheadV2Enabled(final boolean isReadAheadV2Enabled) {
this.isReadAheadV2Enabled = isReadAheadV2Enabled;
}

public int getReadAheadRange() {
return this.readAheadRange;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -962,6 +962,7 @@ AZURE_FOOTER_READ_BUFFER_SIZE, getAbfsConfiguration().getFooterReadBufferSize())
.withReadAheadQueueDepth(getAbfsConfiguration().getReadAheadQueueDepth())
.withTolerateOobAppends(getAbfsConfiguration().getTolerateOobAppends())
.isReadAheadEnabled(getAbfsConfiguration().isReadAheadEnabled())
.isReadAheadV2Enabled(getAbfsConfiguration().isReadAheadV2Enabled())
.withReadSmallFilesCompletely(getAbfsConfiguration().readSmallFilesCompletely())
.withOptimizeFooterRead(getAbfsConfiguration().optimizeFooterRead())
.withFooterReadBufferSize(footerReadBufferSize)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,12 @@ public final class ConfigurationKeys {
*/
public static final String FS_AZURE_ENABLE_READAHEAD = "fs.azure.enable.readahead";

/**
* Enable or disable readaheadV2 buffer in AbfsInputStream.
* Value: {@value}.
*/
public static final String FS_AZURE_ENABLE_READAHEAD_V2 = "fs.azure.enable.readahead_v2";

/** Setting this true will make the driver use it's own RemoteIterator implementation */
public static final String FS_AZURE_ENABLE_ABFS_LIST_ITERATOR = "fs.azure.enable.abfslistiterator";
/** Server side encryption key encoded in Base6format {@value}.*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ public final class FileSystemConfigurations {
public static final long DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS = 120;

public static final boolean DEFAULT_ENABLE_READAHEAD = true;
public static final boolean DEFAULT_ENABLE_READAHEAD_V2 = false;
public static final String DEFAULT_FS_AZURE_USER_AGENT_PREFIX = EMPTY_STRING;
public static final String DEFAULT_VALUE_UNKNOWN = "UNKNOWN";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
private final String eTag; // eTag of the path when InputStream are created
private final boolean tolerateOobAppends; // whether tolerate Oob Appends
private final boolean readAheadEnabled; // whether enable readAhead;
private final boolean isReadAheadV2Enabled; // whether enable readAheadV2;
private final String inputStreamId;
private final boolean alwaysReadBufferSize;
/*
Expand Down Expand Up @@ -131,6 +132,8 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
/** ABFS instance to be held by the input stream to avoid GC close. */
private final BackReference fsBackRef;

private ReadBufferManager readBufferManager = null;

public AbfsInputStream(
final AbfsClient client,
final Statistics statistics,
Expand All @@ -150,6 +153,7 @@ public AbfsInputStream(
this.eTag = eTag;
this.readAheadRange = abfsInputStreamContext.getReadAheadRange();
this.readAheadEnabled = abfsInputStreamContext.isReadAheadEnabled();
this.isReadAheadV2Enabled = abfsInputStreamContext.isReadAheadV2Enabled();
this.alwaysReadBufferSize
= abfsInputStreamContext.shouldReadBufferSizeAlways();
this.bufferedPreadDisabled = abfsInputStreamContext
Expand All @@ -173,9 +177,15 @@ public AbfsInputStream(
this.fsBackRef = abfsInputStreamContext.getFsBackRef();
contextEncryptionAdapter = abfsInputStreamContext.getEncryptionAdapter();

// Propagate the config values to ReadBufferManager so that the first instance
// to initialize can set the readAheadBlockSize
ReadBufferManager.setReadBufferManagerConfigs(readAheadBlockSize);
if (isReadAheadV2Enabled) {
readBufferManager = new ReadBufferManagerV2(readAheadBlockSize);
} else {
// Propagate the config values to ReadBufferManager so that the first instance
// to initialize can set the readAheadBlockSize
ReadBufferManagerV1.setReadBufferManagerConfigs(readAheadBlockSize);
readBufferManager = ReadBufferManagerV1.getBufferManager();
}

if (streamStatistics != null) {
ioStatistics = streamStatistics.getIOStatistics();
}
Expand Down Expand Up @@ -510,7 +520,7 @@ private int readInternal(final long position, final byte[] b, final int offset,
while (numReadAheads > 0 && nextOffset < contentLength) {
LOG.debug("issuing read ahead requestedOffset = {} requested size {}",
nextOffset, nextSize);
ReadBufferManager.getBufferManager().queueReadAhead(this, nextOffset, (int) nextSize,
readBufferManager.queueReadAhead(this, nextOffset, (int) nextSize,
new TracingContext(readAheadTracingContext));
nextOffset = nextOffset + nextSize;
numReadAheads--;
Expand All @@ -519,7 +529,7 @@ private int readInternal(final long position, final byte[] b, final int offset,
}

// try reading from buffers first
receivedBytes = ReadBufferManager.getBufferManager().getBlock(this, position, length, b);
receivedBytes = readBufferManager.getBlock(this, position, length, b);
bytesFromReadAhead += receivedBytes;
if (receivedBytes > 0) {
incrementReadOps();
Expand Down Expand Up @@ -720,7 +730,7 @@ public boolean seekToNewSource(long l) throws IOException {
public synchronized void close() throws IOException {
LOG.debug("Closing {}", this);
closed = true;
ReadBufferManager.getBufferManager().purgeBuffersForStream(this);
readBufferManager.purgeBuffersForStream(this);
buffer = null; // de-reference the buffer so it can be GC'ed sooner
if (contextEncryptionAdapter != null) {
contextEncryptionAdapter.destroy();
Expand Down Expand Up @@ -844,6 +854,11 @@ public int getReadAheadQueueDepth() {
return readAheadQueueDepth;
}

@VisibleForTesting
public ReadBufferManager getReadBufferManager() {
return readBufferManager;
}

@VisibleForTesting
public boolean shouldAlwaysReadBufferSize() {
return alwaysReadBufferSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ public class AbfsInputStreamContext extends AbfsStreamContext {

private boolean isReadAheadEnabled = true;

private boolean isReadAheadV2Enabled;

private boolean alwaysReadBufferSize;

private int readAheadBlockSize;
Expand Down Expand Up @@ -91,6 +93,12 @@ public AbfsInputStreamContext isReadAheadEnabled(
return this;
}

public AbfsInputStreamContext isReadAheadV2Enabled(
final boolean isReadAheadV2Enabled) {
this.isReadAheadV2Enabled = isReadAheadV2Enabled;
return this;
}

public AbfsInputStreamContext withReadAheadRange(
final int readAheadRange) {
this.readAheadRange = readAheadRange;
Expand Down Expand Up @@ -181,6 +189,10 @@ public boolean isReadAheadEnabled() {
return isReadAheadEnabled;
}

public boolean isReadAheadV2Enabled() {
return isReadAheadV2Enabled;
}

public int getReadAheadRange() {
return readAheadRange;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

import static org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus.READ_FAILED;

class ReadBuffer {
public class ReadBuffer {

private AbfsInputStream stream;
private long offset; // offset within the file for the buffer
Expand Down
Loading