Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
e2ffb8c
ABFS: Added changes for expect hundred continue header with append re…
anmolasrani123 Feb 28, 2022
d110408
ABFS: Added changes for expect hundred continue
anmolasrani123 Mar 2, 2022
a29faa8
ABFS: Added changes for expect hundred continue
anmolasrani123 Mar 2, 2022
dacfde0
ABFS: Added changes for expect hundred continue
anmolasrani123 Mar 2, 2022
c14f458
Added retry mechanism for certain HTTP errors
anmolasrani123 Mar 4, 2022
7e40f07
Added retry mechanism for certain HTTP errors
anmolasrani123 Mar 4, 2022
ef67598
Merge branch 'apache:trunk' into HADOOP-18146
anmolanmol1234 Mar 4, 2022
93a77a7
Added retry mechanism for certain HTTP errors
anmolasrani123 Mar 30, 2022
9b43316
ABFS: Added changes for expect hundred continue header with append re…
anmolasrani123 Feb 28, 2022
899b40b
ABFS: Added changes for expect hundred continue
anmolasrani123 Mar 2, 2022
2af317d
ABFS: Added changes for expect hundred continue
anmolasrani123 Mar 2, 2022
cc9fcdb
ABFS: Added changes for expect hundred continue
anmolasrani123 Mar 2, 2022
9fc9c99
Added retry mechanism for certain HTTP errors
anmolasrani123 Mar 4, 2022
56eda26
Added retry mechanism for certain HTTP errors
anmolasrani123 Mar 4, 2022
091f1d4
Added retry mechanism for certain HTTP errors
anmolasrani123 Mar 30, 2022
3709619
Fix trunk conflict
anmolasrani123 May 12, 2022
fe33f93
Merge branch 'HADOOP-18146' of https://github.com/anmolanmol1234/hado…
anmolasrani123 Jul 20, 2022
94397c7
Added config details in md file
anmolasrani123 Jul 20, 2022
9c8f7d0
Merge branch 'trunk' into HADOOP-18146
anmolanmol1234 Jul 20, 2022
5f26061
Changing class modifier
anmolasrani123 Jul 20, 2022
f2ffb23
Merge branch 'HADOOP-18146' of https://github.com/anmolanmol1234/hado…
anmolasrani123 Jul 20, 2022
0f18d94
Spot bugs and checkstyle fixes
anmolasrani123 Aug 2, 2022
83fbd8c
Merge branch 'apache:trunk' into HADOOP-18146
anmolanmol1234 Aug 2, 2022
58c1123
remove unused imports
anmolasrani123 Aug 2, 2022
aab3128
Fix imports
anmolasrani123 Aug 22, 2022
d13f8bd
Merge branch 'apache:trunk' into HADOOP-18146
anmolanmol1234 Sep 19, 2022
478021a
Separate out account throttling
anmolasrani123 Sep 19, 2022
9fbb4de
Documentation added
anmolasrani123 Oct 25, 2022
7c43202
Formatting
anmolasrani123 Oct 25, 2022
75a3332
Merge branch 'apache:trunk' into HADOOP-18146
anmolanmol1234 Oct 25, 2022
3fc18b9
Addressed PR comments
anmolasrani123 Oct 28, 2022
aaca1c1
Addressed PR comments
anmolasrani123 Oct 28, 2022
135e04f
Addressed PR comments
anmolasrani123 Oct 28, 2022
eaf9dfc
Merge branch 'apache:trunk' into HADOOP-18146
anmolanmol1234 Nov 29, 2022
2a1834f
Addressed PR comments
anmolasrani123 Nov 29, 2022
3f37058
Merge branch 'HADOOP-18146' of https://github.com/anmolanmol1234/hado…
anmolasrani123 Nov 29, 2022
443e263
Fix for exception
anmolasrani123 Nov 29, 2022
e694264
Merge branch 'trunk' into HADOOP-18146
anmolanmol1234 Dec 5, 2022
7961d08
Merge branch 'apache:trunk' into HADOOP-18146
anmolanmol1234 Dec 5, 2022
05df41c
Update AbfsConfiguration.java
anmolanmol1234 Dec 5, 2022
457fda0
Changes for exception handling
anmolasrani123 Dec 6, 2022
f2e6f52
String correction
anmolasrani123 Dec 7, 2022
fd006bd
Tests for hundred continue
anmolasrani123 Dec 15, 2022
baf9ec7
Add tests for 100 continue
anmolasrani123 Dec 15, 2022
fe8deea
Add tests for hundred continue
anmolasrani123 Dec 15, 2022
cafd409
Parameters for test
anmolasrani123 Dec 19, 2022
f17c15a
Tests for expect header
anmolasrani123 Dec 19, 2022
61138e9
Update metrics fix
anmolasrani123 Dec 20, 2022
ac3e973
Metric update changes
anmolasrani123 Dec 20, 2022
36ec260
Tests for metric updation verification
anmolasrani123 Dec 20, 2022
8283ef2
Update md file
anmolasrani123 Dec 20, 2022
9611999
Remove unused imports
anmolasrani123 Dec 20, 2022
93003b2
Merge branch 'apache:trunk' into HADOOP-18146
anmolanmol1234 Dec 20, 2022
7899d1c
Checkstyle fixes
anmolasrani123 Dec 20, 2022
5398f2a
Checkstyle fixes
anmolasrani123 Dec 21, 2022
796b774
PR comments addressing
anmolasrani123 Dec 22, 2022
115b0b6
PR comments
anmolasrani123 Dec 22, 2022
db89c78
remove stter for connection
anmolasrani123 Dec 23, 2022
0fb9067
Update AbfsClient.java
anmolanmol1234 Dec 26, 2022
f3e2e14
Addressing PR comments
anmolanmol1234 Mar 16, 2023
51bdece
String fix
anmolanmol1234 Mar 16, 2023
83f14fb
Merge branch 'apache:trunk' into HADOOP-18146
anmolanmol1234 Mar 16, 2023
c3268dc
Remove unused imports
anmolanmol1234 Mar 16, 2023
675687c
Import fix
anmolanmol1234 Mar 16, 2023
99a9377
Checkstyle fixes
anmolanmol1234 Mar 16, 2023
e210b04
Build fixed
anmolanmol1234 Mar 17, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,11 @@
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]utils[\\/]Base64.java"/>
<suppress checks="ParameterNumber|VisibilityModifier"
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]ITestSmallWriteOptimization.java"/>
<suppress checks="VisibilityModifier"
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]services[\\/]ITestAbfsRestOperation.java"/>
<!-- allow tests to use _ for ordering. -->
<suppress checks="MethodName"
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]commit[\\/]ITestAbfsTerasort.java"/>
<suppress checks="ParameterNumber"
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]services[\\/]TestAbfsOutputStream.java"/>
</suppressions>
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,11 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_OPTIMIZE_FOOTER_READ)
private boolean optimizeFooterRead;

