1717 */ 
1818package  org .apache .hadoop .hdfs .server .namenode ;
1919
20- import  org .apache .commons .lang3 .tuple .Pair ;
20+ import  org .apache .commons .lang3 .tuple .Triple ;
2121import  org .apache .hadoop .hdfs .protocol .HdfsConstants ;
2222import  org .apache .hadoop .util .Preconditions ;
2323import  org .apache .hadoop .fs .FileAlreadyExistsException ;
@@ -72,18 +72,28 @@ static RenameResult renameToInt(
7272   * Verify quota for rename operation where srcInodes[srcInodes.length-1] moves 
7373   * dstInodes[dstInodes.length-1] 
7474   */ 
75-   private  static  Pair <Optional <QuotaCounts >, Optional <QuotaCounts >> verifyQuotaForRename (
76-       FSDirectory  fsd , INodesInPath  src , INodesInPath  dst ) throws  QuotaExceededException  {
75+   private  static  Triple <Boolean , Optional <QuotaCounts >, Optional <QuotaCounts >> verifyQuotaForRename (
76+       FSDirectory  fsd , INodesInPath  src , INodesInPath  dst , boolean  overwrite )
77+       throws  QuotaExceededException  {
7778    Optional <QuotaCounts > srcDelta  = Optional .empty ();
7879    Optional <QuotaCounts > dstDelta  = Optional .empty ();
7980    if  (!fsd .getFSNamesystem ().isImageLoaded () || fsd .shouldSkipQuotaChecks ()) {
8081      // Do not check quota if edits log is still being processed 
81-       return  Pair .of (srcDelta , dstDelta );
82+       return  Triple .of (false ,  srcDelta , dstDelta );
8283    }
8384    int  i  = 0 ;
8485    while  (src .getINode (i ) == dst .getINode (i )) {
8586      i ++;
8687    }
88+ 
89+     // Verify path without valid 'DirectoryWithQuotaFeature' 
90+     // Note: In overwrite scenarios, quota calculation is still required, 
91+     // overwrite operations delete existing content, which will affects the root directory's quota. 
92+     // (i - 1) is common ancestor inode index. 
93+     if  (!overwrite  && verifyPathWithoutValidQuotaFeature (src , dst , i  - 1 )) {
94+       return  Triple .of (false , srcDelta , dstDelta );
95+     }
96+ 
8797    // src[i - 1] is the last common ancestor. 
8898    BlockStoragePolicySuite  bsps  = fsd .getBlockStoragePolicySuite ();
8999    // Assume dstParent existence check done by callers. 
@@ -108,7 +118,7 @@ private static Pair<Optional<QuotaCounts>, Optional<QuotaCounts>> verifyQuotaFor
108118      delta .subtract (counts );
109119    }
110120    FSDirectory .verifyQuota (dst , dst .length () - 1 , delta , src .getINode (i  - 1 ));
111-     return  Pair .of (srcDelta , dstDelta );
121+     return  Triple .of (true ,  srcDelta , dstDelta );
112122  }
113123
114124  /** 
@@ -127,6 +137,25 @@ static void verifyFsLimitsForRename(FSDirectory fsd, INodesInPath srcIIP,
127137    }
128138  }
129139
140+   /** 
141+    * Verify that the src and dst path does not contain valid quota feature. 
142+    * 
143+    * @param src   source path. 
144+    * @param dst   destination path. 
145+    * @param index common ancestor inode index. 
146+    * @return true if no valid quota feature, otherwise false. 
147+    */ 
148+   static  boolean  verifyPathWithoutValidQuotaFeature (INodesInPath  src , INodesInPath  dst , int  index ) {
149+     // Excluding root directory 
150+     if  (!FSDirectory .verifyWithoutQuotaFeature (src , index , 1 )) {
151+       return  false ;
152+     }
153+     if  (!FSDirectory .verifyWithoutQuotaFeature (src , src .length () - 2 , index  - 1 )) {
154+       return  false ;
155+     }
156+     return  FSDirectory .verifyWithoutQuotaFeature (dst , dst .length () - 2 , index  - 1 );
157+   }
158+ 
130159  /** 
131160   * <br> 
132161   * Note: This is to be used by {@link FSEditLogLoader} only. 
@@ -216,10 +245,10 @@ static INodesInPath unprotectedRenameTo(FSDirectory fsd,
216245    fsd .ezManager .checkMoveValidity (srcIIP , dstIIP );
217246    // Ensure dst has quota to accommodate rename 
218247    verifyFsLimitsForRename (fsd , srcIIP , dstIIP );
219-     Pair < Optional <QuotaCounts >, Optional <QuotaCounts >> countPair  =
220-         verifyQuotaForRename (fsd , srcIIP , dstIIP );
248+     Triple < Boolean ,  Optional <QuotaCounts >, Optional <QuotaCounts >> countTriple  =
249+         verifyQuotaForRename (fsd , srcIIP , dstIIP ,  false );
221250
222-     RenameOperation  tx  = new  RenameOperation (fsd , srcIIP , dstIIP , countPair );
251+     RenameOperation  tx  = new  RenameOperation (fsd , srcIIP , dstIIP , countTriple );
223252
224253    boolean  added  = false ;
225254
@@ -436,10 +465,10 @@ static RenameResult unprotectedRenameTo(FSDirectory fsd,
436465
437466    // Ensure dst has quota to accommodate rename 
438467    verifyFsLimitsForRename (fsd , srcIIP , dstIIP );
439-     Pair < Optional <QuotaCounts >, Optional <QuotaCounts >> quotaPair  =
440-         verifyQuotaForRename (fsd , srcIIP , dstIIP );
468+     Triple < Boolean ,  Optional <QuotaCounts >, Optional <QuotaCounts >> countTriple  =
469+         verifyQuotaForRename (fsd , srcIIP , dstIIP ,  overwrite );
441470
442-     RenameOperation  tx  = new  RenameOperation (fsd , srcIIP , dstIIP , quotaPair );
471+     RenameOperation  tx  = new  RenameOperation (fsd , srcIIP , dstIIP , countTriple );
443472
444473    boolean  undoRemoveSrc  = true ;
445474    tx .removeSrc ();
@@ -656,13 +685,14 @@ private static class RenameOperation {
656685    private  final  boolean  srcChildIsReference ;
657686    private  final  QuotaCounts  oldSrcCountsInSnapshot ;
658687    private  final  boolean  sameStoragePolicy ;
688+     private  final  boolean  updateQuota ;
659689    private  final  Optional <QuotaCounts > srcSubTreeCount ;
660690    private  final  Optional <QuotaCounts > dstSubTreeCount ;
661691    private  INode  srcChild ;
662692    private  INode  oldDstChild ;
663693
664694    RenameOperation (FSDirectory  fsd , INodesInPath  srcIIP , INodesInPath  dstIIP ,
665-         Pair < Optional <QuotaCounts >, Optional <QuotaCounts >> quotaPair ) {
695+         Triple < Boolean ,  Optional <QuotaCounts >, Optional <QuotaCounts >> countTriple ) {
666696      this .fsd  = fsd ;
667697      this .srcIIP  = srcIIP ;
668698      this .dstIIP  = dstIIP ;
@@ -712,9 +742,10 @@ private static class RenameOperation {
712742        withCount  = null ;
713743      }
714744      // Set quota for src and dst, ignore src is in Snapshot or is Reference 
745+       this .updateQuota  = countTriple .getLeft () || withCount  != null ;
715746      this .srcSubTreeCount  = withCount  == null  ?
716-           quotaPair . getLeft () : Optional .empty ();
717-       this .dstSubTreeCount  = quotaPair .getRight ();
747+           countTriple . getMiddle () : Optional .empty ();
748+       this .dstSubTreeCount  = countTriple .getRight ();
718749    }
719750
720751    boolean  isSameStoragePolicy () {
@@ -755,9 +786,11 @@ long removeSrc() throws IOException {
755786        throw  new  IOException (error );
756787      } else  {
757788        // update the quota count if necessary 
758-         Optional <QuotaCounts > countOp  = sameStoragePolicy  ?
759-             srcSubTreeCount  : Optional .empty ();
760-         fsd .updateCountForDelete (srcChild , srcIIP , countOp );
789+         if  (updateQuota ) {
790+           Optional <QuotaCounts > countOp  = sameStoragePolicy  ?
791+               srcSubTreeCount  : Optional .empty ();
792+           fsd .updateCountForDelete (srcChild , srcIIP , countOp );
793+         }
761794        srcIIP  = INodesInPath .replace (srcIIP , srcIIP .length () - 1 , null );
762795        return  removedNum ;
763796      }
@@ -772,9 +805,11 @@ boolean removeSrc4OldRename() {
772805        return  false ;
773806      } else  {
774807        // update the quota count if necessary 
775-         Optional <QuotaCounts > countOp  = sameStoragePolicy  ?
776-             srcSubTreeCount  : Optional .empty ();
777-         fsd .updateCountForDelete (srcChild , srcIIP , countOp );
808+         if  (updateQuota ) {
809+           Optional <QuotaCounts > countOp  = sameStoragePolicy  ?
810+               srcSubTreeCount  : Optional .empty ();
811+           fsd .updateCountForDelete (srcChild , srcIIP , countOp );
812+         }
778813        srcIIP  = INodesInPath .replace (srcIIP , srcIIP .length () - 1 , null );
779814        return  true ;
780815      }
@@ -785,7 +820,9 @@ long removeDst() {
785820      if  (removedNum  != -1 ) {
786821        oldDstChild  = dstIIP .getLastINode ();
787822        // update the quota count if necessary 
788-         fsd .updateCountForDelete (oldDstChild , dstIIP , dstSubTreeCount );
823+         if  (updateQuota ) {
824+           fsd .updateCountForDelete (oldDstChild , dstIIP , dstSubTreeCount );
825+         }
789826        dstIIP  = INodesInPath .replace (dstIIP , dstIIP .length () - 1 , null );
790827      }
791828      return  removedNum ;
@@ -803,7 +840,7 @@ INodesInPath addSourceToDestination() {
803840        toDst  = new  INodeReference .DstReference (dstParent .asDirectory (),
804841            withCount , dstIIP .getLatestSnapshotId ());
805842      }
806-       return  fsd .addLastINodeNoQuotaCheck (dstParentIIP , toDst , srcSubTreeCount );
843+       return  fsd .addLastINodeNoQuotaCheck (dstParentIIP , toDst , srcSubTreeCount ,  updateQuota );
807844    }
808845
809846    void  updateMtimeAndLease (long  timestamp ) {
@@ -837,7 +874,7 @@ void restoreSource() {
837874        // the srcChild back 
838875        Optional <QuotaCounts > countOp  = sameStoragePolicy  ?
839876            srcSubTreeCount  : Optional .empty ();
840-         fsd .addLastINodeNoQuotaCheck (srcParentIIP , srcChild , countOp );
877+         fsd .addLastINodeNoQuotaCheck (srcParentIIP , srcChild , countOp ,  updateQuota );
841878      }
842879    }
843880
@@ -847,7 +884,7 @@ void restoreDst(BlockStoragePolicySuite bsps) {
847884      if  (dstParent .isWithSnapshot ()) {
848885        dstParent .undoRename4DstParent (bsps , oldDstChild , dstIIP .getLatestSnapshotId ());
849886      } else  {
850-         fsd .addLastINodeNoQuotaCheck (dstParentIIP , oldDstChild , dstSubTreeCount );
887+         fsd .addLastINodeNoQuotaCheck (dstParentIIP , oldDstChild , dstSubTreeCount ,  updateQuota );
851888      }
852889      if  (oldDstChild  != null  && oldDstChild .isReference ()) {
853890        final  INodeReference  removedDstRef  = oldDstChild .asReference ();
0 commit comments