Skip to content

Commit dcddc6a

Browse files
HADOOP-17682. ABFS: Support FileStatus input to OpenFileWithOptions() via OpenFileParameters (#2975)
1 parent ee07b90 commit dcddc6a

File tree

4 files changed

+183
-33
lines changed

4 files changed

+183
-33
lines changed

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -208,23 +208,31 @@ public FSDataInputStream open(final Path path, final int bufferSize) throws IOEx
208208
}
209209

210210
private FSDataInputStream open(final Path path,
211-
final Optional<Configuration> options) throws IOException {
211+
final Optional<OpenFileParameters> parameters) throws IOException {
212212
statIncrement(CALL_OPEN);
213213
Path qualifiedPath = makeQualified(path);
214214

215215
try {
216216
TracingContext tracingContext = new TracingContext(clientCorrelationId,
217-
fileSystemId, FSOperationType.OPEN, tracingHeaderFormat,
218-
listener);
219-
InputStream inputStream = abfsStore.openFileForRead(qualifiedPath,
220-
options, statistics, tracingContext);
217+
fileSystemId, FSOperationType.OPEN, tracingHeaderFormat, listener);
218+
InputStream inputStream = abfsStore
219+
.openFileForRead(qualifiedPath, parameters, statistics, tracingContext);
221220
return new FSDataInputStream(inputStream);
222221
} catch(AzureBlobFileSystemException ex) {
223222
checkException(path, ex);
224223
return null;
225224
}
226225
}
227226

227+
/**
228+
* Takes config and other options through
229+
* {@link org.apache.hadoop.fs.impl.OpenFileParameters}. Ensure that
230+
* FileStatus entered is up-to-date, as it will be used to create the
231+
* InputStream (with info such as contentLength, eTag)
232+
* @param path The location of file to be opened
233+
* @param parameters OpenFileParameters instance; can hold FileStatus,
234+
* Configuration, bufferSize and mandatoryKeys
235+
*/
228236
@Override
229237
protected CompletableFuture<FSDataInputStream> openFileWithOptions(
230238
final Path path, final OpenFileParameters parameters) throws IOException {
@@ -235,7 +243,7 @@ protected CompletableFuture<FSDataInputStream> openFileWithOptions(
235243
"for " + path);
236244
return LambdaUtils.eval(
237245
new CompletableFuture<>(), () ->
238-
open(path, Optional.of(parameters.getOptions())));
246+
open(path, Optional.of(parameters)));
239247
}
240248

241249
@Override

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java

