diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 9c62827cb1743..1df5939a0d829 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -2215,6 +2215,39 @@ public static boolean isAclEnabled(Configuration conf) { public static final String DEFAULT_NM_NONSECURE_MODE_USER_PATTERN = "^[_.A-Za-z0-9][-@_.A-Za-z0-9]{0,255}?[$]?$"; + /** + * Whether or not to use precreated pool of local users in secure mode. + */ + public static final String NM_SECURE_MODE_USE_POOL_USER = NM_PREFIX + + "linux-container-executor.secure-mode.use-pool-user"; + + public static final boolean DEFAULT_NM_SECURE_MODE_USE_POOL_USER = false; + + /** + * The number of pool local users. If set to -1, we'll take the value from: + * NM_PREFIX + "resource.cpu-vcores" + */ + public static final String NM_SECURE_MODE_POOL_USER_COUNT = NM_PREFIX + + "linux-container-executor.secure-mode.pool-user-count"; + + public static final int DEFAULT_NM_SECURE_MODE_POOL_USER_COUNT = -1; + + /** + * The prefix of the local pool users can be used by Yarn Secure Container. + * The number of local pool users to use is specified by: + * + * For example, if prefix is "user" and pool-user-count configured to 20, + * then local user names are: + * user0 + * user1 + * ... + * user19 + */ + public static final String NM_SECURE_MODE_POOL_USER_PREFIX = NM_PREFIX + + "linux-container-executor.secure-mode.pool-user-prefix"; + + public static final String DEFAULT_NM_SECURE_MODE_POOL_USER_PREFIX = "user"; + /** The type of resource enforcement to use with the * linux container executor. */ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java index 06a32be9d5bcb..cc6b8b602f165 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java @@ -115,6 +115,7 @@ public class LinuxContainerExecutor extends ContainerExecutor { private boolean containerSchedPriorityIsSet = false; private int containerSchedPriorityAdjustment = 0; private boolean containerLimitUsers; + private SecureModeLocalUserAllocator secureModeLocalUserAllocator; private ResourceHandler resourceHandlerChain; private LinuxContainerRuntime linuxContainerRuntime; private Context nmContext; @@ -214,6 +215,12 @@ public void setConf(Configuration conf) { LOG.warn("{}: impersonation without authentication enabled", YarnConfiguration.NM_NONSECURE_MODE_LIMIT_USERS); } + boolean secureModeUseLocalUser = UserGroupInformation.isSecurityEnabled() && + conf.getBoolean(YarnConfiguration.NM_SECURE_MODE_USE_POOL_USER, + YarnConfiguration.DEFAULT_NM_SECURE_MODE_USE_POOL_USER); + if (secureModeUseLocalUser) { + secureModeLocalUserAllocator = SecureModeLocalUserAllocator.getInstance(conf); + } } private LCEResourcesHandler getResourcesHandler(Configuration conf) { @@ -242,8 +249,14 @@ void verifyUsernamePattern(String user) { } String getRunAsUser(String user) { - if (UserGroupInformation.isSecurityEnabled() || - !containerLimitUsers) { + if (UserGroupInformation.isSecurityEnabled()) { + if (secureModeLocalUserAllocator != null) { + return secureModeLocalUserAllocator.getRunAsLocalUser(user); + } + else { + return user; + } + } else if (!containerLimitUsers) { return user; } else { return nonsecureLocalUser; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/SecureModeLocalUserAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/SecureModeLocalUserAllocator.java new file mode 100644 index 0000000000000..3de16e2db50c1 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/SecureModeLocalUserAllocator.java @@ -0,0 +1,355 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.nodemanager; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.UnsupportedFileSystemException; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionTask; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class LocalUserInfo { + String localUser; + int localUserIndex; + int appCount; + int fileOpCount; + int logHandlingCount; + + public LocalUserInfo(String user, int userIndex) { + localUser = user; + localUserIndex = userIndex; + appCount = 0; + fileOpCount = 0; + logHandlingCount = 0; + } +} + +/** + * Allocate local user to an appUser from a pool of precreated local users. + * Maintains the appUser to local user mapping, until: + * a) all applications of the appUser is finished; + * b) all FileDeletionTask for that appUser is executed; + * c) all log aggregation/handling requests for appUser's applications are done + * + * If DeletionService is set, during deallocation, we will check if appcache + * folder for the app user exists, if yes queue a FileDeletionTask. + * + * For now allocation is only maintained in memory so it does not support + * node manager recovery mode. + */ +public class SecureModeLocalUserAllocator { + public static final String NONEXISTUSER = "nonexistuser"; + private static final Logger LOG = + LoggerFactory.getLogger(SecureModeLocalUserAllocator.class); + private static SecureModeLocalUserAllocator instance; + private Map appUserToLocalUser; + private ArrayList allocated; + private int localUserCount; + private String localUserPrefix; + private DeletionService delService; + private String[] nmLocalDirs; + private static FileContext lfs = getLfs(); + + private static FileContext getLfs() { + try { + return FileContext.getLocalFSFileContext(); + } catch (UnsupportedFileSystemException e) { + throw new RuntimeException(e); + } + } + + SecureModeLocalUserAllocator(Configuration conf) { + if (conf.getBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, + YarnConfiguration.DEFAULT_NM_RECOVERY_ENABLED)) { + String errMsg = "Invalidate configuration combination: " + + YarnConfiguration.NM_RECOVERY_ENABLED + "=true, " + + YarnConfiguration.NM_SECURE_MODE_USE_POOL_USER + "=true"; + throw new RuntimeException(errMsg); + } + localUserPrefix = conf.get( + YarnConfiguration.NM_SECURE_MODE_POOL_USER_PREFIX, + YarnConfiguration.DEFAULT_NM_SECURE_MODE_POOL_USER_PREFIX); + localUserCount = conf.getInt( + YarnConfiguration.NM_SECURE_MODE_POOL_USER_COUNT, + YarnConfiguration.DEFAULT_NM_SECURE_MODE_POOL_USER_COUNT); + if (localUserCount == -1) { + localUserCount = conf.getInt(YarnConfiguration.NM_VCORES, + YarnConfiguration.DEFAULT_NM_VCORES); + } + allocated = new ArrayList(localUserCount); + appUserToLocalUser = new HashMap(localUserCount); + for (int i=0; i= localUserCount) { + result = -1; + } + return result; + } + + /** + * check if the appUser mapping exists, if not then allocate a local user. + * return true if appUser mapping exists or created, + * return false if not able to allocate local user. + */ + private boolean checkAndAllocateAppUser(String appUser) { + if (appUserToLocalUser.containsKey(appUser)) { + // If appUser exists, don't need to allocate again + return true; + } + + LOG.info("Allocating local user for appUser " + appUser); + // check if the appUser is one of the local user, if yes just use it + int index = localUserIndex(appUser); + if (index == -1) { + // otherwise find the first empty slot in the pool of local users + for (int i=0; i/appcache + // usercache//filecche + for (String localDir : nmLocalDirs) { + Path usersDir = new Path(localDir, ContainerLocalizer.USERCACHE); + Path userDir = new Path(usersDir, appUser); + Path userAppCacheDir = new Path(userDir, ContainerLocalizer.APPCACHE); + Path userFileCacheDir = new Path(userDir, ContainerLocalizer.FILECACHE); + ArrayList toDelete = new ArrayList(); + toDelete.add(userAppCacheDir); + toDelete.add(userFileCacheDir); + + for (Path dir : toDelete) { + FileStatus status = null; + try { + status = lfs.getFileStatus(userAppCacheDir); + } + catch(FileNotFoundException fs) { + } + catch(IOException ie) { + String msg = "Could not get file status for local dir " + dir; + LOG.warn(msg, ie); + throw new YarnRuntimeException(msg, ie); + } + if (status != null) { + FileDeletionTask delTask = new FileDeletionTask(delService, localUser, + dir, null); + delService.delete(delTask); + } + } + } + } + LOG.info("Deallocated local user index " + localUserInfo.localUserIndex + + " for appUser " + appUser); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java index 1806af68351ca..05fac0346ca32 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java @@ -34,6 +34,7 @@ import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; @@ -49,6 +50,7 @@ import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.FlowContextProto; import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.SecureModeLocalUserAllocator; import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; @@ -86,6 +88,7 @@ public class ApplicationImpl implements Application { private final ReadLock readLock; private final WriteLock writeLock; private final Context context; + private final SecureModeLocalUserAllocator secureModeLocalUserAllocator; private static final Logger LOG = LoggerFactory.getLogger(ApplicationImpl.class); @@ -127,6 +130,15 @@ public ApplicationImpl(Dispatcher dispatcher, String user, context.getNMTimelinePublisher().createTimelineClient(appId); } } + boolean secureModeUseLocalUser = UserGroupInformation.isSecurityEnabled() && + conf.getBoolean(YarnConfiguration.NM_SECURE_MODE_USE_POOL_USER, + YarnConfiguration.DEFAULT_NM_SECURE_MODE_USE_POOL_USER); + if (secureModeUseLocalUser) { + secureModeLocalUserAllocator = SecureModeLocalUserAllocator.getInstance(conf); + } + else { + secureModeLocalUserAllocator = null; + } this.context = context; this.appStateStore = context.getNMStateStore(); ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); @@ -336,6 +348,9 @@ static class AppInitTransition implements SingleArcTransition { @Override public void transition(ApplicationImpl app, ApplicationEvent event) { + if (app.secureModeLocalUserAllocator != null) { + app.secureModeLocalUserAllocator.allocate(app.user, app.appId.toString()); + } ApplicationInitEvent initEvent = (ApplicationInitEvent)event; app.applicationACLs = initEvent.getApplicationACLs(); app.aclsManager.addApplication(app.getAppId(), app.applicationACLs); @@ -607,13 +622,16 @@ private void updateCollectorStatus(ApplicationImpl app) { @Override public void transition(ApplicationImpl app, ApplicationEvent event) { - // Inform the logService app.dispatcher.getEventHandler().handle( new LogHandlerAppFinishedEvent(app.appId)); app.context.getNMTokenSecretManager().appFinished(app.getAppId()); updateCollectorStatus(app); + + if (app.secureModeLocalUserAllocator != null) { + app.secureModeLocalUserAllocator.deallocate(app.user, app.appId.toString()); + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/task/FileDeletionTask.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/task/FileDeletionTask.java index a8aab7272ec76..7fbcc7a393e6a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/task/FileDeletionTask.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/task/FileDeletionTask.java @@ -19,8 +19,11 @@ import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.UnsupportedFileSystemException; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; +import org.apache.hadoop.yarn.server.nodemanager.SecureModeLocalUserAllocator; import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext; import java.io.IOException; @@ -34,6 +37,8 @@ public class FileDeletionTask extends DeletionTask implements Runnable { private final Path subDir; private final List baseDirs; private static final FileContext lfs = getLfs(); + private final boolean secureModeUseLocalUser; + private final SecureModeLocalUserAllocator secureModeLocalUserAllocator; private static FileContext getLfs() { try { @@ -70,6 +75,20 @@ public FileDeletionTask(int taskId, DeletionService deletionService, super(taskId, deletionService, user, DeletionTaskType.FILE); this.subDir = subDir; this.baseDirs = baseDirs; + secureModeUseLocalUser = UserGroupInformation.isSecurityEnabled() && + deletionService.getConfig().getBoolean( + YarnConfiguration.NM_SECURE_MODE_USE_POOL_USER, + YarnConfiguration.DEFAULT_NM_SECURE_MODE_USE_POOL_USER); + if (secureModeUseLocalUser) { + secureModeLocalUserAllocator = SecureModeLocalUserAllocator.getInstance( + deletionService.getConfig()); + if (user != null) { + secureModeLocalUserAllocator.incrementFileOpCount(user); + } + } + else { + secureModeLocalUserAllocator = null; + } } /** @@ -139,6 +158,9 @@ public void run() { error = true; LOG.warn("Failed to delete as user " + getUser(), e); } + if (secureModeUseLocalUser) { + secureModeLocalUserAllocator.decrementFileOpCount(getUser()); + } } if (error) { setSuccess(!error); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index a951c0bb0fc3f..0f89adf4cec0c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -99,6 +99,7 @@ import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.DirectoryCollection.DirsChangeListener; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; +import org.apache.hadoop.yarn.server.nodemanager.SecureModeLocalUserAllocator; import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol; import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus; @@ -174,6 +175,7 @@ public class ResourceLocalizationService extends CompositeService private NMStateStoreService stateStore; @VisibleForTesting final NodeManagerMetrics metrics; + private final boolean disablePrivateVis; @VisibleForTesting LocalResourcesTracker publicRsrc; @@ -212,6 +214,26 @@ public ResourceLocalizationService(Dispatcher dispatcher, this.delService = delService; this.dirsHandler = dirsHandler; + // If an app user application require private resources, when local user + // pooling is enabled, we will treat the private resources as application + // resources. This means for each application we will download (localize) + // the resources to application folder, and will delete it after the + // application is finished. + boolean secureModeUseLocalUser = UserGroupInformation.isSecurityEnabled() && + context.getConf().getBoolean( + YarnConfiguration.NM_SECURE_MODE_USE_POOL_USER, + YarnConfiguration.DEFAULT_NM_SECURE_MODE_USE_POOL_USER); + if (secureModeUseLocalUser) { + this.disablePrivateVis = true; + LOG.info("When " + YarnConfiguration.NM_SECURE_MODE_USE_POOL_USER + + " is true, treat PRIVATE visibility as APPLICATION"); + SecureModeLocalUserAllocator.getInstance(context.getConf()) + .setDeletionService(delService); + } + else { + this.disablePrivateVis = false; + } + this.cacheCleanup = new HadoopScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder() .setNameFormat("ResourceLocalizationService Cache Cleanup") @@ -680,7 +702,9 @@ LocalResourcesTracker getLocalResourcesTracker( case PUBLIC: return publicRsrc; case PRIVATE: - return privateRsrc.get(user); + if (!disablePrivateVis) { + return privateRsrc.get(user); + } case APPLICATION: return appRsrc.get(appId.toString()); } @@ -1233,9 +1257,11 @@ private Path getPathForLocalization(LocalResource rsrc, context.getContainerId().getApplicationAttemptId().getApplicationId(); LocalResourceVisibility vis = rsrc.getVisibility(); String cacheDirectory = null; - if (vis == LocalResourceVisibility.PRIVATE) {// PRIVATE Only + if (!disablePrivateVis && vis == LocalResourceVisibility.PRIVATE) { + // PRIVATE ONLY cacheDirectory = getUserFileCachePath(user); - } else {// APPLICATION ONLY + } else { + // APPLICATION ONLY cacheDirectory = getAppFileCachePath(user, appId.toString()); } Path dirPath = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java index 245dc103e933a..497fb7729de0f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java @@ -64,6 +64,7 @@ import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; +import org.apache.hadoop.yarn.server.nodemanager.SecureModeLocalUserAllocator; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.DeletionTask; @@ -104,6 +105,8 @@ public class AppLogAggregatorImpl implements AppLogAggregator { private final Context context; private final NodeId nodeId; private final LogAggregationFileControllerContext logControllerContext; + private final boolean secureModeUseLocalUser; + private final SecureModeLocalUserAllocator secureModeLocalUserAllocator; // These variables are only for testing private final AtomicBoolean waiting = new AtomicBoolean(false); @@ -211,6 +214,16 @@ public AppLogAggregatorImpl(Dispatcher dispatcher, logAggregationInRolling, rollingMonitorInterval, this.appId, this.appAcls, this.nodeId, this.userUgi); + secureModeUseLocalUser = UserGroupInformation.isSecurityEnabled() && + conf.getBoolean(YarnConfiguration.NM_SECURE_MODE_USE_POOL_USER, + YarnConfiguration.DEFAULT_NM_SECURE_MODE_USE_POOL_USER); + if (secureModeUseLocalUser) { + secureModeLocalUserAllocator = + SecureModeLocalUserAllocator.getInstance(conf); + } + else { + secureModeLocalUserAllocator = null; + } } private ContainerLogAggregationPolicy getLogAggPolicy(Configuration conf) { @@ -461,6 +474,10 @@ private void sendLogAggregationReportInternal( @Override public void run() { + if (this.secureModeUseLocalUser) { + this.secureModeLocalUserAllocator.incrementLogHandlingCount( + this.userUgi.getShortUserName()); + } try { doAppLogAggregation(); } catch (LogAggregationDFSException e) { @@ -482,6 +499,10 @@ public void run() { ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED)); } this.appAggregationFinished.set(true); + if (this.secureModeUseLocalUser) { + this.secureModeLocalUserAllocator.decrementLogHandlingCount( + this.userUgi.getShortUserName()); + } } } @@ -647,9 +668,13 @@ public Set doContainerLogAggregation( + ". Current good log dirs are " + StringUtils.join(",", dirsHandler.getLogDirsForRead())); final LogKey logKey = new LogKey(containerId); + String user = userUgi.getShortUserName(); + if (secureModeUseLocalUser) { + user = secureModeLocalUserAllocator.getRunAsLocalUser(user); + } final LogValue logValue = new LogValue(dirsHandler.getLogDirsForRead(), containerId, - userUgi.getShortUserName(), logAggregationContext, + user, logAggregationContext, this.uploadedFileMeta, retentionContext, appFinished, containerFinished); try { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java index 9898f8f797921..c9c09ac078a67 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java @@ -37,6 +37,7 @@ import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.UnsupportedFileSystemException; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.util.concurrent.HadoopScheduledThreadPoolExecutor; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -46,6 +47,7 @@ import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; +import org.apache.hadoop.yarn.server.nodemanager.SecureModeLocalUserAllocator; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionTask; @@ -74,6 +76,7 @@ public class NonAggregatingLogHandler extends AbstractService implements private final NMStateStoreService stateStore; private long deleteDelaySeconds; private ScheduledThreadPoolExecutor sched; + private SecureModeLocalUserAllocator secureModeLocalUserAllocator; public NonAggregatingLogHandler(Dispatcher dispatcher, DeletionService delService, LocalDirsHandlerService dirsHandler, @@ -93,6 +96,13 @@ protected void serviceInit(Configuration conf) throws Exception { conf.getLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, YarnConfiguration.DEFAULT_NM_LOG_RETAIN_SECONDS); sched = createScheduledThreadPoolExecutor(conf); + boolean secureModeUseLocalUser = UserGroupInformation.isSecurityEnabled() && + conf.getBoolean(YarnConfiguration.NM_SECURE_MODE_USE_POOL_USER, + YarnConfiguration.DEFAULT_NM_SECURE_MODE_USE_POOL_USER); + if (secureModeUseLocalUser) { + secureModeLocalUserAllocator = + SecureModeLocalUserAllocator.getInstance(conf); + } super.serviceInit(conf); recover(); } @@ -159,6 +169,10 @@ public void handle(LogHandlerEvent event) { this.dispatcher.getEventHandler().handle( new ApplicationEvent(appStartedEvent.getApplicationId(), ApplicationEventType.APPLICATION_LOG_HANDLING_INITED)); + if (this.secureModeLocalUserAllocator != null) { + this.secureModeLocalUserAllocator.incrementLogHandlingCount( + appStartedEvent.getUser()); + } break; case CONTAINER_FINISHED: // Ignore @@ -200,6 +214,9 @@ public void handle(LogHandlerEvent event) { // or after calling sched.shutdownNow(). logDeleter.run(); } + if (this.secureModeLocalUserAllocator != null) { + this.secureModeLocalUserAllocator.decrementLogHandlingCount(user); + } break; default: ; // Ignore diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestSecureModeLocalUserAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestSecureModeLocalUserAllocator.java new file mode 100644 index 0000000000000..28bf6d5a5d33f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestSecureModeLocalUserAllocator.java @@ -0,0 +1,101 @@ +package org.apache.hadoop.yarn.server.nodemanager; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestSecureModeLocalUserAllocator { + private static Configuration conf; + + @BeforeClass + public static void beforeAllTestMethods() { + conf = new Configuration(); + conf.setBoolean(YarnConfiguration.NM_SECURE_MODE_USE_POOL_USER, true); + conf.set(YarnConfiguration.NM_SECURE_MODE_POOL_USER_PREFIX, "smlu"); + conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, false); + conf.setInt(YarnConfiguration.NM_VCORES, 3); + } + + @Test + public void testSingleUserRefCounting() { + SecureModeLocalUserAllocator allocator = new SecureModeLocalUserAllocator( + conf); + // non existing mapping won't hurt + allocator.deallocate("user0", "app0"); + allocator.decrementFileOpCount("user0"); + allocator.decrementLogHandlingCount("user0"); + allocator.incrementLogHandlingCount("user0"); + Assert.assertEquals(SecureModeLocalUserAllocator.NONEXISTUSER, + allocator.getRunAsLocalUser("user0")); + + // as long as not all ref counts are 0, the mapping stays in memory + allocator.allocate("user0", "app0"); + Assert.assertEquals("smlu0", allocator.getRunAsLocalUser("user0")); + allocator.incrementFileOpCount("user0"); + allocator.deallocate("user0", "app0"); + Assert.assertEquals("smlu0", allocator.getRunAsLocalUser("user0")); + allocator.incrementLogHandlingCount("user0"); + allocator.decrementFileOpCount("user0"); + Assert.assertEquals("smlu0", allocator.getRunAsLocalUser("user0")); + allocator.decrementLogHandlingCount("user0"); + Assert.assertEquals(SecureModeLocalUserAllocator.NONEXISTUSER, + allocator.getRunAsLocalUser("user0")); + } + + @Test + public void testMultiUserRefCounting() { + SecureModeLocalUserAllocator allocator = new SecureModeLocalUserAllocator( + conf); + allocator.allocate("user0", "app0"); + allocator.allocate("user1", "app1"); + allocator.allocate("user2", "app2"); + // no available local pool users to allocate, but it should not fail + allocator.allocate("user3", "app3"); + + allocator.incrementFileOpCount("user0"); + allocator.incrementLogHandlingCount("user1"); + + Assert.assertEquals("smlu0", allocator.getRunAsLocalUser("user0")); + Assert.assertEquals("smlu1", allocator.getRunAsLocalUser("user1")); + Assert.assertEquals("smlu2", allocator.getRunAsLocalUser("user2")); + Assert.assertEquals(SecureModeLocalUserAllocator.NONEXISTUSER, + allocator.getRunAsLocalUser("user3")); + + allocator.deallocate("user0", "app0"); + allocator.deallocate("user1", "app1"); + allocator.deallocate("user2", "app2"); + + Assert.assertEquals("smlu0", allocator.getRunAsLocalUser("user0")); + Assert.assertEquals("smlu1", allocator.getRunAsLocalUser("user1")); + Assert.assertEquals(SecureModeLocalUserAllocator.NONEXISTUSER, + allocator.getRunAsLocalUser("user2")); + + allocator.decrementFileOpCount("user0"); + allocator.decrementLogHandlingCount("user1"); + Assert.assertEquals(SecureModeLocalUserAllocator.NONEXISTUSER, + allocator.getRunAsLocalUser("user0")); + Assert.assertEquals(SecureModeLocalUserAllocator.NONEXISTUSER, + allocator.getRunAsLocalUser("user1")); + } + + @Test + public void testIncrementOpCount() { + SecureModeLocalUserAllocator allocator = new SecureModeLocalUserAllocator( + conf); + // make sure calling incrementOpCount() with local pool user allocates + // the same local pool user. + allocator.incrementFileOpCount("smlu0"); + Assert.assertEquals("smlu0", allocator.getRunAsLocalUser("smlu0")); + allocator.incrementFileOpCount("smlu0"); + allocator.decrementFileOpCount("smlu0"); + allocator.incrementFileOpCount("smlu2"); + Assert.assertEquals("smlu2", allocator.getRunAsLocalUser("smlu2")); + Assert.assertEquals("smlu0", allocator.getRunAsLocalUser("smlu0")); + + // make sure new app allocation won't use these allocated local pool user + allocator.allocate("user0", "app0"); + Assert.assertEquals("smlu1", allocator.getRunAsLocalUser("user0")); + } +}