Skip to content

Commit

Permalink
feat(pegboard): integrate setup scripts
Browse files Browse the repository at this point in the history
  • Loading branch information
MasterPtato committed Sep 28, 2024
1 parent 92504c4 commit c873078
Show file tree
Hide file tree
Showing 15 changed files with 1,482 additions and 181 deletions.
6 changes: 6 additions & 0 deletions lib/pegboard/container-runner/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ fn main() -> anyhow::Result<()> {
.context("`container_dir` arg required")?;
let pegboard_container_dir = Path::new(&pegboard_container_dir);

// Write PID to file
fs::write(
pegboard_container_dir.join("pid"),
std::process::id().to_string().as_bytes(),
)?;

let root_user_enabled = var("PEGBOARD_META_root_user_enabled")? == "1";
let stakeholder = match var("PEGBOARD_META_stakeholder").ok() {
Some(x) if x == "dynamic_server" => Stakeholder::DynamicServer {
Expand Down
2 changes: 2 additions & 0 deletions lib/pegboard/manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ license = "Apache-2.0"
anyhow = "1.0.79"
futures-util = { version = "0.3" }
indoc = "2.0"
lz4_flex = "0.11"
nix = { version = "0.27", default-features = false, features = ["user", "signal"] }
notify = { version = "6.1.1", default-features = false, features = [ "serde" ] }
reqwest = { version = "0.11", features = ["stream"] }
serde = { version = "1.0.195", features = ["derive"] }
serde_json = "1.0.111"
tar = "0.4.41"
tokio = { version = "1.27", default-features = false, features = ["fs", "process", "macros", "rt", "rt-multi-thread"] }
tokio-util = { version = "0.7", default-features = false, features = ["io-util"] }
tracing = "0.1"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
use std::{
os::unix::process::CommandExt,
path::{Path, PathBuf},
process::{Command, Stdio},
sync::Arc,
time::{Duration, Instant},
};
Expand All @@ -10,19 +8,19 @@ use anyhow::*;
use futures_util::{stream::FuturesUnordered, FutureExt, StreamExt};
use indoc::indoc;
use nix::{
sys::{
signal::{kill, Signal},
wait::{waitpid, WaitStatus},
},
unistd::{fork, pipe, read, write, ForkResult, Pid},
sys::signal::{kill, Signal},
unistd::Pid,
};
use pegboard::protocol;
use tokio::{fs, sync::Mutex};
use url::Url;
use uuid::Uuid;

use crate::{ctx::Ctx, utils};

mod oci_config;
mod seccomp;
mod setup;

/// How often to check for a PID when one is not present and a stop command was received.
const STOP_PID_INTERVAL: Duration = std::time::Duration::from_millis(250);
/// How long to wait until no longer waiting for a PID when a stop command was received.
Expand All @@ -39,35 +37,25 @@ enum ObservationState {
}

pub struct Container {
pub container_id: Uuid,
pub image_artifact_url: String,
pub container_runner_binary_url: String,
pub root_user_enabled: bool,
pub stakeholder: protocol::Stakeholder,
container_id: Uuid,

pub pid: Mutex<Option<Pid>>,
pid: Mutex<Option<Pid>>,
}

impl Container {
pub fn new(
container_id: Uuid,
image_artifact_url: String,
container_runner_binary_url: String,
root_user_enabled: bool,
stakeholder: protocol::Stakeholder,
) -> Arc<Self> {
pub fn new(container_id: Uuid) -> Arc<Self> {
Arc::new(Container {
container_id,
image_artifact_url,
container_runner_binary_url,
root_user_enabled,
stakeholder,

pid: Mutex::new(None),
})
}

pub async fn start(self: &Arc<Self>, ctx: &Arc<Ctx>) -> Result<()> {
pub async fn start(
self: &Arc<Self>,
ctx: &Arc<Ctx>,
config: protocol::ContainerConfig,
) -> Result<()> {
tracing::info!(container_id=?self.container_id, "starting container");

// Write container to DB
Expand All @@ -77,7 +65,7 @@ impl Container {
"
INSERT INTO containers (
container_id,
create_ts
start_ts
)
VALUES (?1, ?2)
ON CONFLICT (container_id) DO NOTHING
Expand All @@ -96,57 +84,101 @@ impl Container {
})
.await?;

let self2 = self.clone();
let ctx2 = ctx.clone();
tokio::spawn(async {
if let Err(err) = self2.run(ctx2).await {
tracing::error!(?err, "container run failed");
}
});
{
let s = self.clone();
let ctx = ctx.clone();

tokio::spawn(async move {
if let Err(err) = s.setup(&ctx, config).await {
tracing::error!(?err, "container run failed");

// Cleanup
let mut containers = ctx.containers.write().await;
containers.remove(&s.container_id);
}
});
}

Ok(())
}

async fn run(self: Arc<Self>, ctx: Arc<Ctx>) -> Result<()> {
async fn setup(
self: &Arc<Self>,
ctx: &Arc<Ctx>,
config: protocol::ContainerConfig,
) -> Result<()> {
let container_path = ctx.container_path(self.container_id);

fs::create_dir(&container_path).await?;

// Get local path of container runner
// Download container runner
let container_runner_path = ctx
.fetch_container_runner(&self.container_runner_binary_url)
.fetch_container_runner(&config.container_runner_binary_url)
.await?;

let url = Url::parse(&self.image_artifact_url)?;
let path_stub = utils::get_s3_path_stub(&url, false)?;
let path = container_path.join(path_stub);
setup::cni_bundle(self.container_id, &config, &ctx).await?;

// Download container image
utils::download_file(&self.image_artifact_url, &path).await?;
// Run CNI setup script
if let protocol::NetworkMode::Bridge = config.network_mode {
setup::cni_network(self.container_id, &config, &ctx).await?;
}

let mut env = vec![
let mut runner_env = vec![
(
"PEGBOARD_META_root_user_enabled",
self.root_user_enabled.to_string(),
config.root_user_enabled.to_string(),
),
(
"PEGBOARD_META_vector_socket_addr",
VECTOR_SOCKET_ADDR.to_string(),
),
];
env.extend(self.stakeholder.env());
runner_env.extend(config.stakeholder.env());

self.run(ctx, container_runner_path, &runner_env).await?;

Ok(())
}

async fn run(
self: &Arc<Self>,
ctx: &Arc<Ctx>,
container_runner_path: PathBuf,
env: &[(&str, String)],
) -> Result<()> {
tracing::info!(container_id=?self.container_id, "spawning");

// Spawn runner which spawns the container
let pid = spawn_orphaned_container_runner(container_runner_path, container_path, &env)?;
let pid = setup::spawn_orphaned_container_runner(
container_runner_path,
ctx.container_path(self.container_id),
&env,
)?;

tracing::info!(container_id=?self.container_id, ?pid, "pid received");

// Store PID
{
*self.pid.lock().await = Some(pid);
}

// Update DB
utils::query(|| async {
sqlx::query(indoc!(
"
UPDATE containers
SET
running_ts = ?2
WHERE container_id = ?1
",
))
.bind(self.container_id)
.bind(utils::now())
.execute(&mut *ctx.sql().await?)
.await
})
.await?;

ctx.event(protocol::Event::ContainerStateUpdate {
container_id: self.container_id,
state: protocol::ContainerState::Running {
Expand All @@ -161,7 +193,7 @@ impl Container {
}

// Watch container for updates
async fn observe(&self, ctx: Arc<Ctx>, pid: Pid) -> Result<()> {
async fn observe(&self, ctx: &Arc<Ctx>, pid: Pid) -> Result<()> {
let exit_code_path = ctx.container_path(self.container_id).join("exit-code");
let proc_path = Path::new("/proc").join(pid.to_string());

Expand Down Expand Up @@ -231,6 +263,7 @@ impl Container {

tracing::info!(container_id=?self.container_id, ?exit_code, "received exit code");

// Update DB
utils::query(|| async {
sqlx::query(indoc!(
"
Expand Down Expand Up @@ -263,16 +296,20 @@ impl Container {
pub async fn stop(self: &Arc<Self>, ctx: &Arc<Ctx>) -> Result<()> {
let self2 = self.clone();
let ctx2 = ctx.clone();
tokio::spawn(async {
if let Err(err) = self2.stop_inner(ctx2).await {
tokio::spawn(async move {
if let Err(err) = self2.stop_inner(&ctx2).await {
tracing::error!(?err, "container stop failed");
}

// Cleanup regardless
let mut containers = ctx2.containers.write().await;
containers.remove(&self2.container_id);
});

Ok(())
}

async fn stop_inner(self: Arc<Self>, ctx: Arc<Ctx>) -> Result<()> {
async fn stop_inner(self: &Arc<Self>, ctx: &Arc<Ctx>) -> Result<()> {
let now = Instant::now();

let pid = loop {
Expand Down Expand Up @@ -323,71 +360,3 @@ impl Container {
Ok(())
}
}

fn spawn_orphaned_container_runner(
container_runner_path: PathBuf,
container_path: PathBuf,
env: &[(&str, String)],
) -> Result<Pid> {
// Prepare the arguments for the container runner
let runner_args = vec![container_path.to_str().context("bad path")?];

// Pipe communication between processes
let (pipe_read, pipe_write) = pipe()?;

// NOTE: This is why we fork the process twice: https://stackoverflow.com/a/5386753
match unsafe { fork() }.context("process first fork failed")? {
ForkResult::Parent { child } => {
// Close the writing end of the pipe in the parent
nix::unistd::close(pipe_write)?;

// Ensure that the child process spawned successfully
match waitpid(child, None).context("waitpid failed")? {
WaitStatus::Exited(_, 0) => {
// Read the second child's PID from the pipe
let mut buf = [0u8; 4];
read(pipe_read, &mut buf)?;
let second_child_pid = Pid::from_raw(i32::from_le_bytes(buf));

Ok(second_child_pid)
}
WaitStatus::Exited(_, status) => {
bail!("Child process exited with status {}", status)
}
_ => bail!("Unexpected wait status for child process"),
}
}
ForkResult::Child => {
// Child process
match unsafe { fork() } {
Result::Ok(ForkResult::Parent { child }) => {
// Write the second child's PID to the pipe
let child_pid_bytes = child.as_raw().to_le_bytes();
write(pipe_write, &child_pid_bytes)?;

// Exit the intermediate child
std::process::exit(0);
}
Result::Ok(ForkResult::Child) => {
// Exit immediately on fail in order to not leak process
let err = Command::new(&container_runner_path)
.args(&runner_args)
.envs(env.iter().cloned())
.stdin(Stdio::null())
.stdout(Stdio::null())
.stderr(Stdio::null())
.exec();
eprintln!("exec failed: {err:?}");
std::process::exit(1);
}
Err(err) => {
// Exit immediately in order to not leak child process.
//
// The first fork doesn't need to exit on error since it
eprintln!("process second fork failed: {err:?}");
std::process::exit(1);
}
}
}
}
}
Loading

0 comments on commit c873078

Please sign in to comment.