Skip to content

Commit

Permalink
feat(pegboard): add container runner and manager
Browse files Browse the repository at this point in the history
  • Loading branch information
MasterPtato committed Sep 28, 2024
1 parent 458de12 commit 91cd21c
Show file tree
Hide file tree
Showing 33 changed files with 1,549 additions and 165 deletions.
2 changes: 1 addition & 1 deletion docs/libraries/workflow/GOTCHAS.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ Instead, you can increment the location preemptively with `ctx.step()`:

```rust
let iter = actions.into_iter().map(|action| {
let ctx = ctx.step();
let mut ctx = ctx.step();

async move {
ctx.activity(MyActivityInput {
Expand Down
1 change: 0 additions & 1 deletion lib/chirp-workflow/core/src/executable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ where
}
}

// Closure executable impl
#[async_trait]
impl<T: Executable> Executable for Option<T> {
type Output = Option<T::Output>;
Expand Down
70 changes: 57 additions & 13 deletions lib/chirp-workflow/core/src/signal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,20 @@ pub trait Signal {
///
/// Example:
/// ```rust
/// #[macros::signal("my-signal")]
/// #[signal("my_signal")]
/// struct MySignal {
/// x: i64,
/// }
/// #[macros::signal("my-signal2")]
/// #[signal("my_signal2")]
/// struct MySignal2 {
/// y: i64,
/// }
///
/// join_signal!(MyJoinSignal, [MySignal, MySignal2]);
/// join_signal!(MyJoinSignal {
/// MySignal,
/// MySignal2
/// });
///
/// // Automatically becomes:
/// enum MyJoinSignal {
Expand All @@ -29,44 +32,85 @@ pub trait Signal {
/// MySignal(sig) => println!("received MySignal {sig:?}"),
/// MySignal2(sig) => println!("received MySignal2 {sig:?}"),
/// }
/// ````
///
///
/// // Also allows aliases:
/// join_signal!(MyJoinSignal {
/// MySignal,
/// MySignal2(some_pkg::Signal),
/// }
/// ```
#[macro_export]
macro_rules! join_signal {
($vis:vis $join:ident, [$($signals:ident),* $(,)?]) => {
$vis enum $join {
$($signals($signals)),*
($vis:vis $join:ident { $($tt:tt)* }) => {
join_signal!(@ $vis $join [] [] $($tt)*);
};
(@
$vis:vis $join:ident
[$({ $names:tt } { $types:tt })*]
[$({ $just_types:tt })*]
) => {
$vis enum $join {
$( $names ($types) ),*
}

#[async_trait::async_trait]
#[async_trait::async_trait]
impl Listen for $join {
async fn listen(ctx: &chirp_workflow::prelude::ListenCtx) -> chirp_workflow::prelude::WorkflowResult<Self> {
let row = ctx.listen_any(&[
$(<$signals as chirp_workflow::prelude::Signal>::NAME),*
$(<$just_types as chirp_workflow::prelude::Signal>::NAME),*
]).await?;

Self::parse(&row.signal_name, row.body)
}

fn parse(name: &str, body: serde_json::Value) -> chirp_workflow::prelude::WorkflowResult<Self> {
$(
if name == <$signals as chirp_workflow::prelude::Signal>::NAME {
if name == <$types as chirp_workflow::prelude::Signal>::NAME {
Ok(
Self::$signals(
Self::$names(
serde_json::from_value(body)
.map_err(WorkflowError::DeserializeSignalBody)?
)
)
}
)else*

else {
unreachable!(
"received signal that wasn't queried for: {}, expected {:?}",
name, &[$(<$signals as chirp_workflow::prelude::Signal>::NAME),*]
name, &[$(<$just_types as chirp_workflow::prelude::Signal>::NAME),*]
);
}
}
}
};
(@
$vis:vis $join:ident
[$({ $names:tt } { $types:tt })*]
[$({ $just_types:tt })*]
$name:ident,
$($tail:tt)*
) => {
join_signal!(@
$vis $join
[$( { $names } { $types } )* { $name } { $name }]
[$( { $just_types } )* { $name }]
$($tail)*
);
};
(@
$vis:vis $join:ident
[$({ $names:tt } { $types:tt })*]
[$({ $just_types:tt })*]
$name:ident($ty:ty),
$($tail:tt)*
) => {
join_signal!(@
$vis $join
[$( { $names } { $types } )* { $name } { $ty }]
[$( { $just_types } )* { $ty }]
$($tail)*
);
};
}
pub use join_signal;
6 changes: 6 additions & 0 deletions lib/pegboard/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
[workspace]
members = [
"container-runner",
"manager",
]

5 changes: 5 additions & 0 deletions lib/pegboard/container-runner/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
/target
**/target
.dockerignore
Dockerfile

18 changes: 18 additions & 0 deletions lib/pegboard/container-runner/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
[package]
name = "container-runner"
version = "0.1.0"
edition = "2021"
authors = ["Rivet Gaming, LLC <developer@rivet.gg>"]
license = "Apache-2.0"

[dependencies]
anyhow = "1.0.79"
portpicker = "0.1.1"
serde = { version = "1.0.195", features = ["derive"] }
serde_json = "1.0.111"
signal-hook = "0.3.17"

[dev-dependencies]
tempfile = "3.9.0"
uuid = { version = "1.6.1", features = ["v4"] }

7 changes: 7 additions & 0 deletions lib/pegboard/container-runner/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
FROM clux/muslrust:1.80.0-stable

WORKDIR /app
COPY Cargo.toml Cargo.lock .
COPY src/ src/
RUN cargo build --release

8 changes: 8 additions & 0 deletions lib/pegboard/container-runner/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# container-runner

This crate is used to run OCI bundles on the job servers themselves. This takes care of trapping signals, log
shipping, rate limiting logs, and more.

## Deployment

This gets built & deployed in `infra/tf/infra-artifacts/` then used in `TODO`.
123 changes: 123 additions & 0 deletions lib/pegboard/container-runner/src/log_shipper.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
use std::{io::Write, net::TcpStream, sync::mpsc, thread::JoinHandle};

use anyhow::*;
use serde::Serialize;
use serde_json;

use crate::utils::{var, Stakeholder};

#[derive(Copy, Clone, Debug)]
#[repr(u8)]
pub enum StreamType {
StdOut = 0,
StdErr = 1,
}

pub struct ReceivedMessage {
pub stream_type: StreamType,
pub ts: u64,
pub message: String,
}

/// Sends logs from the container to the Vector agent on the machine.
///
/// This will run until the `msg_rx` sender is dropped before shutting down.
///
/// If attempting to reconnect while the runner is shut down, this will exit immediately, dropping
/// all logs in the process. This is to ensure that if Vector becomes unreachable, we don't end up
/// with a lot of lingering runners that refuse to exit.
pub struct LogShipper {
/// Notifies of process shutdown.
pub shutdown_rx: mpsc::Receiver<()>,

/// Receiver for messages to be shipped. This holds a buffer of messages waiting to be send.
///
/// If the socket closes or creates back pressure, logs will be dropped on the main thread when
/// trying to send to this channel.
pub msg_rx: mpsc::Receiver<ReceivedMessage>,

pub stakeholder: Stakeholder,
}

impl LogShipper {
pub fn spawn(self) -> JoinHandle<()> {
std::thread::spawn(move || self.run())
}

fn run(self) {
// Retry loop
loop {
match self.run_inner() {
Result::Ok(()) => {
println!("Exiting log shipper");
break;
}
Err(err) => {
eprintln!("Log shipper error: {err:?}");

// Wait before attempting to reconnect. Wait for disconnect in this time
// period.
match self
.shutdown_rx
.recv_timeout(std::time::Duration::from_secs(15))
{
Result::Ok(_) => {
println!("Log shipper received shutdown");
break;
}
Err(mpsc::RecvTimeoutError::Disconnected) => {
eprintln!("Log shipper shutdown unexpectedly disconnected");
break;
}
Err(mpsc::RecvTimeoutError::Timeout) => {
// Not shut down, attempt reconnect
}
}
}
}
}
}

fn run_inner(&self) -> Result<()> {
let vector_socket_addr = var("PEGBOARD_META_vector_socket_addr")?;

println!("Connecting log shipper to Vector at {vector_socket_addr}");

let mut stream = TcpStream::connect(vector_socket_addr)?;

println!("Log shipper connected");

while let Result::Ok(message) = self.msg_rx.recv() {
let vector_message = match &self.stakeholder {
Stakeholder::DynamicServer { server_id } => VectorMessage::DynamicServers {
server_id: server_id.as_str(),
task: "main", // Backwards compatibility with logs
stream_type: message.stream_type as u8,
ts: message.ts,
message: message.message.as_str(),
},
};

serde_json::to_writer(&mut stream, &vector_message)?;
stream.write_all(b"\n")?;
}

println!("Log shipper msg_rx disconnected");

Ok(())
}
}

/// Vector-compatible message format
#[derive(Serialize)]
#[serde(tag = "source")]
enum VectorMessage<'a> {
#[serde(rename = "dynamic_servers")]
DynamicServers {
server_id: &'a str,
task: &'a str,
stream_type: u8,
ts: u64,
message: &'a str,
},
}
Loading

0 comments on commit 91cd21c

Please sign in to comment.