@BooleanConfigurationValidatorAnnotation(
ConfigurationKey = FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED,
DefaultValue = DEFAULT_FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED)
private boolean isExpectHeaderEnabled;

@BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ACCOUNT_LEVEL_THROTTLING_ENABLED,
DefaultValue = DEFAULT_FS_AZURE_ACCOUNT_LEVEL_THROTTLING_ENABLED)
private boolean accountThrottlingEnabled;
Expand Down Expand Up @@ -706,6 +711,10 @@ public String getAppendBlobDirs() {
return this.azureAppendBlobDirs;
}

public boolean isExpectHeaderEnabled() {
return this.isExpectHeaderEnabled;
}

public boolean accountThrottlingEnabled() {
return accountThrottlingEnabled;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -693,6 +693,7 @@ private AbfsOutputStreamContext populateAbfsOutputStreamContext(
}
return new AbfsOutputStreamContext(abfsConfiguration.getSasTokenRenewPeriodForStreamsInSeconds())
.withWriteBufferSize(bufferSize)
.enableExpectHeader(abfsConfiguration.isExpectHeaderEnabled())
.enableFlush(abfsConfiguration.isFlushEnabled())
.enableSmallWriteOptimization(abfsConfiguration.isSmallWriteOptimizationEnabled())
.disableOutputStreamFlush(abfsConfiguration.isOutputStreamFlushDisabled())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ public final class AbfsHttpConstants {
public static final String HTTP_METHOD_PATCH = "PATCH";
public static final String HTTP_METHOD_POST = "POST";
public static final String HTTP_METHOD_PUT = "PUT";
/**
* All status codes less than http 100 signify error
* and should qualify for retry.
*/
public static final int HTTP_CONTINUE = 100;

// Abfs generic constants
public static final String SINGLE_WHITE_SPACE = " ";
Expand Down Expand Up @@ -103,6 +108,9 @@ public final class AbfsHttpConstants {
public static final String DEFAULT_SCOPE = "default:";
public static final String PERMISSION_FORMAT = "%04d";
public static final String SUPER_USER = "$superuser";
// The HTTP 100 Continue informational status response code indicates that everything so far
// is OK and that the client should continue with the request or ignore it if it is already finished.
public static final String HUNDRED_CONTINUE = "100-continue";

public static final char CHAR_FORWARD_SLASH = '/';
public static final char CHAR_EXCLAMATION_POINT = '!';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ public final class ConfigurationKeys {
* path to determine HNS status.
*/
public static final String FS_AZURE_ACCOUNT_IS_HNS_ENABLED = "fs.azure.account.hns.enabled";
/**
* Enable or disable expect hundred continue header.
* Value: {@value}.
*/
public static final String FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED = "fs.azure.account.expect.header.enabled";
public static final String FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME = "fs.azure.account.key";
public static final String FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME_REGX = "fs\\.azure\\.account\\.key\\.(.*)";
public static final String FS_AZURE_SECURE_MODE = "fs.azure.secure.mode";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
public final class FileSystemConfigurations {

public static final String DEFAULT_FS_AZURE_ACCOUNT_IS_HNS_ENABLED = "";

public static final boolean DEFAULT_FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED = true;
public static final String USER_HOME_DIRECTORY_PREFIX = "/user";

private static final int SIXTY_SECONDS = 60 * 1000;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public final class HttpHeaderConfigurations {
public static final String X_MS_LEASE_ID = "x-ms-lease-id";
public static final String X_MS_PROPOSED_LEASE_ID = "x-ms-proposed-lease-id";
public static final String X_MS_LEASE_BREAK_PERIOD = "x-ms-lease-break-period";
public static final String EXPECT = "Expect";

private HttpHeaderConfigurations() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,33 @@
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class InvalidAbfsRestOperationException extends AbfsRestOperationException {

private static final String ERROR_MESSAGE = "InvalidAbfsRestOperationException";

public InvalidAbfsRestOperationException(
final Exception innerException) {
super(
AzureServiceErrorCode.UNKNOWN.getStatusCode(),
AzureServiceErrorCode.UNKNOWN.getErrorCode(),
innerException != null
? innerException.toString()
: "InvalidAbfsRestOperationException",
: ERROR_MESSAGE,
innerException);
}

/**
* Adds the retry count along with the exception.
* @param innerException The inner exception which is originally caught.
* @param retryCount The retry count when the exception was thrown.
*/
public InvalidAbfsRestOperationException(
final Exception innerException, int retryCount) {
super(
AzureServiceErrorCode.UNKNOWN.getStatusCode(),
AzureServiceErrorCode.UNKNOWN.getErrorCode(),
innerException != null
? innerException.toString()
: ERROR_MESSAGE + " RetryCount: " + retryCount,
innerException);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,22 @@ public enum Mode {
private final Mode mode;
private final boolean isAppendBlob;
private final String leaseId;
private boolean isExpectHeaderEnabled;

public AppendRequestParameters(final long position,
final int offset,
final int length,
final Mode mode,
final boolean isAppendBlob,
final String leaseId) {
final String leaseId,
final boolean isExpectHeaderEnabled) {
this.position = position;
this.offset = offset;
this.length = length;
this.mode = mode;
this.isAppendBlob = isAppendBlob;
this.leaseId = leaseId;
this.isExpectHeaderEnabled = isExpectHeaderEnabled;
}

public long getPosition() {
Expand All @@ -72,4 +75,12 @@ public boolean isAppendBlob() {
public String getLeaseId() {
return this.leaseId;
}

public boolean isExpectHeaderEnabled() {
return isExpectHeaderEnabled;
}

public void setExpectHeaderEnabled(boolean expectHeaderEnabled) {
isExpectHeaderEnabled = expectHeaderEnabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.HTTPS_SCHEME;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.*;
import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.*;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.RENAME_DESTINATION_PARENT_PATH_NOT_FOUND;

/**
Expand Down Expand Up @@ -656,6 +657,9 @@ public AbfsRestOperation append(final String path, final byte[] buffer,
throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
addCustomerProvidedKeyHeaders(requestHeaders);
if (reqParams.isExpectHeaderEnabled()) {
requestHeaders.add(new AbfsHttpHeader(EXPECT, HUNDRED_CONTINUE));
}
// JDK7 does not support PATCH, so to workaround the issue we will use
// PUT and specify the real method in the X-Http-Method-Override header.
requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE,
Expand All @@ -681,36 +685,49 @@ public AbfsRestOperation append(final String path, final byte[] buffer,
abfsUriQueryBuilder, cachedSasToken);

final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
AbfsRestOperationType.Append,
this,
HTTP_METHOD_PUT,
url,
requestHeaders,
buffer,
reqParams.getoffset(),
reqParams.getLength(),
sasTokenForReuse);
final AbfsRestOperation op = getAbfsRestOperationForAppend(AbfsRestOperationType.Append,
HTTP_METHOD_PUT,
url,
requestHeaders,
buffer,
reqParams.getoffset(),
reqParams.getLength(),
sasTokenForReuse);
try {
op.execute(tracingContext);
} catch (AzureBlobFileSystemException e) {
/*
If the http response code indicates a user error we retry
the same append request with expect header being disabled.
When "100-continue" header is enabled but a non Http 100 response comes,
the response message might not get set correctly by the server.
So, this handling is to avoid breaking of backward compatibility
if someone has taken dependency on the exception message,
which is created using the error string present in the response header.
*/
int responseStatusCode = ((AbfsRestOperationException) e).getStatusCode();
if (checkUserError(responseStatusCode) && reqParams.isExpectHeaderEnabled()) {
LOG.debug("User error, retrying without 100 continue enabled for the given path {}", path);
reqParams.setExpectHeaderEnabled(false);
return this.append(path, buffer, reqParams, cachedSasToken,
tracingContext);
}
// If we have no HTTP response, throw the original exception.
if (!op.hasResult()) {
throw e;
}
if (reqParams.isAppendBlob()
&& appendSuccessCheckOp(op, path,
(reqParams.getPosition() + reqParams.getLength()), tracingContext)) {
final AbfsRestOperation successOp = new AbfsRestOperation(
AbfsRestOperationType.Append,
this,
HTTP_METHOD_PUT,
url,
requestHeaders,
buffer,
reqParams.getoffset(),
reqParams.getLength(),
sasTokenForReuse);
final AbfsRestOperation successOp = getAbfsRestOperationForAppend(
AbfsRestOperationType.Append,
HTTP_METHOD_PUT,
url,
requestHeaders,
buffer,
reqParams.getoffset(),
reqParams.getLength(),
sasTokenForReuse);
successOp.hardSetResult(HttpURLConnection.HTTP_OK);
return successOp;
}
Expand All @@ -720,6 +737,48 @@ && appendSuccessCheckOp(op, path,
return op;
}

/**
* Returns the rest operation for append.
* @param operationType The AbfsRestOperationType.
* @param httpMethod specifies the httpMethod.
* @param url specifies the url.
* @param requestHeaders This includes the list of request headers.
* @param buffer The buffer to write into.
* @param bufferOffset The buffer offset.
* @param bufferLength The buffer Length.
* @param sasTokenForReuse The sasToken.
* @return AbfsRestOperation op.
*/
@VisibleForTesting
AbfsRestOperation getAbfsRestOperationForAppend(final AbfsRestOperationType operationType,
final String httpMethod,
final URL url,
final List<AbfsHttpHeader> requestHeaders,
final byte[] buffer,
final int bufferOffset,
final int bufferLength,
final String sasTokenForReuse) {
return new AbfsRestOperation(
operationType,
this,
httpMethod,
url,
requestHeaders,
buffer,
bufferOffset,
bufferLength, sasTokenForReuse);
}

/**
* Returns true if the status code lies in the range of user error.
* @param responseStatusCode http response status code.
* @return True or False.
*/
private boolean checkUserError(int responseStatusCode) {
return (responseStatusCode >= HttpURLConnection.HTTP_BAD_REQUEST
&& responseStatusCode < HttpURLConnection.HTTP_INTERNAL_ERROR);
}

// For AppendBlob its possible that the append succeeded in the backend but the request failed.
// However a retry would fail with an InvalidQueryParameterValue
// (as the current offset would be unacceptable).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.apache.hadoop.fs.azurebfs.AbfsStatistic;
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;

import static java.net.HttpURLConnection.HTTP_UNAVAILABLE;

/**
* Throttles Azure Blob File System read and write operations to achieve maximum
* throughput by minimizing errors. The errors occur when the account ingress
Expand Down Expand Up @@ -60,7 +62,7 @@ public AbfsClientThrottlingIntercept(String accountName, AbfsConfiguration abfsC

// Hide default constructor
private AbfsClientThrottlingIntercept(AbfsConfiguration abfsConfiguration) {
//Account name is kept as empty as same instance is shared across all accounts
// Account name is kept as empty as same instance is shared across all accounts.
this.accountName = "";
this.readThrottler = setAnalyzer("read", abfsConfiguration);
this.writeThrottler = setAnalyzer("write", abfsConfiguration);
Expand Down Expand Up @@ -114,6 +116,18 @@ static AbfsClientThrottlingIntercept initializeSingleton(AbfsConfiguration abfsC
return singleton;
}

/**
* Updates the metrics for the case when response code signifies throttling
* but there are some expected bytes to be sent.
* @param isThrottledOperation returns true if status code is HTTP_UNAVAILABLE
* @param abfsHttpOperation Used for status code and data transferred.
* @return true if the operation is throttled and has some bytes to transfer.
*/
private boolean updateBytesTransferred(boolean isThrottledOperation,
AbfsHttpOperation abfsHttpOperation) {
return isThrottledOperation && abfsHttpOperation.getExpectedBytesToBeSent() > 0;
}

/**
* Updates the metrics for successful and failed read and write operations.
* @param operationType Only applicable for read and write operations.
Expand All @@ -134,9 +148,22 @@ public void updateMetrics(AbfsRestOperationType operationType,
boolean isFailedOperation = (status < HttpURLConnection.HTTP_OK
|| status >= HttpURLConnection.HTTP_INTERNAL_ERROR);

// If status code is 503, it is considered as a throttled operation.
boolean isThrottledOperation = (status == HTTP_UNAVAILABLE);

switch (operationType) {
case Append:
contentLength = abfsHttpOperation.getBytesSent();
if (contentLength == 0) {
/*
Signifies the case where we could not update the bytesSent due to
throttling but there were some expectedBytesToBeSent.
*/
if (updateBytesTransferred(isThrottledOperation, abfsHttpOperation)) {
LOG.debug("Updating metrics due to throttling for path {}", abfsHttpOperation.getConnUrl().getPath());
contentLength = abfsHttpOperation.getExpectedBytesToBeSent();
}
}
if (contentLength > 0) {
writeThrottler.addBytesTransferred(contentLength,
isFailedOperation);
Expand Down
Loading