Lines changed: 47 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@
115115
import org.apache.hadoop.fs.azurebfs.utils.DateTimeUtils;
116116
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
117117
import org.apache.hadoop.fs.azurebfs.utils.UriUtils;
118+
import org.apache.hadoop.fs.impl.OpenFileParameters;
118119
import org.apache.hadoop.fs.permission.AclEntry;
119120
import org.apache.hadoop.fs.permission.AclStatus;
120121
import org.apache.hadoop.fs.permission.FsAction;
@@ -129,6 +130,8 @@
129130
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_PLUS;
130131
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_STAR;
131132
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_UNDERSCORE;
133+
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DIRECTORY;
134+
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FILE;
132135
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ROOT_PATH;
133136
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SINGLE_WHITE_SPACE;
134137
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.TOKEN_VERSION;
@@ -669,44 +672,64 @@ public void createDirectory(final Path path, final FsPermission permission,
669672

670673
public AbfsInputStream openFileForRead(final Path path,
671674
final FileSystem.Statistics statistics, TracingContext tracingContext)
672-
throws AzureBlobFileSystemException {
673-
return openFileForRead(path, Optional.empty(), statistics, tracingContext);
675+
throws IOException {
676+
return openFileForRead(path, Optional.empty(), statistics,
677+
tracingContext);
674678
}
675679

676-
public AbfsInputStream openFileForRead(final Path path,
677-
final Optional<Configuration> options,
680+
public AbfsInputStream openFileForRead(Path path,
681+
final Optional<OpenFileParameters> parameters,
678682
final FileSystem.Statistics statistics, TracingContext tracingContext)
679-
throws AzureBlobFileSystemException {
680-
try (AbfsPerfInfo perfInfo = startTracking("openFileForRead", "getPathStatus")) {
683+
throws IOException {
684+
try (AbfsPerfInfo perfInfo = startTracking("openFileForRead",
685+
"getPathStatus")) {
681686
LOG.debug("openFileForRead filesystem: {} path: {}",
682-
client.getFileSystem(),
683-
path);
687+
client.getFileSystem(), path);
684688

689+
FileStatus fileStatus = parameters.map(OpenFileParameters::getStatus)
690+
.orElse(null);
685691
String relativePath = getRelativePath(path);
686-
687-
final AbfsRestOperation op = client
688-
.getPathStatus(relativePath, false, tracingContext);
689-
perfInfo.registerResult(op.getResult());
690-
691-
final String resourceType = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE);
692-
final long contentLength = Long.parseLong(op.getResult().getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH));
693-
final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG);
692+
String resourceType, eTag;
693+
long contentLength;
694+
if (fileStatus instanceof VersionedFileStatus) {
695+
path = path.makeQualified(this.uri, path);
696+
Preconditions.checkArgument(fileStatus.getPath().equals(path),
697+
String.format(
698+
"Filestatus path [%s] does not match with given path [%s]",
699+
fileStatus.getPath(), path));
700+
resourceType = fileStatus.isFile() ? FILE : DIRECTORY;
701+
contentLength = fileStatus.getLen();
702+
eTag = ((VersionedFileStatus) fileStatus).getVersion();
703+
} else {
704+
if (fileStatus != null) {
705+
LOG.warn(
706+
"Fallback to getPathStatus REST call as provided filestatus "
707+
+ "is not of type VersionedFileStatus");
708+
}
709+
AbfsHttpOperation op = client.getPathStatus(relativePath, false,
710+
tracingContext).getResult();
711+
resourceType = op.getResponseHeader(
712+
HttpHeaderConfigurations.X_MS_RESOURCE_TYPE);
713+
contentLength = Long.parseLong(
714+
op.getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH));
715+
eTag = op.getResponseHeader(HttpHeaderConfigurations.ETAG);
716+
}
694717

695718
if (parseIsDirectory(resourceType)) {
696719
throw new AbfsRestOperationException(
697-
AzureServiceErrorCode.PATH_NOT_FOUND.getStatusCode(),
698-
AzureServiceErrorCode.PATH_NOT_FOUND.getErrorCode(),
699-
"openFileForRead must be used with files and not directories",
700-
null);
720+
AzureServiceErrorCode.PATH_NOT_FOUND.getStatusCode(),
721+
AzureServiceErrorCode.PATH_NOT_FOUND.getErrorCode(),
722+
"openFileForRead must be used with files and not directories",
723+
null);
701724
}
702725

703726
perfInfo.registerSuccess(true);
704727

705728
// Add statistics for InputStream
706-
return new AbfsInputStream(client, statistics,
707-
relativePath, contentLength,
708-
populateAbfsInputStreamContext(options),
709-
eTag, tracingContext);
729+
return new AbfsInputStream(client, statistics, relativePath,
730+
contentLength, populateAbfsInputStreamContext(
731+
parameters.map(OpenFileParameters::getOptions)),
732+
eTag, tracingContext);
710733
}
711734
}
712735

hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
3939
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
4040
import org.apache.hadoop.fs.azurebfs.security.AbfsDelegationTokenManager;
41+
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
4142
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
4243
import org.apache.hadoop.fs.azurebfs.services.AuthType;
4344
import org.apache.hadoop.fs.azure.AzureNativeFileSystemStore;
@@ -429,6 +430,15 @@ public AzureBlobFileSystemStore getAbfsStore(final AzureBlobFileSystem fs) {
429430
return fs.getAbfsStore();
430431
}
431432

433+
public AbfsClient getAbfsClient(final AzureBlobFileSystemStore abfsStore) {
434+
return abfsStore.getClient();
435+
}
436+
437+
public void setAbfsClient(AzureBlobFileSystemStore abfsStore,
438+
AbfsClient client) {
439+
abfsStore.setClient(client);
440+
}
441+
432442
public Path makeQualified(Path path) throws java.io.IOException {
433443
return getFileSystem().makeQualified(path);
434444
}

hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java

Lines changed: 112 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,31 +19,40 @@
1919
package org.apache.hadoop.fs.azurebfs.services;
2020

2121
import java.io.IOException;
22-
23-
import org.junit.Assert;
24-
import org.junit.Test;
2522
import java.util.Arrays;
23+
import java.util.Optional;
24+
import java.util.Random;
25+
import java.util.concurrent.ExecutionException;
2626

2727
import org.assertj.core.api.Assertions;
28+
import org.junit.Assert;
29+
import org.junit.Test;
30+
import org.mockito.Mockito;
2831

2932
import org.apache.hadoop.conf.Configuration;
3033
import org.apache.hadoop.fs.FileSystem;
3134
import org.apache.hadoop.fs.FSDataInputStream;
3235
import org.apache.hadoop.fs.FSDataOutputStream;
3336
import org.apache.hadoop.fs.FileStatus;
37+
import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
3438
import org.apache.hadoop.fs.Path;
3539
import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
3640
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
41+
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore;
3742
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
3843
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TimeoutException;
3944
import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
4045
import org.apache.hadoop.fs.azurebfs.utils.TestCachedSASToken;
4146
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
47+
import org.apache.hadoop.fs.impl.OpenFileParameters;
4248

4349
import static org.mockito.ArgumentMatchers.any;
50+
import static org.mockito.ArgumentMatchers.anyBoolean;
51+
import static org.mockito.ArgumentMatchers.anyString;
4452
import static org.mockito.Mockito.doReturn;
4553
import static org.mockito.Mockito.doThrow;
4654
import static org.mockito.Mockito.mock;
55+
import static org.mockito.Mockito.spy;
4756
import static org.mockito.Mockito.times;
4857
import static org.mockito.Mockito.verify;
4958
import static org.mockito.Mockito.when;
@@ -192,6 +201,106 @@ public TestAbfsInputStream() throws Exception {
192201
ReadBufferManager.getBufferManager().setThresholdAgeMilliseconds(REDUCED_READ_BUFFER_AGE_THRESHOLD);
193202
}
194203

