-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathmod.rs
689 lines (606 loc) · 27.7 KB
/
mod.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
//! Web interface to view and control running services
//!
//! Uses the Iron web framework, Handlebars templates, and Twitter Boostrap.
extern crate iron;
extern crate handlebars_iron as hbs;
extern crate staticfile;
extern crate mount;
extern crate router;
extern crate urlencoded;
extern crate url;
extern crate hyper;
extern crate rustc_serialize as serialize;
extern crate websocket as ws;
extern crate uuid;
extern crate time;
use std::ops::Deref;
use std::path::Path;
use std::sync::{Arc, Mutex, RwLock, mpsc};
use std::thread;
use std::thread::JoinHandle;
use std::collections::{HashMap, BTreeMap};
use std::io::{self, Read};
use super::comms::{Controllable, CmdFrom, Block};
use super::stb::ParkState;
use self::iron::prelude::*;
use self::iron::status;
use self::iron::middleware::{Handler, AfterMiddleware};
use self::iron::headers::Connection;
use self::hbs::{Template, HandlebarsEngine};
#[cfg(feature = "watch")] use self::hbs::Watchable;
use self::serialize::json::{ToJson, Json};
use self::staticfile::Static;
use self::mount::Mount;
use self::router::Router;
use self::urlencoded::{UrlEncodedQuery, UrlEncodedBody};
use self::url::percent_encoding::percent_decode;
use self::hyper::server::Listening;
use self::ws::Sender as WebsocketSender;
use self::ws::Receiver as WebsocketReceiver;
use self::uuid::Uuid;
static HTTP_PORT: u16 = 3000;
static WS_PORT: u16 = 3001;
lazy_static! {
static ref RPC_SENDERS: Mutex<HashMap<usize, mpsc::Sender<String>>> = Mutex::new(HashMap::new());
static ref WS_SENDERS: Mutex<Vec<ws::server::sender::Sender<ws::stream::WebSocketStream>>> = Mutex::new(Vec::new());
}
/// Service descriptor
///
/// Unlike the one in main.rs, this descriptor only needs to contain things that are useful for
/// display in the interface. However, they should probably be unified (TODO). The "web descriptor"
/// could be just a subfield of the main.rs service descriptor, and then those could get passed in
/// here (somehow).
struct Service {
name: String,
shortname: String,
extra: String
}
impl Service {
/// Create a new service descriptor with the given name
fn new(s: &str, t: &str, e: &str) -> Service {
Service { name: s.to_string(), shortname: t.to_string(), extra: e.to_string() }
}
}
/// Descriptor of a data collection flow
struct Flow {
/// Name of the flow
name: &'static str,
/// States in the flow
states: Vec<FlowState>,
/// Is this the active flow?
active: bool,
stamp: Option<time::Timespec>,
id: Option<uuid::Uuid>,
}
/// One state in a data collection flow
struct FlowState {
/// Name of the flow state
name: &'static str,
/// State of parking lot that allows this state (if applicable)
park: Option<ParkState>,
/// Commands to run for this state
script: Vec<FlowCmd>,
/// Has this state been completed?
done: bool,
}
/// Different actions that a flow can perform at each state
enum FlowCmd {
Message(&'static str),
Str { prompt: &'static str, data: Option<String> },
Int { prompt: &'static str, limits: (i32, i32), data: Option<i32> },
Start(&'static str),
Stop(&'static str),
Send(&'static str),
StopSensors,
}
impl Flow {
fn new(name: &'static str, states: Vec<FlowState>) -> Flow {
Flow {
name: name,
states: states,
active: false,
stamp: None,
id: None,
}
}
fn run(&mut self, park: ParkState, tx: &mpsc::Sender<CmdFrom>, wsid: usize) {
// are we just starting the flow now?
if !self.active {
println!("Beginning flow {}!", self.name);
// need a timestamp and ID
self.stamp = Some(time::get_time());
self.id = Some(uuid::Uuid::new_v4());
self.active = true;
}
// find the next eligible state
let state = self.states.iter_mut().find(|s| !s.done && s.park.as_ref().map_or(true, |p| *p == park)).unwrap();
println!("Executing state {}", state.name);
state.run(tx, wsid);
println!("Finished executing state {}", state.name);
}
}
impl FlowState {
fn new(name: &'static str, park: Option<ParkState>, script: Vec<FlowCmd>) -> FlowState {
FlowState {
name: name,
park: park,
script: script,
done: false,
}
}
fn run(&mut self, tx: &mpsc::Sender<CmdFrom>, wsid: usize) {
assert!(!self.done);
for c in self.script.iter_mut() {
c.run(&tx, wsid);
}
self.done = true;
}
}
fn ws_send(wsid: usize, msg: String) {
let mut locked_senders = WS_SENDERS.lock().unwrap();
locked_senders[wsid].send_message(ws::Message::Text(msg)).unwrap();
}
fn ws_rpc<T, F: Fn(String) -> Result<T,String>>(wsid: usize, prompt: String, validator: F) -> T {
let mut locked_senders = WS_SENDERS.lock().unwrap();
let mut locked_rpcs = RPC_SENDERS.lock().unwrap();
let mut go = |prompt: &str| -> String {
let (tx, rx) = mpsc::channel();
locked_rpcs.insert(wsid, tx);
locked_senders[wsid].send_message(ws::Message::Text(prompt.to_string())).unwrap();
rx.recv().unwrap()
};
let mut answer = go(&prompt);
loop {
match validator(answer) {
Ok(ret) => return ret,
Err(admonish) => {
answer = go(&format!("{} {}", admonish, prompt));
}
}
}
}
impl FlowCmd {
fn str(prompt: &'static str) -> FlowCmd {
FlowCmd::Str { prompt: prompt, data: None}
}
fn int(prompt: &'static str, limits: (i32, i32)) -> FlowCmd {
FlowCmd::Int { prompt: prompt, limits: limits, data: None}
}
fn run(&mut self, tx: &mpsc::Sender<CmdFrom>, wsid: usize) {
match *self {
FlowCmd::Message(msg) => ws_send(wsid, format!("{}", msg)),
FlowCmd::Str { prompt, ref mut data } => {
assert!(data.is_none());
*data = Some(ws_rpc(wsid,
format!("Please enter {}: <form><input type=\"text\" name=\"{}\"/></form>",
prompt, prompt),
|x| {
if x.is_empty() {
Err("That's an empty string!".to_string())
} else {
Ok(x)
}
}));
},
FlowCmd::Int { prompt, limits: (low, high), ref mut data } => {
assert!(data.is_none());
*data = Some(ws_rpc(wsid,
format!("Please select {} ({}-{} scale): <form><input type=\"text\" name=\"{}\"/></form>",
prompt, low, high, prompt),
|x| {
match x.parse() {
Ok(i) if i >= low && i <= high => {
Ok(i)
},
Ok(_) => {
Err("Out of range!".to_string())
},
Err(_) => {
Err("Not an integer!".to_string())
},
}
}));
},
FlowCmd::Start(service) => {
assert!(rpc!(tx, CmdFrom::Start, service.to_string()).unwrap());
},
FlowCmd::Stop(service) => {
assert!(rpc!(tx, CmdFrom::Stop, service.to_string()).unwrap());
},
FlowCmd::Send(string) => {
tx.send(CmdFrom::Data(string.to_string())).unwrap();
},
FlowCmd::StopSensors => {
for svc in ["bluefox", "structure", "biotac", "optoforce", "stb"].into_iter() {
assert!(rpc!(tx, CmdFrom::Start, svc.to_string()).unwrap());
}
}
}
}
}
macro_rules! jsonize {
($map:ident, $selph:ident, $var:ident) => {{
$map.insert(stringify!($var).to_string(), $selph.$var.to_json())
}};
($map:ident, $selph:ident; $($var:ident),+) => {{
$(jsonize!($map, $selph, $var));+
}}
}
impl ToJson for Service {
fn to_json(&self) -> Json {
let mut m: BTreeMap<String, Json> = BTreeMap::new();
jsonize!(m, self; name, shortname, extra);
m.to_json()
}
}
impl ToJson for Flow {
fn to_json(&self) -> Json {
let mut m: BTreeMap<String, Json> = BTreeMap::new();
jsonize!(m, self; name, states);
m.to_json()
}
}
impl ToJson for FlowState {
fn to_json(&self) -> Json {
let mut m: BTreeMap<String, Json> = BTreeMap::new();
jsonize!(m, self; name, done);
m.to_json()
}
}
/// Make a path relative to the current file's directory
fn relpath(path: &str) -> String {
String::from(Path::new(file!()).parent().unwrap().join(path).to_str().unwrap())
}
/// Handler for the main page of the web interface
fn index(flows: Arc<RwLock<Vec<Flow>>>) -> Box<Handler> {
Box::new(move |req: &mut Request| -> IronResult<Response> {
let mut data = BTreeMap::<String, Json>::new();
data.insert("services".to_string(), vec![ Service::new("Structure Sensor", "structure" , "<img class=\"structure latest\" src=\"img/structure_latest.png\" /><div class=\"structure framenum\">NaN</div>"),
Service::new("mvBlueFOX3" , "bluefox" , "<img class=\"bluefox latest\" src=\"img/bluefox_latest.png\" /><div class=\"bluefox framenum\">NaN</div>"),
Service::new("OptoForce" , "optoforce" , ""),
Service::new("SynTouch BioTac" , "biotac" , ""),
Service::new("STB" , "stb" , ""),
].to_json());
data.insert("flows".to_string(), flows.read().unwrap().to_json());
data.insert("server".to_string(), format!("{}:{}", req.url.host, WS_PORT).to_json());
let mut resp = Response::new();
resp.set_mut(Template::new("index", data)).set_mut(status::Ok);
Ok(resp)
})
}
macro_rules! params {
($req:expr => [URL $($url:ident),*] [GET $($get:ident),*] [POST $($post:ident),*]) => {
let ($($url,)*) = {
if let Some(params) = $req.extensions.get::<Router>() {
($({
println!("DBG extracting :{} from URL", stringify!($url));
let p = params.find(stringify!($url)).unwrap();
println!("DBG :{} = {}", stringify!($url), p);
String::from_utf8(percent_decode(p.as_bytes())).unwrap()
},)*)
} else {
($({
println!("DBG making up default value for :{}", stringify!($url));
Default::default()
},)*)
}
};
let ($($get,)*) = {
if let Some(params) = $req.extensions.get::<UrlEncodedQuery>() {
($({
println!("DBG extracting GET {}", stringify!($get));
let p = params[stringify!($get)][0].clone();
println!("DBG GET {} = {}", stringify!($get), p);
p
},)*)
} else {
($({
println!("DBG making up default value for GET {}", stringify!($get));
Default::default()
},)*)
}
};
let ($($post,)*) = {
if let Some(params) = $req.extensions.get::<UrlEncodedBody>() {
($({
println!("DBG extracting POST {}", stringify!($post));
let p = params[stringify!($post)][0].clone();
println!("DBG POST {} = {}", stringify!($post), p);
p
},)*)
} else {
($({
println!("DBG making up default value for POST {}", stringify!($post));
Default::default()
},)*)
}
};
}
}
/// Handler for starting/stopping a service
fn control(tx: mpsc::Sender<CmdFrom>) -> Box<Handler> {
let mtx = Mutex::new(tx);
Box::new(move |req: &mut Request| -> IronResult<Response> {
params!(req => [URL service, action]
[GET]
[POST]);
Ok(match &*action {
"start" =>
if rpc!(mtx.lock().unwrap(), CmdFrom::Start, service.to_string()).unwrap() {
Response::with((status::Ok, format!("Started {}", service)))
} else {
Response::with((status::InternalServerError, format!("Failed to start {}", service)))
},
"stop" =>
if rpc!(mtx.lock().unwrap(), CmdFrom::Stop, service.to_string()).unwrap() {
Response::with((status::Ok, format!("Stopped {}", service)))
} else {
Response::with((status::InternalServerError, format!("Failed to stop {}", service)))
},
"kick" =>
match mtx.lock().unwrap().send(CmdFrom::Data(format!("kick {}", service))) {
Ok(_) => Response::with((status::Ok, format!("Kicked {}", service))),
Err(_) => Response::with((status::InternalServerError, format!("Failed to kick {}", service)))
},
_ => Response::with((status::BadRequest, format!("What does {} mean?", action)))
})
})
}
/// Handler for starting/continuing a flow
fn flow(tx: mpsc::Sender<CmdFrom>, flows: Arc<RwLock<Vec<Flow>>>) -> Box<Handler> {
let mtx = Mutex::new(tx);
Box::new(move |req: &mut Request| -> IronResult<Response> {
params!(req => [URL flow, action]
[GET]
[POST wsid]);
let wsid = wsid.parse().unwrap();
Ok(match &*action {
"start" => {
let mut locked_flows = flows.write().unwrap();
if let Some(found) = locked_flows.iter_mut().find(|f| f.name == flow) {
if found.active {
Response::with((status::BadRequest, format!("Already in the middle of \"{}\" flow", flow)))
} else {
found.run(ParkState::metermaid(), mtx.lock().unwrap().deref(), wsid);
Response::with((status::Ok, format!("Started \"{}\" flow", flow)))
}
} else {
Response::with((status::BadRequest, format!("Could not find \"{}\" flow", flow)))
}
},
"continue" => unimplemented!(),
_ => Response::with((status::BadRequest, format!("What does {} mean?", action)))
})
})
}
/// Controllable struct for the web server
pub struct Web {
/// Private handle to the HTTP server
listening: Listening,
/// Private handle to the websocket server thread
websocket: JoinHandle<()>,
/// Private channel for sending events to WebSocket clients
wstx: mpsc::Sender<ws::Message>,
}
struct Catchall;
impl Catchall {
fn new() -> Catchall { Catchall }
}
impl AfterMiddleware for Catchall {
fn catch(&self, req: &mut Request, err: IronError) -> IronResult<Response> {
match err.response.status {
Some(status::NotFound) => Ok(err.response),
_ => Err(err)
}
}
}
struct Drain;
impl Drain {
fn new() -> Drain { Drain }
fn drain(req: &mut Request, resp: &mut Response) {
const LIMIT: u64 = 1024 * 1024;
io::copy(&mut req.body.by_ref().take(LIMIT), &mut io::sink());
let mut buf = [0];
if let Ok(n) = req.body.read(&mut buf) {
if n > 0 {
resp.headers.set(Connection::close());
}
}
}
}
impl AfterMiddleware for Drain {
fn after(&self, req: &mut Request, mut resp: Response) -> IronResult<Response> {
Drain::drain(req, &mut resp);
Ok(resp)
}
fn catch(&self, req: &mut Request, mut err: IronError) -> IronResult<Response> {
Drain::drain(req, &mut err.response);
Err(err)
}
}
#[cfg(feature = "watch")]
fn maybe_watch(chain: &mut Chain) {
let watcher = Arc::new(HandlebarsEngine::new(&relpath("templates"), ".hbs"));
watcher.watch();
chain.link_after(watcher);
}
#[cfg(not(feature = "watch"))]
fn maybe_watch(chain: &mut Chain) {
chain.link_after(HandlebarsEngine::new(&relpath("templates"), ".hbs"));
}
guilty!{
impl Controllable for Web {
const NAME: &'static str = "web",
const BLOCK: Block = Block::Infinite,
fn setup(tx: mpsc::Sender<CmdFrom>, _: Option<String>) -> Web {
let flows = vec! {
Flow::new("Test flow",
vec! {
FlowState::new("One", None, vec! {
FlowCmd::Message("Please insert Tab A into Slot B"),
}),
}),
Flow::new("Episode",
vec! {
// TODO shutdown button
FlowState::new("Begin", None, vec! {
FlowCmd::StopSensors,
FlowCmd::Message("Starting new episode!"),
}),
// TODO generate unique ID and overall timestamp (+ timestamp for each step)
FlowState::new("Camera aiming", Some(ParkState::None), vec! {
FlowCmd::StopSensors,
FlowCmd::Start("bluefox"), FlowCmd::Start("structure"),
FlowCmd::Message("Use the Refresh button to get the cameras aimed well"),
}),
FlowState::new("Camera capture", None, vec! {
FlowCmd::Send("bluefox disk start"),
FlowCmd::Send("structure disk start"),
FlowCmd::Message("Now recording! Pan the rig around to get images from various angles"),
}),
FlowState::new("Camera finish", None, vec! {
FlowCmd::Send("bluefox disk stop"),
FlowCmd::Send("structure disk stop"),
FlowCmd::Message("Writing to disk, please wait..."),
FlowCmd::Stop("bluefox"), FlowCmd::Stop("structure"),
FlowCmd::Message("Done!"),
}),
FlowState::new("BioTac capture", Some(ParkState::BioTac), vec! {
FlowCmd::Start("biotac"), FlowCmd::Start("stb"),
FlowCmd::Message("Recording from BioTac!"),
}),
FlowState::new("BioTac finish", None, vec! {
FlowCmd::Message("Writing to disk, please wait..."),
FlowCmd::Stop("biotac"), FlowCmd::Stop("stb"),
FlowCmd::Message("Done!"),
}),
FlowState::new("OptoForce capture", Some(ParkState::OptoForce), vec! {
FlowCmd::Start("optoforce"), FlowCmd::Start("stb"),
FlowCmd::Message("Recording from OptoForce!"),
}),
FlowState::new("OptoForce finish", None, vec! {
FlowCmd::Message("Writing to disk, please wait..."),
FlowCmd::Stop("optoforce"), FlowCmd::Stop("stb"),
FlowCmd::Message("Done!"),
}),
FlowState::new("Rigid stick capture", Some(ParkState::Stick), vec! {
FlowCmd::Start("stb"),
FlowCmd::Message("Recording from rigid stick!"),
}),
FlowState::new("Rigid stick finish", None, vec! {
FlowCmd::Message("Writing to disk, please wait..."),
FlowCmd::Stop("stb"),
FlowCmd::Message("Done!"),
}),
FlowState::new("Wrap up", Some(ParkState::None), vec! {
FlowCmd::str("episode name"),
// TODO more questions here?
FlowCmd::int("hardness", (1, 5)),
FlowCmd::int("roughness", (1, 5)),
FlowCmd::int("slipperiness", (1, 5)),
FlowCmd::int("warmness", (1, 5)),
FlowCmd::Message("Done!"),
}),
}),
};
let (wstx, wsrx) = mpsc::channel();
let ctx = tx.clone();
let thread = thread::spawn(move || {
let ws = ws::Server::bind(("0.0.0.0", WS_PORT)).unwrap();
let mut ws_relays = Vec::new();
let marshal = thread::spawn(move || {
// relay messages from above to all WS threads
while let Ok(msg) = wsrx.recv() {
WS_SENDERS.lock().unwrap().iter_mut().map(|ref mut s| {
s.send_message(ws::Message::clone(&msg)).unwrap(); // FIXME why is the typechecker so confused here
}).count();
}
println!("web: shutting down websocket servers");
// kill all WS threads now
WS_SENDERS.lock().unwrap().iter_mut().map(|ref mut s| {
s.send_message(ws::Message::Close(None)).unwrap();
}).count();
});
for connection in ws {
let request = connection.unwrap().read_request().unwrap(); // Get the request
let headers = request.headers.clone(); // Keep the headers so we can check them
request.validate().unwrap(); // Validate the request
let mut response = request.accept(); // Form a response
if let Some(&ws::header::WebSocketProtocol(ref protocols)) = headers.get() {
if protocols.contains(&("rust-websocket".to_string())) {
// We have a protocol we want to use
response.headers.set(ws::header::WebSocketProtocol(vec!["rust-websocket".to_string()]));
}
}
let mut client = response.send().unwrap(); // Send the response
let ip = client.get_mut_sender()
.get_mut()
.peer_addr()
.unwrap();
println!("Websocket connection from {}", ip);
let mut locked_senders = WS_SENDERS.lock().unwrap();
let wsid = locked_senders.len();
let message = ws::Message::Text(format!("hello {}", wsid));
client.send_message(message).unwrap();
let (sender, mut receiver) = client.split();
locked_senders.push(sender);
let cctx = ctx.clone();
ws_relays.push(thread::spawn(move || {
for message in receiver.incoming_messages() {
let message = message.unwrap();
match message {
ws::Message::Close(_) => {
println!("Websocket client {} disconnected", ip);
return;
},
ws::Message::Text(text) => {
if text.starts_with("RPC") {
let space = text.find(' ').unwrap();
let id = text[3..space].parse::<usize>().unwrap();
let msg = text[space..].to_string();
let mut locked_senders = WS_SENDERS.lock().unwrap();
let locked_rpcs = RPC_SENDERS.lock().unwrap();
if let Some(rpc) = locked_rpcs.get(&id) {
rpc.send(msg).unwrap();
} else {
locked_senders[id].send_message(ws::Message::Text("RPC ERROR: nobody listening".to_string())).unwrap();
}
} else {
cctx.send(CmdFrom::Data(text)).unwrap();
}
},
_ => ()
}
}
}));
}
});
let shared_flows = Arc::new(RwLock::new(flows));
let mut router = Router::new();
router.get("/", index(shared_flows.clone()));
router.post("/control/:service/:action", control(tx.clone()));
router.post("/flow/:flow/:action", flow(tx.clone(), shared_flows.clone()));
let mut mount = Mount::new();
for p in ["css", "fonts", "js"].iter() {
mount.mount(&format!("/{}/", p),
Static::new(Path::new(&relpath("bootstrap")).join(p)));
}
mount.mount("/", router);
let mut chain = Chain::new(mount);
maybe_watch(&mut chain);
chain.link_after(Catchall::new());
chain.link_after(Drain::new());
let listening = Iron::new(chain).http(("0.0.0.0", HTTP_PORT)).unwrap();
Web { listening: listening, websocket: thread, wstx: wstx }
}
fn step(&mut self, data: Option<String>) {
if let Some(d) = data {
self.wstx.send(ws::Message::Text(d)).unwrap();
}
}
fn teardown(&mut self) {
self.listening.close().unwrap(); // FIXME this does not do anything (known bug in hyper)
// TODO send a message to the websocket thread telling it to shut down
}
}
}