Skip to content

Commit

Permalink
Send proper host + port in radars request
Browse files Browse the repository at this point in the history
  • Loading branch information
keesverruijt committed Aug 31, 2024
1 parent 77a703e commit 2e28788
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 124 deletions.
161 changes: 39 additions & 122 deletions src/web.rs
Original file line number Diff line number Diff line change
@@ -1,42 +1,37 @@
use anyhow::anyhow;
use axum::{
body::Body,
debug_handler,
extract::{
ws::{Message, WebSocket},
ConnectInfo, Path, State, WebSocketUpgrade,
},
http::StatusCode,
response::{IntoResponse, Response},
extract::{ ConnectInfo, Host, Request, State },
http::{ StatusCode, Uri },
response::{ IntoResponse, Response },
routing::get,
Json, Router,
Json,
Router,
};
use log::{debug, info, trace};
use log::{ debug, info, trace };
use miette::Result;
use serde::{Deserialize, Serialize};
use serde::{ Deserialize, Serialize };
use std::{
collections::HashMap,
io,
net::{IpAddr, Ipv4Addr, SocketAddr},
sync::{Arc, RwLock},
net::{ IpAddr, Ipv4Addr, SocketAddr },
str::FromStr,
sync::{ Arc, RwLock },
};
use thiserror::Error;
use tokio::net::TcpListener;
use tokio_graceful_shutdown::SubsystemHandle;

use crate::radar::{Legend, Radars};
use crate::radar::{ Legend, Radars };
use crate::VERSION;

#[derive(Error, Debug)]
pub enum WebError {
#[error("Socket operation failed")]
Io(#[from] io::Error),
#[error("Socket operation failed")] Io(#[from] io::Error),
}

#[derive(Clone, Debug)]
pub struct Web {
radars: Arc<RwLock<Radars>>,
url: Option<String>,
port: u16,
shutdown_tx: tokio::sync::broadcast::Sender<()>,
}
Expand All @@ -53,32 +48,21 @@ impl Web {
Web {
radars,
port,
url: None,
shutdown_tx,
}
}

pub async fn run(mut self, subsys: SubsystemHandle) -> Result<(), WebError> {
let listener = TcpListener::bind(SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
self.port,
))
.await
.unwrap();
let listener = TcpListener::bind(
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), self.port)
).await.unwrap();

let url = format!(
"http://localhost:{}/v1/api/",
listener.local_addr().unwrap().port()
);
info!("HTTP server available on {}", &url);
self.url = Some(url);
let mut shutdown_rx = self.shutdown_tx.subscribe();
let shutdown_tx = self.shutdown_tx.clone(); // Clone as self used in with_state() and with_graceful_shutdown() below

let app = Router::new()
.route("/", get(root))
.route("/v1/api/radars", get(get_radars))
.route("/v1/api/stream/:key", get(ws_handler))
.with_state(self)
.into_make_service_with_connect_info::<SocketAddr>();

