Skip to content

Commit

Permalink
.
Browse files Browse the repository at this point in the history
  • Loading branch information
nagworld9 committed Nov 17, 2024
1 parent 287efe1 commit adf24d0
Showing 1 changed file with 82 additions and 48 deletions.
130 changes: 82 additions & 48 deletions azurelinuxagent/ga/cgroupconfigurator.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@
import subprocess
import threading

from orca.punctuation_settings import infinity
from orca.speech_generator import SYSTEM

from azurelinuxagent.common import conf
from azurelinuxagent.common import logger
from azurelinuxagent.ga.cgroupcontroller import AGENT_NAME_TELEMETRY, MetricsCounter
Expand Down Expand Up @@ -138,7 +135,7 @@ def initialize(self):
if self._initialized:
return
# check whether cgroup monitoring is supported on the current distro
self._cgroups_supported = self.__cgroups_supported()
self._cgroups_supported = self._check_cgroups_supported()
if not self._cgroups_supported:
# If a distro is not supported, attempt to clean up any existing drop in files in case it was
# previously supported. It is necessary to cleanup in this scenario in case the OS hits any bugs on
Expand All @@ -159,9 +156,9 @@ def initialize(self):
return

# Notes about slice setup:
# On first agent update (for machines where daemon version did not already create azure.slice), the
# agent creates azure.slice and the agent unit Slice drop-in file, but systemd does not move the agent
# unit to azure.slice until service restart. It is ok to enable cgroup usage in this case if agent is
# For machines where daemon version did not already create azure.slice, the
# agent creates azure.slice and the agent unit Slice drop-in file(without daemon-reload), but systemd does not move the agent
# unit to azure.slice until vm restart. It is ok to enable cgroup usage in this case if agent is
# running in system.slice.

self._setup_azure_slice()
Expand All @@ -170,7 +167,7 @@ def initialize(self):
self._cgroups_api.log_root_paths()

# Get agent cgroup
self._agent_cgroup = self._cgroups_api.get_process_cgroup(process_id="self", cgroup_name=AGENT_NAME_TELEMETRY)
self._agent_cgroup = self._cgroups_api.get_unit_cgroup(unit_name=agent_unit_name, cgroup_name=AGENT_NAME_TELEMETRY)

if conf.get_cgroup_disable_on_process_check_failure() and self._check_fails_if_processes_found_in_agent_cgroup_before_enable(agent_slice):
reason = "Found unexpected processes in the agent cgroup before agent enable cgroups."
Expand Down Expand Up @@ -198,7 +195,7 @@ def initialize(self):
log_cgroup_info('Agent cgroups enabled: {0}'.format(self._agent_cgroups_enabled))
self._initialized = True

def __cgroups_supported(self):
def _check_cgroups_supported(self):
distro_supported = CGroupUtil.distro_supported()
if not distro_supported:
log_cgroup_info("Cgroup monitoring is not supported on {0}".format(get_distro()), send_event=True)
Expand All @@ -210,7 +207,7 @@ def __cgroups_supported(self):

log_cgroup_info("systemd version: {0}".format(systemd.get_version()))

if not self.__check_no_legacy_cgroups():
if not self._check_no_legacy_cgroups():
return False

try:
Expand All @@ -236,7 +233,7 @@ def __cgroups_supported(self):
return True

@staticmethod
def __check_no_legacy_cgroups():
def _check_no_legacy_cgroups():
"""
Older versions of the daemon (2.2.31-2.2.40) wrote their PID to /sys/fs/cgroup/{cpu,memory}/WALinuxAgent/WALinuxAgent. When running
under systemd this could produce invalid resource usage data. Cgroups should not be enabled under this condition.
Expand Down Expand Up @@ -268,10 +265,10 @@ def _setup_azure_slice():
This method ensures that the "azure" and "vmextensions" slices are created. Setup should create those slices
under /lib/systemd/system; but if they do not exist, __ensure_azure_slices_exist will create them.
It also creates drop-in files to set the agent's Slice and CPUAccounting if they have not been
set up in the agent's unit file.
Lastly, the method also cleans up unit files left over from previous versions of the agent.
Note: New agent switching to use systemctl cmd instead of drop-files for desired configuration. So, cleaning up the old drop-in files.
We will keep cleanup code for few agents, until we determine all vms moved to new agent version.
"""

# Older agents used to create this slice, but it was never used. Cleanup the file.
Expand Down Expand Up @@ -305,7 +302,7 @@ def _setup_azure_slice():
# New agent will setup limits for scope instead slice, so removing existing logcollector slice.
CGroupConfigurator._Impl._cleanup_unit_file(logcollector_slice)

# Cleanup the old drop-in files
# Cleanup the old drop-in files, new agent will use systemdctl set-property to enable accounting and limits
CGroupConfigurator._Impl._cleanup_unit_file(agent_drop_in_file_cpu_accounting)

CGroupConfigurator._Impl._cleanup_unit_file(agent_drop_in_file_memory_accounting)
Expand Down Expand Up @@ -348,21 +345,16 @@ def _reset_agent_cgroup_setup(self):

