Skip to content

Commit b03da54

Browse files
committed
HADOOP-19207: [ABFS][FNSOverBlob] Response Handling of Blob Endpoint APIs and Metadata APIs (#7210)
Contributed by Anuj Modi
1 parent 0006912 commit b03da54

32 files changed

+2350
-420
lines changed

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java

Lines changed: 41 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -395,6 +395,14 @@ public class AbfsConfiguration{
395395
private String clientProvidedEncryptionKey;
396396
private String clientProvidedEncryptionKeySHA;
397397

398+
/**
399+
* Constructor for AbfsConfiguration for specified service type.
400+
* @param rawConfig used to initialize the configuration.
401+
* @param accountName the name of the azure storage account.
402+
* @param fsConfiguredServiceType service type configured for the file system.
403+
* @throws IllegalAccessException if the field is not accessible.
404+
* @throws IOException if an I/O error occurs.
405+
*/
398406
public AbfsConfiguration(final Configuration rawConfig,
399407
String accountName,
400408
AbfsServiceType fsConfiguredServiceType)
@@ -424,6 +432,13 @@ public AbfsConfiguration(final Configuration rawConfig,
424432
}
425433
}
426434

435+
/**
436+
* Constructor for AbfsConfiguration for default service type i.e. DFS.
437+
* @param rawConfig used to initialize the configuration.
438+
* @param accountName the name of the azure storage account.
439+
* @throws IllegalAccessException if the field is not accessible.
440+
* @throws IOException if an I/O error occurs.
441+
*/
427442
public AbfsConfiguration(final Configuration rawConfig, String accountName)
428443
throws IllegalAccessException, IOException {
429444
this(rawConfig, accountName, AbfsServiceType.DFS);
@@ -442,7 +457,7 @@ public Trilean getIsNamespaceEnabledAccount() {
442457
* @return the service type.
443458
*/
444459
public AbfsServiceType getFsConfiguredServiceType() {
445-
return getEnum(FS_AZURE_FNS_ACCOUNT_SERVICE_TYPE, fsConfiguredServiceType);
460+
return getCaseInsensitiveEnum(FS_AZURE_FNS_ACCOUNT_SERVICE_TYPE, fsConfiguredServiceType);
446461
}
447462

448463
/**
@@ -451,7 +466,7 @@ public AbfsServiceType getFsConfiguredServiceType() {
451466
* @return the service type.
452467
*/
453468
public AbfsServiceType getConfiguredServiceTypeForFNSAccounts() {
454-
return getEnum(FS_AZURE_FNS_ACCOUNT_SERVICE_TYPE, null);
469+
return getCaseInsensitiveEnum(FS_AZURE_FNS_ACCOUNT_SERVICE_TYPE, null);
455470
}
456471

457472
/**
@@ -460,7 +475,7 @@ public AbfsServiceType getConfiguredServiceTypeForFNSAccounts() {
460475
* @return the service type.
461476
*/
462477
public AbfsServiceType getIngressServiceType() {
463-
return getEnum(FS_AZURE_INGRESS_SERVICE_TYPE, getFsConfiguredServiceType());
478+
return getCaseInsensitiveEnum(FS_AZURE_INGRESS_SERVICE_TYPE, getFsConfiguredServiceType());
464479
}
465480

466481
/**
@@ -487,7 +502,7 @@ public void validateConfiguredServiceType(boolean isHNSEnabled)
487502
}
488503
if (isHNSEnabled && getConfiguredServiceTypeForFNSAccounts() == AbfsServiceType.BLOB) {
489504
throw new InvalidConfigurationValueException(
490-
FS_AZURE_FNS_ACCOUNT_SERVICE_TYPE, "Cannot be BLOB for HNS Account");
505+
FS_AZURE_FNS_ACCOUNT_SERVICE_TYPE, "Service Type Cannot be BLOB for HNS Account");
491506
} else if (isHNSEnabled && fsConfiguredServiceType == AbfsServiceType.BLOB) {
492507
throw new InvalidConfigurationValueException(FS_DEFAULT_NAME_KEY,
493508
"Blob Endpoint Url Cannot be used to initialize filesystem for HNS Account");
@@ -684,6 +699,28 @@ public <T extends Enum<T>> T getEnum(String name, T defaultValue) {
684699
rawConfig.getEnum(name, defaultValue));
685700
}
686701

702+
/**
703+
* Returns the account-specific enum value if it exists, then
704+
* looks for an account-agnostic value in case-insensitive manner.
705+
* @param name Account-agnostic configuration key
706+
* @param defaultValue Value returned if none is configured
707+
* @param <T> Enum type
708+
* @return enum value if one exists, else null
709+
*/
710+
public <T extends Enum<T>> T getCaseInsensitiveEnum(String name, T defaultValue) {
711+
String configValue = getString(name, null);
712+
if (configValue != null) {
713+
for (T enumConstant : defaultValue.getDeclaringClass().getEnumConstants()) { // Step 3: Iterate over enum constants
714+
if (enumConstant.name().equalsIgnoreCase(configValue)) {
715+
return enumConstant;
716+
}
717+
}
718+
// No match found
719+
throw new IllegalArgumentException("No enum constant " + defaultValue.getDeclaringClass().getCanonicalName() + "." + configValue);
720+
}
721+
return defaultValue;
722+
}
723+
687724
/**
688725
* Returns the account-agnostic enum value if it exists, else
689726
* return default.

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java

Lines changed: 29 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@
124124
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_IS_HNS_ENABLED;
125125
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BLOCK_UPLOAD_ACTIVE_BLOCKS;
126126
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BLOCK_UPLOAD_BUFFER_DIR;
127+
import static org.apache.hadoop.fs.azurebfs.constants.FSOperationType.CREATE_FILESYSTEM;
127128
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.BLOCK_UPLOAD_ACTIVE_BLOCKS_DEFAULT;
128129
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DATA_BLOCKS_BUFFER_DEFAULT;
129130
import static org.apache.hadoop.fs.azurebfs.constants.InternalConstants.CAPABILITY_SAFE_READAHEAD;
@@ -217,16 +218,16 @@ public void initialize(URI uri, Configuration configuration)
217218
tracingHeaderFormat = abfsConfiguration.getTracingHeaderFormat();
218219
this.setWorkingDirectory(this.getHomeDirectory());
219220

220-
TracingContext tracingContext = new TracingContext(clientCorrelationId,
221-
fileSystemId, FSOperationType.CREATE_FILESYSTEM, tracingHeaderFormat, listener);
221+
TracingContext initFSTracingContext = new TracingContext(clientCorrelationId,
222+
fileSystemId, FSOperationType.INIT, tracingHeaderFormat, listener);
222223

223224
/*
224225
* Validate the service type configured in the URI is valid for account type used.
225226
* HNS Account Cannot have Blob Endpoint URI.
226227
*/
227228
try {
228229
abfsConfiguration.validateConfiguredServiceType(
229-
tryGetIsNamespaceEnabled(new TracingContext(tracingContext)));
230+
tryGetIsNamespaceEnabled(initFSTracingContext));
230231
} catch (InvalidConfigurationValueException ex) {
231232
LOG.debug("File system configured with Invalid Service Type", ex);
232233
throw ex;
@@ -235,34 +236,39 @@ public void initialize(URI uri, Configuration configuration)
235236
throw new InvalidConfigurationValueException(FS_AZURE_ACCOUNT_IS_HNS_ENABLED, ex);
236237
}
237238

239+
/*
240+
* Non-hierarchical-namespace account can not have a customer-provided-key(CPK).
241+
* Fail initialization of filesystem if the configs are provided. CPK is of
242+
* two types: GLOBAL_KEY, and ENCRYPTION_CONTEXT.
243+
*/
244+
try {
245+
if ((isEncryptionContextCPK(abfsConfiguration) || isGlobalKeyCPK(
246+
abfsConfiguration)) && !tryGetIsNamespaceEnabled(new TracingContext(
247+
initFSTracingContext))) {
248+
throw new PathIOException(uri.getPath(),
249+
CPK_IN_NON_HNS_ACCOUNT_ERROR_MESSAGE);
250+
}
251+
} catch (InvalidConfigurationValueException ex) {
252+
LOG.debug("Non-Hierarchical Namespace Accounts Cannot Have CPK Enabled", ex);
253+
throw ex;
254+
} catch (AzureBlobFileSystemException ex) {
255+
LOG.debug("Failed to determine account type for service type validation", ex);
256+
throw new InvalidConfigurationValueException(FS_AZURE_ACCOUNT_IS_HNS_ENABLED, ex);
257+
}
258+
238259
// Create the file system if it does not exist.
239260
if (abfsConfiguration.getCreateRemoteFileSystemDuringInitialization()) {
240-
if (this.tryGetFileStatus(new Path(AbfsHttpConstants.ROOT_PATH), tracingContext) == null) {
261+
TracingContext createFSTracingContext = new TracingContext(initFSTracingContext);
262+
createFSTracingContext.setOperation(CREATE_FILESYSTEM);
263+
if (this.tryGetFileStatus(new Path(AbfsHttpConstants.ROOT_PATH), createFSTracingContext) == null) {
241264
try {
242-
this.createFileSystem(tracingContext);
265+
this.createFileSystem(createFSTracingContext);
243266
} catch (AzureBlobFileSystemException ex) {
244267
checkException(null, ex, AzureServiceErrorCode.FILE_SYSTEM_ALREADY_EXISTS);
245268
}
246269
}
247270
}
248271

249-
/*
250-
* Non-hierarchical-namespace account can not have a customer-provided-key(CPK).
251-
* Fail initialization of filesystem if the configs are provided. CPK is of
252-
* two types: GLOBAL_KEY, and ENCRYPTION_CONTEXT.
253-
*/
254-
if ((isEncryptionContextCPK(abfsConfiguration) || isGlobalKeyCPK(
255-
abfsConfiguration))
256-
&& !getIsNamespaceEnabled(new TracingContext(tracingContext))) {
257-
/*
258-
* Close the filesystem gracefully before throwing exception. Graceful close
259-
* will ensure that all resources are released properly.
260-
*/
261-
close();
262-
throw new PathIOException(uri.getPath(),
263-
CPK_IN_NON_HNS_ACCOUNT_ERROR_MESSAGE);
264-
}
265-
266272
LOG.trace("Initiate check for delegation token manager");
267273
if (UserGroupInformation.isSecurityEnabled()) {
268274
this.delegationTokenEnabled = abfsConfiguration.isDelegationTokenManagerEnabled();
@@ -702,7 +708,7 @@ private void incrementStatistic(AbfsStatistic statistic) {
702708
private void trailingPeriodCheck(Path path) throws IllegalArgumentException {
703709
while (!path.isRoot()) {
704710
String pathToString = path.toString();
705-
if (pathToString.length() != 0) {
711+
if (!pathToString.isEmpty()) {
706712
if (pathToString.charAt(pathToString.length() - 1) == '.') {
707713
throw new IllegalArgumentException(
708714
"ABFS does not allow files or directories to end with a dot.");

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java

Lines changed: 46 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@
150150
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_STAR;
151151
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_UNDERSCORE;
152152
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DIRECTORY;
153+
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
153154
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FILE;
154155
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ROOT_PATH;
155156
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SINGLE_WHITE_SPACE;
@@ -343,11 +344,13 @@ public void close() throws IOException {
343344
}
344345

345346
byte[] encodeAttribute(String value) throws UnsupportedEncodingException {
346-
return value.getBytes(XMS_PROPERTIES_ENCODING);
347+
// DFS Client works with ISO_8859_1 encoding, Blob Works with UTF-8.
348+
return getClient().encodeAttribute(value);
347349
}
348350

349351
String decodeAttribute(byte[] value) throws UnsupportedEncodingException {
350-
return new String(value, XMS_PROPERTIES_ENCODING);
352+
// DFS Client works with ISO_8859_1 encoding, Blob Works with UTF-8.
353+
return getClient().decodeAttribute(value);
351354
}
352355

353356
private String[] authorityParts(URI uri) throws InvalidUriAuthorityException, InvalidUriException {
@@ -485,9 +488,8 @@ public Hashtable<String, String> getFilesystemProperties(
485488
.getFilesystemProperties(tracingContext);
486489
perfInfo.registerResult(op.getResult());
487490

488-
final String xMsProperties = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_PROPERTIES);
489-
490-
parsedXmsProperties = parseCommaSeparatedXmsProperties(xMsProperties);
491+
// Handling difference in request headers formats between DFS and Blob Clients.
492+
parsedXmsProperties = getClient().getXMSProperties(op.getResult());
491493
perfInfo.registerSuccess(true);
492494

493495
return parsedXmsProperties;
@@ -533,10 +535,8 @@ public Hashtable<String, String> getPathStatus(final Path path,
533535
perfInfo.registerResult(op.getResult());
534536
contextEncryptionAdapter.destroy();
535537

536-
final String xMsProperties = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_PROPERTIES);
537-
538-
parsedXmsProperties = parseCommaSeparatedXmsProperties(xMsProperties);
539-
538+
// Handling difference in request headers formats between DFS and Blob Clients.
539+
parsedXmsProperties = getClient().getXMSProperties(op.getResult());
540540
perfInfo.registerSuccess(true);
541541

542542
return parsedXmsProperties;
@@ -899,10 +899,8 @@ public AbfsInputStream openFileForRead(Path path,
899899
} else {
900900
AbfsHttpOperation op = getClient().getPathStatus(relativePath, false,
901901
tracingContext, null).getResult();
902-
resourceType = op.getResponseHeader(
903-
HttpHeaderConfigurations.X_MS_RESOURCE_TYPE);
904-
contentLength = Long.parseLong(
905-
op.getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH));
902+
resourceType = getClient().checkIsDir(op) ? DIRECTORY : FILE;
903+
contentLength = extractContentLength(op);
906904
eTag = op.getResponseHeader(HttpHeaderConfigurations.ETAG);
907905
/*
908906
* For file created with ENCRYPTION_CONTEXT, client shall receive
@@ -983,17 +981,15 @@ public OutputStream openFileForWrite(final Path path,
983981
.getPathStatus(relativePath, false, tracingContext, null);
984982
perfInfo.registerResult(op.getResult());
985983

986-
final String resourceType = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE);
987-
final Long contentLength = Long.valueOf(op.getResult().getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH));
988-
989-
if (parseIsDirectory(resourceType)) {
984+
if (getClient().checkIsDir(op.getResult())) {
990985
throw new AbfsRestOperationException(
991986
AzureServiceErrorCode.PATH_NOT_FOUND.getStatusCode(),
992987
AzureServiceErrorCode.PATH_NOT_FOUND.getErrorCode(),
993-
"openFileForRead must be used with files and not directories",
988+
"openFileForWrite must be used with files and not directories",
994989
null);
995990
}
996991

992+
final long contentLength = extractContentLength(op.getResult());
997993
final long offset = overwrite ? 0 : contentLength;
998994

999995
perfInfo.registerSuccess(true);
@@ -1180,8 +1176,8 @@ public FileStatus getFileStatus(final Path path,
11801176
contentLength = 0;
11811177
resourceIsDir = true;
11821178
} else {
1183-
contentLength = parseContentLength(result.getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH));
1184-
resourceIsDir = parseIsDirectory(result.getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE));
1179+
contentLength = extractContentLength(result);
1180+
resourceIsDir = getClient().checkIsDir(result);
11851181
}
11861182

11871183
final String transformedOwner = identityTransformer.transformIdentityForGetRequest(
@@ -1256,10 +1252,16 @@ public String listStatus(final Path path, final String startFrom,
12561252
startFrom);
12571253

12581254
final String relativePath = getRelativePath(path);
1255+
AbfsClient listingClient = getClient();
12591256

12601257
if (continuation == null || continuation.isEmpty()) {
12611258
// generate continuation token if a valid startFrom is provided.
12621259
if (startFrom != null && !startFrom.isEmpty()) {
1260+
/*
1261+
* Blob Endpoint Does not support startFrom yet. Fallback to DFS Client.
1262+
* startFrom remains null for all HDFS APIs. This is only for internal use.
1263+
*/
1264+
listingClient = getClient(AbfsServiceType.DFS);
12631265
continuation = getIsNamespaceEnabled(tracingContext)
12641266
? generateContinuationTokenForXns(startFrom)
12651267
: generateContinuationTokenForNonXns(relativePath, startFrom);
@@ -1268,11 +1270,11 @@ public String listStatus(final Path path, final String startFrom,
12681270

12691271
do {
12701272
try (AbfsPerfInfo perfInfo = startTracking("listStatus", "listPath")) {
1271-
AbfsRestOperation op = getClient().listPath(relativePath, false,
1273+
AbfsRestOperation op = listingClient.listPath(relativePath, false,
12721274
abfsConfiguration.getListMaxResults(), continuation,
12731275
tracingContext);
12741276
perfInfo.registerResult(op.getResult());
1275-
continuation = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION);
1277+
continuation = listingClient.getContinuationFromResponse(op.getResult());
12761278
ListResultSchema retrievedSchema = op.getResult().getListResultSchema();
12771279
if (retrievedSchema == null) {
12781280
throw new AbfsRestOperationException(
@@ -1465,7 +1467,7 @@ public void modifyAclEntries(final Path path, final List<AclEntry> aclSpec,
14651467
final AbfsRestOperation op = getClient()
14661468
.getAclStatus(relativePath, useUpn, tracingContext);
14671469
perfInfoGet.registerResult(op.getResult());
1468-
final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG);
1470+
final String eTag = extractEtagHeader(op.getResult());
14691471

14701472
final Map<String, String> aclEntries = AbfsAclHelper.deserializeAclSpec(op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_ACL));
14711473

@@ -1508,7 +1510,7 @@ public void removeAclEntries(final Path path, final List<AclEntry> aclSpec,
15081510
final AbfsRestOperation op = getClient()
15091511
.getAclStatus(relativePath, isUpnFormat, tracingContext);
15101512
perfInfoGet.registerResult(op.getResult());
1511-
final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG);
1513+
final String eTag = extractEtagHeader(op.getResult());
15121514

15131515
final Map<String, String> aclEntries = AbfsAclHelper.deserializeAclSpec(op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_ACL));
15141516

@@ -1546,7 +1548,7 @@ public void removeDefaultAcl(final Path path, TracingContext tracingContext)
15461548
final AbfsRestOperation op = getClient()
15471549
.getAclStatus(relativePath, tracingContext);
15481550
perfInfoGet.registerResult(op.getResult());
1549-
final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG);
1551+
final String eTag = extractEtagHeader(op.getResult());
15501552
final Map<String, String> aclEntries = AbfsAclHelper.deserializeAclSpec(op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_ACL));
15511553
final Map<String, String> defaultAclEntries = new HashMap<>();
15521554

@@ -1590,7 +1592,7 @@ public void removeAcl(final Path path, TracingContext tracingContext)
15901592
final AbfsRestOperation op = getClient()
15911593
.getAclStatus(relativePath, tracingContext);
15921594
perfInfoGet.registerResult(op.getResult());
1593-
final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG);
1595+
final String eTag = extractEtagHeader(op.getResult());
15941596

15951597
final Map<String, String> aclEntries = AbfsAclHelper.deserializeAclSpec(op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_ACL));
15961598
final Map<String, String> newAclEntries = new HashMap<>();
@@ -1636,7 +1638,7 @@ public void setAcl(final Path path, final List<AclEntry> aclSpec,
16361638
final AbfsRestOperation op = getClient()
16371639
.getAclStatus(relativePath, isUpnFormat, tracingContext);
16381640
perfInfoGet.registerResult(op.getResult());
1639-
final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG);
1641+
final String eTag = extractEtagHeader(op.getResult());
16401642

16411643
final Map<String, String> getAclEntries = AbfsAclHelper.deserializeAclSpec(op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_ACL));
16421644

@@ -1859,12 +1861,24 @@ public String getRelativePath(final Path path) {
18591861
return relPath;
18601862
}
18611863

1862-
private long parseContentLength(final String contentLength) {
1863-
if (contentLength == null) {
1864-
return -1;
1864+
/**
1865+
* Extracts the content length from the HTTP operation's response headers.
1866+
*
1867+
* @param op The AbfsHttpOperation instance from which to extract the content length.
1868+
* This operation contains the HTTP response headers.
1869+
* @return The content length as a long value. If the Content-Length header is
1870+
* not present or is empty, returns 0.
1871+
*/
1872+
private long extractContentLength(AbfsHttpOperation op) {
1873+
long contentLength;
1874+
String contentLengthHeader = op.getResponseHeader(
1875+
HttpHeaderConfigurations.CONTENT_LENGTH);
1876+
if (!contentLengthHeader.equals(EMPTY_STRING)) {
1877+
contentLength = Long.parseLong(contentLengthHeader);
1878+
} else {
1879+
contentLength = 0;
18651880
}
1866-
1867-
return Long.parseLong(contentLength);
1881+
return contentLength;
18681882
}
18691883

18701884
private boolean parseIsDirectory(final String resourceType) {

0 commit comments

Comments
 (0)