204+
private void writeBufferToNewFile(Path testFile, byte[] buffer) throws IOException {
205+
AzureBlobFileSystem fs = getFileSystem();
206+
fs.create(testFile);
207+
FSDataOutputStream out = fs.append(testFile);
208+
out.write(buffer);
209+
out.close();
210+
}
211+
212+
private void verifyOpenWithProvidedStatus(Path path, FileStatus fileStatus,
213+
byte[] buf, AbfsRestOperationType source)
214+
throws IOException, ExecutionException, InterruptedException {
215+
byte[] readBuf = new byte[buf.length];
216+
AzureBlobFileSystem fs = getFileSystem();
217+
FutureDataInputStreamBuilder builder = fs.openFile(path);
218+
builder.withFileStatus(fileStatus);
219+
FSDataInputStream in = builder.build().get();
220+
assertEquals(String.format(
221+
"Open with fileStatus [from %s result]: Incorrect number of bytes read",
222+
source), buf.length, in.read(readBuf));
223+
assertArrayEquals(String
224+
.format("Open with fileStatus [from %s result]: Incorrect read data",
225+
source), readBuf, buf);
226+
}
227+
228+
private void checkGetPathStatusCalls(Path testFile, FileStatus fileStatus,
229+
AzureBlobFileSystemStore abfsStore, AbfsClient mockClient,
230+
AbfsRestOperationType source, TracingContext tracingContext)
231+
throws IOException {
232+
233+
// verify GetPathStatus not invoked when FileStatus is provided
234+
abfsStore.openFileForRead(testFile, Optional
235+
.ofNullable(new OpenFileParameters().withStatus(fileStatus)), null, tracingContext);
236+
verify(mockClient, times(0).description((String.format(
237+
"FileStatus [from %s result] provided, GetFileStatus should not be invoked",
238+
source)))).getPathStatus(anyString(), anyBoolean(), any(TracingContext.class));
239+
240+
// verify GetPathStatus invoked when FileStatus not provided
241+
abfsStore.openFileForRead(testFile,
242+
Optional.empty(), null,
243+
tracingContext);
244+
verify(mockClient, times(1).description(
245+
"GetPathStatus should be invoked when FileStatus not provided"))
246+
.getPathStatus(anyString(), anyBoolean(), any(TracingContext.class));
247+
248+
Mockito.reset(mockClient); //clears invocation count for next test case
249+
}
250+
251+
@Test
252+
public void testOpenFileWithOptions() throws Exception {
253+
AzureBlobFileSystem fs = getFileSystem();
254+
String testFolder = "/testFolder";
255+
Path smallTestFile = new Path(testFolder + "/testFile0");
256+
Path largeTestFile = new Path(testFolder + "/testFile1");
257+
fs.mkdirs(new Path(testFolder));
258+
int readBufferSize = getConfiguration().getReadBufferSize();
259+
byte[] smallBuffer = new byte[5];
260+
byte[] largeBuffer = new byte[readBufferSize + 5];
261+
new Random().nextBytes(smallBuffer);
262+
new Random().nextBytes(largeBuffer);
263+
writeBufferToNewFile(smallTestFile, smallBuffer);
264+
writeBufferToNewFile(largeTestFile, largeBuffer);
265+
266+
FileStatus[] getFileStatusResults = {fs.getFileStatus(smallTestFile),
267+
fs.getFileStatus(largeTestFile)};
268+
FileStatus[] listStatusResults = fs.listStatus(new Path(testFolder));
269+
270+
// open with fileStatus from GetPathStatus
271+
verifyOpenWithProvidedStatus(smallTestFile, getFileStatusResults[0],
272+
smallBuffer, AbfsRestOperationType.GetPathStatus);
273+
verifyOpenWithProvidedStatus(largeTestFile, getFileStatusResults[1],
274+
largeBuffer, AbfsRestOperationType.GetPathStatus);
275+
276+
// open with fileStatus from ListStatus
277+
verifyOpenWithProvidedStatus(smallTestFile, listStatusResults[0], smallBuffer,
278+
AbfsRestOperationType.ListPaths);
279+
verifyOpenWithProvidedStatus(largeTestFile, listStatusResults[1], largeBuffer,
280+
AbfsRestOperationType.ListPaths);
281+
282+
// verify number of GetPathStatus invocations
283+
AzureBlobFileSystemStore abfsStore = getAbfsStore(fs);
284+
AbfsClient mockClient = spy(getAbfsClient(abfsStore));
285+
setAbfsClient(abfsStore, mockClient);
286+
TracingContext tracingContext = getTestTracingContext(fs, false);
287+
checkGetPathStatusCalls(smallTestFile, getFileStatusResults[0],
288+
abfsStore, mockClient, AbfsRestOperationType.GetPathStatus, tracingContext);
289+
checkGetPathStatusCalls(largeTestFile, getFileStatusResults[1],
290+
abfsStore, mockClient, AbfsRestOperationType.GetPathStatus, tracingContext);
291+
checkGetPathStatusCalls(smallTestFile, listStatusResults[0],
292+
abfsStore, mockClient, AbfsRestOperationType.ListPaths, tracingContext);
293+
checkGetPathStatusCalls(largeTestFile, listStatusResults[1],
294+
abfsStore, mockClient, AbfsRestOperationType.ListPaths, tracingContext);
295+
296+
// Verify with incorrect filestatus
297+
getFileStatusResults[0].setPath(new Path("wrongPath"));
298+
intercept(ExecutionException.class,
299+
() -> verifyOpenWithProvidedStatus(smallTestFile,
300+
getFileStatusResults[0], smallBuffer,
301+
AbfsRestOperationType.GetPathStatus));
302+
}
303+
195304
/**
196305
* This test expects AbfsInputStream to throw the exception that readAhead
197306
* thread received on read. The readAhead thread must be initiated from the

0 commit comments

Comments
 (0)