Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,11 @@
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]utils[\\/]Base64.java"/>
<suppress checks="ParameterNumber|VisibilityModifier"
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]ITestSmallWriteOptimization.java"/>
<suppress checks="VisibilityModifier"
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]services[\\/]ITestAbfsRestOperation.java"/>
<!-- allow tests to use _ for ordering. -->
<suppress checks="MethodName"
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]commit[\\/]ITestAbfsTerasort.java"/>
<suppress checks="ParameterNumber"
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]services[\\/]TestAbfsOutputStream.java"/>
</suppressions>
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,11 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_OPTIMIZE_FOOTER_READ)
private boolean optimizeFooterRead;

@BooleanConfigurationValidatorAnnotation(
ConfigurationKey = FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED,
DefaultValue = DEFAULT_FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED)
private boolean isExpectHeaderEnabled;

@BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ACCOUNT_LEVEL_THROTTLING_ENABLED,
DefaultValue = DEFAULT_FS_AZURE_ACCOUNT_LEVEL_THROTTLING_ENABLED)
private boolean accountThrottlingEnabled;
Expand Down Expand Up @@ -706,6 +711,10 @@ public String getAppendBlobDirs() {
return this.azureAppendBlobDirs;
}

public boolean isExpectHeaderEnabled() {
return this.isExpectHeaderEnabled;
}

