Skip to content

Commit

Permalink
feat(pegboard): add ws tunnel (#1145)
Browse files Browse the repository at this point in the history
<!-- Please make sure there is an issue that this PR is correlated to. -->

## Changes

<!-- If there are frontend changes, please include screenshots. -->
  • Loading branch information
MasterPtato committed Oct 9, 2024
1 parent 799c059 commit 5f0085d
Show file tree
Hide file tree
Showing 9 changed files with 56 additions and 38 deletions.
6 changes: 6 additions & 0 deletions infra/tf/k8s_infra/traefik_tunnel.tf
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ locals {
service_namespace = kubernetes_namespace.nomad.0.metadata[0].name
service_port = 4647
}
# Addresses a random Pegboard server.
"pegboard-server" = {
service = "rivet-pegboard-ws"
service_namespace = kubernetes_namespace.rivet_service.metadata[0].name
service_port = 80
}
}] : [],
var.prometheus_enabled ? [{
"vector" = {
Expand Down
10 changes: 2 additions & 8 deletions lib/bolt/config/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,6 @@ pub enum ServiceKind {
#[serde(rename = "api-routes", rename_all = "kebab-case")]
ApiRoutes {},

#[serde(rename = "static", rename_all = "kebab-case")]
Static { router: ServiceRouter },

#[serde(rename = "database")]
Database {},

Expand Down Expand Up @@ -317,8 +314,7 @@ impl ServiceKind {
ServiceKind::Api {
router: Some(router),
..
}
| ServiceKind::Static { router } => Some(router),
} => Some(router),
_ => None,
}
}
Expand All @@ -327,7 +323,7 @@ impl ServiceKind {
/// because this will be true for any services that are internally-facing HTTP servers, such as
/// `api-job`.
pub fn has_server(&self) -> bool {
matches!(self, ServiceKind::Api { .. } | ServiceKind::Static { .. })
matches!(self, ServiceKind::Api { .. })
}

pub fn short(&self) -> &str {
Expand All @@ -339,7 +335,6 @@ impl ServiceKind {
ServiceKind::Consumer { .. } => "consumer",
ServiceKind::Api { .. } => "api",
ServiceKind::ApiRoutes { .. } => "api-routes",
ServiceKind::Static { .. } => "static",
ServiceKind::Database { .. } => "database",
ServiceKind::Cache { .. } => "cache",
ServiceKind::Package { .. } => "package",
Expand All @@ -351,7 +346,6 @@ impl ServiceKind {
ServiceKind::Headless { .. }
| ServiceKind::Oneshot { .. }
| ServiceKind::Periodic { .. }
| ServiceKind::Static { .. }
| ServiceKind::Api { .. } => ComponentClass::Executable,

ServiceKind::Operation { .. }
Expand Down
3 changes: 1 addition & 2 deletions lib/bolt/core/src/dep/k8s/gen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,7 @@ pub async fn gen_svc(exec_ctx: &ExecServiceContext) -> Vec<serde_json::Value> {
RunContext::Service { .. } => match svc_ctx.config().kind {
ServiceKind::Headless { .. }
| ServiceKind::Consumer { .. }
| ServiceKind::Api { .. }
| ServiceKind::Static { .. } => SpecType::Deployment,
| ServiceKind::Api { .. } => SpecType::Deployment,
ServiceKind::Oneshot { .. } => SpecType::Job,
ServiceKind::Periodic { .. } => SpecType::CronJob,
ServiceKind::Operation { .. }
Expand Down
2 changes: 1 addition & 1 deletion lib/pegboard/manager/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ async fn main() -> Result<()> {
let client_id: Uuid = todo!();

// Build connection URL
let mut url = Url::parse("TODO:8080")?;
let mut url = Url::parse("ws://127.0.0.1:5030")?;
url.set_path("/v1");
url.query_pairs_mut()
.append_pair("client_id", &client_id.to_string());
Expand Down
4 changes: 2 additions & 2 deletions scripts/forward/traefik_tunnel.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
set -euf

# We don't know the value of $PORT yet
echo 'Dashboard: http://localhost:$PORT/dashboard/'
echo 'API: http://localhost:$PORT/api/'
echo "Dashboard: http://localhost:$PORT/dashboard/"
echo "API: http://localhost:$PORT/api/"

FORWARD_NS=traefik-tunnel FORWARD_NAME=service/traefik-headless FORWARD_PORT=9000 ./scripts/forward/service.sh

Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ pub const TUNNEL_SERVICES: &[TunnelService] = &[
name: "vector-tcp-json",
port: TUNNEL_VECTOR_TCP_JSON_PORT,
},
TunnelService {
name: "pegboard-server",
port: 5030,
},
];

/// Service that gets exposed from the Traefik tunnel.
Expand Down
2 changes: 1 addition & 1 deletion svc/pkg/pegboard/standalone/ws/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@ serde = { version = "1.0", features = ["derive"] }
tokio-tungstenite = "0.23.1"
url = "2.2.2"

pegboard = { path = "../.." }
pegboard = { path = "../.." }
3 changes: 1 addition & 2 deletions svc/pkg/pegboard/standalone/ws/Service.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,4 @@ name = "pegboard-ws"
[runtime]
kind = "rust"

[headless]
singleton = true
[api]
60 changes: 38 additions & 22 deletions svc/pkg/pegboard/standalone/ws/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ pub async fn run_from_env(pools: rivet_pools::Pools) -> GlobalResult<()> {
}

async fn socket_thread(ctx: &StandaloneCtx, conns: Arc<RwLock<Connections>>) -> GlobalResult<()> {
let addr = ("TODO", 8080);
let listener = TcpListener::bind(&addr).await?;
tracing::info!("Listening on: {:?}", addr);
let addr = "127.0.0.1:80";
let listener = TcpListener::bind(addr).await?;
tracing::info!("Listening on: {}", addr);

loop {
match listener.accept().await {
Expand All @@ -62,28 +62,44 @@ async fn handle_connection(
raw_stream: TcpStream,
addr: SocketAddr,
) {
tracing::error!(?addr, "new connection");
tracing::info!(?addr, "new connection");

let ctx = ctx.clone();

tokio::spawn(async move {
let mut uri = None;
let ws_stream = tokio_tungstenite::accept_hdr_async(
raw_stream,
|req: &tokio_tungstenite::tungstenite::handshake::server::Request, res| {
// Bootleg way of reading the uri
uri = Some(req.uri().clone());

Ok(res)
},
)
.await?;
// TODO: This is an ugly way to improve error visibility
let setup_res = async move {
let mut uri = None;
let ws_stream = tokio_tungstenite::accept_hdr_async(
raw_stream,
|req: &tokio_tungstenite::tungstenite::handshake::server::Request, res| {
// Bootleg way of reading the uri
uri = Some(req.uri().clone());

tracing::info!(?addr, ?uri, "handshake");

Ok(res)
},
)
.await?;

// Parse URL
let uri = unwrap!(uri, "socket has no associated request");
let (protocol_version, client_id) = parse_url(addr, uri)?;

// Parse URI
let uri = unwrap!(uri, "socket has no associated request");
let (protocol_version, client_id) = parse_uri(uri)?;
Ok((ws_stream, protocol_version, client_id))
};

// Print error
let (ws_stream, protocol_version, client_id) = match setup_res.await {
Ok(x) => x,
Err(err) => {
tracing::error!(?addr, "{err}");
return Err(err);
}
};

// Handle result
// Handle result for cleanup
match handle_connection_inner(
&ctx,
conns.clone(),
Expand Down Expand Up @@ -218,15 +234,15 @@ async fn signal_thread(ctx: &StandaloneCtx, conns: Arc<RwLock<Connections>>) ->
}
}

fn parse_uri(uri: hyper::Uri) -> GlobalResult<(u16, Uuid)> {
let url = url::Url::parse(&uri.to_string())?;
fn parse_url(addr: SocketAddr, uri: hyper::Uri) -> GlobalResult<(u16, Uuid)> {
let url = url::Url::parse(&format!("ws://{addr}{uri}"))?;

// Get protocol version from last path segment
let last_segment = unwrap!(
unwrap!(url.path_segments(), "invalid url").last(),
"no path segments"
);
assert!(last_segment.starts_with('v'), "invalid protocol version");
ensure!(last_segment.starts_with('v'), "invalid protocol version");
let protocol_version = last_segment[1..].parse::<u16>()?;

// Read client_id from query parameters
Expand Down

0 comments on commit 5f0085d

Please sign in to comment.