Skip to content

Commit

Permalink
fix bugs, etc.
Browse files Browse the repository at this point in the history
  • Loading branch information
Donald Ball committed Jan 30, 2023
1 parent 7f9eb09 commit c6c15bc
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 38 deletions.
8 changes: 7 additions & 1 deletion rust/speed/fly.toml
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
app = "protohackers-dball"
# fly.toml file generated for aged-dust-7628 on 2023-01-29T16:40:32-05:00

app = "aged-dust-7628"
kill_signal = "SIGINT"
kill_timeout = 5

[[services]]
internal_port = 9000
protocol = "tcp"

[services.concurrency]
hard_limit = 200
soft_limit = 150

[[services.ports]]
port = 9000
15 changes: 9 additions & 6 deletions rust/speed/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ impl<'a> Connection<'a> {
unimplemented!("This is only the server.")
}
}
self.writer.flush().await?;
Ok(())
}

Expand All @@ -59,21 +60,23 @@ impl<'a> Connection<'a> {
0x20 => {
let len: usize = self.reader.read_u8().await?.into();
let mut buf: Vec<u8> = Vec::with_capacity(len);
let n = self.reader.read(&mut buf[..]).await?;
if n != len {
return Err(io::Error::from(io::ErrorKind::InvalidData));
}
buf.resize(len, 0);
self.reader.read_exact(&mut buf[..]).await?;
match String::from_utf8(buf) {
Ok(plate) => {
let timestamp = self.reader.read_u32().await?;
Ok(Message::Plate(plate, timestamp))
}
Err(e) => return Err(io::Error::new(io::ErrorKind::InvalidData, e)),
Err(e) => Err(io::Error::new(io::ErrorKind::InvalidData, e)),
}
}
0x40 => {
let deciseconds = self.reader.read_u32().await?;
let duration = Duration::from_micros((deciseconds * 10).into());
let duration = if deciseconds == 0 {
None
} else {
Some(Duration::from_millis((deciseconds * 100).into()))
};
Ok(Message::WantHeartbeat(duration))
}
0x80 => {
Expand Down
29 changes: 20 additions & 9 deletions rust/speed/src/domain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use std::{
// TODO if we did, would that affect the dispatch speed in reads, writes, etc.?
// TODO how can we encode the direction, and/or the state sequence constraints on the message types
// TODO how can we construct de/serialization code for these declaratively?
#[derive(Debug)]
pub enum Message {
// server -> client
Error(String),
Expand All @@ -22,7 +23,7 @@ pub enum Message {
// server -> dispatcher
Ticket(Ticket),
// client -> server
WantHeartbeat(Duration),
WantHeartbeat(Option<Duration>),
// server -> client
Heartbeat,
// (client->camera) -> server
Expand Down Expand Up @@ -128,11 +129,13 @@ impl Region {
let by_timestamp = by_road.entry(camera.road).or_default();
if by_timestamp.insert(timestamp, camera.mile).is_none() {
if let Some((then, there)) = by_timestamp.range(0..timestamp).last() {
eprintln!("looking back {} {}", then, there);
if let Some(ticket) = Self::compute_ticket(camera, plate, timestamp, *then, *there)
{
return Some(ticket);
}
} else if let Some((then, there)) = by_timestamp.range(timestamp + 1..).next() {
eprintln!("looking fore {} {}", then, there);
if let Some(ticket) = Self::compute_ticket(camera, plate, timestamp, *then, *there)
{
return Some(ticket);
Expand All @@ -150,14 +153,22 @@ impl Region {
then: Timestamp,
there: Mile,
) -> Option<Ticket> {
let distance = camera.mile - there;
let elapsed = now - then;
let fspeed: f64 = f64::from(distance) / f64::from(elapsed);
if fspeed.abs() > camera.limit.into() {
let (mile1, mile2, timestamp1, timestamp2) = if fspeed > 0.0 {
(camera.mile, there, now, then)
} else {
eprintln!("COMPUTE_TICKET plate={}", plate);
let miles = f64::from(camera.mile) - f64::from(there);
eprintln!("MILES from={} to={} delta={}", camera.mile, there, miles);
let hours = (f64::from(now) - f64::from(then)) / 3600.0;
eprintln!("TIME from={} to={} hours={}", now, then, hours);
let velocity: f64 = miles / hours;
let speed = velocity.abs();
eprintln!(
"SPEEDS vel={} speed={} limit={}",
velocity, speed, camera.limit,
);
if speed > camera.limit.into() {
let (mile1, mile2, timestamp1, timestamp2) = if then < now {
(there, camera.mile, then, now)
} else {
(camera.mile, there, now, then)
};
return Some(Ticket {
plate,
Expand All @@ -166,7 +177,7 @@ impl Region {
timestamp1,
mile2,
timestamp2,
speed: (fspeed.abs() * 100.0).round() as u16,
speed: (speed * 100.0).round() as u16,
});
}
None
Expand Down
2 changes: 1 addition & 1 deletion rust/speed/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ pub mod connection;
pub mod domain;
pub mod server;

#[tokio::main]
#[tokio::main(flavor = "multi_thread")]
async fn main() {
let mut server = Server::new();
server.run().await.unwrap();
Expand Down
56 changes: 35 additions & 21 deletions rust/speed/src/server.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::{io, time::Duration};
use std::{
io::{self, ErrorKind},
time::Duration,
};

use tokio::{
net::{TcpListener, TcpStream},
Expand Down Expand Up @@ -27,15 +30,10 @@ impl Server {
let listener = TcpListener::bind("0.0.0.0:9000").await?;
loop {
tokio::select! {
_ = async {
loop {
let (socket, _) = listener.accept().await?;
let tx = tx.clone();
tokio::spawn(async move { handle(socket, tx); });
}
// "Help the rust type inferencer out" ?
Ok::<_, io::Error>(())
} => {},
Ok((socket, _)) = listener.accept() => {
let tx = tx.clone();
tokio::spawn(async move { handle(socket, tx).await; });
}
Some(cmd) = rx.recv() => {
match cmd {
ServerCommand::RecordPlate(camera, plate, timestamp) => {
Expand Down Expand Up @@ -66,18 +64,18 @@ async fn send_error(mut conn: Connection<'_>, msg: &str) -> Result<(), io::Error
// TODO is this goofy or good? Feels like it'll call on every select poll
// even in the none case, but really, if it's none, we don't even want to
// participate in the select.
async fn maybe_tick(interval: &mut Option<Interval>) -> Option<()> {
async fn maybe_tick(interval: &mut Option<Option<Interval>>) -> Option<()> {
match interval {
Some(interval) => {
Some(Some(interval)) => {
interval.tick().await;
Some(())
}
None => None,
_ => None,
}
}

async fn handle(mut socket: TcpStream, tx: mpsc::Sender<ServerCommand>) -> Result<(), io::Error> {
let mut heartbeat = None;
let mut heartbeat: Option<Option<Interval>> = None;
let mut conn = Connection::new(&mut socket);
loop {
tokio::select! {
Expand All @@ -87,14 +85,19 @@ async fn handle(mut socket: TcpStream, tx: mpsc::Sender<ServerCommand>) -> Resul
if heartbeat.is_some() {
return send_error(conn, "already beating").await;
}
heartbeat = Some(time::interval(duration));
if let Some(duration) = duration {
heartbeat = Some(Some(time::interval(duration)));
} else {
heartbeat = Some(None);
}
}
Ok(Message::IAmCamera(camera)) => {
return handle_camera(conn, tx, camera, heartbeat).await;
}
Ok(Message::IAmDispatcher(dispatcher)) => {
return handle_dispatcher(conn, tx, dispatcher, heartbeat).await;
}
Err(e) if e.kind() == ErrorKind::UnexpectedEof => {},
_ => {
return send_error(conn, "invalid message").await;
}
Expand All @@ -111,7 +114,7 @@ async fn handle_camera(
mut conn: Connection<'_>,
tx: mpsc::Sender<ServerCommand>,
camera: Camera,
mut heartbeat: Option<Interval>,
mut heartbeat: Option<Option<Interval>>,
) -> Result<(), io::Error> {
loop {
tokio::select! {
Expand All @@ -121,15 +124,21 @@ async fn handle_camera(
if heartbeat.is_some() {
return send_error(conn, "already beating").await;
}
heartbeat = Some(time::interval(duration));
if let Some(duration) = duration {
heartbeat = Some(Some(time::interval(duration)));
} else {
heartbeat = Some(None);
}
}
Ok(Message::Plate(plate, timestamp)) => {
let cmd = ServerCommand::RecordPlate(camera, plate, timestamp);
if tx.send(cmd).await.is_err() {
eprintln!("dropped plate record");
}
}
Err(e) if e.kind() == ErrorKind::UnexpectedEof => {},
_ => {
eprintln!("invalid camera message {:?}", msg);
return send_error(conn, "invalid camera message").await;
}
}
Expand All @@ -152,9 +161,9 @@ async fn handle_dispatcher(
mut conn: Connection<'_>,
cmd_tx: mpsc::Sender<ServerCommand>,
dispatcher: Dispatcher,
mut heartbeat: Option<Interval>,
mut heartbeat: Option<Option<Interval>>,
) -> Result<(), io::Error> {
let mut issue = time::interval(Duration::from_millis(100));
let mut issue = time::interval(Duration::from_millis(1000));
let mut ticketer: Option<oneshot::Receiver<Option<Ticket>>> = None;
loop {
tokio::select! {
Expand All @@ -177,10 +186,15 @@ async fn handle_dispatcher(
if heartbeat.is_some() {
return send_error(conn, "already beating").await;
}
heartbeat = Some(time::interval(duration));
if let Some(duration) = duration {
heartbeat = Some(Some(time::interval(duration)));
} else {
heartbeat = Some(None);
}
}
Err(e) if e.kind() == ErrorKind::UnexpectedEof => {},
_ => {
return send_error(conn, "invalid camera message").await;
return send_error(conn, "invalid dispatcher message").await;
}
}
}
Expand Down

0 comments on commit c6c15bc

Please sign in to comment.