Skip to content

Commit a4122e2

Browse files
committed
HADOOP-18657. Tune ABFS create() retry logic
Production code changes; no tests yet. Something with mockito is going to be needed here Change-Id: I430a9f0e6796461ccec8c23cd80d024258703048
1 parent 9274018 commit a4122e2

File tree

1 file changed

+33
-12
lines changed

1 file changed

+33
-12
lines changed

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java

Lines changed: 33 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import java.util.concurrent.TimeUnit;
5656

5757
import org.apache.hadoop.classification.VisibleForTesting;
58+
import org.apache.hadoop.fs.azurebfs.services.AbfsErrors;
5859
import org.apache.hadoop.util.Preconditions;
5960
import org.apache.hadoop.thirdparty.com.google.common.base.Strings;
6061
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Futures;
@@ -611,7 +612,7 @@ private AbfsRestOperation conditionalCreateOverwriteFile(final String relativePa
611612
final String umask,
612613
final boolean isAppendBlob,
613614
TracingContext tracingContext) throws AzureBlobFileSystemException {
614-
AbfsRestOperation op;
615+
AbfsRestOperation op = null;
615616

616617
try {
617618
// Trigger a create with overwrite=false first so that eTag fetch can be
@@ -621,37 +622,57 @@ private AbfsRestOperation conditionalCreateOverwriteFile(final String relativePa
621622
isAppendBlob, null, tracingContext);
622623

623624
} catch (AbfsRestOperationException e) {
625+
LOG.debug("Failed to create {}", relativePath, e);
624626
if (e.getStatusCode() == HttpURLConnection.HTTP_CONFLICT) {
625627
// File pre-exists, fetch eTag
628+
LOG.debug("Fetching etag of {}", relativePath);
626629
try {
627630
op = client.getPathStatus(relativePath, false, tracingContext);
628631
} catch (AbfsRestOperationException ex) {
632+
LOG.debug("Failed to to getPathStatus {}", relativePath, ex);
629633
if (ex.getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) {
630634
// Is a parallel access case, as file which was found to be
631635
// present went missing by this request.
632-
throw new ConcurrentWriteOperationDetectedException(
633-
"Parallel access to the create path detected. Failing request "
634-
+ "to honor single writer semantics");
636+
// this means the other thread deleted it and the conflict
637+
// has implicitly been resolved.
638+
LOG.debug("File at {} has been deleted; creation can continue", relativePath);
635639
} else {
636640
throw ex;
637641
}
638642
}
639643

640-
String eTag = op.getResult()
641-
.getResponseHeader(HttpHeaderConfigurations.ETAG);
644+
String eTag = op != null
645+
? op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG)
646+
: null;
642647

648+
LOG.debug("Attempting to create file {} with etag of {}", relativePath, eTag);
643649
try {
644-
// overwrite only if eTag matches with the file properties fetched befpre
645-
op = client.createPath(relativePath, true, true, permission, umask,
650+
// overwrite only if eTag matches with the file properties fetched or the file
651+
// was deleted and there is no etag.
652+
// if the etag was not retrieved, overwrite is still false, so will fail
653+
// if another process has just created the file
654+
op = client.createPath(relativePath, true, eTag != null, permission, umask,
646655
isAppendBlob, eTag, tracingContext);
647656
} catch (AbfsRestOperationException ex) {
648-
if (ex.getStatusCode() == HttpURLConnection.HTTP_PRECON_FAILED) {
657+
final int sc = ex.getStatusCode();
658+
LOG.debug("Failed to create file {} with etag {}; status code={}",
659+
relativePath, eTag, sc, ex);
660+
if (sc == HttpURLConnection.HTTP_PRECON_FAILED
661+
|| sc == HttpURLConnection.HTTP_CONFLICT) {
649662
// Is a parallel access case, as file with eTag was just queried
650663
// and precondition failure can happen only when another file with
651664
// different etag got created.
652-
throw new ConcurrentWriteOperationDetectedException(
653-
"Parallel access to the create path detected. Failing request "
654-
+ "to honor single writer semantics");
665+
// OR leasing is enabled on the directory and this client
666+
// does not have the lease.
667+
final ConcurrentWriteOperationDetectedException ex2 =
668+
new ConcurrentWriteOperationDetectedException(
669+
AbfsErrors.ERR_PARALLEL_ACCESS_DETECTED
670+
+ " Path =\"" + relativePath+ "\""
671+
+ "; Status code =" + sc
672+
+ "; etag = \"" + eTag + "\""
673+
+ "; error =" + ex.getErrorMessage());
674+
ex2.initCause(ex);
675+
throw ex2;
655676
} else {
656677
throw ex;
657678
}

0 commit comments

Comments
 (0)