public boolean accountThrottlingEnabled() {
return accountThrottlingEnabled;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -693,6 +693,7 @@ private AbfsOutputStreamContext populateAbfsOutputStreamContext(
}
return new AbfsOutputStreamContext(abfsConfiguration.getSasTokenRenewPeriodForStreamsInSeconds())
.withWriteBufferSize(bufferSize)
.enableExpectHeader(abfsConfiguration.isExpectHeaderEnabled())
.enableFlush(abfsConfiguration.isFlushEnabled())
.enableSmallWriteOptimization(abfsConfiguration.isSmallWriteOptimizationEnabled())
.disableOutputStreamFlush(abfsConfiguration.isOutputStreamFlushDisabled())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ public final class AbfsHttpConstants {
public static final String HTTP_METHOD_PATCH = "PATCH";
public static final String HTTP_METHOD_POST = "POST";
public static final String HTTP_METHOD_PUT = "PUT";
/**
* All status codes less than http 100 signify error
* and should qualify for retry.
*/
public static final int HTTP_CONTINUE = 100;

// Abfs generic constants
public static final String SINGLE_WHITE_SPACE = " ";
Expand Down Expand Up @@ -103,6 +108,9 @@ public final class AbfsHttpConstants {
public static final String DEFAULT_SCOPE = "default:";
public static final String PERMISSION_FORMAT = "%04d";
public static final String SUPER_USER = "$superuser";
// The HTTP 100 Continue informational status response code indicates that everything so far
// is OK and that the client should continue with the request or ignore it if it is already finished.
public static final String HUNDRED_CONTINUE = "100-continue";

public static final char CHAR_FORWARD_SLASH = '/';
public static final char CHAR_EXCLAMATION_POINT = '!';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ public final class ConfigurationKeys {
* path to determine HNS status.
*/
public static final String FS_AZURE_ACCOUNT_IS_HNS_ENABLED = "fs.azure.account.hns.enabled";
/**
* Enable or disable expect hundred continue header.
* Value: {@value}.
*/
public static final String FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED = "fs.azure.account.expect.header.enabled";
public static final String FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME = "fs.azure.account.key";
public static final String FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME_REGX = "fs\\.azure\\.account\\.key\\.(.*)";
public static final String FS_AZURE_SECURE_MODE = "fs.azure.secure.mode";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
public final class FileSystemConfigurations {

public static final String DEFAULT_FS_AZURE_ACCOUNT_IS_HNS_ENABLED = "";

public static final boolean DEFAULT_FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED = true;
public static final String USER_HOME_DIRECTORY_PREFIX = "/user";

private static final int SIXTY_SECONDS = 60 * 1000;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public final class HttpHeaderConfigurations {
public static final String X_MS_LEASE_ID = "x-ms-lease-id";
public static final String X_MS_PROPOSED_LEASE_ID = "x-ms-proposed-lease-id";
public static final String X_MS_LEASE_BREAK_PERIOD = "x-ms-lease-break-period";
public static final String EXPECT = "Expect";

private HttpHeaderConfigurations() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,33 @@
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class InvalidAbfsRestOperationException extends AbfsRestOperationException {

private static final String ERROR_MESSAGE = "InvalidAbfsRestOperationException";

public InvalidAbfsRestOperationException(
final Exception innerException) {
super(
AzureServiceErrorCode.UNKNOWN.getStatusCode(),
AzureServiceErrorCode.UNKNOWN.getErrorCode(),
innerException != null
? innerException.toString()
: "InvalidAbfsRestOperationException",
: ERROR_MESSAGE,
innerException);
}

/**
* Adds the retry count along with the exception.
* @param innerException The inner exception which is originally caught.
* @param retryCount The retry count when the exception was thrown.
*/
public InvalidAbfsRestOperationException(
final Exception innerException, int retryCount) {
super(
AzureServiceErrorCode.UNKNOWN.getStatusCode(),
AzureServiceErrorCode.UNKNOWN.getErrorCode(),
innerException != null
? innerException.toString()
: ERROR_MESSAGE + " RetryCount: " + retryCount,
innerException);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,22 @@ public enum Mode {
private final Mode mode;
private final boolean isAppendBlob;
private final String leaseId;
private boolean isExpectHeaderEnabled;

public AppendRequestParameters(final long position,
final int offset,
final int length,
final Mode mode,
final boolean isAppendBlob,
final String leaseId) {
final String leaseId,
final boolean isExpectHeaderEnabled) {
this.position = position;
this.offset = offset;
this.length = length;
this.mode = mode;
this.isAppendBlob = isAppendBlob;
this.leaseId = leaseId;
this.isExpectHeaderEnabled = isExpectHeaderEnabled;
}

public long getPosition() {
Expand All @@ -72,4 +75,12 @@ public boolean isAppendBlob() {
public String getLeaseId() {
return this.leaseId;
}

public boolean isExpectHeaderEnabled() {
return isExpectHeaderEnabled;
}

public void setExpectHeaderEnabled(boolean expectHeaderEnabled) {
isExpectHeaderEnabled = expectHeaderEnabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.HTTPS_SCHEME;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.*;
import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.*;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.RENAME_DESTINATION_PARENT_PATH_NOT_FOUND;

/**
Expand Down Expand Up @@ -656,6 +657,9 @@ public AbfsRestOperation append(final String path, final byte[] buffer,
throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
addCustomerProvidedKeyHeaders(requestHeaders);
if (reqParams.isExpectHeaderEnabled()) {
requestHeaders.add(new AbfsHttpHeader(EXPECT, HUNDRED_CONTINUE));
}
// JDK7 does not support PATCH, so to workaround the issue we will use
// PUT and specify the real method in the X-Http-Method-Override header.
requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE,
Expand All @@ -681,36 +685,49 @@ public AbfsRestOperation append(final String path, final byte[] buffer,
abfsUriQueryBuilder, cachedSasToken);

final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
AbfsRestOperationType.Append,
this,
HTTP_METHOD_PUT,
url,
requestHeaders,
buffer,
reqParams.getoffset(),
reqParams.getLength(),
sasTokenForReuse);
final AbfsRestOperation op = getAbfsRestOperationForAppend(AbfsRestOperationType.Append,
HTTP_METHOD_PUT,
url,
requestHeaders,
buffer,
reqParams.getoffset(),
reqParams.getLength(),
sasTokenForReuse);
try {
op.execute(tracingContext);
} catch (AzureBlobFileSystemException e) {
/*
If the http response code indicates a user error we retry
the same append request with expect header being disabled.
When "100-continue" header is enabled but a non Http 100 response comes,
the response message might not get set correctly by the server.
So, this handling is to avoid breaking of backward compatibility
if someone has taken dependency on the exception message,
which is created using the error string present in the response header.
*/
int responseStatusCode = ((AbfsRestOperationException) e).getStatusCode();
if (checkUserError(responseStatusCode) && reqParams.isExpectHeaderEnabled()) {
LOG.debug("User error, retrying without 100 continue enabled for the given path {}", path);
reqParams.setExpectHeaderEnabled(false);
return this.append(path, buffer, reqParams, cachedSasToken,
tracingContext);
}
// If we have no HTTP response, throw the original exception.
if (!op.hasResult()) {
throw e;
}
if (reqParams.isAppendBlob()
&& appendSuccessCheckOp(op, path,
(reqParams.getPosition() + reqParams.getLength()), tracingContext)) {
final AbfsRestOperation successOp = new AbfsRestOperation(
AbfsRestOperationType.Append,
this,
HTTP_METHOD_PUT,
url,
requestHeaders,
buffer,
reqParams.getoffset(),
reqParams.getLength(),
sasTokenForReuse);
final AbfsRestOperation successOp = getAbfsRestOperationForAppend(
AbfsRestOperationType.Append,
HTTP_METHOD_PUT,
url,
requestHeaders,
buffer,
reqParams.getoffset(),
reqParams.getLength(),
sasTokenForReuse);
successOp.hardSetResult(HttpURLConnection.HTTP_OK);
return successOp;
}
Expand All @@ -720,6 +737,48 @@ && appendSuccessCheckOp(op, path,
return op;
}

/**
* Returns the rest operation for append.
* @param operationType The AbfsRestOperationType.
* @param httpMethod specifies the httpMethod.
* @param url specifies the url.
* @param requestHeaders This includes the list of request headers.
* @param buffer The buffer to write into.
* @param bufferOffset The buffer offset.
* @param bufferLength The buffer Length.
* @param sasTokenForReuse The sasToken.
* @return AbfsRestOperation op.
*/
@VisibleForTesting
AbfsRestOperation getAbfsRestOperationForAppend(final AbfsRestOperationType operationType,
final String httpMethod,
final URL url,
final List<AbfsHttpHeader> requestHeaders,
final byte[] buffer,
final int bufferOffset,
final int bufferLength,
final String sasTokenForReuse) {
return new AbfsRestOperation(
operationType,
this,
httpMethod,
url,
requestHeaders,
buffer,
bufferOffset,
bufferLength, sasTokenForReuse);
}

/**
* Returns true if the status code lies in the range of user error.
* @param responseStatusCode http response status code.
* @return True or False.
*/
private boolean checkUserError(int responseStatusCode) {
return (responseStatusCode >= HttpURLConnection.HTTP_BAD_REQUEST
&& responseStatusCode < HttpURLConnection.HTTP_INTERNAL_ERROR);
}

// For AppendBlob its possible that the append succeeded in the backend but the request failed.
// However a retry would fail with an InvalidQueryParameterValue
// (as the current offset would be unacceptable).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.apache.hadoop.fs.azurebfs.AbfsStatistic;
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;

import static java.net.HttpURLConnection.HTTP_UNAVAILABLE;

/**
* Throttles Azure Blob File System read and write operations to achieve maximum
* throughput by minimizing errors. The errors occur when the account ingress
Expand Down Expand Up @@ -60,7 +62,7 @@ public AbfsClientThrottlingIntercept(String accountName, AbfsConfiguration abfsC

// Hide default constructor
private AbfsClientThrottlingIntercept(AbfsConfiguration abfsConfiguration) {
//Account name is kept as empty as same instance is shared across all accounts
// Account name is kept as empty as same instance is shared across all accounts.
this.accountName = "";
this.readThrottler = setAnalyzer("read", abfsConfiguration);
this.writeThrottler = setAnalyzer("write", abfsConfiguration);
Expand Down Expand Up @@ -114,6 +116,18 @@ static AbfsClientThrottlingIntercept initializeSingleton(AbfsConfiguration abfsC
return singleton;
}

/**
* Updates the metrics for the case when response code signifies throttling
* but there are some expected bytes to be sent.
* @param isThrottledOperation returns true if status code is HTTP_UNAVAILABLE
* @param abfsHttpOperation Used for status code and data transferred.
* @return true if the operation is throttled and has some bytes to transfer.
*/
private boolean updateBytesTransferred(boolean isThrottledOperation,
AbfsHttpOperation abfsHttpOperation) {
return isThrottledOperation && abfsHttpOperation.getExpectedBytesToBeSent() > 0;
}

/**
* Updates the metrics for successful and failed read and write operations.
* @param operationType Only applicable for read and write operations.
Expand All @@ -134,9 +148,22 @@ public void updateMetrics(AbfsRestOperationType operationType,
boolean isFailedOperation = (status < HttpURLConnection.HTTP_OK
|| status >= HttpURLConnection.HTTP_INTERNAL_ERROR);

// If status code is 503, it is considered as a throttled operation.
boolean isThrottledOperation = (status == HTTP_UNAVAILABLE);

switch (operationType) {
case Append:
contentLength = abfsHttpOperation.getBytesSent();
if (contentLength == 0) {
/*
Signifies the case where we could not update the bytesSent due to
throttling but there were some expectedBytesToBeSent.
*/
if (updateBytesTransferred(isThrottledOperation, abfsHttpOperation)) {
LOG.debug("Updating metrics due to throttling for path {}", abfsHttpOperation.getConnUrl().getPath());
contentLength = abfsHttpOperation.getExpectedBytesToBeSent();
}
}
if (contentLength > 0) {
writeThrottler.addBytesTransferred(contentLength,
isFailedOperation);
Expand Down
Loading