Expand All @@ -94,7 +78,7 @@ impl Web {
) => {
return r.map_err(|e| WebError::Io(e));
}
};
}
Ok(())
}
}
Expand Down Expand Up @@ -122,7 +106,7 @@ impl RadarApi {
spokes: u16,
max_spoke_len: u16,
stream_url: String,
legend: Legend,
legend: Legend
) -> Self {
RadarApi {
id: id,
Expand All @@ -139,15 +123,32 @@ impl RadarApi {
// Signal K radar API says this returns something like:
// {"radar-0":{"id":"radar-0","name":"Navico","spokes":2048,"maxSpokeLen":1024,"streamUrl":"http://localhost:3001/v1/api/stream/radar-0"}}
//
async fn get_radars(State(state): State<Web>, _request: Body) -> Response {
async fn get_radars(
State(state): State<Web>,
ConnectInfo(addr): ConnectInfo<SocketAddr>,
Host(host): Host
) -> Response {
debug!("Radar state request from {} for host '{}'", addr, host);

let host = format!(
"{}:{}",
match Uri::from_str(&host) {
Ok(uri) => uri.host().unwrap_or("localhost").to_string(),
Err(_) => "localhost".to_string(),
},
state.port + 1
);

debug!("target host = '{}'", host);

match state.radars.read() {
Ok(radars) => {
let x = &radars.info;
let mut api: HashMap<String, RadarApi> = HashMap::new();
for (_key, value) in x.iter() {
if let Some(legend) = &value.legend {
let id = format!("radar-{}", value.id);
let url = format!("{}stream/{}", state.url.as_ref().unwrap(), id);
let url = format!("http://{}/v1/api/stream/{}", host, id);
let mut name = value.brand.to_owned();
if value.model.is_some() {
name.push(' ');
Expand All @@ -163,7 +164,7 @@ async fn get_radars(State(state): State<Web>, _request: Body) -> Response {
value.spokes,
value.max_spoke_len,
url,
legend.clone(),
legend.clone()
);

api.insert(id.to_owned(), v);
Expand All @@ -184,98 +185,14 @@ impl IntoResponse for AppError {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("Something went wrong: {}", self.0),
)
.into_response()
).into_response()
}
}

// This enables using `?` on functions that return `Result<_, anyhow::Error>` to turn them into
// `Result<_, AppError>`. That way you don't need to do that manually.
impl<E> From<E> for AppError
where
E: Into<anyhow::Error>,
{
impl<E> From<E> for AppError where E: Into<anyhow::Error> {
fn from(err: E) -> Self {
Self(err.into())
}
}

/// The handler for the HTTP request (this gets called when the HTTP GET lands at the start
/// of websocket negotiation). After this completes, the actual switching from HTTP to
/// websocket protocol will occur.
/// This is the last point where we can extract TCP/IP metadata such as IP address of the client
/// as well as things from HTTP headers such as user-agent of the browser etc.
#[debug_handler]
async fn ws_handler(
State(state): State<Web>,
ConnectInfo(addr): ConnectInfo<SocketAddr>,
Path(params): Path<WebSocketHandlerParameters>,
ws: WebSocketUpgrade,
) -> Response {
debug!("stream request from {} for {}", addr, params.key);

match match_radar_id(&state, &params.key) {
Ok(radar_message_rx) => {
let shutdown_rx = state.shutdown_tx.subscribe();
// finalize the upgrade process by returning upgrade callback.
// we can customize the callback by sending additional info such as address.
ws.on_upgrade(move |socket| handle_socket(socket, radar_message_rx, shutdown_rx))
}
Err(e) => e.into_response(),
}
}

fn match_radar_id(
state: &Web,
key: &str,
) -> Result<tokio::sync::broadcast::Receiver<Vec<u8>>, AppError> {
match state.radars.read() {
Ok(radars) => {
let x = &radars.info;

for (_key, value) in x.iter() {
if value.legend.is_some() {
let id = format!("radar-{}", value.id);
if id == key {
return Ok(value.radar_message_tx.subscribe());
}
}
}
}
Err(_) => return Err(AppError(anyhow!("Poisoned lock"))),
}
Err(AppError(anyhow!("No such radar {}", key)))
}
/// Actual websocket statemachine (one will be spawned per connection)
async fn handle_socket(
mut socket: WebSocket,
mut radar_message_rx: tokio::sync::broadcast::Receiver<Vec<u8>>,
mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
) {
loop {
tokio::select! {
_ = shutdown_rx.recv() => {
debug!("Shutdown of websocket");
break;
},
r = radar_message_rx.recv() => {
match r {
Ok(message) => {
let len = message.len();
let ws_message = Message::Binary(message);
if let Err(e) = socket.send(ws_message).await {
debug!("Error on send to websocket: {}", e);
break;
}
trace!("Sent radar message {} bytes", len);
},
Err(e) => {
debug!("Error on RadarMessage channel: {}", e);
break;
}
}
}
}
}
}
4 changes: 2 additions & 2 deletions src/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ use std::{ net::{ IpAddr, Ipv4Addr, SocketAddr }, sync::{ Arc, RwLock } };

use log::{ debug, info, trace, warn };
use ratchet_rs::{
deflate::DeflateExtProvider,
CloseCode,
CloseReason,
Error,
ErrorKind,
HttpError,
NoExtProvider,
PayloadType,
ProtocolRegistry,
UpgradedServer,
Expand Down Expand Up @@ -120,7 +120,7 @@ impl WebSocket {
let upgrader = ratchet_rs::accept_with(
socket,
WebSocketConfig::default(),
NoExtProvider,
DeflateExtProvider::default(),
ProtocolRegistry::default()
).await?;

Expand Down

0 comments on commit 2e28788

Please sign in to comment.