Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add kble-based c2a-hal UART emulation #41

Merged
merged 3 commits into from
Aug 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ members = [
"./hal/spi-noop",
"./hal/uart-noop",
"./hal/wdt-noop",

"./hal/uart-kble",
]

[workspace.dependencies]
Expand Down
17 changes: 17 additions & 0 deletions hal/uart-kble/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
[package]
name = "c2a-uart-kble"
description = "kble-based UART emulation for C2A"
meltingrabbit marked this conversation as resolved.
Show resolved Hide resolved
version.workspace = true
edition = "2021"

[dependencies]
c2a-core.workspace = true
once_cell = "1"
futures = "0.3"
kble-socket = { version = "0.2.0", features = ["axum"] }
tokio = { version = "1", features = ["sync", "rt"] }
axum = { version = "0.6", default-features = false, features = ["tokio", "http1", "ws"] }
anyhow = "1"

[dev-dependencies]
c2a-core = { workspace = true, features = ["no-c2a-link"] }
79 changes: 79 additions & 0 deletions hal/uart-kble/src/buffer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
use std::{
collections::VecDeque,
ops::{Deref, DerefMut},
};

use tokio::sync::{Mutex, Notify};

pub struct Buffer {
deque: Mutex<VecDeque<u8>>,
notify: Notify,
}

fn write_internal<D>(mut deque: D, data: &[u8])
where
D: Deref<Target = VecDeque<u8>> + DerefMut,
{
let data = if deque.capacity() < data.len() {
&data[data.len() - deque.capacity()..]
} else {
data
};
if deque.capacity() < deque.len() + data.len() {
let overflow = deque.len() + data.len() - deque.capacity();
deque.drain(..overflow);
}
deque.extend(data);
}
fn read_internal<D>(mut deque: D, buf: &mut [u8]) -> usize
where
D: Deref<Target = VecDeque<u8>> + DerefMut,
{
let read = deque.len().min(buf.len());
for (s, d) in deque.drain(..).zip(buf.iter_mut()) {
*d = s;
}
read
}
impl Buffer {
pub fn with_capacity(capacity: usize) -> Self {
Self {
deque: Mutex::new(VecDeque::with_capacity(capacity)),
notify: Notify::new(),
}
}

pub fn reinitialize(&self, capacity: usize) {
let mut deque = self.deque.blocking_lock();
*deque = VecDeque::with_capacity(capacity);
}

pub fn blocking_write(&self, data: &[u8]) {
let deque = self.deque.blocking_lock();
write_internal(deque, data);
self.notify.notify_waiters();
}

pub async fn write(&self, data: &[u8]) {
let deque = self.deque.lock().await;
write_internal(deque, data);
self.notify.notify_waiters();
}

pub fn nonblocking_read(&self, buf: &mut [u8]) -> usize {
let deque = self.deque.blocking_lock();
read_internal(deque, buf)
}

pub async fn read(&self, buf: &mut [u8]) -> usize {
loop {
let deque = self.deque.lock().await;
if deque.is_empty() {
drop(deque);
self.notify.notified().await;
continue;
}
return read_internal(deque, buf);
}
}
}
85 changes: 85 additions & 0 deletions hal/uart-kble/src/kble.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
use std::{net::SocketAddr, sync::Arc, thread};

use anyhow::Result;
use axum::{
extract::{ws::WebSocket, Path, State, WebSocketUpgrade},
http::StatusCode,
response::Response,
routing::get,
Router,
};
use futures::{SinkExt, StreamExt};
use kble_socket::from_axum;
use tokio::sync::OwnedMutexGuard;

use crate::{Mux, OuterChannel};

pub struct Server {
mux: Arc<Mux>,
}

impl Server {
pub fn new(mux: Arc<Mux>) -> Self {
Self { mux }
}

pub async fn serve(self, addr: SocketAddr) -> Result<()> {
let app = Router::new()
.route("/channels/:ch", get(handle_channel))
.with_state(self.mux);
axum::Server::bind(&addr)
.serve(app.into_make_service())
.await?;
Ok(())
}

pub fn serve_in_background(self, addr: SocketAddr) {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
thread::spawn(move || {
let fut = self.serve(addr);
if let Err(e) = rt.block_on(fut) {
eprintln!("kble server has exited: {e}");
}
});
}
}

async fn handle_channel(
upgrade: WebSocketUpgrade,
State(mux): State<Arc<Mux>>,
Path(ch): Path<u8>,
) -> Result<Response, StatusCode> {
let Some(channel) = mux.try_get_outer(ch) else {
return Err(StatusCode::CONFLICT);
};
Ok(upgrade.on_upgrade(|ws| handle_ws(ws, channel)))
}

