Skip to content

Commit 76ce3cb

Browse files
committed
HADOOP-18543. AliyunOSSFileSystem#open(Path path, int bufferSize) use buffer size as its downloadPartSize
1 parent e09e81a commit 76ce3cb

File tree

3 files changed

+68
-8
lines changed

3 files changed

+68
-8
lines changed

hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -585,6 +585,21 @@ private void validatePath(Path path) throws IOException {
585585
} while (fPart != null);
586586
}
587587

588+
@Override
589+
public FSDataInputStream open(Path path) throws IOException {
590+
final FileStatus fileStatus = getFileStatus(path);
591+
if (fileStatus.isDirectory()) {
592+
throw new FileNotFoundException("Can't open " + path +
593+
" because it is a directory");
594+
}
595+
596+
return new FSDataInputStream(new AliyunOSSInputStream(getConf(),
597+
new SemaphoredDelegatingExecutor(
598+
boundedThreadPool, maxReadAheadPartNumber, true),
599+
maxReadAheadPartNumber, store, pathToKey(path), fileStatus.getLen(),
600+
statistics));
601+
}
602+
588603
@Override
589604
public FSDataInputStream open(Path path, int bufferSize) throws IOException {
590605
final FileStatus fileStatus = getFileStatus(path);
@@ -593,7 +608,7 @@ public FSDataInputStream open(Path path, int bufferSize) throws IOException {
593608
" because it is a directory");
594609
}
595610

596-
return new FSDataInputStream(new AliyunOSSInputStream(getConf(),
611+
return new FSDataInputStream(new AliyunOSSInputStream(bufferSize,
597612
new SemaphoredDelegatingExecutor(
598613
boundedThreadPool, maxReadAheadPartNumber, true),
599614
maxReadAheadPartNumber, store, pathToKey(path), fileStatus.getLen(),

hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSInputStream.java

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.hadoop.fs.FSInputStream;
3333
import org.apache.hadoop.fs.FileSystem.Statistics;
3434

35+
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
3536
import static org.apache.hadoop.fs.aliyun.oss.Constants.*;
3637

3738
/**
@@ -57,18 +58,21 @@ public class AliyunOSSInputStream extends FSInputStream {
5758
private ExecutorService readAheadExecutorService;
5859
private Queue<ReadBuffer> readBufferQueue = new ArrayDeque<>();
5960

60-
public AliyunOSSInputStream(Configuration conf,
61-
ExecutorService readAheadExecutorService, int maxReadAheadPartNumber,
62-
AliyunOSSFileSystemStore store, String key, Long contentLength,
63-
Statistics statistics) throws IOException {
61+
public AliyunOSSInputStream(
62+
long downloadPartSize,
63+
ExecutorService readAheadExecutorService,
64+
int maxReadAheadPartNumber,
65+
AliyunOSSFileSystemStore store,
66+
String key,
67+
Long contentLength,
68+
Statistics statistics) throws IOException {
6469
this.readAheadExecutorService =
65-
MoreExecutors.listeningDecorator(readAheadExecutorService);
70+
MoreExecutors.listeningDecorator(readAheadExecutorService);
6671
this.store = store;
6772
this.key = key;
6873
this.statistics = statistics;
6974
this.contentLength = contentLength;
70-
downloadPartSize = conf.getLong(MULTIPART_DOWNLOAD_SIZE_KEY,
71-
MULTIPART_DOWNLOAD_SIZE_DEFAULT);
75+
this.downloadPartSize = Math.max(downloadPartSize, IO_FILE_BUFFER_SIZE_DEFAULT);
7276
this.maxReadAheadPartNumber = maxReadAheadPartNumber;
7377

7478
this.expectNextPos = 0;
@@ -77,6 +81,18 @@ public AliyunOSSInputStream(Configuration conf,
7781
closed = false;
7882
}
7983

84+
public AliyunOSSInputStream(Configuration conf,
85+
ExecutorService readAheadExecutorService, int maxReadAheadPartNumber,
86+
AliyunOSSFileSystemStore store, String key, Long contentLength,
87+
Statistics statistics) throws IOException {
88+
this(conf.getLong(MULTIPART_DOWNLOAD_SIZE_KEY, MULTIPART_DOWNLOAD_SIZE_DEFAULT),
89+
readAheadExecutorService, maxReadAheadPartNumber, store, key, contentLength, statistics);
90+
}
91+
92+
long getDownloadPartSize() {
93+
return downloadPartSize;
94+
}
95+
8096
/**
8197
* Reopen the wrapped stream at give position, by seeking for
8298
* data of a part length from object content stream.

hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSInputStream.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,12 @@
3333
import org.slf4j.Logger;
3434
import org.slf4j.LoggerFactory;
3535

36+
import java.io.IOException;
37+
import java.io.InputStream;
3638
import java.util.Random;
3739

40+
import static org.apache.hadoop.fs.aliyun.oss.Constants.MULTIPART_DOWNLOAD_SIZE_DEFAULT;
41+
import static org.apache.hadoop.fs.aliyun.oss.Constants.MULTIPART_DOWNLOAD_SIZE_KEY;
3842
import static org.junit.Assert.assertEquals;
3943
import static org.junit.Assert.assertTrue;
4044

@@ -108,6 +112,31 @@ public void testSeekFile() throws Exception {
108112
IOUtils.closeStream(instream);
109113
}
110114

115+
@Test
116+
public void testConfiguration() throws IOException {
117+
Path configurationFile = setPath("/test/configurationFile.txt");
118+
long size = 5 * 1024 * 1024;
119+
120+
ContractTestUtils.generateTestFile(this.fs, configurationFile, size, 256, 255);
121+
LOG.info("5MB file created: configurationFile.txt");
122+
123+
FSDataInputStream instream = this.fs.open(configurationFile);
124+
assertTrue(instream.getWrappedStream() instanceof AliyunOSSInputStream);
125+
AliyunOSSInputStream wrappedStream = (AliyunOSSInputStream) instream.getWrappedStream();
126+
assertEquals(
127+
fs.getConf().getLong(MULTIPART_DOWNLOAD_SIZE_KEY, MULTIPART_DOWNLOAD_SIZE_DEFAULT),
128+
wrappedStream.getDownloadPartSize());
129+
IOUtils.closeStream(instream);
130+
131+
instream = this.fs.open(configurationFile, 1024);
132+
assertTrue(instream.getWrappedStream() instanceof AliyunOSSInputStream);
133+
wrappedStream = (AliyunOSSInputStream) instream.getWrappedStream();
134+
assertEquals(
135+
1024,
136+
wrappedStream.getDownloadPartSize());
137+
IOUtils.closeStream(instream);
138+
}
139+
111140
@Test
112141
public void testSequentialAndRandomRead() throws Exception {
113142
Path smallSeekFile = setPath("/test/smallSeekFile.txt");

0 commit comments

Comments
 (0)