@staticmethod
def _enable_accounting(unit_name):
"""
Enable CPU and Memory accounting for the unit
"""
try:
# since we don't use daemon-reload and drop-files for accounting, so it will be enabled with systemctl set-property
systemd.set_unit_properties_run_time(systemd.get_agent_unit_name(), CPUAccounting="yes", MemoryAccounting="yes")
accounting_properties = ["CPUAccounting=yes", "MemoryAccounting=yes"]
systemd.set_unit_properties_run_time(unit_name, accounting_properties)
except Exception as exception:
log_cgroup_warning("Failed to set accounting properties for the agent: {0}".format(ustr(exception)))

@staticmethod
def __reload_systemd_config():
# reload the systemd configuration; the new slices will be used once the agent's service restarts
try:
log_cgroup_info("Executing systemctl daemon-reload...", send_event=False)
shellutil.run_command(["systemctl", "daemon-reload"])
except Exception as exception:
log_cgroup_warning("daemon-reload failed (create azure slice): {0}".format(ustr(exception)))

# W0238: Unused private member `_Impl.__create_unit_file(path, contents)` (unused-private-member)
@staticmethod
def _create_unit_file(path, contents): # pylint: disable=unused-private-member
Expand Down Expand Up @@ -405,23 +397,54 @@ def _create_all_files(files_to_create):
CGroupConfigurator._Impl._cleanup_unit_file(unit_file)
return

@staticmethod
def _calculate_current_cpu_quota(cpu_quota_per_sec_usec):
"""
Calculate the CPU percentage from CPUQuotaPerSecUSec.
Params:
cpu_quota_per_sec_usec (str): The value of CPUQuotaPerSecUSec (e.g., "1s", "500ms", "500us", or "infinity").
Returns:
str: CPU percentage, or empty if 'infinity'.
"""
try:
if cpu_quota_per_sec_usec.lower() == "infinity":
return "" # No limit on CPU usage

# Parse the value based on the suffix
if cpu_quota_per_sec_usec.endswith("s"):
# Convert seconds to microseconds
cpu_quota_us = float(cpu_quota_per_sec_usec[:-1]) * 1_000_000
elif cpu_quota_per_sec_usec.endswith("ms"):
# Convert milliseconds to microseconds
cpu_quota_us = float(cpu_quota_per_sec_usec[:-2]) * 1_000
elif cpu_quota_per_sec_usec.endswith("us"):
# Directly use the microseconds value
cpu_quota_us = float(cpu_quota_per_sec_usec[:-2])
else:
raise ValueError("Invalid format. Expected 's', 'ms', 'us', or 'infinity'.")

# Calculate CPU percentage
cpu_percentage = (cpu_quota_us / 1_000_000) * 100
return str(cpu_percentage)
except Exception as e:
log_cgroup_warning("Error parsing current CPUQuotaPerSecUSec: {0}".format(ustr(e)))
return ""

def is_extension_resource_limits_setup_completed(self, extension_name, cpu_quota=None):
unit_file_install_path = systemd.get_unit_file_install_path()
old_extension_slice_path = os.path.join(unit_file_install_path, CGroupUtil.get_extension_slice_name(extension_name, old_slice=True))
# clean up the old slice from the disk
if os.path.exists(old_extension_slice_path):
CGroupConfigurator._Impl._cleanup_unit_file(old_extension_slice_path)

extension_slice_path = os.path.join(unit_file_install_path,
CGroupUtil.get_extension_slice_name(extension_name))
extension_slice_name = CGroupUtil.get_extension_slice_name(extension_name)
cpu_quota = str(
cpu_quota) + "%" if cpu_quota is not None else "" # setting an empty value resets to the default (infinity)
slice_contents = _EXTENSION_SLICE_CONTENTS.format(extension_name=extension_name,
cpu_quota=cpu_quota)
if os.path.exists(extension_slice_path):
with open(extension_slice_path, "r") as file_:
if file_.read() == slice_contents:
return True
cpu_quota_per_sec_usec = systemd.get_unit_property(extension_slice_name, "CPUQuotaPerSecUSec")
current_cpu_quota = CGroupConfigurator._Impl._calculate_current_cpu_quota(cpu_quota_per_sec_usec)
if current_cpu_quota == cpu_quota:
return True
return False

