Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(pegboard): implement cleanup, rebuild, fix queries #1150

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions docs/libraries/workflow/GOTCHAS.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,13 +114,25 @@ the internal location.
`std::collections::HashMap` does not implement `Hash`. To get around this, use `util::serde::HashableMap`:

```rust
use util::serde::AsHashableExt;

struct Input {
map: HashMap<..., ...>,
}

// ...

ctx
.activity(MyActivityInput {
map: input.map.as_hashable(),
map: input.map.into(),
})
.await?;

// ...

struct MyActivityInput {
map: util::serde::HashableMap<..., ...>,
}

```

## Nested options with serde
Expand Down
4 changes: 3 additions & 1 deletion lib/bolt/core/src/tasks/ssh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ async fn ip_inner(
"root",
"-i",
ssh_key.path(),
ip
"-L",
"9090:10.0.0.84:8080",
ip,
)
.run()
}
Expand Down
18 changes: 11 additions & 7 deletions lib/pegboard/container-runner/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ fn main() -> anyhow::Result<()> {
Some(x) if x == "dynamic_server" => Stakeholder::DynamicServer {
server_id: var("PEGBOARD_META_server_id")?,
},
Some(x) => bail!("invalid container manager: {x}"),
None => bail!("no container manager specified"),
Some(x) => bail!("invalid container stakeholder: {x}"),
None => bail!("no container stakeholder specified"),
};