async fn handle_ws(ws: WebSocket, channel: OwnedMutexGuard<OuterChannel>) {
let (mut sink, mut stream) = from_axum(ws);
let tx = channel.tx.clone();
let rx = channel.rx.clone();
let tx_fut = async {
loop {
let mut buf = vec![0u8; 2048];
let len = tx.read(&mut buf).await;
buf.truncate(len);
sink.send(buf.into()).await?;
}
#[allow(unreachable_code)]
anyhow::Ok(())
};
let rx_fut = async {
loop {
let Some(chunk) = stream.next().await else {
break;
};
rx.write(&chunk?).await;
}
anyhow::Ok(())
};
futures::future::try_join(tx_fut, rx_fut).await.ok();
}
143 changes: 143 additions & 0 deletions hal/uart-kble/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
//! **c2a-uart-kble** provides [kble](https://github.com/arkedge/kble) based c2a-hal UART
//! emulation for C2A

mod buffer;
mod kble;

use std::env;
use std::ffi::c_int;
use std::net::{Ipv4Addr, SocketAddr};
use std::{collections::HashMap, sync::Arc};

use c2a_core::hal::uart::{UART_Config, UART_ERR_CODE, UART_ERR_CODE_UART_OK};
use kble::Server;
use once_cell::sync::Lazy;
use tokio::sync::{Mutex, OwnedMutexGuard, RwLock};

use buffer::Buffer;

pub struct OuterChannel {
pub tx: Arc<Buffer>,
pub rx: Arc<Buffer>,
}

struct InnerChannel {
tx: Arc<Buffer>,
rx: Arc<Buffer>,
}

struct ChannelPair {
inner: InnerChannel,
outer: Arc<Mutex<OuterChannel>>,
}

impl ChannelPair {
fn with_capacity(capacity: usize) -> Self {
let tx = Arc::new(Buffer::with_capacity(capacity));
let rx = Arc::new(Buffer::with_capacity(capacity));
let inner = InnerChannel {
tx: tx.clone(),
rx: rx.clone(),
};
let outer = OuterChannel { tx, rx };

Self {
inner,
outer: Arc::new(Mutex::new(outer)),
}
}

fn reinitialize(&mut self, capacity: usize) {
self.inner.tx.reinitialize(capacity);
self.inner.rx.reinitialize(capacity);
}
}

#[derive(Default)]
pub struct Mux {
channels: RwLock<HashMap<u8, ChannelPair>>,
}

impl Mux {
fn init_channel(&self, ch: u8) {
const BUFFER_SIZE: usize = 2048; // FIXME: make configurable
let mut channels = self.channels.blocking_write();
channels
.entry(ch)
.and_modify(|channel| channel.reinitialize(BUFFER_SIZE))
.or_insert_with(|| ChannelPair::with_capacity(BUFFER_SIZE));
}

fn read(&self, ch: u8, buf: &mut [u8]) -> usize {
let channels = self.channels.blocking_read();
let Some(pair) = channels.get(&ch) else {
// TODO: return propery error
return 0;
};
pair.inner.rx.nonblocking_read(buf)
}

fn write(&self, ch: u8, data: &[u8]) {
let channels = self.channels.blocking_read();
let Some(pair) = channels.get(&ch) else {
return;
};
pair.inner.tx.blocking_write(data)
}

pub fn try_get_outer(&self, ch: u8) -> Option<OwnedMutexGuard<OuterChannel>> {
let channels = self.channels.try_read().ok()?;
let Some(pair) = channels.get(&ch) else {
return None;
};
pair.outer.clone().try_lock_owned().ok()
}
}

static MUX: Lazy<Arc<Mux>> = Lazy::new(|| {
let mux = Arc::new(Mux::default());
let port = env::var("UART_KBLE_PORT").unwrap_or_else(|_| "9696".to_string());
let port = port.parse().unwrap();
Server::new(mux.clone())
.serve_in_background(SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), port));
mux
});

#[no_mangle]
#[allow(clippy::missing_safety_doc)]
pub unsafe extern "C" fn UART_init(uart_config: *mut UART_Config) -> c_int {
let ch = unsafe { (*uart_config).ch };
MUX.init_channel(ch);
UART_ERR_CODE_UART_OK.0
}

#[no_mangle]
#[allow(clippy::missing_safety_doc)]
pub unsafe extern "C" fn UART_rx(
uart_config: *mut UART_Config,
buf: *mut u8,
buf_size: c_int,
) -> c_int {
let ch = unsafe { (*uart_config).ch };
let buf = unsafe { std::slice::from_raw_parts_mut(buf, buf_size as usize) };
let ret = MUX.read(ch, buf);
UART_ERR_CODE(ret as i32).0
}

#[no_mangle]
#[allow(clippy::missing_safety_doc)]
pub unsafe extern "C" fn UART_tx(
uart_config: *mut UART_Config,
data: *mut u8,
data_size: c_int,
) -> c_int {
let ch = unsafe { (*uart_config).ch };
let data = unsafe { std::slice::from_raw_parts_mut(data, data_size as usize) };
MUX.write(ch, data);
UART_ERR_CODE_UART_OK.0
}

#[no_mangle]
pub extern "C" fn UART_reopen(_uart_config: *mut UART_Config, _reason: c_int) -> c_int {
UART_ERR_CODE_UART_OK.0
}