def supported(self):
Expand Down Expand Up @@ -472,9 +495,9 @@ def disable(self, reason, disable_cgroups):
@staticmethod
def _set_cpu_quota(unit_name, quota):
"""
Sets the agent's CPU quota to the given percentage (100% == 1 CPU)
Sets CPU quota to the given percentage (100% == 1 CPU)
NOTE: This is done using a dropin file in the default dropin directory; any local overrides on the VM will take precedence
NOTE: This is done using a systemtcl set-property --runtime; any local overrides in /etc folder on the VM will take precedence
over this setting.
"""
quota_percentage = "{0}%".format(quota)
Expand All @@ -497,6 +520,10 @@ def _reset_cpu_quota(unit_name):
@staticmethod
def _try_set_cpu_quota(unit_name, quota): # pylint: disable=unused-private-member
try:
cpu_quota_per_sec_usec = systemd.get_unit_property(unit_name, "CPUQuotaPerSecUSec")
current_cpu_quota = CGroupConfigurator._Impl._calculate_current_cpu_quota(cpu_quota_per_sec_usec)
if current_cpu_quota == quota:
return
systemd.set_unit_property_run_time(unit_name, "CPUQuota", quota)
except Exception as exception:
log_cgroup_warning('Failed to set CPUQuota: {0}'.format(ustr(exception)))
Expand All @@ -514,7 +541,7 @@ def _check_fails_if_processes_found_in_agent_cgroup_before_enable(self, agent_sl
return False
try:
log_cgroup_info("Checking for unexpected processes in the agent's cgroup before enabling cgroups")
self._check_processes_in_agent_cgroup()
self._check_processes_in_agent_cgroup(check_before_enable=True)
except CGroupsException as exception:
log_cgroup_warning(ustr(exception))
return True
Expand Down Expand Up @@ -554,15 +581,18 @@ def check_cgroups(self, cgroup_metrics):
finally:
self._check_cgroups_lock.release()

def _check_processes_in_agent_cgroup(self):
def _check_processes_in_agent_cgroup(self, check_before_enable=False):
"""
Verifies that the agent's cgroup includes only the current process, its parent, commands started using shellutil and instances of systemd-run
(those processes correspond, respectively, to the extension handler, the daemon, commands started by the extension handler, and the systemd-run
commands used to start extensions on their own cgroup).
Other processes started by the agent (e.g. extensions) and processes not started by the agent (e.g. services installed by extensions) are reported
as unexpected, since they should belong to their own cgroup.
Raises a CGroupsException if the check fails
Raises a CGroupsException only when current unexpected process seen last time.
Note: This check added as conservative approach before cgroups feature stable. Now this producing noise due to race issues, we identify extra process before systemd move to new cgroup or process about to die.
So changing the behavior to report only when we see the unexpected process last time. Later we will remove this check if no issues reported.
"""
current_unexpected = {}
agent_cgroup_proc_names = []
Expand Down Expand Up @@ -600,12 +630,15 @@ def _check_processes_in_agent_cgroup(self):
# If so, consider it as valid process in agent cgroup.
if current == 0 and not (self._is_process_descendant_of_the_agent(process) or self._is_zombie_process(process)):
current_unexpected[process] = self._format_process(process)
for process in current_unexpected:
if process in self._unexpected_processes:
report.append(current_unexpected[process])
if len(report) >= 5: # collect just a small sample
break
self._unexpected_processes = current_unexpected
if check_before_enable:
report = [current_unexpected[process] for process in current_unexpected]
else:
for process in current_unexpected:
if process in self._unexpected_processes:
report.append(current_unexpected[process])
if len(report) >= 5: # collect just a small sample
break
self._unexpected_processes = current_unexpected
except Exception as exception:
log_cgroup_warning("Error checking the processes in the agent's cgroup: {0}".format(ustr(exception)))

Expand Down Expand Up @@ -837,6 +870,7 @@ def setup_extension_slice(self, extension_name, cpu_quota):
"""
if self.enabled():
try:
# clean up the old slice from the disk, new agent use systemdctl set-property
unit_file_install_path = systemd.get_unit_file_install_path()
extension_slice = CGroupUtil.get_extension_slice_name(extension_name)
extension_slice_path = os.path.join(unit_file_install_path, extension_slice)
Expand Down Expand Up @@ -873,15 +907,15 @@ def remove_extension_slice(self, extension_name):
def set_extension_services_cpu_memory_quota(self, services_list):
"""
Each extension service will have name, systemd path and it's quotas.
This method ensures that drop-in files are created under service.d folder if quotas given.
ex: /lib/systemd/system/extension.service.d/11-CPUAccounting.conf
This method ensure limits set with systemtctl at runtime
TODO: set memory quotas
"""
if self.enabled() and services_list is not None:
for service in services_list:
service_name = service.get('name', None)
unit_file_path = systemd.get_unit_file_install_path()
if service_name is not None and unit_file_path is not None:
# remove drop files from disk, new agent use systemdctl set-property
files_to_remove = []
drop_in_path = os.path.join(unit_file_path, "{0}.d".format(service_name))
drop_in_file_cpu_accounting = os.path.join(drop_in_path,
Expand All @@ -908,7 +942,7 @@ def _reset_extension_services_cpu_quota(self, services_list):
"""
Removes any CPUQuota on the extension service
NOTE: This resets the quota on the extension service's default dropin file; any local overrides on the VM will take precedence
NOTE: This resets the quota on the extension service's default; any local overrides on the VM will take precedence
over this setting.
"""
if self.enabled() and services_list is not None:
Expand All @@ -927,7 +961,7 @@ def _reset_extension_services_cpu_quota(self, services_list):

def remove_extension_services_drop_in_files(self, services_list):
"""
Remove the dropin files from service .d folder for the given service
Remove the dropin files from service .d folder for the given service and reset the quotas
"""
if services_list is not None:
for service in services_list:
Expand Down

0 comments on commit adf24d0

Please sign in to comment.