Skip to content

Commit

Permalink
Simplify protocol reader
Browse files Browse the repository at this point in the history
  • Loading branch information
yorickdewid committed Aug 10, 2023
1 parent 58ab28d commit 6bb4e33
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 30 deletions.
15 changes: 10 additions & 5 deletions glonax-proxy/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ async fn daemonize(config: &config::ProxyConfig) -> anyhow::Result<()> {

tokio::time::sleep(std::time::Duration::from_millis(host_interval)).await;
}

// log::debug!("Host service shutdown");
});

let ecu_sender: Sender<glonax::core::Signal> = tx.clone();
Expand Down Expand Up @@ -184,6 +186,8 @@ async fn daemonize(config: &config::ProxyConfig) -> anyhow::Result<()> {
}
}
}

log::debug!("FIFO listener shutdown");
});

let ecu_interface = config.interface.clone();
Expand Down Expand Up @@ -227,14 +231,16 @@ async fn daemonize(config: &config::ProxyConfig) -> anyhow::Result<()> {
}
}
}

log::debug!("Motion listener shutdown");
});

let listener = TcpListener::bind(config.address.clone()).await?;

loop {
let (stream, addr) = listener.accept().await?;

log::info!("Accepted connection from: {}", addr);
log::debug!("Accepted connection from: {}", addr);

let (stream_reader, stream_writer) = stream.into_split();

Expand All @@ -252,22 +258,21 @@ async fn daemonize(config: &config::ProxyConfig) -> anyhow::Result<()> {
}
}

log::info!("Signal listener shutdown");
log::info!("Session signal listener shutdown");
});

tokio::spawn(async move {
let mut session_name = String::new();

let mut protocol_in = glonax::transport::Protocol::new(stream_reader);

while let Ok(message) = protocol_in.read_frame().await {
while let Ok(message) = protocol_in.read_frame2().await {
match message {
glonax::transport::Message::Start(session) => {
log::info!("Session started for: {}", session.name());
session_name = session.name().to_string();
}
glonax::transport::Message::Shutdown => {
log::info!("Session shutdown for: {}", session_name);
break;
}
glonax::transport::Message::Motion(motion) => {
Expand All @@ -281,7 +286,7 @@ async fn daemonize(config: &config::ProxyConfig) -> anyhow::Result<()> {
}
}

log::info!("Connection closed for: {}", addr);
log::info!("Session shutdown for: {}", session_name);
});
}

Expand Down
105 changes: 80 additions & 25 deletions glonax-runtime/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,32 +130,23 @@ impl<T: AsyncWrite + Unpin> Protocol<T> {
}

impl<T: AsyncRead + Unpin> Protocol<T> {
// TODO: Why not return UnexpectedEof?
// TODO: This maybe too complex
async fn read_at_least(&mut self, min_len: usize) -> std::io::Result<Option<usize>> {
let mut total_bytes_read = None;
async fn read_at_least(&mut self, min_len: usize) -> std::io::Result<()> {
while self.buffer.len() < min_len {
let bytes_read = self.inner.read_buf(&mut self.buffer).await?;
if bytes_read == 0 {
return Ok(Some(0));
return Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"EOF",
));
}
total_bytes_read = Some(bytes_read + total_bytes_read.unwrap_or(0));
}

Ok(total_bytes_read)
Ok(())
}

pub async fn read_frame(&mut self) -> std::io::Result<Message> {
loop {
let bytes_read = self.read_at_least(MIN_BUFFER_SIZE).await.unwrap();
if let Some(bytes_read) = bytes_read {
if bytes_read == 0 {
return Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"EOF",
));
}
}
self.read_at_least(MIN_BUFFER_SIZE).await?;

// Find header
for i in 0..(self.buffer.len() - PROTO_HEADER.len()) {
Expand Down Expand Up @@ -184,15 +175,7 @@ impl<T: AsyncRead + Unpin> Protocol<T> {
continue;
}

let bytes_read = self.read_at_least(proto_length).await.unwrap();
if let Some(bytes_read) = bytes_read {
if bytes_read == 0 {
return Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"EOF",
));
}
}
self.read_at_least(proto_length).await?;

if message == PROTO_MESSAGE_NULL {
return Ok(Message::Null);
Expand Down Expand Up @@ -237,4 +220,76 @@ impl<T: AsyncRead + Unpin> Protocol<T> {
}
}
}

pub async fn read_frame2(&mut self) -> std::io::Result<Message> {
loop {
let mut header_buffer = [0u8; MIN_BUFFER_SIZE];

self.inner.read_exact(&mut header_buffer).await?;

// Check header
if header_buffer[0] != PROTO_HEADER[0]
|| header_buffer[1] != PROTO_HEADER[1]
|| header_buffer[2] != PROTO_HEADER[2]
{
log::warn!("Invalid header");
continue;
}

// Check protocol version
let version = header_buffer[3];
if version != PROTO_VERSION {
log::warn!("Invalid version {}", version);
continue;
}

let message = header_buffer[4];

let proto_length = u16::from_be_bytes([header_buffer[5], header_buffer[6]]) as usize;
if proto_length > 4096 {
log::warn!("Invalid proto length {}", proto_length);
continue;
}

let payload_buffer = &mut vec![0u8; proto_length];

self.inner.read_exact(payload_buffer).await?;

if message == PROTO_MESSAGE_NULL {
return Ok(Message::Null);
} else if message == PROTO_MESSAGE_START {
let mut session_name = String::new();

for c in payload_buffer {
session_name.push(*c as char);
}

return Ok(Message::Start(frame::Start::new(session_name)));
} else if message == PROTO_MESSAGE_SHUTDOWN {
return Ok(Message::Shutdown);
} else if message == PROTO_MESSAGE_MOTION {
match Motion::try_from(&payload_buffer[..]) {
Ok(motion) => {
return Ok(Message::Motion(motion));
}
Err(_) => {
log::warn!("Invalid motion payload");
continue;
}
}
} else if message == PROTO_MESSAGE_SIGNAL {
match Signal::try_from(&payload_buffer[..]) {
Ok(signal) => {
return Ok(Message::Signal(signal));
}
Err(_) => {
log::warn!("Invalid signal payload");
continue;
}
}
} else {
log::error!("Invalid message type: {}", message);
}
}
}
}

0 comments on commit 6bb4e33

Please sign in to comment.