diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 5842d6435798d..385d512feb730 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1366,6 +1366,13 @@ public static boolean isAclEnabled(Configuration conf) { public static final long DEFAULT_LOG_AGGREGATION_STATUS_TIME_OUT_MS = 10 * 60 * 1000; + /** + * Whether to clean up after nodemanager logs when log aggregation is enabled + */ + public static final String LOG_AGGREGATION_ENABLE_LOCAL_CLEANUP = + YARN_PREFIX + "log-aggregation.enable-local-cleanup"; + public static final boolean DEFAULT_LOG_AGGREGATION_ENABLE_LOCAL_CLEANUP = true; + /** * Number of seconds to retain logs on the NodeManager. Only applicable if Log * aggregation is disabled diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java index 59568230e47a8..5ae9a29797f3e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java @@ -88,6 +88,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator { private final ApplicationId appId; private final String applicationId; private boolean logAggregationDisabled = false; + private final boolean enableLocalCleanup; private final Configuration conf; private final DeletionService delService; private final UserGroupInformation userUgi; @@ -174,6 +175,9 @@ public AppLogAggregatorImpl(Dispatcher dispatcher, this.logAggregationContext = logAggregationContext; this.context = context; this.nodeId = nodeId; + this.enableLocalCleanup = + conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLE_LOCAL_CLEANUP, + YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLE_LOCAL_CLEANUP); this.logAggPolicy = getLogAggPolicy(conf); this.recoveredLogInitedTime = recoveredLogInitedTime; if (logAggregationFileController == null) { @@ -328,11 +332,13 @@ private void uploadLogsForContainers(boolean appFinished) appFinished, finishedContainers.contains(container)); if (uploadedFilePathsInThisCycle.size() > 0) { uploadedLogsInThisCycle = true; - List uploadedFilePathsInThisCycleList = new ArrayList<>(); - uploadedFilePathsInThisCycleList.addAll(uploadedFilePathsInThisCycle); - deletionTask = new FileDeletionTask(delService, - this.userUgi.getShortUserName(), null, - uploadedFilePathsInThisCycleList); + if (enableLocalCleanup) { + List uploadedFilePathsInThisCycleList = new ArrayList<>(); + uploadedFilePathsInThisCycleList.addAll(uploadedFilePathsInThisCycle); + deletionTask = new FileDeletionTask(delService, + this.userUgi.getShortUserName(), null, + uploadedFilePathsInThisCycleList); + } } // This container is finished, and all its logs have been uploaded, @@ -443,7 +449,9 @@ public void run() { // do post clean up of log directories on any other exception LOG.error("Error occurred while aggregating the log for the application " + appId, e); - doAppLogAggregationPostCleanUp(); + if (enableLocalCleanup) { + doAppLogAggregationPostCleanUp(); + } } finally { if (!this.appAggregationFinished.get() && !this.aborted.get()) { LOG.warn("Log aggregation did not complete for application " + appId); @@ -486,8 +494,9 @@ private void doAppLogAggregation() throws LogAggregationDFSException { try { // App is finished, upload the container logs. uploadLogsForContainers(true); - - doAppLogAggregationPostCleanUp(); + if (enableLocalCleanup) { + doAppLogAggregationPostCleanUp(); + } } catch (LogAggregationDFSException e) { LOG.error("Error during log aggregation", e); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java index 6268ad986c34e..d79917b06812f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java @@ -223,31 +223,64 @@ private void verifyLocalFileDeletion( // ensure filesystems were closed verify(logAggregationService).closeFileSystems( any(UserGroupInformation.class)); - List dirList = new ArrayList<>(); - dirList.add(new Path(app1LogDir.toURI())); - verify(delSrvc, times(2)).delete(argThat(new FileDeletionMatcher( - delSrvc, user, null, dirList))); - - String containerIdStr = container11.toString(); - File containerLogDir = new File(app1LogDir, containerIdStr); - int count = 0; - int maxAttempts = 50; - for (String fileType : new String[] { "stdout", "stderr", "syslog" }) { - File f = new File(containerLogDir, fileType); + boolean filesShouldBeDeleted = + this.conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLE_LOCAL_CLEANUP, + YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLE_LOCAL_CLEANUP); + if (filesShouldBeDeleted) { + List dirList = new ArrayList<>(); + dirList.add(new Path(app1LogDir.toURI())); + verify(delSrvc, times(2)).delete(argThat(new FileDeletionMatcher( + delSrvc, user, null, dirList))); + + String containerIdStr = container11.toString(); + File containerLogDir = new File(app1LogDir, containerIdStr); + int count = 0; + int maxAttempts = 50; + for (String fileType : new String[]{"stdout", "stderr", "syslog"}) { + File f = new File(containerLogDir, fileType); + count = 0; + while ((f.exists()) && (count < maxAttempts)) { + count++; + Thread.sleep(100); + } + Assert.assertFalse("File [" + f + "] was not deleted", f.exists()); + } count = 0; - while ((f.exists()) && (count < maxAttempts)) { + while ((app1LogDir.exists()) && (count < maxAttempts)) { count++; Thread.sleep(100); } - Assert.assertFalse("File [" + f + "] was not deleted", f.exists()); - } - count = 0; - while ((app1LogDir.exists()) && (count < maxAttempts)) { - count++; - Thread.sleep(100); + Assert.assertFalse("Directory [" + app1LogDir + "] was not deleted", + app1LogDir.exists()); + } else { + List dirList = new ArrayList<>(); + dirList.add(new Path(app1LogDir.toURI())); + verify(delSrvc, never()).delete(argThat(new FileDeletionMatcher( + delSrvc, user, null, dirList))); + + String containerIdStr = container11.toString(); + File containerLogDir = new File(app1LogDir, containerIdStr); + int count = 0; + int maxAttempts = 50; + for (String fileType : new String[]{"stdout", "stderr", "syslog"}) { + File f = new File(containerLogDir, fileType); + count = 0; + while ((f.exists()) && (count < maxAttempts)) { + count++; + Thread.sleep(100); + } + Assert.assertTrue("File [" + f + "] was not deleted", f.exists()); + } + count = 0; + while ((app1LogDir.exists()) && (count < maxAttempts)) { + count++; + Thread.sleep(100); + } + Assert.assertTrue("Directory [" + app1LogDir + "] was not deleted", + app1LogDir.exists()); } - Assert.assertFalse("Directory [" + app1LogDir + "] was not deleted", - app1LogDir.exists()); + + delSrvc.stop(); Path logFilePath = logAggregationService .getLogAggregationFileController(conf) @@ -286,6 +319,20 @@ public void testLocalFileDeletionAfterUpload() throws Exception { verifyLocalFileDeletion(logAggregationService); } + @Test + public void testLocalFileRemainsAfterUploadOnCleanupDisable() throws Exception { + this.delSrvc = new DeletionService(createContainerExecutor()); + delSrvc = spy(delSrvc); + this.delSrvc.init(conf); + this.conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLE_LOCAL_CLEANUP, false); + this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath()); + this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, + this.remoteRootLogDir.getAbsolutePath()); + LogAggregationService logAggregationService = spy( + new LogAggregationService(dispatcher, this.context, this.delSrvc, super.dirsHandler)); + verifyLocalFileDeletion(logAggregationService); + } + @Test public void testLocalFileDeletionOnDiskFull() throws Exception { this.delSrvc = new DeletionService(createContainerExecutor());