Skip to content

Commit

Permalink
YARN-11692. Support mixed cgroup v1/v2 controller structure
Browse files Browse the repository at this point in the history
Change-Id: I10a024b3fa88aaeb5dfcd9173f53200b00ccc7df
  • Loading branch information
p-szucs committed May 13, 2024
1 parent 43e8ca4 commit d945e4e
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,35 +63,54 @@ public class ResourceHandlerModule {
* as resource metrics functionality. We need to ensure that the same
* instance is used for both.
*/
private static volatile CGroupsHandler cGroupsV1Handler;
private static volatile CGroupsHandler cGroupsV2Handler;
private static volatile TrafficControlBandwidthHandlerImpl
trafficControlBandwidthHandler;
private static volatile NetworkPacketTaggingHandlerImpl
networkPacketTaggingHandlerImpl;
private static volatile CGroupsHandler cGroupsHandler;
private static volatile CGroupsBlkioResourceHandlerImpl
cGroupsBlkioResourceHandler;
private static volatile MemoryResourceHandler
cGroupsMemoryResourceHandler;
private static volatile CpuResourceHandler
cGroupsCpuResourceHandler;

/**
* Returns an initialized, thread-safe CGroupsHandler instance.
*/
private static CGroupsHandler getInitializedCGroupsHandler(Configuration conf)
private static void initializeCGroupsHandlers(Configuration conf) throws ResourceHandlerException {
initializeCGroupsV1Handler(conf);
if (cgroupsV2Enabled) {
initializeCGroupsV2Handler(conf);
}
}

private static void initializeCGroupsV1Handler(Configuration conf)
throws ResourceHandlerException {
if (cGroupsHandler == null) {
if (cGroupsV1Handler == null) {
synchronized (CGroupsHandler.class) {
if (cGroupsHandler == null) {
cGroupsHandler = cgroupsV2Enabled
? new CGroupsV2HandlerImpl(conf, PrivilegedOperationExecutor.getInstance(conf))
: new CGroupsHandlerImpl(conf, PrivilegedOperationExecutor.getInstance(conf));
LOG.debug("Value of CGroupsHandler is: {}", cGroupsHandler);
if (cGroupsV1Handler == null) {
cGroupsV1Handler = new CGroupsHandlerImpl(
conf, PrivilegedOperationExecutor.getInstance(conf));
LOG.debug("Value of CGroupsV1Handler is: {}", cGroupsV1Handler);
}
}
}
}

return cGroupsHandler;
private static void initializeCGroupsV2Handler(Configuration conf)
throws ResourceHandlerException {
if (cGroupsV2Handler == null) {
synchronized (CGroupsHandler.class) {
if (cGroupsV2Handler == null) {
cGroupsV2Handler = new CGroupsV2HandlerImpl(
conf, PrivilegedOperationExecutor.getInstance(conf));
LOG.debug("Value of CGroupsV2Handler is: {}", cGroupsV2Handler);
}
}
}
}

private static boolean isMountedInCGroupsV2(CGroupsHandler.CGroupController controller) {
return (cGroupsV2Handler != null && cGroupsV2Handler.getControllerPath(controller) != null);
}

/**
Expand All @@ -101,18 +120,18 @@ private static CGroupsHandler getInitializedCGroupsHandler(Configuration conf)
*/

public static CGroupsHandler getCGroupsHandler() {
return cGroupsHandler;
return cgroupsV2Enabled ? cGroupsV2Handler : cGroupsV1Handler;
}

/**
* Returns relative root for cgroups. Returns null if cGroupsHandler is
* not initialized, or if the path is empty.
*/
public static String getCgroupsRelativeRoot() {
if (cGroupsHandler == null) {
if (getCGroupsHandler() == null) {
return null;
}
String cGroupPath = cGroupsHandler.getRelativePathForCGroup("");
String cGroupPath = getCGroupsHandler().getRelativePathForCGroup("");
if (cGroupPath == null || cGroupPath.isEmpty()) {
return null;
}
Expand Down Expand Up @@ -153,9 +172,13 @@ private static CpuResourceHandler initCGroupsCpuResourceHandler(
synchronized (CpuResourceHandler.class) {
if (cGroupsCpuResourceHandler == null) {
LOG.debug("Creating new cgroups cpu handler");
cGroupsCpuResourceHandler = cgroupsV2Enabled
? new CGroupsV2CpuResourceHandlerImpl(getInitializedCGroupsHandler(conf))
: new CGroupsCpuResourceHandlerImpl(getInitializedCGroupsHandler(conf));

initializeCGroupsHandlers(conf);
if (isMountedInCGroupsV2(CGroupsHandler.CGroupController.CPU)) {
cGroupsCpuResourceHandler = new CGroupsV2CpuResourceHandlerImpl(cGroupsV2Handler);
} else {
cGroupsCpuResourceHandler = new CGroupsCpuResourceHandlerImpl(cGroupsV1Handler);
}
return cGroupsCpuResourceHandler;
}
}
Expand All @@ -173,9 +196,11 @@ private static CpuResourceHandler initCGroupsCpuResourceHandler(
synchronized (OutboundBandwidthResourceHandler.class) {
if (trafficControlBandwidthHandler == null) {
LOG.info("Creating new traffic control bandwidth handler.");

initializeCGroupsHandlers(conf);
trafficControlBandwidthHandler = new
TrafficControlBandwidthHandlerImpl(PrivilegedOperationExecutor
.getInstance(conf), getInitializedCGroupsHandler(conf),
.getInstance(conf), cGroupsV1Handler,
new TrafficController(conf, PrivilegedOperationExecutor
.getInstance(conf)));
}
Expand Down Expand Up @@ -208,10 +233,11 @@ public static ResourceHandler getNetworkTaggingHandler(Configuration conf)
synchronized (OutboundBandwidthResourceHandler.class) {
if (networkPacketTaggingHandlerImpl == null) {
LOG.info("Creating new network-tagging-handler.");

initializeCGroupsHandlers(conf);
networkPacketTaggingHandlerImpl =
new NetworkPacketTaggingHandlerImpl(
PrivilegedOperationExecutor.getInstance(conf),
getInitializedCGroupsHandler(conf));
PrivilegedOperationExecutor.getInstance(conf), cGroupsV1Handler);
}
}
}
Expand Down Expand Up @@ -239,9 +265,10 @@ private static CGroupsBlkioResourceHandlerImpl getCgroupsBlkioResourceHandler(
synchronized (DiskResourceHandler.class) {
if (cGroupsBlkioResourceHandler == null) {
LOG.debug("Creating new cgroups blkio handler");

initializeCGroupsHandlers(conf);
cGroupsBlkioResourceHandler =
new CGroupsBlkioResourceHandlerImpl(
getInitializedCGroupsHandler(conf));
new CGroupsBlkioResourceHandlerImpl(cGroupsV1Handler);
}
}
}
Expand All @@ -263,9 +290,13 @@ public static MemoryResourceHandler initMemoryResourceHandler(
if (cGroupsMemoryResourceHandler == null) {
synchronized (MemoryResourceHandler.class) {
if (cGroupsMemoryResourceHandler == null) {
cGroupsMemoryResourceHandler = cgroupsV2Enabled
? new CGroupsV2MemoryResourceHandlerImpl(getInitializedCGroupsHandler(conf))
: new CGroupsMemoryResourceHandlerImpl(getInitializedCGroupsHandler(conf));

initializeCGroupsHandlers(conf);
if (isMountedInCGroupsV2(CGroupsHandler.CGroupController.MEMORY)) {
cGroupsMemoryResourceHandler = new CGroupsV2MemoryResourceHandlerImpl(cGroupsV2Handler);
} else {
cGroupsMemoryResourceHandler = new CGroupsMemoryResourceHandlerImpl(cGroupsV1Handler);
}
}
}
}
Expand Down Expand Up @@ -327,9 +358,10 @@ private static void addHandlersFromConfiguredResourcePlugins(
}

for (ResourcePlugin plugin : pluginMap.values()) {
initializeCGroupsHandlers(conf);
addHandlerIfNotNull(handlerList,
plugin.createResourceHandler(nmContext,
getInitializedCGroupsHandler(conf),
cGroupsV1Handler,
PrivilegedOperationExecutor.getInstance(conf)));
}
}
Expand Down Expand Up @@ -361,8 +393,9 @@ static void nullifyResourceHandlerChain() throws ResourceHandlerException {
}

@VisibleForTesting
static void resetCgroupsHandler() {
cGroupsHandler = null;
static void resetCGroupsHandlers() {
cGroupsV1Handler = null;
cGroupsV2Handler = null;
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public void setup() throws Exception {
networkEnabledConf.setBoolean(YarnConfiguration.NM_NETWORK_RESOURCE_ENABLED,
true);
ResourceHandlerModule.nullifyResourceHandlerChain();
ResourceHandlerModule.resetCgroupsHandler();
ResourceHandlerModule.resetCGroupsHandlers();
ResourceHandlerModule.resetCpuResourceHandler();
ResourceHandlerModule.resetMemoryResourceHandler();
}
Expand Down Expand Up @@ -116,57 +116,25 @@ public void testDiskResourceHandler() throws Exception {
}

@Test
public void testCpuResourceHandlerClassForCgroupV1() throws ResourceHandlerException {
Configuration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.NM_CPU_RESOURCE_ENABLED, true);
conf.setBoolean(YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_V2_ENABLED, false);

initResourceHandlerChain(conf);

Assert.assertTrue(ResourceHandlerModule.getCpuResourceHandler()
instanceof CGroupsCpuResourceHandlerImpl);
Assert.assertTrue(ResourceHandlerModule.getCGroupsHandler()
instanceof CGroupsHandlerImpl);
}

@Test
public void testCpuResourceHandlerClassForCgroupV2() throws ResourceHandlerException {
Configuration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.NM_CPU_RESOURCE_ENABLED, true);
conf.setBoolean(YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_V2_ENABLED, true);

initResourceHandlerChain(conf);

Assert.assertTrue(ResourceHandlerModule.getCpuResourceHandler()
instanceof CGroupsV2CpuResourceHandlerImpl);
Assert.assertTrue(ResourceHandlerModule.getCGroupsHandler()
instanceof CGroupsV2HandlerImpl);
}

@Test
public void testMemoryResourceHandlerClassForCgroupV1() throws ResourceHandlerException {
public void testCgroupsHandlerClassForCgroupV1() throws ResourceHandlerException {
Configuration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.NM_MEMORY_RESOURCE_ENABLED, true);
conf.setBoolean(YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_V2_ENABLED, false);

initResourceHandlerChain(conf);

Assert.assertTrue(ResourceHandlerModule.getMemoryResourceHandler()
instanceof CGroupsMemoryResourceHandlerImpl);
Assert.assertTrue(ResourceHandlerModule.getCGroupsHandler()
instanceof CGroupsHandlerImpl);
}

@Test
public void testMemoryResourceHandlerClassForCgroupV2() throws ResourceHandlerException {
public void testCgroupsHandlerClassForCgroupV2() throws ResourceHandlerException {
Configuration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.NM_MEMORY_RESOURCE_ENABLED, true);
conf.setBoolean(YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_V2_ENABLED, true);

initResourceHandlerChain(conf);

Assert.assertTrue(ResourceHandlerModule.getMemoryResourceHandler()
instanceof CGroupsV2MemoryResourceHandlerImpl);
Assert.assertTrue(ResourceHandlerModule.getCGroupsHandler()
instanceof CGroupsV2HandlerImpl);
}
Expand Down

0 comments on commit d945e4e

Please sign in to comment.