Skip to content

Commit

Permalink
feat(pegboard): make protocol more robust, implement db sync (#1148)
Browse files Browse the repository at this point in the history
<!-- Please make sure there is an issue that this PR is correlated to. -->

## Changes

<!-- If there are frontend changes, please include screenshots. -->
  • Loading branch information
MasterPtato committed Oct 9, 2024
1 parent cd85660 commit 166991e
Show file tree
Hide file tree
Showing 13 changed files with 542 additions and 180 deletions.
4 changes: 2 additions & 2 deletions docs/libraries/workflow/GOTCHAS.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,10 @@ the internal location.
## Hashmaps in activity inputs/outputs

`std::collections::HashMap` does not implement `Hash`. To get around this, use `util::HashableMap`:
`std::collections::HashMap` does not implement `Hash`. To get around this, use `util::serde::HashableMap`:

```rust
use util::AsHashableExt;
use util::serde::AsHashableExt;

ctx
.activity(MyActivityInput {
Expand Down
2 changes: 1 addition & 1 deletion lib/pegboard/manager/src/container/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ pub async fn cni_bundle(
}

if let protocol::ImageCompression::Lz4 = config.image.compression {
// Give tmp name
// Rename to tmp name
let tmp_path = container_path.join("tmp.tar");
fs::rename(&docker_image_path, &tmp_path).await?;

Expand Down
103 changes: 76 additions & 27 deletions lib/pegboard/manager/src/ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,37 +69,56 @@ impl Ctx {
Ok(())
}

async fn write_event(&self, event: &protocol::Event) -> Result<()> {
async fn write_event(&self, event: &protocol::Event) -> Result<i64> {
// Write event to db
let event_json = serde_json::to_vec(event)?;
utils::query(|| async {
sqlx::query(indoc!(
let (index,) = utils::query(|| async {
sqlx::query_as::<_, (i64,)>(indoc!(
"
INSERT INTO events (
data,
create_ts
)
VALUES (?1, ?2)
WITH
last_idx AS (
UPDATE state
SET last_event_idx = last_event_idx + 1
RETURNING last_event_idx - 1
),
insert_event AS (
INSERT INTO events (
index,
payload,
create_ts
)
SELECT last_idx.last_event_idx, ?1, ?2
FROM last_idx
LIMIT 1
RETURNING 1
)
SELECT last_event_idx FROM last_idx
",
))
.bind(&event_json)
.bind(utils::now())
.execute(&mut *self.sql().await?)
.fetch_one(&mut *self.sql().await?)
.await
})
.await?;

Ok(())
Ok(index)
}

pub async fn event(&self, event: protocol::Event) -> Result<()> {
self.write_event(&event).await?;
self.send_packet(protocol::ToServer::Events(vec![event]))
let index = self.write_event(&event).await?;

let wrapped_event = protocol::EventWrapper {
index,
inner: protocol::Raw::serialize(&event)?,
};

self.send_packet(protocol::ToServer::Events(vec![wrapped_event]))
.await
}

// Rebuilds state from DB
pub async fn rebuild(self: &Arc<Self>) -> Result<()> {
pub async fn rebuild(self: &Arc<Self>, last_event_idx: i64) -> Result<()> {
todo!();
}

Expand All @@ -119,6 +138,23 @@ impl Ctx {
Ok(())
});

// Send init packet
{
let (last_command_idx,) = utils::query(|| async {
sqlx::query_as::<_, (i64,)>(indoc!(
"
SELECT last_command_idx FROM state
",
))
.fetch_one(&mut *self.sql().await?)
.await
})
.await?;

self.send_packet(protocol::ToServer::Init { last_command_idx })
.await?;
}

// Receive messages from socket
while let Some(msg) = rx.next().await {
match msg? {
Expand All @@ -144,12 +180,16 @@ impl Ctx {

async fn process_packet(self: &Arc<Self>, packet: protocol::ToClient) -> Result<()> {
match packet {
protocol::ToClient::Init { api_endpoint, .. } => {
protocol::ToClient::Init {
last_event_idx,
api_endpoint,
} => {
{
let mut guard = self.api_endpoint.write().await;
*guard = Some(api_endpoint);
}
self.rebuild().await?;

self.rebuild(last_event_idx).await?;
}
protocol::ToClient::Commands(commands) => {
for command in commands {
Expand All @@ -162,11 +202,8 @@ impl Ctx {
Ok(())
}

async fn process_command(self: &Arc<Self>, command: protocol::Command) -> Result<()> {
// TODO: This is deserialized then serialized again
let command_json = serde_json::to_vec(&command)?;

match command {
async fn process_command(self: &Arc<Self>, command: protocol::CommandWrapper) -> Result<()> {
match command.inner.deserialize()? {
protocol::Command::StartContainer {
container_id,
config,
Expand All @@ -178,7 +215,7 @@ impl Ctx {
.or_insert_with(|| Container::new(container_id));

// Spawn container
container.start(&self, config).await?;
container.start(&self, *config).await?;
}
protocol::Command::StopContainer { container_id } => {
if let Some(container) = self.containers.read().await.get(&container_id) {
Expand All @@ -196,14 +233,26 @@ impl Ctx {
utils::query(|| async {
sqlx::query(indoc!(
"
INSERT INTO commands (
data,
ack_ts
)
VALUES (?1, ?2)
WITH
update_last_idx AS (
UPDATE state
SET last_command_idx = ?2
RETURNING 1
),
insert_event AS (
INSERT INTO commands (
index,
payload,
create_ts
)
VALUES(?1, ?2, ?3)
RETURNING 1
)
SELECT 1
",
))
.bind(&command_json)
.bind(command.index)
.bind(&command.inner)
.bind(utils::now())
.execute(&mut *self.sql().await?)
.await
Expand Down
20 changes: 15 additions & 5 deletions lib/pegboard/manager/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,18 +109,28 @@ pub async fn init_sqlite_schema(pool: &SqlitePool) -> Result<()> {
sqlx::query(indoc!(
"
CREATE TABLE IF NOT EXISTS state (
last_command_idx INTEGER NOT NULL,
last_event_idx INTEGER NOT NULL,
last_command_idx INTEGER NOT NULL
)
",
))
.execute(&mut *conn)
.await?;

sqlx::query(indoc!(
"
INSERT INTO state
VALUES (0, 0)
",
))
.execute(&mut *conn)
.await?;

sqlx::query(indoc!(
"
CREATE TABLE IF NOT EXISTS events (
data BLOB NOT NULL,
index INTEGER NOT NULL,
index INTEGER NOT NULL UNIQUE,
payload BLOB NOT NULL,
create_ts INTEGER NOT NULL
)
",
Expand All @@ -131,8 +141,8 @@ pub async fn init_sqlite_schema(pool: &SqlitePool) -> Result<()> {
sqlx::query(indoc!(
"
CREATE TABLE IF NOT EXISTS commands (
data BLOB NOT NULL,
index INTEGER NOT NULL,
index INTEGER NOT NULL UNIQUE,
payload BLOB NOT NULL,
ack_ts INTEGER NOT NULL
)
",
Expand Down
18 changes: 15 additions & 3 deletions lib/util/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,24 @@ serde = { version = "1.0", features = ["derive"] }
serde_json = { version = "1.0" }
thiserror = "1.0"
tokio = { version = "1.29", default-features = false, features = [
"time",
"sync",
"fs",
"macros",
"parking_lot",
"sync",
"test-util",
"macros",
"time",
] }
types-proto = { path = "../../types-proto/core" }
uuid = { version = "1", features = ["v4", "serde"] }

[dependencies.sqlx]
git = "https://github.com/rivet-gg/sqlx"
rev = "08d6e61aa0572e7ec557abbedb72cebb96e1ac5b"
default-features = false
features = [
"runtime-tokio",
"postgres",
"uuid",
"json",
"ipnetwork"
]
75 changes: 0 additions & 75 deletions lib/util/core/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,6 @@
use std::{
collections::HashMap,
fmt,
hash::{Hash, Hasher},
ops::Deref,
};

use indexmap::IndexMap;
use rand::Rng;
pub use rivet_util_env as env;
pub use rivet_util_macros as macros;
use ::serde::{Deserialize, Serialize};
use tokio::time::{Duration, Instant};

pub mod billing;
Expand Down Expand Up @@ -171,72 +162,6 @@ impl Default for Backoff {
}
}

/// Used in workflow activity inputs/outputs. Using this over BTreeMap is preferred because this does not
/// reorder keys, providing faster insert and lookup.
#[derive(Serialize, Deserialize)]
pub struct HashableMap<K: Eq + Hash, V: Hash>(IndexMap<K, V>);

impl<K: Eq + Hash, V: Hash> Deref for HashableMap<K, V> {
type Target = IndexMap<K, V>;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl<K: Eq + Ord + Hash, V: Hash> Hash for HashableMap<K, V> {
fn hash<H: Hasher>(&self, state: &mut H) {
let mut kv = Vec::from_iter(&self.0);
kv.sort_unstable_by(|a, b| a.0.cmp(b.0));
kv.hash(state);
}
}

impl<K: Eq + Hash + fmt::Debug, V: Hash + fmt::Debug> fmt::Debug for HashableMap<K, V> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_map().entries(self.iter()).finish()
}
}

impl<K: Eq + Hash + Clone, V: Hash + Clone> Clone for HashableMap<K, V> {
fn clone(&self) -> Self {
HashableMap(self.0.clone())
}

fn clone_from(&mut self, other: &Self) {
self.0.clone_from(&other.0);
}
}

pub trait AsHashableExt<K: Eq + Hash, V: Hash> {
/// Converts the iterable to a `HashableMap` via cloning.
fn as_hashable(&self) -> HashableMap<K, V>;
}

impl<K: Eq + Clone + Hash, V: Clone + Hash> AsHashableExt<K, V> for HashMap<K, V> {
fn as_hashable(&self) -> HashableMap<K, V> {
HashableMap(self.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
}
}

impl<K: Eq + Clone + Hash, V: Clone + Hash> Into<HashableMap<K, V>> for HashMap<K, V> {
fn into(self) -> HashableMap<K, V> {
HashableMap(self.into_iter().collect())
}
}

impl<K: Eq + Clone + Hash, V: Clone + Hash> Into<HashableMap<K, V>> for &HashMap<K, V> {
fn into(self) -> HashableMap<K, V> {
HashableMap(self.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
}
}

impl<K: Eq + Hash, V: Hash> FromIterator<(K, V)> for HashableMap<K, V> {
fn from_iter<I: IntoIterator<Item = (K, V)>>(iter: I) -> Self {
HashableMap(iter.into_iter().collect())
}
}

#[cfg(test)]
mod tests {
use std::time::Instant;
Expand Down
Loading

0 comments on commit 166991e

Please sign in to comment.