2121import java .io .File ;
2222import java .io .FileNotFoundException ;
2323import java .io .IOException ;
24+ import java .io .UncheckedIOException ;
25+ import java .nio .file .Files ;
2426import java .util .ArrayList ;
2527import java .util .Arrays ;
2628import java .util .Collections ;
2729import java .util .HashMap ;
2830import java .util .HashSet ;
2931import java .util .List ;
3032import java .util .Map ;
33+ import java .util .Objects ;
3134import java .util .Set ;
3235import java .util .concurrent .ConcurrentHashMap ;
3336import java .util .concurrent .locks .ReentrantReadWriteLock ;
3437import java .util .concurrent .locks .ReentrantReadWriteLock .ReadLock ;
3538import java .util .concurrent .locks .ReentrantReadWriteLock .WriteLock ;
39+ import java .util .stream .Collectors ;
40+ import java .util .stream .Stream ;
41+
3642import org .slf4j .Logger ;
3743import org .slf4j .LoggerFactory ;
3844
3945import org .apache .hadoop .classification .InterfaceStability ;
46+ import org .apache .hadoop .conf .Configuration ;
4047import org .apache .hadoop .fs .FileAlreadyExistsException ;
4148import org .apache .hadoop .fs .FileContext ;
4249import org .apache .hadoop .fs .Path ;
4350import org .apache .hadoop .fs .permission .FsPermission ;
51+ import org .apache .hadoop .util .DiskChecker ;
4452import org .apache .hadoop .util .DiskValidator ;
4553import org .apache .hadoop .util .DiskValidatorFactory ;
46- import org .apache .hadoop .conf .Configuration ;
4754import org .apache .hadoop .yarn .conf .YarnConfiguration ;
4855import org .apache .hadoop .yarn .exceptions .YarnRuntimeException ;
4956
@@ -62,6 +69,7 @@ public class DirectoryCollection {
6269
6370 private boolean diskUtilizationThresholdEnabled ;
6471 private boolean diskFreeSpaceThresholdEnabled ;
72+ private boolean subAccessibilityValidationEnabled ;
6573 /**
6674 * The enum defines disk failure type.
6775 */
@@ -242,16 +250,15 @@ public DirectoryCollection(String[] dirs,
242250 throw new YarnRuntimeException (e );
243251 }
244252
245- diskUtilizationThresholdEnabled = conf .
246- getBoolean (YarnConfiguration .
247- NM_DISK_UTILIZATION_THRESHOLD_ENABLED ,
248- YarnConfiguration .
249- DEFAULT_NM_DISK_UTILIZATION_THRESHOLD_ENABLED );
250- diskFreeSpaceThresholdEnabled = conf .
251- getBoolean (YarnConfiguration .
252- NM_DISK_FREE_SPACE_THRESHOLD_ENABLED ,
253- YarnConfiguration .
254- DEFAULT_NM_DISK_FREE_SPACE_THRESHOLD_ENABLED );
253+ diskUtilizationThresholdEnabled = conf .getBoolean (
254+ YarnConfiguration .NM_DISK_UTILIZATION_THRESHOLD_ENABLED ,
255+ YarnConfiguration .DEFAULT_NM_DISK_UTILIZATION_THRESHOLD_ENABLED );
256+ diskFreeSpaceThresholdEnabled = conf .getBoolean (
257+ YarnConfiguration .NM_DISK_FREE_SPACE_THRESHOLD_ENABLED ,
258+ YarnConfiguration .DEFAULT_NM_DISK_FREE_SPACE_THRESHOLD_ENABLED );
259+ subAccessibilityValidationEnabled = conf .getBoolean (
260+ YarnConfiguration .NM_WORKING_DIR_CONTENT_ACCESSIBILITY_VALIDATION_ENABLED ,
261+ YarnConfiguration .DEFAULT_NM_WORKING_DIR_CONTENT_ACCESSIBILITY_VALIDATION_ENABLED );
255262
256263 localDirs = new ArrayList <>(Arrays .asList (dirs ));
257264 errorDirs = new ArrayList <>();
@@ -448,8 +455,7 @@ boolean checkDirs() {
448455
449456 // move testDirs out of any lock as it could wait for very long time in
450457 // case of busy IO
451- Map <String , DiskErrorInformation > dirsFailedCheck = testDirs (allLocalDirs ,
452- preCheckGoodDirs );
458+ Map <String , DiskErrorInformation > dirsFailedCheck = testDirs (allLocalDirs , preCheckGoodDirs );
453459
454460 this .writeLock .lock ();
455461 try {
@@ -521,60 +527,89 @@ boolean checkDirs() {
521527 }
522528 }
523529
524- Map <String , DiskErrorInformation > testDirs (List <String > dirs ,
525- Set <String > goodDirs ) {
526- HashMap <String , DiskErrorInformation > ret =
527- new HashMap <String , DiskErrorInformation >();
528- for (final String dir : dirs ) {
529- String msg ;
530- try {
531- File testDir = new File (dir );
532- diskValidator .checkStatus (testDir );
533- float diskUtilizationPercentageCutoff = goodDirs .contains (dir ) ?
534- diskUtilizationPercentageCutoffHigh : diskUtilizationPercentageCutoffLow ;
535- long diskFreeSpaceCutoff = goodDirs .contains (dir ) ?
536- diskFreeSpaceCutoffLow : diskFreeSpaceCutoffHigh ;
537-
538- if (diskUtilizationThresholdEnabled
539- && isDiskUsageOverPercentageLimit (testDir ,
540- diskUtilizationPercentageCutoff )) {
541- msg =
542- "used space above threshold of "
543- + diskUtilizationPercentageCutoff
544- + "%" ;
545- ret .put (dir ,
546- new DiskErrorInformation (DiskErrorCause .DISK_FULL , msg ));
547- continue ;
548- } else if (diskFreeSpaceThresholdEnabled
549- && isDiskFreeSpaceUnderLimit (testDir , diskFreeSpaceCutoff )) {
550- msg =
551- "free space below limit of " + diskFreeSpaceCutoff
552- + "MB" ;
553- ret .put (dir ,
554- new DiskErrorInformation (DiskErrorCause .DISK_FULL , msg ));
555- continue ;
556- }
557- } catch (IOException ie ) {
558- ret .put (dir ,
559- new DiskErrorInformation (DiskErrorCause .OTHER , ie .getMessage ()));
560- }
530+ Map <String , DiskErrorInformation > testDirs (List <String > dirs , Set <String > goodDirs ) {
531+ final Map <String , DiskErrorInformation > ret = new HashMap <>(0 );
532+ for (String dir : dirs ) {
533+ LOG .debug ("Start testing dir accessibility: {}" , dir );
534+ File testDir = new File (dir );
535+ boolean goodDir = goodDirs .contains (dir );
536+ Stream .of (
537+ validateDisk (testDir ),
538+ validateUsageOverPercentageLimit (testDir , goodDir ),
539+ validateDiskFreeSpaceUnderLimit (testDir , goodDir ),
540+ validateSubsAccessibility (testDir )
541+ )
542+ .filter (Objects ::nonNull )
543+ .findFirst ()
544+ .ifPresent (diskErrorInformation -> ret .put (dir , diskErrorInformation ));
561545 }
562546 return ret ;
563547 }
564548
565- private boolean isDiskUsageOverPercentageLimit (File dir ,
566- float diskUtilizationPercentageCutoff ) {
567- float freePercentage =
568- 100 * (dir .getUsableSpace () / (float ) dir .getTotalSpace ());
549+ private DiskErrorInformation validateDisk (File dir ) {
550+ try {
551+ diskValidator .checkStatus (dir );
552+ LOG .debug ("Dir {} pass throw the disk validation" , dir );
553+ return null ;
554+ } catch (IOException | UncheckedIOException | SecurityException e ) {
555+ return new DiskErrorInformation (DiskErrorCause .OTHER , e .getMessage ());
556+ }
557+ }
558+
559+ private DiskErrorInformation validateUsageOverPercentageLimit (File dir , boolean isGoodDir ) {
560+ if (!diskUtilizationThresholdEnabled ) {
561+ return null ;
562+ }
563+ float diskUtilizationPercentageCutoff = isGoodDir
564+ ? diskUtilizationPercentageCutoffHigh
565+ : diskUtilizationPercentageCutoffLow ;
566+ float freePercentage = 100 * (dir .getUsableSpace () / (float ) dir .getTotalSpace ());
569567 float usedPercentage = 100.0F - freePercentage ;
570- return (usedPercentage > diskUtilizationPercentageCutoff
571- || usedPercentage >= 100.0F );
568+ if (usedPercentage > diskUtilizationPercentageCutoff || usedPercentage >= 100.0F ) {
569+ return new DiskErrorInformation (DiskErrorCause .DISK_FULL ,
570+ "used space above threshold of " + diskUtilizationPercentageCutoff + "%" );
571+ } else {
572+ LOG .debug ("Dir {} pass throw the usage over percentage validation" , dir );
573+ return null ;
574+ }
572575 }
573576
574- private boolean isDiskFreeSpaceUnderLimit (File dir ,
575- long freeSpaceCutoff ) {
577+ private DiskErrorInformation validateDiskFreeSpaceUnderLimit (File dir , boolean isGoodDir ) {
578+ if (!diskFreeSpaceThresholdEnabled ) {
579+ return null ;
580+ }
581+ long freeSpaceCutoff = isGoodDir ? diskFreeSpaceCutoffLow : diskFreeSpaceCutoffHigh ;
576582 long freeSpace = dir .getUsableSpace () / (1024 * 1024 );
577- return freeSpace < freeSpaceCutoff ;
583+ if (freeSpace < freeSpaceCutoff ) {
584+ return new DiskErrorInformation (DiskErrorCause .DISK_FULL ,
585+ "free space below limit of " + freeSpaceCutoff + "MB" );
586+ } else {
587+ LOG .debug ("Dir {} pass throw the free space validation" , dir );
588+ return null ;
589+ }
590+ }
591+
592+ private DiskErrorInformation validateSubsAccessibility (File dir ) {
593+ if (!subAccessibilityValidationEnabled ) {
594+ return null ;
595+ }
596+ try (Stream <java .nio .file .Path > walk = Files .walk (dir .toPath ())) {
597+ List <File > subs = walk
598+ .map (java .nio .file .Path ::toFile )
599+ .collect (Collectors .toList ());
600+ for (File sub : subs ) {
601+ if (sub .isDirectory ()) {
602+ DiskChecker .checkDir (sub );
603+ } else if (!Files .isReadable (sub .toPath ())) {
604+ return new DiskErrorInformation (DiskErrorCause .OTHER , "Can not read " + sub );
605+ } else {
606+ LOG .debug ("{} under {} is accessible" , sub , dir );
607+ }
608+ }
609+ } catch (IOException | UncheckedIOException | SecurityException e ) {
610+ return new DiskErrorInformation (DiskErrorCause .OTHER , e .getMessage ());
611+ }
612+ return null ;
578613 }
579614
580615 private void createDir (FileContext localFs , Path dir , FsPermission perm )
0 commit comments