diff --git a/crates/shim/Cargo.toml b/crates/shim/Cargo.toml index aa606a6a..9ba49ec5 100644 --- a/crates/shim/Cargo.toml +++ b/crates/shim/Cargo.toml @@ -23,7 +23,7 @@ go-flag = "0.1.0" thiserror = "1.0" log = { version = "0.4", features = ["std"] } libc = "0.2.95" -nix = "0.25" +nix = { version = "0.28.0", features = ["mount", "socket", "ioctl", "signal", "fs", "event"] } command-fds = "0.2.1" lazy_static = "1.4.0" time = { version = "0.3.7", features = ["serde", "std"] } @@ -37,7 +37,7 @@ prctl = "1.0.0" page_size = "0.4.2" regex = "1" -containerd-shim-protos = { path = "../shim-protos", version = "0.2.0" } +containerd-shim-protos = { path = "../shim-protos", version = "0.2.0", features = ["async"] } async-trait = { version = "0.1.51", optional = true } tokio = { version = "1.17.0", features = ["full"], optional = true } diff --git a/crates/shim/src/asynchronous/cgroup_memory.rs b/crates/shim/src/asynchronous/cgroup_memory.rs new file mode 100644 index 00000000..fc379b0f --- /dev/null +++ b/crates/shim/src/asynchronous/cgroup_memory.rs @@ -0,0 +1,245 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +#![cfg(target_os = "linux")] + +use cgroups_rs::hierarchies::is_cgroup2_unified_mode; +use log::warn; +use std::{ + os::unix::io::{AsRawFd, FromRawFd}, + path::Path, +}; + +use crate::{ + error::{Error, Result}, + io_error, other_error, +}; + +use crate::event::Event; +use containerd_shim_protos::events::task::TaskOOM; +use containerd_shim_protos::protobuf::MessageDyn; +use nix::sys::eventfd::{EfdFlags, EventFd}; +use tokio::sync::mpsc::Sender; +use tokio::{ + fs::{self, read_to_string, File}, + io::AsyncReadExt, + spawn, + sync::mpsc::{self, Receiver}, +}; + +pub type EventSender = Sender<(String, Box)>; + +#[cfg(target_os = "linux")] +fn run_oom_monitor(mut rx: Receiver, id: String, tx: EventSender) { + let oom_event = TaskOOM { + container_id: id, + ..Default::default() + }; + let topic = oom_event.topic(); + let oom_box = Box::new(oom_event); + spawn(async move { + while let Some(_item) = rx.recv().await { + tx.send((topic.to_string(), oom_box.clone())) + .await + .unwrap_or_else(|e| warn!("send {} to publisher: {}", topic, e)); + } + }); +} + +#[cfg(target_os = "linux")] +pub async fn monitor_oom(id: &String, pid: u32, tx: EventSender) -> Result<()> { + if !is_cgroup2_unified_mode() { + let path_from_cgorup = get_path_from_cgorup(pid).await?; + let (mount_root, mount_point) = get_existing_cgroup_mem_path(path_from_cgorup).await?; + + let mem_cgroup_path = mount_point + &mount_root; + let rx = register_memory_event(id, Path::new(&mem_cgroup_path), "memory.oom_control") + .await + .map_err(other_error!(e, "register_memory_event failed:"))?; + + run_oom_monitor(rx, id.to_string(), tx); + } + Ok(()) +} + +pub async fn get_path_from_cgorup(pid: u32) -> Result { + let proc_path = format!("/proc/{}/cgroup", pid); + let path_string = read_to_string(&proc_path) + .await + .map_err(io_error!(e, "open {}.", &proc_path))?; + + let (_, path) = path_string + .lines() + .find(|line| line.contains("memory")) + .ok_or(Error::Other("Memory line not found".into()))? + .split_once(":memory:") + .ok_or(Error::Other("Failed to parse memory line".into()))?; + + Ok(path.to_string()) +} + +pub async fn get_existing_cgroup_mem_path(pid_path: String) -> Result<(String, String)> { + let (mut mount_root, mount_point) = get_path_from_mountinfo().await?; + if mount_root == "/" { + mount_root = String::from(""); + } + let mount_root = pid_path.trim_start_matches(&mount_root).to_string(); + Ok((mount_root, mount_point)) +} + +async fn get_path_from_mountinfo() -> Result<(String, String)> { + let mountinfo_path = "/proc/self/mountinfo"; + let mountinfo_string = + read_to_string(mountinfo_path) + .await + .map_err(io_error!(e, "open {}.", mountinfo_path))?; + + let line = mountinfo_string + .lines() + .find(|line| line.contains("cgroup") && line.contains("memory")) + .ok_or(Error::Other( + "Lines containers cgroup and memory not found in mountinfo".into(), + ))?; + + parse_memory_mountroot(line) +} + +fn parse_memory_mountroot(line: &str) -> Result<(String, String)> { + let mut columns = line.split_whitespace(); + let mount_root = columns.nth(3).ok_or(Error::Other( + "Invalid input information about mountinfo".into(), + ))?; + let mount_point = columns.next().ok_or(Error::Other( + "Invalid input information about mountinfo".into(), + ))?; + Ok((mount_root.to_string(), mount_point.to_string())) +} + +pub async fn register_memory_event( + key: &str, + cg_dir: &Path, + event_name: &str, +) -> Result> { + let path = cg_dir.join(event_name); + let event_file = fs::File::open(path.clone()) + .await + .map_err(other_error!(e, "Error get path:"))?; + + let eventfd = EventFd::from_value_and_flags(0, EfdFlags::EFD_CLOEXEC)?; + + let event_control_path = cg_dir.join("cgroup.event_control"); + let data = format!("{} {}", eventfd.as_raw_fd(), event_file.as_raw_fd()); + fs::write(&event_control_path, data.clone()) + .await + .map_err(other_error!(e, "Error write eventfd:"))?; + + let mut buf = [0u8; 8]; + + let (sender, receiver) = mpsc::channel(128); + let key = key.to_string(); + + tokio::spawn(async move { + let mut eventfd_file = unsafe { File::from_raw_fd(eventfd.as_raw_fd()) }; + loop { + match eventfd_file.read(&mut buf).await { + Ok(bytes_read) if bytes_read == 0 => return, + Err(_) => return, + _ => (), + } + if !Path::new(&event_control_path).exists() { + return; + } + sender.send(key.clone()).await.unwrap(); + } + }); + + Ok(receiver) +} + +#[cfg(test)] +mod tests { + use std::path::Path; + + use crate::asynchronous::cgroup_memory::{ + get_existing_cgroup_mem_path, get_path_from_cgorup, register_memory_event, + }; + use cgroups_rs::{ + hierarchies::{self, is_cgroup2_unified_mode}, + memory::MemController, + Cgroup, CgroupPid, + }; + use tokio::{fs::remove_file, io::AsyncWriteExt, process::Command}; + + #[tokio::test] + async fn test_cgroupv1_oom_monitor() { + if !is_cgroup2_unified_mode() { + // Create a memory cgroup with limits on both memory and swap. + let path = "cgroupv1_oom_monitor"; + let cg = Cgroup::new(hierarchies::auto(), path).unwrap(); + + let mem_controller: &MemController = cg.controller_of().unwrap(); + mem_controller.set_limit(10 * 1024 * 1024).unwrap(); // 10M + mem_controller.set_swappiness(0).unwrap(); + + // Create a sh sub process, and let it wait for the stdinput. + let mut child_process = Command::new("sh") + .stdin(std::process::Stdio::piped()) + .spawn() + .unwrap(); + + let pid = child_process.id().unwrap(); + + // Add the sh subprocess to the cgroup. + cg.add_task_by_tgid(CgroupPid::from(pid as u64)).unwrap(); + + // Set oom monitor + let path_from_cgorup = get_path_from_cgorup(pid).await.unwrap(); + let (mount_root, mount_point) = get_existing_cgroup_mem_path(path_from_cgorup) + .await + .unwrap(); + + let mem_cgroup_path = mount_point + &mount_root; + let mut rx = register_memory_event( + pid.to_string().as_str(), + Path::new(&mem_cgroup_path), + "memory.oom_control", + ) + .await + .unwrap(); + + // Exec the sh subprocess to a dd command that consumes more than 10M of memory. + if let Some(mut stdin) = child_process.stdin.take() { + stdin + .write_all( + b"exec dd if=/dev/zero of=/tmp/test_oom_monitor_file bs=11M count=1\n", + ) + .await + .unwrap(); + stdin.flush().await.unwrap(); + } + + // Wait for the oom message. + if let Some(item) = rx.recv().await { + assert_eq!(pid.to_string(), item, "Receive error oom message"); + } + + // Clean. + child_process.wait().await.unwrap(); + cg.delete().unwrap(); + remove_file("/tmp/test_oom_monitor_file").await.unwrap(); + } + } +} diff --git a/crates/shim/src/asynchronous/mod.rs b/crates/shim/src/asynchronous/mod.rs index 984a28de..95820e84 100644 --- a/crates/shim/src/asynchronous/mod.rs +++ b/crates/shim/src/asynchronous/mod.rs @@ -66,6 +66,7 @@ use crate::{ Config, StartOpts, SOCKET_FD, TTRPC_ADDRESS, }; +pub mod cgroup_memory; pub mod console; pub mod container; pub mod monitor; diff --git a/crates/shim/src/asynchronous/task.rs b/crates/shim/src/asynchronous/task.rs index 450f292e..cbf6fc02 100644 --- a/crates/shim/src/asynchronous/task.rs +++ b/crates/shim/src/asynchronous/task.rs @@ -28,10 +28,11 @@ use containerd_shim_protos::{ ttrpc, ttrpc::r#async::TtrpcContext, }; -use log::{debug, info, warn}; +use log::{debug, error, info, warn}; use oci_spec::runtime::LinuxResources; use tokio::sync::{mpsc::Sender, MappedMutexGuard, Mutex, MutexGuard}; +use crate::asynchronous::cgroup_memory::monitor_oom; use crate::{ api::{ CreateTaskRequest, CreateTaskResponse, DeleteRequest, Empty, ExecProcessRequest, @@ -166,6 +167,10 @@ where ..Default::default() }) .await; + #[cfg(target_os = "linux")] + if let Err(e) = monitor_oom(&req.id, resp.pid, self.tx.clone()).await { + error!("monitor_oom failed: {:?}.", e); + } } else { self.send_event(TaskExecStarted { container_id: req.id.to_string(), diff --git a/crates/shim/src/mount.rs b/crates/shim/src/mount.rs index 54717c64..9276cb78 100644 --- a/crates/shim/src/mount.rs +++ b/crates/shim/src/mount.rs @@ -395,7 +395,7 @@ impl From for MountExitCode { impl From for nix::errno::Errno { fn from(code: MountExitCode) -> Self { match code { - MountExitCode::NixOtherErr(errno) => nix::errno::Errno::from_i32(errno), + MountExitCode::NixOtherErr(errno) => nix::errno::Errno::from_raw(errno), _ => nix::errno::Errno::UnknownErrno, } } @@ -412,7 +412,7 @@ impl From for Result<()> { MountExitCode::Success => Ok(()), MountExitCode::NixOtherErr(errno) => Err(other!( "mount process exit unexpectedly, exit code: {}", - nix::errno::Errno::from_i32(errno) + nix::errno::Errno::from_raw(errno) )), } } diff --git a/crates/shim/src/util.rs b/crates/shim/src/util.rs index 6583524b..058ebc93 100644 --- a/crates/shim/src/util.rs +++ b/crates/shim/src/util.rs @@ -16,7 +16,7 @@ use std::{ env, - os::unix::io::RawFd, + os::{fd::IntoRawFd, unix::io::RawFd}, time::{SystemTime, UNIX_EPOCH}, }; @@ -113,7 +113,7 @@ pub fn connect(address: impl AsRef) -> Result { #[cfg(not(target_os = "linux"))] const SOCK_CLOEXEC: SockFlag = SockFlag::empty(); - let fd = socket(AddressFamily::Unix, SockType::Stream, SOCK_CLOEXEC, None)?; + let fd = socket(AddressFamily::Unix, SockType::Stream, SOCK_CLOEXEC, None)?.into_raw_fd(); // MacOS doesn't support atomic creation of a socket descriptor with `SOCK_CLOEXEC` flag, // so there is a chance of leak if fork + exec happens in between of these calls. diff --git a/rust-toolchain.toml b/rust-toolchain.toml index c6549df5..7b7e745b 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,3 +1,3 @@ [toolchain] -channel = "1.65" +channel = "1.71" components = ["rustfmt", "clippy"]