Skip to content

Commit b15ed27

Browse files
authored
HADOOP-19187: [ABFS][FNSOverBlob] AbfsClient Refactoring to Support Multiple Implementation of Clients. (#6879)
Refactor AbfsClient into DFS and Blob Client. Contributed by Anuj Modi
1 parent 33c9ecb commit b15ed27

22 files changed

+2287
-912
lines changed

hadoop-tools/hadoop-azure/src/config/checkstyle-suppressions.xml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@
4444
<suppressions>
4545
<suppress checks="ParameterNumber|MagicNumber"
4646
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]AzureBlobFileSystemStore.java"/>
47+
<suppress checks="ParameterNumber"
48+
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]services[\\/]AbfsClient.java"/>
4749
<suppress checks="ParameterNumber|MagicNumber"
4850
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]utils[\\/]Base64.java"/>
4951
<suppress checks="ParameterNumber|VisibilityModifier"

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java

Lines changed: 80 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.lang.reflect.Field;
2323

2424
import org.apache.hadoop.classification.VisibleForTesting;
25+
import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType;
2526
import org.apache.hadoop.fs.azurebfs.services.FixedSASTokenProvider;
2627
import org.apache.hadoop.fs.azurebfs.constants.HttpOperationType;
2728
import org.apache.hadoop.fs.azurebfs.utils.MetricFormat;
@@ -74,6 +75,7 @@
7475
import org.slf4j.Logger;
7576
import org.slf4j.LoggerFactory;
7677

78+
import static org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY;
7779
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
7880
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.*;
7981
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.*;
@@ -87,13 +89,19 @@ public class AbfsConfiguration{
8789

8890
private final Configuration rawConfig;
8991
private final String accountName;
92+
// Service type identified from URL used to initialize FileSystem.
93+
private final AbfsServiceType fsConfiguredServiceType;
9094
private final boolean isSecure;
9195
private static final Logger LOG = LoggerFactory.getLogger(AbfsConfiguration.class);
9296

9397
@StringConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ACCOUNT_IS_HNS_ENABLED,
9498
DefaultValue = DEFAULT_FS_AZURE_ACCOUNT_IS_HNS_ENABLED)
9599
private String isNamespaceEnabledAccount;
96100

