|
172 | 172 | import static org.apache.hadoop.fs.s3a.auth.delegation.S3ADelegationTokens.TokenIssuingPolicy.NoTokensAvailable; |
173 | 173 | import static org.apache.hadoop.fs.s3a.auth.delegation.S3ADelegationTokens.hasDelegationTokenBinding; |
174 | 174 | import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.submit; |
| 175 | +import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletion; |
175 | 176 | import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletionIgnoringExceptions; |
176 | 177 | import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_404; |
| 178 | +import static org.apache.hadoop.fs.s3a.impl.InternalConstants.X_DIRECTORY; |
177 | 179 | import static org.apache.hadoop.fs.s3a.impl.NetworkBinding.fixBucketRegion; |
178 | 180 | import static org.apache.hadoop.io.IOUtils.cleanupWithLogger; |
179 | 181 |
|
@@ -287,6 +289,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, |
287 | 289 | private final S3AFileSystem.OperationCallbacksImpl |
288 | 290 | operationCallbacks = new OperationCallbacksImpl(); |
289 | 291 |
|
| 292 | + /** |
| 293 | + * Should directory marker use be optimized? |
| 294 | + */ |
| 295 | + private boolean optimizeDirectoryOperations; |
| 296 | + |
290 | 297 | /** Add any deprecated keys. */ |
291 | 298 | @SuppressWarnings("deprecation") |
292 | 299 | private static void addDeprecatedKeys() { |
@@ -411,7 +418,11 @@ public void initialize(URI name, Configuration originalConf) |
411 | 418 |
|
412 | 419 | // instantiate S3 Select support |
413 | 420 | selectBinding = new SelectBinding(writeHelper); |
414 | | - |
| 421 | + optimizeDirectoryOperations = conf.getBoolean( |
| 422 | + EXPERIMENTAL_OPTIMIZED_DIRECTORY_OPERATIONS, false); |
| 423 | + if (optimizeDirectoryOperations) { |
| 424 | + LOG.info("Using experimental optimized directory operations"); |
| 425 | + } |
415 | 426 | boolean blockUploadEnabled = conf.getBoolean(FAST_UPLOAD, true); |
416 | 427 |
|
417 | 428 | if (!blockUploadEnabled) { |
@@ -1495,8 +1506,21 @@ public void finishRename(final Path sourceRenamed, final Path destCreated) |
1495 | 1506 | Path destParent = destCreated.getParent(); |
1496 | 1507 | if (!sourceRenamed.getParent().equals(destParent)) { |
1497 | 1508 | LOG.debug("source & dest parents are different; fix up dir markers"); |
1498 | | - deleteUnnecessaryFakeDirectories(destParent); |
1499 | | - maybeCreateFakeParentDirectory(sourceRenamed); |
| 1509 | + // kick off an async delete |
| 1510 | + List<CompletableFuture<Void>> ops = new ArrayList<>(2); |
| 1511 | + ops.add(submit( |
| 1512 | + unboundedThreadPool, |
| 1513 | + () -> { |
| 1514 | + deleteUnnecessaryFakeDirectories(destParent, false); |
| 1515 | + return null; |
| 1516 | + })); |
| 1517 | + ops.add(submit( |
| 1518 | + unboundedThreadPool, |
| 1519 | + () -> { |
| 1520 | + maybeCreateFakeParentDirectory(sourceRenamed); |
| 1521 | + return null; |
| 1522 | + })); |
| 1523 | + waitForCompletion(ops); |
1500 | 1524 | } |
1501 | 1525 | } |
1502 | 1526 |
|
@@ -3564,7 +3588,7 @@ void finishedWrite(String key, long length, String eTag, String versionId, |
3564 | 3588 | final CompletableFuture<?> deletion = submit( |
3565 | 3589 | unboundedThreadPool, |
3566 | 3590 | () -> { |
3567 | | - deleteUnnecessaryFakeDirectories(p.getParent()); |
| 3591 | + deleteUnnecessaryFakeDirectories(p.getParent(), isDir); |
3568 | 3592 | return null; |
3569 | 3593 | }); |
3570 | 3594 | // this is only set if there is a metastore to update and the |
@@ -3629,18 +3653,50 @@ void finishedWrite(String key, long length, String eTag, String versionId, |
3629 | 3653 | * Delete mock parent directories which are no longer needed. |
3630 | 3654 | * Retry policy: retrying; exceptions swallowed. |
3631 | 3655 | * @param path path |
| 3656 | + * @param isMkDirOperation is this for a mkdir call? |
3632 | 3657 | */ |
3633 | 3658 | @Retries.RetryExceptionsSwallowed |
3634 | | - private void deleteUnnecessaryFakeDirectories(Path path) { |
| 3659 | + private void deleteUnnecessaryFakeDirectories(Path path, |
| 3660 | + final boolean isMkDirOperation) { |
3635 | 3661 | List<DeleteObjectsRequest.KeyVersion> keysToRemove = new ArrayList<>(); |
3636 | | - while (!path.isRoot()) { |
3637 | | - String key = pathToKey(path); |
3638 | | - key = (key.endsWith("/")) ? key : (key + "/"); |
3639 | | - LOG.trace("To delete unnecessary fake directory {} for {}", key, path); |
3640 | | - keysToRemove.add(new DeleteObjectsRequest.KeyVersion(key)); |
3641 | | - path = path.getParent(); |
| 3662 | + boolean deleteWholeTree = false; |
| 3663 | + if (optimizeDirectoryOperations && !isMkDirOperation) { |
| 3664 | + // this is a file creation/commit |
| 3665 | + // Assume that the parent directory exists either explicitly as a marker |
| 3666 | + // on implicitly (peer entries) |
| 3667 | + // only look for the dir marker in S3 -we don't care about DDB. |
| 3668 | + try { |
| 3669 | + String key = pathToKey(path); |
| 3670 | + s3GetFileStatus(path, key, StatusProbeEnum.DIR_MARKER_ONLY, null); |
| 3671 | + // here an entry exists. |
| 3672 | + LOG.debug("Removing marker {}", key); |
| 3673 | + keysToRemove.add(new DeleteObjectsRequest.KeyVersion(key)); |
| 3674 | + } catch (FileNotFoundException e) { |
| 3675 | + // no entry. Nothing to delete. |
| 3676 | + } catch (IOException e) { |
| 3677 | + instrumentation.errorIgnored(); |
| 3678 | + LOG.debug("Ignored when looking at directory marker {}", path, e); |
| 3679 | + // for now, fall back to a full delete. |
| 3680 | + // if the failure was permissions or network this will probably fail |
| 3681 | + // too... |
| 3682 | + deleteWholeTree = true; |
| 3683 | + } |
| 3684 | + } else { |
| 3685 | + deleteWholeTree = true; |
| 3686 | + } |
| 3687 | + if (deleteWholeTree) { |
| 3688 | + // traditional delete creates a delete request for |
| 3689 | + // all parents. |
| 3690 | + while (!path.isRoot()) { |
| 3691 | + String key = pathToKey(path); |
| 3692 | + key = (key.endsWith("/")) ? key : (key + "/"); |
| 3693 | + LOG.trace("To delete unnecessary fake directory {} for {}", key, path); |
| 3694 | + keysToRemove.add(new DeleteObjectsRequest.KeyVersion(key)); |
| 3695 | + path = path.getParent(); |
| 3696 | + } |
3642 | 3697 | } |
3643 | 3698 | try { |
| 3699 | + // TODO: when size ==1, use DELETE instead |
3644 | 3700 | removeKeys(keysToRemove, true, null); |
3645 | 3701 | } catch(AmazonClientException | IOException e) { |
3646 | 3702 | instrumentation.errorIgnored(); |
@@ -3686,8 +3742,10 @@ public int read() throws IOException { |
3686 | 3742 | } |
3687 | 3743 | }; |
3688 | 3744 |
|
| 3745 | + final ObjectMetadata metadata = newObjectMetadata(0L); |
| 3746 | + metadata.setContentType(X_DIRECTORY); |
3689 | 3747 | PutObjectRequest putObjectRequest = newPutObjectRequest(objectName, |
3690 | | - newObjectMetadata(0L), |
| 3748 | + metadata, |
3691 | 3749 | im); |
3692 | 3750 | invoker.retry("PUT 0-byte object ", objectName, |
3693 | 3751 | true, |
@@ -3803,8 +3861,7 @@ public String toString() { |
3803 | 3861 | if (committerIntegration != null) { |
3804 | 3862 | sb.append(", magicCommitter=").append(isMagicCommitEnabled()); |
3805 | 3863 | } |
3806 | | - sb.append(", boundedExecutor=").append(boundedThreadPool); |
3807 | | - sb.append(", unboundedExecutor=").append(unboundedThreadPool); |
| 3864 | + sb.append(", optimizeDirMarkers=").append(optimizeDirectoryOperations); |
3808 | 3865 | sb.append(", credentials=").append(credentials); |
3809 | 3866 | sb.append(", delegation tokens=") |
3810 | 3867 | .append(delegationTokens.map(Objects::toString).orElse("disabled")); |
@@ -3902,25 +3959,40 @@ public boolean exists(Path f) throws IOException { |
3902 | 3959 | } |
3903 | 3960 |
|
3904 | 3961 | /** |
3905 | | - * Override superclass so as to add statistic collection. |
| 3962 | + * An optimized check which only looks for directory markers. |
3906 | 3963 | * {@inheritDoc} |
3907 | 3964 | */ |
3908 | 3965 | @Override |
3909 | 3966 | @SuppressWarnings("deprecation") |
3910 | 3967 | public boolean isDirectory(Path f) throws IOException { |
3911 | 3968 | entryPoint(INVOCATION_IS_DIRECTORY); |
3912 | | - return super.isDirectory(f); |
| 3969 | + try { |
| 3970 | + // against S3Guard, a full query; |
| 3971 | + // against S3 a HEAD + "/" then a LIST. |
| 3972 | + return innerGetFileStatus(f, false, |
| 3973 | + StatusProbeEnum.DIRECTORIES).isDirectory(); |
| 3974 | + } catch (FileNotFoundException e) { |
| 3975 | + return false; |
| 3976 | + } |
3913 | 3977 | } |
3914 | 3978 |
|
3915 | 3979 | /** |
3916 | | - * Override superclass so as to add statistic collection. |
| 3980 | + * Override superclass so as to only poll for a file. |
| 3981 | + * Warning: may leave a 404 in the S3 load balancer cache. |
3917 | 3982 | * {@inheritDoc} |
3918 | 3983 | */ |
3919 | 3984 | @Override |
3920 | 3985 | @SuppressWarnings("deprecation") |
3921 | 3986 | public boolean isFile(Path f) throws IOException { |
3922 | 3987 | entryPoint(INVOCATION_IS_FILE); |
3923 | | - return super.isFile(f); |
| 3988 | + try { |
| 3989 | + // against S3Guard, a full query; against S3 only a HEAD. |
| 3990 | + return innerGetFileStatus(f, false, |
| 3991 | + StatusProbeEnum.HEAD_ONLY).isFile(); |
| 3992 | + } catch (FileNotFoundException e) { |
| 3993 | + // no file or there is a directory there. |
| 3994 | + return false; |
| 3995 | + } |
3924 | 3996 | } |
3925 | 3997 |
|
3926 | 3998 | /** |
|
0 commit comments