Skip to content

Commit

Permalink
#13 authenticate websockets
Browse files Browse the repository at this point in the history
  • Loading branch information
joepio committed Nov 21, 2021
1 parent d68b055 commit 600fde1
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 19 deletions.
2 changes: 2 additions & 0 deletions lib/src/authentication.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//! Check signatures in authentication headers, find the correct agent. Authorization is done in Hierarchies
use crate::{commit::check_timestamp, errors::AtomicResult, Storelike};

/// Set of values extracted from the request.
Expand Down
1 change: 1 addition & 0 deletions lib/src/plugins/versioning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ fn handle_version_request(
fn handle_all_versions_request(
url: url::Url,
store: &impl Storelike,
// TODO: implement auth
for_agent: Option<String>,
) -> AtomicResult<Resource> {
let params = url.query_pairs();
Expand Down
1 change: 1 addition & 0 deletions server/src/actor_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub struct WsMessage(pub String);
pub struct Subscribe {
pub addr: Addr<crate::handlers::web_sockets::WebSocketConnection>,
pub subject: String,
pub agent: String,
}

/// A message containing a Resource, which should be sent to subscribers
Expand Down
3 changes: 2 additions & 1 deletion server/src/appstate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ use atomic_lib::{
Storelike,
};

/// Context for the server (not an individual request).
/// Data object available to handlers and actors.
/// Contains the store, configuration and addresses for Actix Actors.
// This struct is cloned accross all threads, so make sure the fields are thread safe.
// A good option here is to use Actors for things that can change (e.g. commit_monitor)
#[derive(Clone)]
Expand Down
35 changes: 26 additions & 9 deletions server/src/commit_monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ use actix::{
prelude::{Actor, Context, Handler},
Addr,
};
use atomic_lib::Db;
use atomic_lib::{Db, Storelike};
use chrono::Local;
use std::collections::{HashMap, HashSet};

/// The Commit Monitor is an Actor that manages subscriptions for subjects and sends Commits to listeners.
/// It's also responsible for checking whether the rights are present
pub struct CommitMonitor {
/// Maintains a list of all the resources that are being subscribed to, and maps these to websocket connections.
subscriptions: HashMap<String, HashSet<Addr<WebSocketConnection>>>,
Expand All @@ -34,15 +35,31 @@ impl Actor for CommitMonitor {
impl Handler<Subscribe> for CommitMonitor {
type Result = ();

// A message comes in when a client subscribes to a subject.
fn handle(&mut self, msg: Subscribe, _: &mut Context<Self>) {
let mut set = if let Some(set) = self.subscriptions.get(&msg.subject) {
set.clone()
} else {
HashSet::new()
};
set.insert(msg.addr);
log::info!("handle subscribe {} ", msg.subject);
self.subscriptions.insert(msg.subject, set);
// check if the agent has the rights to subscribe to this resource
if let Ok(resource) = self.store.get_resource(&msg.subject) {
if let Ok(can) =
atomic_lib::hierarchy::check_read(&self.store, &resource, msg.agent.clone())
{
if can {
let mut set = if let Some(set) = self.subscriptions.get(&msg.subject) {
set.clone()
} else {
HashSet::new()
};
set.insert(msg.addr);
log::info!("handle subscribe {} ", msg.subject);
self.subscriptions.insert(msg.subject.clone(), set);
}
log::info!(
"Not allowed {} to subscribe to {} ",
&msg.agent,
&msg.subject
);
}
// TODO: Handle errors
}
}
}

Expand Down
8 changes: 6 additions & 2 deletions server/src/handlers/tpf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,12 @@ pub async fn tpf(
req: actix_web::HttpRequest,
query: web::Query<TpfQuery>,
) -> AtomicServerResult<HttpResponse> {
let mut context = data.lock().unwrap();
let store = &mut context.store;
let appstate = data.lock().unwrap();
let store = &appstate.store;

if !appstate.config.opts.public_mode {
return Err("/tpf endpoint is only available on public mode".into());
}
// This is how locally items are stored (which don't know their full subject URL) in Atomic Data
let mut builder = HttpResponse::Ok();
let content_type = get_accept(req.headers());
Expand Down
37 changes: 30 additions & 7 deletions server/src/handlers/web_sockets.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,38 @@
use actix::{Actor, ActorContext, Addr, AsyncContext, Handler, StreamHandler};
use actix_web::{web, Error, HttpRequest, HttpResponse};
use actix_web::{web, HttpRequest, HttpResponse};
use actix_web_actors::ws::{self};
use std::{
sync::Mutex,
time::{Duration, Instant},
};

use crate::{actor_messages::CommitMessage, appstate::AppState, commit_monitor::CommitMonitor};
use crate::{
actor_messages::CommitMessage, appstate::AppState, commit_monitor::CommitMonitor,
errors::AtomicServerResult, helpers::get_auth_headers,
};

/// Get an HTTP request, upgrade it to a Websocket connection
pub async fn web_socket_handler(
req: HttpRequest,
stream: web::Payload,
data: web::Data<Mutex<AppState>>,
) -> Result<HttpResponse, Error> {
) -> AtomicServerResult<HttpResponse> {
log::info!("Starting websocket");
let context = data.lock().unwrap();
ws::start(
WebSocketConnection::new(context.commit_monitor.clone()),

// Authentication check. If the user has no headers, continue with the Public Agent.
let auth_header_values = get_auth_headers(req.headers(), "ws".into())?;
let for_agent = atomic_lib::authentication::get_agent_from_headers_and_check(
auth_header_values,
&context.store,
)?;

let result = ws::start(
WebSocketConnection::new(context.commit_monitor.clone(), for_agent),
&req,
stream,
)
)?;
Ok(result)
}

const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
Expand All @@ -37,6 +49,9 @@ pub struct WebSocketConnection {
subscribed: std::collections::HashSet<String>,
/// The CommitMonitor Actor that receives and sends messages for Commits
commit_monitor_addr: Addr<CommitMonitor>,
/// The Agent who is connected.
/// If it's not specified, it's the Public Agent.
agent: String,
}

impl Actor for WebSocketConnection {
Expand Down Expand Up @@ -68,6 +83,7 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WebSocketConnecti
.do_send(crate::actor_messages::Subscribe {
addr: ctx.address(),
subject: subject.to_string(),
agent: self.agent.clone(),
});
self.subscribed.insert(subject.into());
} else {
Expand All @@ -82,6 +98,12 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WebSocketConnecti
ctx.text("ERROR: UNSUBSCRIBE without subject")
}
}
s if s.starts_with("GET ") => {
let mut parts = s.split("GET ");
if let Some(_subject) = parts.nth(1) {
ctx.text("GET not yet supported, see https://github.com/joepio/atomic-data-rust/issues/180")
}
}
other => {
log::warn!("Unmatched message: {}", other);
ctx.text(format!("Server receieved unknown message: {}", other));
Expand All @@ -99,12 +121,13 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WebSocketConnecti
}

impl WebSocketConnection {
fn new(commit_monitor_addr: Addr<CommitMonitor>) -> Self {
fn new(commit_monitor_addr: Addr<CommitMonitor>, agent: String) -> Self {
Self {
hb: Instant::now(),
// Maybe this should be stored only in the CommitMonitor, and not here.
subscribed: std::collections::HashSet::new(),
commit_monitor_addr,
agent,
}
}

Expand Down

0 comments on commit 600fde1

Please sign in to comment.