101+
@BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ENABLE_DFSTOBLOB_FALLBACK,
102+
DefaultValue = DEFAULT_FS_AZURE_ENABLE_DFSTOBLOB_FALLBACK)
103+
private boolean isDfsToBlobFallbackEnabled;
104+
97105
@IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_WRITE_MAX_CONCURRENT_REQUESTS,
98106
DefaultValue = -1)
99107
private int writeMaxConcurrentRequestCount;
@@ -408,11 +416,14 @@ public class AbfsConfiguration{
408416
private String clientProvidedEncryptionKey;
409417
private String clientProvidedEncryptionKeySHA;
410418

411-
public AbfsConfiguration(final Configuration rawConfig, String accountName)
412-
throws IllegalAccessException, InvalidConfigurationValueException, IOException {
419+
public AbfsConfiguration(final Configuration rawConfig,
420+
String accountName,
421+
AbfsServiceType fsConfiguredServiceType)
422+
throws IllegalAccessException, IOException {
413423
this.rawConfig = ProviderUtils.excludeIncompatibleCredentialProviders(
414424
rawConfig, AzureBlobFileSystem.class);
415425
this.accountName = accountName;
426+
this.fsConfiguredServiceType = fsConfiguredServiceType;
416427
this.isSecure = getBoolean(FS_AZURE_SECURE_MODE, false);
417428

418429
Field[] fields = this.getClass().getDeclaredFields();
@@ -434,10 +445,75 @@ public AbfsConfiguration(final Configuration rawConfig, String accountName)
434445
}
435446
}
436447

448+
public AbfsConfiguration(final Configuration rawConfig, String accountName)
449+
throws IllegalAccessException, IOException {
450+
this(rawConfig, accountName, AbfsServiceType.DFS);
451+
}
452+
437453
public Trilean getIsNamespaceEnabledAccount() {
438454
return Trilean.getTrilean(isNamespaceEnabledAccount);
439455
}
440456

457+
/**
458+
* Returns the service type to be used based on the filesystem configuration.
459+
* Precedence is given to service type configured for FNS Accounts using
460+
* "fs.azure.fns.account.service.type". If not configured, then the service
461+
* type identified from url used to initialize filesystem will be used.
462+
* @return the service type.
463+
*/
464+
public AbfsServiceType getFsConfiguredServiceType() {
465+
return getEnum(FS_AZURE_FNS_ACCOUNT_SERVICE_TYPE, fsConfiguredServiceType);
466+
}
467+
468+
/**
469+
* Returns the service type configured for FNS Accounts to override the
470+
* service type identified by URL used to initialize the filesystem.
471+
* @return the service type.
472+
*/
473+
public AbfsServiceType getConfiguredServiceTypeForFNSAccounts() {
474+
return getEnum(FS_AZURE_FNS_ACCOUNT_SERVICE_TYPE, null);
475+
}
476+
477+
/**
478+
* Returns the service type to be used for Ingress Operations irrespective of account type.
479+
* Default value is the same as the service type configured for the file system.
480+
* @return the service type.
481+
*/
482+
public AbfsServiceType getIngressServiceType() {
483+
return getEnum(FS_AZURE_INGRESS_SERVICE_TYPE, getFsConfiguredServiceType());
484+
}
485+
486+
/**
487+
* Returns whether there is a need to move traffic from DFS to Blob.
488+
* Needed when the service type is DFS and operations are experiencing compatibility issues.
489+
* @return true if fallback enabled.
490+
*/
491+
public boolean isDfsToBlobFallbackEnabled() {
492+
return isDfsToBlobFallbackEnabled;
493+
}
494+
495+
/**
496+
* Checks if the service type configured is valid for account type used.
497+
* HNS Enabled accounts cannot have service type as BLOB.
498+
* @param isHNSEnabled Flag to indicate if HNS is enabled for the account.
499+
* @throws InvalidConfigurationValueException if the service type is invalid.
500+
*/
501+
public void validateConfiguredServiceType(boolean isHNSEnabled)
502+
throws InvalidConfigurationValueException {
503+
// Todo: [FnsOverBlob] - Remove this check, Failing FS Init with Blob Endpoint Until FNS over Blob is ready.
504+
if (getFsConfiguredServiceType() == AbfsServiceType.BLOB) {
505+
throw new InvalidConfigurationValueException(FS_DEFAULT_NAME_KEY,
506+
"Blob Endpoint Support not yet available");
507+
}
508+
if (isHNSEnabled && getConfiguredServiceTypeForFNSAccounts() == AbfsServiceType.BLOB) {
509+
throw new InvalidConfigurationValueException(
510+
FS_AZURE_FNS_ACCOUNT_SERVICE_TYPE, "Cannot be BLOB for HNS Account");
511+
} else if (isHNSEnabled && fsConfiguredServiceType == AbfsServiceType.BLOB) {
512+
throw new InvalidConfigurationValueException(FS_DEFAULT_NAME_KEY,
513+
"Blob Endpoint Url Cannot be used to initialize filesystem for HNS Account");
514+
}
515+
}
516+
441517
/**
442518
* Gets the Azure Storage account name corresponding to this instance of configuration.
443519
* @return the Azure Storage account name
@@ -478,6 +554,7 @@ public String get(String key) {
478554
* Returns the account-specific value if it exists, then looks for an
479555
* account-agnostic value.
480556
* @param key Account-agnostic configuration key
557+
* @param defaultValue Value returned if none is configured
481558
* @return value if one exists, else the default value
482559
*/
483560
public String getString(String key, String defaultValue) {
@@ -522,7 +599,7 @@ public int getInt(String key, int defaultValue) {
522599
* looks for an account-agnostic value.
523600
* @param key Account-agnostic configuration key
524601
* @return value in String form if one exists, else null
525-
* @throws IOException
602+
* @throws IOException if parsing fails.
526603
*/
527604
public String getPasswordString(String key) throws IOException {
528605
char[] passchars = rawConfig.getPassword(accountConf(key));

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java

Lines changed: 51 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545

4646
import org.apache.commons.lang3.StringUtils;
4747
import org.apache.hadoop.classification.VisibleForTesting;
48+
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidConfigurationValueException;
4849
import org.apache.hadoop.fs.impl.BackReference;
4950
import org.apache.hadoop.security.ProviderUtils;
5051
import org.apache.hadoop.util.Preconditions;
@@ -109,13 +110,16 @@
109110
import org.apache.hadoop.util.LambdaUtils;
110111
import org.apache.hadoop.util.Progressable;
111112

113+
import static java.net.HttpURLConnection.HTTP_BAD_REQUEST;
112114
import static java.net.HttpURLConnection.HTTP_CONFLICT;
115+
import static java.net.HttpURLConnection.HTTP_INTERNAL_ERROR;
113116
import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL;
114117
import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL_DEFAULT;
115118
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_STANDARD_OPTIONS;
116119
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.*;
117120
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CPK_IN_NON_HNS_ACCOUNT_ERROR_MESSAGE;
118121
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.DATA_BLOCKS_BUFFER;
122+
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_IS_HNS_ENABLED;
119123
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BLOCK_UPLOAD_ACTIVE_BLOCKS;
120124
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BLOCK_UPLOAD_BUFFER_DIR;
121125
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.BLOCK_UPLOAD_ACTIVE_BLOCKS_DEFAULT;
@@ -213,6 +217,23 @@ public void initialize(URI uri, Configuration configuration)
213217

214218
TracingContext tracingContext = new TracingContext(clientCorrelationId,
215219
fileSystemId, FSOperationType.CREATE_FILESYSTEM, tracingHeaderFormat, listener);
220+
221+
/*
222+
* Validate the service type configured in the URI is valid for account type used.
223+
* HNS Account Cannot have Blob Endpoint URI.
224+
*/
225+
try {
226+
abfsConfiguration.validateConfiguredServiceType(
227+
tryGetIsNamespaceEnabled(new TracingContext(tracingContext)));
228+
} catch (InvalidConfigurationValueException ex) {
229+
LOG.debug("File system configured with Invalid Service Type", ex);
230+
throw ex;
231+
} catch (AzureBlobFileSystemException ex) {
232+
LOG.debug("Failed to determine account type for service type validation", ex);
233+
throw new InvalidConfigurationValueException(FS_AZURE_ACCOUNT_IS_HNS_ENABLED, ex);
234+
}
235+
236+
// Create the file system if it does not exist.
216237
if (abfsConfiguration.getCreateRemoteFileSystemDuringInitialization()) {
217238
if (this.tryGetFileStatus(new Path(AbfsHttpConstants.ROOT_PATH), tracingContext) == null) {
218239
try {
@@ -230,10 +251,7 @@ public void initialize(URI uri, Configuration configuration)
230251
*/
231252
if ((isEncryptionContextCPK(abfsConfiguration) || isGlobalKeyCPK(
232253
abfsConfiguration))
233-
&& !getIsNamespaceEnabled(
234-
new TracingContext(clientCorrelationId, fileSystemId,
235-
FSOperationType.CREATE_FILESYSTEM, tracingHeaderFormat,
236-
listener))) {
254+
&& !getIsNamespaceEnabled(new TracingContext(tracingContext))) {
237255
/*
238256
* Close the filesystem gracefully before throwing exception. Graceful close
239257
* will ensure that all resources are released properly.
@@ -1400,6 +1418,34 @@ private FileStatus tryGetFileStatus(final Path f, TracingContext tracingContext)
14001418
}
14011419
}
14021420

1421+
/**
1422+
* Utility function to check if the namespace is enabled on the storage account.
1423+
* If request fails with 4xx other than 400, it will be inferred as HNS.
1424+
* @param tracingContext tracing context
1425+
* @return true if namespace is enabled, false otherwise.
1426+
* @throws AzureBlobFileSystemException if any other error occurs.
1427+
*/
1428+
private boolean tryGetIsNamespaceEnabled(TracingContext tracingContext)
1429+
throws AzureBlobFileSystemException{
1430+
try {
1431+
return getIsNamespaceEnabled(tracingContext);
1432+
} catch (AbfsRestOperationException ex) {
1433+
/*
1434+
* Exception will be thrown for any non 400 error code.
1435+
* If status code is in 4xx range, it means it's an HNS account.
1436+
* If status code is in 5xx range, it means nothing can be inferred.
1437+
* In case of network errors status code will be -1.
1438+
*/
1439+
int statusCode = ex.getStatusCode();
1440+
if (statusCode > HTTP_BAD_REQUEST && statusCode < HTTP_INTERNAL_ERROR) {
1441+
LOG.debug("getNamespace failed with non 400 user error", ex);
1442+
statIncrement(ERROR_IGNORED);
1443+
return true;
1444+
}
1445+
throw ex;
1446+
}
1447+
}
1448+
14031449
private boolean fileSystemExists() throws IOException {
14041450
LOG.debug(
14051451
"AzureBlobFileSystem.fileSystemExists uri: {}", uri);
@@ -1660,7 +1706,7 @@ AbfsDelegationTokenManager getDelegationTokenManager() {
16601706
@VisibleForTesting
16611707
boolean getIsNamespaceEnabled(TracingContext tracingContext)
16621708
throws AzureBlobFileSystemException {
1663-
return abfsStore.getIsNamespaceEnabled(tracingContext);
1709+
return getAbfsStore().getIsNamespaceEnabled(tracingContext);
16641710
}
16651711

16661712
/**

0 commit comments

Comments
 (0)