Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pegboard): add container runner and manager #1144

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/libraries/workflow/GOTCHAS.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ Instead, you can increment the location preemptively with `ctx.step()`:

```rust
let iter = actions.into_iter().map(|action| {
let ctx = ctx.step();
let mut ctx = ctx.step();

async move {
ctx.activity(MyActivityInput {
Expand Down
1 change: 0 additions & 1 deletion lib/chirp-workflow/core/src/executable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ where
}
}

// Closure executable impl
#[async_trait]
impl<T: Executable> Executable for Option<T> {
type Output = Option<T::Output>;
Expand Down
70 changes: 57 additions & 13 deletions lib/chirp-workflow/core/src/signal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,20 @@ pub trait Signal {
///
/// Example:
/// ```rust
/// #[macros::signal("my-signal")]
/// #[signal("my_signal")]
/// struct MySignal {
/// x: i64,
/// }

/// #[macros::signal("my-signal2")]
/// #[signal("my_signal2")]
/// struct MySignal2 {
/// y: i64,
/// }
///
/// join_signal!(MyJoinSignal, [MySignal, MySignal2]);
/// join_signal!(MyJoinSignal {
/// MySignal,
/// MySignal2
/// });
///
/// // Automatically becomes:
/// enum MyJoinSignal {
Expand All @@ -29,44 +32,85 @@ pub trait Signal {
/// MySignal(sig) => println!("received MySignal {sig:?}"),
/// MySignal2(sig) => println!("received MySignal2 {sig:?}"),
/// }
/// ````
///
///
/// // Also allows aliases:
/// join_signal!(MyJoinSignal {
/// MySignal,
/// MySignal2(some_pkg::Signal),
/// }
/// ```
#[macro_export]
macro_rules! join_signal {
($vis:vis $join:ident, [$($signals:ident),* $(,)?]) => {
$vis enum $join {
$($signals($signals)),*
($vis:vis $join:ident { $($tt:tt)* }) => {
join_signal!(@ $vis $join [] [] $($tt)*);
};
(@
$vis:vis $join:ident
[$({ $names:tt } { $types:tt })*]
[$({ $just_types:tt })*]
) => {
$vis enum $join {
$( $names ($types) ),*
}

#[async_trait::async_trait]
#[async_trait::async_trait]
impl Listen for $join {
async fn listen(ctx: &chirp_workflow::prelude::ListenCtx) -> chirp_workflow::prelude::WorkflowResult<Self> {
let row = ctx.listen_any(&[
$(<$signals as chirp_workflow::prelude::Signal>::NAME),*
$(<$just_types as chirp_workflow::prelude::Signal>::NAME),*
]).await?;

Self::parse(&row.signal_name, row.body)
}

fn parse(name: &str, body: serde_json::Value) -> chirp_workflow::prelude::WorkflowResult<Self> {
$(
if name == <$signals as chirp_workflow::prelude::Signal>::NAME {
if name == <$types as chirp_workflow::prelude::Signal>::NAME {
Ok(
Self::$signals(
Self::$names(
serde_json::from_value(body)
.map_err(WorkflowError::DeserializeSignalBody)?
)
)
}
)else*

else {
unreachable!(
"received signal that wasn't queried for: {}, expected {:?}",
name, &[$(<$signals as chirp_workflow::prelude::Signal>::NAME),*]
name, &[$(<$just_types as chirp_workflow::prelude::Signal>::NAME),*]
);
}
}
}
};
(@
$vis:vis $join:ident
[$({ $names:tt } { $types:tt })*]
[$({ $just_types:tt })*]
$name:ident,
$($tail:tt)*
) => {
join_signal!(@
$vis $join
[$( { $names } { $types } )* { $name } { $name }]
[$( { $just_types } )* { $name }]
$($tail)*
);
};
(@
$vis:vis $join:ident
[$({ $names:tt } { $types:tt })*]
[$({ $just_types:tt })*]
$name:ident($ty:ty),
$($tail:tt)*
) => {
join_signal!(@
$vis $join
[$( { $names } { $types } )* { $name } { $ty }]
[$( { $just_types } )* { $ty }]
$($tail)*
);
};
}
pub use join_signal;
6 changes: 6 additions & 0 deletions lib/pegboard/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
[workspace]
members = [
"container-runner",
"manager",
]

5 changes: 5 additions & 0 deletions lib/pegboard/container-runner/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
/target
**/target
.dockerignore
Dockerfile

18 changes: 18 additions & 0 deletions lib/pegboard/container-runner/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
[package]
name = "container-runner"
version = "0.1.0"
edition = "2021"
authors = ["Rivet Gaming, LLC <developer@rivet.gg>"]
license = "Apache-2.0"

[dependencies]
anyhow = "1.0.79"
portpicker = "0.1.1"
serde = { version = "1.0.195", features = ["derive"] }
serde_json = "1.0.111"
signal-hook = "0.3.17"

[dev-dependencies]
tempfile = "3.9.0"
uuid = { version = "1.6.1", features = ["v4"] }

7 changes: 7 additions & 0 deletions lib/pegboard/container-runner/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
FROM clux/muslrust:1.80.0-stable

WORKDIR /app
COPY Cargo.toml Cargo.lock .
COPY src/ src/
RUN cargo build --release

8 changes: 8 additions & 0 deletions lib/pegboard/container-runner/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# container-runner

This crate is used to run OCI bundles on the job servers themselves. This takes care of trapping signals, log
shipping, rate limiting logs, and more.

## Deployment

This gets built & deployed in `infra/tf/infra-artifacts/` then used in `TODO`.
123 changes: 123 additions & 0 deletions lib/pegboard/container-runner/src/log_shipper.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
use std::{io::Write, net::TcpStream, sync::mpsc, thread::JoinHandle};

use anyhow::*;
use serde::Serialize;
use serde_json;

use crate::utils::{var, Stakeholder};

#[derive(Copy, Clone, Debug)]
#[repr(u8)]
pub enum StreamType {
StdOut = 0,
StdErr = 1,
}

pub struct ReceivedMessage {
pub stream_type: StreamType,
pub ts: u64,
pub message: String,
}

/// Sends logs from the container to the Vector agent on the machine.
///
/// This will run until the `msg_rx` sender is dropped before shutting down.
///
/// If attempting to reconnect while the runner is shut down, this will exit immediately, dropping
/// all logs in the process. This is to ensure that if Vector becomes unreachable, we don't end up
/// with a lot of lingering runners that refuse to exit.
pub struct LogShipper {
/// Notifies of process shutdown.
pub shutdown_rx: mpsc::Receiver<()>,

/// Receiver for messages to be shipped. This holds a buffer of messages waiting to be send.
///
/// If the socket closes or creates back pressure, logs will be dropped on the main thread when
/// trying to send to this channel.
pub msg_rx: mpsc::Receiver<ReceivedMessage>,

pub stakeholder: Stakeholder,
}

impl LogShipper {
pub fn spawn(self) -> JoinHandle<()> {
std::thread::spawn(move || self.run())
}

fn run(self) {
// Retry loop
loop {
match self.run_inner() {
Result::Ok(()) => {
println!("Exiting log shipper");
break;
}
Err(err) => {
eprintln!("Log shipper error: {err:?}");

// Wait before attempting to reconnect. Wait for disconnect in this time
// period.
match self
.shutdown_rx
.recv_timeout(std::time::Duration::from_secs(15))
{
Result::Ok(_) => {
println!("Log shipper received shutdown");
break;
}
Err(mpsc::RecvTimeoutError::Disconnected) => {
eprintln!("Log shipper shutdown unexpectedly disconnected");
break;
}
Err(mpsc::RecvTimeoutError::Timeout) => {
// Not shut down, attempt reconnect
}
}
}
}
}
}

fn run_inner(&self) -> Result<()> {
let vector_socket_addr = var("PEGBOARD_META_vector_socket_addr")?;

println!("Connecting log shipper to Vector at {vector_socket_addr}");

let mut stream = TcpStream::connect(vector_socket_addr)?;

println!("Log shipper connected");

while let Result::Ok(message) = self.msg_rx.recv() {
let vector_message = match &self.stakeholder {
Stakeholder::DynamicServer { server_id } => VectorMessage::DynamicServers {
server_id: server_id.as_str(),
task: "main", // Backwards compatibility with logs
stream_type: message.stream_type as u8,
ts: message.ts,
message: message.message.as_str(),
},
};

serde_json::to_writer(&mut stream, &vector_message)?;
stream.write_all(b"\n")?;
}

println!("Log shipper msg_rx disconnected");

Ok(())
}
}

/// Vector-compatible message format
#[derive(Serialize)]
#[serde(tag = "source")]
enum VectorMessage<'a> {
#[serde(rename = "dynamic_servers")]
DynamicServers {
server_id: &'a str,
task: &'a str,
stream_type: u8,
ts: u64,
message: &'a str,
},
}
Loading
Loading