Skip to content
This repository has been archived by the owner on Dec 21, 2021. It is now read-only.

Remove systemd units without a corresponding pod #312

Merged
merged 3 commits into from
Sep 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

## [Unreleased]

### Added
- Cleanup stage added where systemd units without corresponding pods are
removed on startup ([#312]).

[#312]: https://github.com/stackabletech/agent/pull/312

## [0.6.1] - 2021-09-14

### Changed
Expand Down
3 changes: 3 additions & 0 deletions docs/modules/ROOT/nav.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,8 @@
* xref:limitations.adoc[]
* xref:services.adoc[]
* xref:jobs.adoc[]
* Stages
** xref:stages/overview.adoc[]
** xref:stages/cleanup.adoc[]
* Monitoring
** xref:monitoring/logs.adoc[]
8 changes: 8 additions & 0 deletions docs/modules/ROOT/pages/stages/cleanup.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
= Cleanup stage

On startup the systemd units in the `system-stackable` slice are
compared to the pods assigned to this node. If a systemd unit is as
expected then it is kept and the Stackable agent will take ownership
again in a later stage. If there is no corresponding pod or the systemd
unit differs from the pod specification then it is removed and the
Stackable agent will create a new systemd unit afterwards.
26 changes: 26 additions & 0 deletions docs/modules/ROOT/pages/stages/overview.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
= Overview

When the Stackable Agent starts, it runs through the following stages:

* Check configured directories and files.
** Check if the optional files can be opened if they exist.
** Create the directories where write access is required and which do
not exist yet.
** Check the configured directories if they are writable by the current
process.
* Bootstrap the cluster with TLS certificates but only if no existing
kubeconfig can be found.
* Remove all systemd units from a previous run without a corresponding
pod (see xref:stages/cleanup.adoc[]).
* Start the kubelet.

After the kubelet was started, assigned pods run through the following
stages:

* Download the package from a registered Stackable repository.
* Unpack the package and install it.
* Create the configuration files according to the config maps.
* Create, start, and enable the systemd units.
* Monitor the systemd units and patch the pod status accordingly.
* Stop, disable, and remove the systemd units on termination or when the
pod is deleted.
2 changes: 2 additions & 0 deletions src/bin/stackable-agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ async fn main() -> anyhow::Result<()> {
.await
.expect("Error initializing provider.");

provider.cleanup(&krustlet_config.node_name).await;

let kubelet = Kubelet::new(provider, kubeconfig, krustlet_config).await?;
kubelet.start().await
}
Expand Down
245 changes: 245 additions & 0 deletions src/provider/cleanup.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
//! Initial cleanup
//!
//! On startup the systemd units in the `system-stackable` slice are compared to the pods assigned
//! to this node. If a systemd unit is as expected then it is kept and the Stackable Agent will
//! take ownership again in the `Starting` stage. If there is no corresponding pod or the systemd
//! unit differs from the pod specification then it is removed and the Stackable Agent will create
//! a new systemd unit in the `CreatingService` stage.
//!
//! The cleanup stage is implemented as part of the [`StackableProvider`] because the expected
//! content of a systemd unit file can only be determined with the directories configured in the
//! provider.
//!
//! The cleanup code resides in a separate module because the amount of code justifies it and the
//! log output is more meaningful. It makes it clearer whether a systemd unit is removed in the
//! cleanup stage or in the normal process.
use std::collections::HashMap;

use anyhow::Context;
use k8s_openapi::api::core::v1::Pod as KubePod;
use kube::api::{ListParams, Meta, ObjectList};
use kube::Api;
use kubelet::pod::Pod;
use kubelet::provider::Provider;
use log::{debug, error, info, warn};
use tokio::fs::{read_to_string, remove_file};

use super::systemdmanager::systemdunit::SystemDUnit;
use super::systemdmanager::systemdunit::STACKABLE_SLICE;
use super::StackableProvider;

impl StackableProvider {
/// Removes systemd units without corresponding pods.
///
/// The systemd units in the `system-stackable` slice are compared with the pods assigned to
/// this node and all units without corresponding pods or which differ from the pod
/// specifications are removed.
pub async fn cleanup(&self, node_name: &str) {
let systemd_manager = &self.shared.systemd_manager;

if let Err(error) = systemd_manager.reload().await {
error!(
"Skipping the cleanup stage because the systemd daemon reload failed. {}",
error
);
return;
}

let units_in_slice = match systemd_manager.slice_content(STACKABLE_SLICE).await {
Ok(units_in_slice) => units_in_slice,
Err(error) => {
debug!(
"Skipping the cleanup stage because no systemd units were found in the slice \
[{}]. {}",
STACKABLE_SLICE, error
);
return;
}
};

let pods = match self.assigned_pods(node_name).await {
Ok(pods) => pods.items,
Err(error) => {
error!(
"The assigned pods could not be retrieved. All systemd units in the slice [{}] \
will be removed. {}",
STACKABLE_SLICE, error
);
Vec::new()
}
};

let mut units_from_pods = HashMap::new();
for pod in pods {
let pod_terminating = pod.metadata.deletion_timestamp.is_some();

match self.units_from_pod(&pod).await {
Ok(units) => {
for (unit_name, content) in units {
units_from_pods.insert(unit_name, (content, pod_terminating));
}
}
Err(error) => warn!(
"Systemd units could not be generated for pod [{}/{}]. {}",
pod.namespace().unwrap_or_else(|| String::from("default")),
pod.name(),
error
),
}
}

let mut unit_removed = false;

for unit_name in &units_in_slice {
let remove_unit = match units_from_pods.get(unit_name) {
Some((expected_content, pod_terminating)) => {
match self.unit_file_content(unit_name).await {
Ok(Some(content)) if &content == expected_content && !pod_terminating => {
info!(
"The systemd unit [{}] will be kept because a corresponding pod \
exists.",
unit_name
);
false
}
Ok(Some(_)) if *pod_terminating => {
info!(
"The systemd unit [{}] will be removed because the corresponding \
pod is terminating.",
unit_name
);
true
}
Ok(Some(content)) => {
info!(
"The systemd unit [{}] will be removed because it differs from the \
corresponding pod specification.\n\
expected content:\n\
{}\n\n\
actual content:\n\
{}",
unit_name, expected_content, content
);
true
}
Ok(None) => {
info!(
"The systemd unit [{}] will be removed because its file path could \
not be determined.",
unit_name
);
true
}
Err(error) => {
warn!(
"The systemd unit [{}] will be removed because the file content \
could not be retrieved. {}",
unit_name, error
);
true
}
}
}
None => {
info!(
"The systemd unit [{}] will be removed because no corresponding pod \
exists.",
unit_name
);
true
}
};

if remove_unit {
self.remove_unit(unit_name).await;
unit_removed = true;
}
}

if unit_removed {
let _ = systemd_manager.reload().await;
}
}

/// Returns a list of all pods assigned to the given node.
async fn assigned_pods(&self, node_name: &str) -> anyhow::Result<ObjectList<KubePod>> {
let client = &self.shared.client;

let api: Api<KubePod> = Api::all(client.to_owned());
let lp = ListParams::default().fields(&format!("spec.nodeName={}", node_name));
api.list(&lp).await.with_context(|| {
format!(
"The pods assigned to this node (nodeName = [{}]) could not be retrieved.",
node_name
)
})
}

/// Creates the systemd unit files for the given pod in memory.
///
/// A mapping from systemd unit file names to the file content is returned.
async fn units_from_pod(&self, kubepod: &KubePod) -> anyhow::Result<HashMap<String, String>> {
let systemd_manager = &self.shared.systemd_manager;

let mut units = HashMap::new();
let pod = Pod::from(kubepod.to_owned());
let pod_state = self.initialize_pod_state(&pod).await?;

for container in pod.containers() {
let unit = SystemDUnit::new(
systemd_manager.is_user_mode(),
&pod_state,
&self.shared.kubeconfig_path,
&pod,
&container,
)?;
units.insert(unit.get_name(), unit.get_unit_file_content());
}

Ok(units)
}

/// Returns the content of the given systemd unit file.
async fn unit_file_content(&self, unit_name: &str) -> anyhow::Result<Option<String>> {
let systemd_manager = &self.shared.systemd_manager;

let file_path_result = systemd_manager
.fragment_path(unit_name)
.await
.with_context(|| {
format!(
"The file path of the unit [{}] could not be determined.",
unit_name
)
});

match file_path_result {
Ok(Some(file_path)) => {
let file_content = read_to_string(&file_path)
.await
.with_context(|| format!("The file [{}] could not be read.", file_path))?;
Ok(Some(file_content))
}
Ok(None) => Ok(None),
Err(error) => Err(error),
}
}

/// Stops, disables and removes the given systemd unit.
async fn remove_unit(&self, unit_name: &str) {
let systemd_manager = &self.shared.systemd_manager;

if let Err(error) = systemd_manager.stop(unit_name).await {
warn!("{}", error);
}
if let Err(error) = systemd_manager.disable(unit_name).await {
warn!("{}", error);
}
if let Ok(Some(file_path)) = systemd_manager.fragment_path(unit_name).await {
debug!("Removing file [{}].", file_path);
if let Err(error) = remove_file(file_path).await {
warn!("{}", error);
}
}
}
}
Loading