let (shutdown_tx, shutdown_rx) = mpsc::sync_channel(1);
Expand Down Expand Up @@ -112,8 +112,13 @@ fn run_container(
pegboard_container_dir: &Path,
root_user_enabled: bool,
) -> anyhow::Result<i32> {
let container_id = fs::read_to_string(pegboard_container_dir.join("container-id"))
.context("failed to read container-id")?;
// Extract container id from dir
let container_id = pegboard_container_dir
.iter()
.last()
.context("empty `pegboard_container_dir`")?
.to_string_lossy()
.to_string();
let oci_bundle_path = pegboard_container_dir.join("oci-bundle");
let oci_bundle_config_json = oci_bundle_path.join("config.json");

Expand Down Expand Up @@ -167,14 +172,13 @@ fn run_container(
// This will wait for the child to exit and then exit itself so we have time to ship all of the
// required logs
let mut signals = Signals::new(&[SIGTERM])?;
let runc_container_id = container_id.clone();
thread::spawn(move || {
for _ in signals.forever() {
println!("Received SIGTERM, forwarding to runc container {runc_container_id}");
println!("Received SIGTERM, forwarding to runc container {container_id}");
let status = Command::new("runc")
.arg("kill")
.arg("--all")
.arg(&runc_container_id)
.arg(&container_id)
.arg("SIGTERM")
.status();
println!("runc kill status: {:?}", status);
Expand Down
2 changes: 0 additions & 2 deletions lib/pegboard/manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,11 @@ license = "Apache-2.0"
anyhow = "1.0.79"
futures-util = { version = "0.3" }
indoc = "2.0"
lz4_flex = "0.11"
nix = { version = "0.27", default-features = false, features = ["user", "signal"] }
notify = { version = "6.1.1", default-features = false, features = [ "serde" ] }
reqwest = { version = "0.11", features = ["stream"] }
serde = { version = "1.0.195", features = ["derive"] }
serde_json = "1.0.111"
tar = "0.4.41"
tokio = { version = "1.27", default-features = false, features = ["fs", "process", "macros", "rt", "rt-multi-thread"] }
tokio-util = { version = "0.7", default-features = false, features = ["io-util"] }
tracing = "0.1"
Expand Down
134 changes: 74 additions & 60 deletions lib/pegboard/manager/src/container/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{
path::{Path, PathBuf},
sync::Arc,
time::{Duration, Instant},
time::Duration,
};

use anyhow::*;
Expand All @@ -23,8 +23,8 @@ mod setup;

/// How often to check for a PID when one is not present and a stop command was received.
const STOP_PID_INTERVAL: Duration = std::time::Duration::from_millis(250);
/// How long to wait until no longer waiting for a PID when a stop command was received.
const STOP_PID_TIMEOUT: Duration = std::time::Duration::from_secs(30);
/// How many times to check for a PID when a stop command was received.
const STOP_PID_RETRIES: usize = 32;
/// How often to check that a PID is still running when observing container state.
const PID_POLL_INTERVAL: Duration = std::time::Duration::from_millis(1000);
const VECTOR_SOCKET_ADDR: &str = "127.0.0.1:5021";
Expand All @@ -38,40 +38,50 @@ enum ObservationState {

pub struct Container {
container_id: Uuid,
config: protocol::ContainerConfig,

pid: Mutex<Option<Pid>>,
}

impl Container {
pub fn new(container_id: Uuid) -> Arc<Self> {
pub fn new(container_id: Uuid, config: protocol::ContainerConfig) -> Arc<Self> {
Arc::new(Container {
container_id,
config,

pid: Mutex::new(None),
})
}

pub async fn start(
self: &Arc<Self>,
ctx: &Arc<Ctx>,
config: protocol::ContainerConfig,
) -> Result<()> {
tracing::info!(container_id=?self.container_id, "starting container");
pub fn with_pid(container_id: Uuid, config: protocol::ContainerConfig, pid: Pid) -> Arc<Self> {
Arc::new(Container {
container_id,
config,

pid: Mutex::new(Some(pid)),
})
}

pub async fn start(self: &Arc<Self>, ctx: &Arc<Ctx>) -> Result<()> {
tracing::info!(container_id=?self.container_id, "starting");

// Write container to DB
let config_json = serde_json::to_vec(&self.config)?;
utils::query(|| async {
// NOTE: On conflict here in case this query runs but the command is not acknowledged
sqlx::query(indoc!(
"
INSERT INTO containers (
container_id,
config,
start_ts
)
VALUES (?1, ?2)
VALUES (?1, ?2, ?3)
ON CONFLICT (container_id) DO NOTHING
",
))
.bind(self.container_id)
.bind(&config_json)
.bind(utils::now())
.execute(&mut *ctx.sql().await?)
.await
Expand All @@ -84,75 +94,76 @@ impl Container {
})
.await?;

{
let s = self.clone();
let ctx = ctx.clone();
// Lifecycle
let self2 = self.clone();
let ctx2 = ctx.clone();
tokio::spawn(async move {
use std::result::Result::{Err, Ok};

tokio::spawn(async move {
if let Err(err) = s.setup(&ctx, config).await {
tracing::error!(?err, "container run failed");
match self2.setup(&ctx2).await {
Ok(container_runner_path) => match self2.run(&ctx2, container_runner_path).await {
Ok(pid) => {
if let Err(err) = self2.observe(&ctx2, pid).await {
tracing::error!(container_id=?self2.container_id, ?err, "observe failed");
}
}
Err(err) => {
tracing::error!(container_id=?self2.container_id, ?err, "run failed")
}
},
Err(err) => tracing::error!(container_id=?self2.container_id, ?err, "setup failed"),
}

// Cleanup
let mut containers = ctx.containers.write().await;
containers.remove(&s.container_id);
}
});
}
// Cleanup afterwards
self2.cleanup(&ctx2).await
});

Ok(())
}

async fn setup(
self: &Arc<Self>,
ctx: &Arc<Ctx>,
config: protocol::ContainerConfig,
) -> Result<()> {
async fn setup(self: &Arc<Self>, ctx: &Arc<Ctx>) -> Result<PathBuf> {
tracing::info!(container_id=?self.container_id, "setting up");

let container_path = ctx.container_path(self.container_id);

// Create container working dir
fs::create_dir(&container_path).await?;

// Download container runner
let container_runner_path = ctx
.fetch_container_runner(&config.container_runner_binary_url)
.fetch_container_runner(&self.config.container_runner_binary_url)
.await?;

setup::cni_bundle(self.container_id, &config, &ctx).await?;
self.setup_oci_bundle(&ctx).await?;

// Run CNI setup script
if let protocol::NetworkMode::Bridge = config.network_mode {
setup::cni_network(self.container_id, &config, &ctx).await?;
if let protocol::NetworkMode::Bridge = self.config.network_mode {
self.setup_cni_network(&ctx).await?;
}

Ok(container_runner_path)
}

async fn run(self: &Arc<Self>, ctx: &Arc<Ctx>, container_runner_path: PathBuf) -> Result<Pid> {
tracing::info!(container_id=?self.container_id, "spawning");

let mut runner_env = vec![
(
"PEGBOARD_META_root_user_enabled",
config.root_user_enabled.to_string(),
self.config.root_user_enabled.to_string(),
),
(
"PEGBOARD_META_vector_socket_addr",
VECTOR_SOCKET_ADDR.to_string(),
),
];
runner_env.extend(config.stakeholder.env());

self.run(ctx, container_runner_path, &runner_env).await?;

Ok(())
}

async fn run(
self: &Arc<Self>,
ctx: &Arc<Ctx>,
container_runner_path: PathBuf,
env: &[(&str, String)],
) -> Result<()> {
tracing::info!(container_id=?self.container_id, "spawning");
runner_env.extend(self.config.stakeholder.env());

// Spawn runner which spawns the container
let pid = setup::spawn_orphaned_container_runner(
container_runner_path,
ctx.container_path(self.container_id),
&env,
&runner_env,
)?;

tracing::info!(container_id=?self.container_id, ?pid, "pid received");
Expand All @@ -168,12 +179,14 @@ impl Container {
"
UPDATE containers
SET
running_ts = ?2
running_ts = ?2 AND
pid = ?3
WHERE container_id = ?1
",
))
.bind(self.container_id)
.bind(utils::now())
.bind(pid.as_raw())
.execute(&mut *ctx.sql().await?)
.await
})
Expand All @@ -187,13 +200,13 @@ impl Container {
})
.await?;

self.observe(ctx, pid).await?;

Ok(())
Ok(pid)
}

// Watch container for updates
async fn observe(&self, ctx: &Arc<Ctx>, pid: Pid) -> Result<()> {
pub(crate) async fn observe(&self, ctx: &Arc<Ctx>, pid: Pid) -> Result<()> {
tracing::info!(container_id=?self.container_id, ?pid, "observing");

let exit_code_path = ctx.container_path(self.container_id).join("exit-code");
let proc_path = Path::new("/proc").join(pid.to_string());

Expand Down Expand Up @@ -288,29 +301,29 @@ impl Container {
})
.await?;

tracing::info!(container_id=?self.container_id, "container complete");
tracing::info!(container_id=?self.container_id, "complete");

Ok(())
}

pub async fn stop(self: &Arc<Self>, ctx: &Arc<Ctx>) -> Result<()> {
tracing::info!(container_id=?self.container_id, "stopping");

let self2 = self.clone();
let ctx2 = ctx.clone();
tokio::spawn(async move {
if let Err(err) = self2.stop_inner(&ctx2).await {
tracing::error!(?err, "container stop failed");
}

// Cleanup regardless
let mut containers = ctx2.containers.write().await;
containers.remove(&self2.container_id);
self2.cleanup(&ctx2).await
});

Ok(())
}

async fn stop_inner(self: &Arc<Self>, ctx: &Arc<Ctx>) -> Result<()> {
let now = Instant::now();
let mut i = 0;

let pid = loop {
if let Some(pid) = *self.pid.lock().await {
Expand All @@ -319,14 +332,15 @@ impl Container {

tracing::warn!(container_id=?self.container_id, "waiting for pid to stop workflow");

if now.elapsed() > STOP_PID_TIMEOUT {
if i > STOP_PID_RETRIES {
tracing::error!(
container_id=?self.container_id,
"timed out waiting for container to get PID, considering container stopped",
);

break None;
}
i += 1;

tokio::time::sleep(STOP_PID_INTERVAL).await;
};
Expand All @@ -353,7 +367,7 @@ impl Container {

ctx.event(protocol::Event::ContainerStateUpdate {
container_id: self.container_id,
state: protocol::ContainerState::Stopping,
state: protocol::ContainerState::Stopped,
})
.await?;

Expand Down
Loading
Loading