Skip to content

Commit

Permalink
Port to futures channel instead of std one.
Browse files Browse the repository at this point in the history
Fixes #133.
  • Loading branch information
LPGhatguy committed Mar 12, 2019
1 parent 3b6238f commit ad93631
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 33 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## [Unreleased]
* Fixed `cargo init` giving unexpected results by upgrading to `rbx_dom_weak` 1.1.0
* Fixed API not responding when the Rojo plugin is connected ([#133](https://github.com/LPGhatguy/rojo/issues/133))
* Updated default place file:
* Improved default properties to be closer to Studio's built-in 'Baseplate' template
* Added a baseplate to the project file (Thanks, [@AmaranthineCodices](https://github.com/AmaranthineCodices/)!)
Expand Down
12 changes: 6 additions & 6 deletions server/src/message_queue.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use std::{
collections::HashMap,
sync::{
mpsc,
atomic::{AtomicUsize, Ordering},
RwLock,
Mutex,
},
};

use futures::sync::mpsc;

/// A unique identifier, not guaranteed to be generated in any order.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct ListenerId(usize);
Expand All @@ -21,8 +22,7 @@ pub fn get_listener_id() -> ListenerId {

/// A message queue with persistent history that can be subscribed to.
///
/// Definitely non-optimal, but a simple design that works well for the
/// synchronous web server Rojo uses, Rouille.
/// Definitely non-optimal. This would ideally be a lockless mpmc queue.
#[derive(Default)]
pub struct MessageQueue<T> {
messages: RwLock<Vec<T>>,
Expand All @@ -38,15 +38,15 @@ impl<T: Clone> MessageQueue<T> {
}

pub fn push_messages(&self, new_messages: &[T]) {
let message_listeners = self.message_listeners.lock().unwrap();
let mut message_listeners = self.message_listeners.lock().unwrap();

{
let mut messages = self.messages.write().unwrap();
messages.extend_from_slice(new_messages);
}

for listener in message_listeners.values() {
listener.send(()).unwrap();
for listener in message_listeners.values_mut() {
listener.try_send(()).unwrap();
}
}

Expand Down
57 changes: 30 additions & 27 deletions server/src/web/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
use std::{
borrow::Cow,
collections::{HashMap, HashSet},
sync::{mpsc, Arc},
sync::Arc,
};

use futures::{future, Future};
use futures::{future, Future, stream::Stream, sync::mpsc};
use hyper::{
service::Service,
header,
Expand Down Expand Up @@ -114,14 +114,16 @@ impl Service for ApiService {
fn call(&mut self, request: hyper::Request<Self::ReqBody>) -> Self::Future {
let response = match (request.method(), request.uri().path()) {
(&Method::GET, "/api/rojo") => self.handle_api_rojo(),
(&Method::GET, path) if path.starts_with("/api/subscribe/") => self.handle_api_subscribe(request),
(&Method::GET, path) if path.starts_with("/api/read/") => self.handle_api_read(request),
(&Method::GET, path) if path.starts_with("/api/subscribe/") => {
return self.handle_api_subscribe(request);
}
_ => {
Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Body::empty())
.unwrap()
},
}
};

Box::new(future::ok(response))
Expand Down Expand Up @@ -152,16 +154,16 @@ impl ApiService {

/// Retrieve any messages past the given cursor index, and if
/// there weren't any, subscribe to receive any new messages.
fn handle_api_subscribe(&self, request: Request<Body>) -> Response<Body> {
fn handle_api_subscribe(&self, request: Request<Body>) -> <ApiService as Service>::Future {
let argument = &request.uri().path()["/api/subscribe/".len()..];
let cursor: u32 = match argument.parse() {
Ok(v) => v,
Err(err) => {
return Response::builder()
return Box::new(future::ok(Response::builder()
.status(StatusCode::BAD_REQUEST)
.header(header::CONTENT_TYPE, "text/plain")
.body(Body::from(err.to_string()))
.unwrap();
.unwrap()));
},
};

Expand All @@ -172,37 +174,38 @@ impl ApiService {
let (new_cursor, new_messages) = message_queue.get_messages_since(cursor);

if !new_messages.is_empty() {
return response_json(&SubscribeResponse {
return Box::new(future::ok(response_json(&SubscribeResponse {
session_id: self.live_session.session_id(),
messages: Cow::Borrowed(&new_messages),
message_cursor: new_cursor,
})
})));
}
}

// TOOD: Switch to futures mpsc instead to not block this task
let (tx, rx) = mpsc::channel();
let (tx, rx) = mpsc::channel(1024);
let sender_id = message_queue.subscribe(tx);
let session_id = self.live_session.session_id();

match rx.recv() {
Ok(_) => (),
Err(_) => return Response::builder()
.status(500)
.body(Body::from("error!"))
.unwrap(),
}
let result = rx.into_future()
.and_then(move |_| {
message_queue.unsubscribe(sender_id);

message_queue.unsubscribe(sender_id);
let (new_cursor, new_messages) = message_queue.get_messages_since(cursor);

{
let (new_cursor, new_messages) = message_queue.get_messages_since(cursor);

return response_json(&SubscribeResponse {
session_id: self.live_session.session_id(),
messages: Cow::Owned(new_messages),
message_cursor: new_cursor,
Box::new(future::ok(response_json(SubscribeResponse {
session_id: session_id,
messages: Cow::Owned(new_messages),
message_cursor: new_cursor,
})))
})
}
.or_else(|e| {
Box::new(future::ok(Response::builder()
.status(500)
.body(Body::from(format!("Internal Error: {:?}", e)))
.unwrap()))
});

Box::new(result)
}

fn handle_api_read(&self, request: Request<Body>) -> Response<Body> {
Expand Down

0 comments on commit ad93631

Please sign in to comment.