From 5e70137acf6083feaab1e531a8a9042eab6c6cde Mon Sep 17 00:00:00 2001 From: sksat Date: Wed, 2 Aug 2023 14:20:40 +0900 Subject: [PATCH 1/3] add kble-based c2a-hal UART implementation --- Cargo.toml | 2 + hal/uart_kble/Cargo.toml | 17 +++++ hal/uart_kble/src/buffer.rs | 79 ++++++++++++++++++++ hal/uart_kble/src/kble.rs | 85 ++++++++++++++++++++++ hal/uart_kble/src/lib.rs | 140 ++++++++++++++++++++++++++++++++++++ 5 files changed, 323 insertions(+) create mode 100644 hal/uart_kble/Cargo.toml create mode 100644 hal/uart_kble/src/buffer.rs create mode 100644 hal/uart_kble/src/kble.rs create mode 100644 hal/uart_kble/src/lib.rs diff --git a/Cargo.toml b/Cargo.toml index 5be40995d..6b2da3dd1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,6 +12,8 @@ members = [ "./hal/spi-noop", "./hal/uart-noop", "./hal/wdt-noop", + + "./hal/uart_kble", ] [workspace.dependencies] diff --git a/hal/uart_kble/Cargo.toml b/hal/uart_kble/Cargo.toml new file mode 100644 index 000000000..713e593cc --- /dev/null +++ b/hal/uart_kble/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "c2a-uart-kble" +description = "kble-based UART emulation for C2A" +version.workspace = true +edition = "2021" + +[dependencies] +c2a-core.workspace = true +once_cell = "1" +futures = "0.3" +kble-socket = { version = "0.2.0", features = ["axum"] } +tokio = { version = "1", features = ["sync", "rt"] } +axum = { version = "0.6", default-features = false, features = ["tokio", "http1", "ws"] } +anyhow = "1" + +[dev-dependencies] +c2a-core = { workspace = true, features = ["no-c2a-link"] } diff --git a/hal/uart_kble/src/buffer.rs b/hal/uart_kble/src/buffer.rs new file mode 100644 index 000000000..6dd70e65b --- /dev/null +++ b/hal/uart_kble/src/buffer.rs @@ -0,0 +1,79 @@ +use std::{ + collections::VecDeque, + ops::{Deref, DerefMut}, +}; + +use tokio::sync::{Mutex, Notify}; + +pub struct Buffer { + deque: Mutex>, + notify: Notify, +} + +fn write_internal(mut deque: D, data: &[u8]) +where + D: Deref> + DerefMut, +{ + let data = if deque.capacity() < data.len() { + &data[data.len() - deque.capacity()..] + } else { + data + }; + if deque.capacity() < deque.len() + data.len() { + let overflow = deque.len() + data.len() - deque.capacity(); + deque.drain(..overflow); + } + deque.extend(data); +} +fn read_internal(mut deque: D, buf: &mut [u8]) -> usize +where + D: Deref> + DerefMut, +{ + let read = deque.len().min(buf.len()); + for (s, d) in deque.drain(..).zip(buf.iter_mut()) { + *d = s; + } + read +} +impl Buffer { + pub fn with_capacity(capacity: usize) -> Self { + Self { + deque: Mutex::new(VecDeque::with_capacity(capacity)), + notify: Notify::new(), + } + } + + pub fn reinitialize(&self, capacity: usize) { + let mut deque = self.deque.blocking_lock(); + *deque = VecDeque::with_capacity(capacity); + } + + pub fn blocking_write(&self, data: &[u8]) { + let deque = self.deque.blocking_lock(); + write_internal(deque, data); + self.notify.notify_waiters(); + } + + pub async fn write(&self, data: &[u8]) { + let deque = self.deque.lock().await; + write_internal(deque, data); + self.notify.notify_waiters(); + } + + pub fn nonblocking_read(&self, buf: &mut [u8]) -> usize { + let deque = self.deque.blocking_lock(); + read_internal(deque, buf) + } + + pub async fn read(&self, buf: &mut [u8]) -> usize { + loop { + let deque = self.deque.lock().await; + if deque.is_empty() { + drop(deque); + self.notify.notified().await; + continue; + } + return read_internal(deque, buf); + } + } +} diff --git a/hal/uart_kble/src/kble.rs b/hal/uart_kble/src/kble.rs new file mode 100644 index 000000000..ea6045924 --- /dev/null +++ b/hal/uart_kble/src/kble.rs @@ -0,0 +1,85 @@ +use std::{net::SocketAddr, sync::Arc, thread}; + +use anyhow::Result; +use axum::{ + extract::{ws::WebSocket, Path, State, WebSocketUpgrade}, + http::StatusCode, + response::Response, + routing::get, + Router, +}; +use futures::{SinkExt, StreamExt}; +use kble_socket::from_axum; +use tokio::sync::OwnedMutexGuard; + +use crate::{Mux, OuterChannel}; + +pub struct Server { + mux: Arc, +} + +impl Server { + pub fn new(mux: Arc) -> Self { + Self { mux } + } + + pub async fn serve(self, addr: SocketAddr) -> Result<()> { + let app = Router::new() + .route("/channels/:ch", get(handle_channel)) + .with_state(self.mux); + axum::Server::bind(&addr) + .serve(app.into_make_service()) + .await?; + Ok(()) + } + + pub fn serve_in_background(self, addr: SocketAddr) { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + thread::spawn(move || { + let fut = self.serve(addr); + if let Err(e) = rt.block_on(fut) { + eprintln!("kble server has exited: {e}"); + } + }); + } +} + +async fn handle_channel( + upgrade: WebSocketUpgrade, + State(mux): State>, + Path(ch): Path, +) -> Result { + let Some(channel) = mux.try_get_outer(ch) else { + return Err(StatusCode::CONFLICT); + }; + Ok(upgrade.on_upgrade(|ws| handle_ws(ws, channel))) +} + +async fn handle_ws(ws: WebSocket, channel: OwnedMutexGuard) { + let (mut sink, mut stream) = from_axum(ws); + let tx = channel.tx.clone(); + let rx = channel.rx.clone(); + let tx_fut = async { + loop { + let mut buf = vec![0u8; 2048]; + let len = tx.read(&mut buf).await; + buf.truncate(len); + sink.send(buf.into()).await?; + } + #[allow(unreachable_code)] + anyhow::Ok(()) + }; + let rx_fut = async { + loop { + let Some(chunk) = stream.next().await else { + break; + }; + rx.write(&chunk?).await; + } + anyhow::Ok(()) + }; + futures::future::try_join(tx_fut, rx_fut).await.ok(); +} diff --git a/hal/uart_kble/src/lib.rs b/hal/uart_kble/src/lib.rs new file mode 100644 index 000000000..17f56f08f --- /dev/null +++ b/hal/uart_kble/src/lib.rs @@ -0,0 +1,140 @@ +mod buffer; +mod kble; + +use std::env; +use std::ffi::c_int; +use std::net::{Ipv4Addr, SocketAddr}; +use std::{collections::HashMap, sync::Arc}; + +use c2a_core::hal::uart::{UART_Config, UART_ERR_CODE, UART_ERR_CODE_UART_OK}; +use kble::Server; +use once_cell::sync::Lazy; +use tokio::sync::{Mutex, OwnedMutexGuard, RwLock}; + +use buffer::Buffer; + +pub struct OuterChannel { + pub tx: Arc, + pub rx: Arc, +} + +struct InnerChannel { + tx: Arc, + rx: Arc, +} + +struct ChannelPair { + inner: InnerChannel, + outer: Arc>, +} + +impl ChannelPair { + fn with_capacity(capacity: usize) -> Self { + let tx = Arc::new(Buffer::with_capacity(capacity)); + let rx = Arc::new(Buffer::with_capacity(capacity)); + let inner = InnerChannel { + tx: tx.clone(), + rx: rx.clone(), + }; + let outer = OuterChannel { tx, rx }; + + Self { + inner, + outer: Arc::new(Mutex::new(outer)), + } + } + + fn reinitialize(&mut self, capacity: usize) { + self.inner.tx.reinitialize(capacity); + self.inner.rx.reinitialize(capacity); + } +} + +#[derive(Default)] +pub struct Mux { + channels: RwLock>, +} + +impl Mux { + fn init_channel(&self, ch: u8) { + const BUFFER_SIZE: usize = 2048; // FIXME: make configurable + let mut channels = self.channels.blocking_write(); + channels + .entry(ch) + .and_modify(|channel| channel.reinitialize(BUFFER_SIZE)) + .or_insert_with(|| ChannelPair::with_capacity(BUFFER_SIZE)); + } + + fn read(&self, ch: u8, buf: &mut [u8]) -> usize { + let channels = self.channels.blocking_read(); + let Some(pair) = channels.get(&ch) else { + // TODO: return propery error + return 0; + }; + pair.inner.rx.nonblocking_read(buf) + } + + fn write(&self, ch: u8, data: &[u8]) { + let channels = self.channels.blocking_read(); + let Some(pair) = channels.get(&ch) else { + return; + }; + pair.inner.tx.blocking_write(data) + } + + pub fn try_get_outer(&self, ch: u8) -> Option> { + let channels = self.channels.try_read().ok()?; + let Some(pair) = channels.get(&ch) else { + return None; + }; + pair.outer.clone().try_lock_owned().ok() + } +} + +static MUX: Lazy> = Lazy::new(|| { + let mux = Arc::new(Mux::default()); + let port = env::var("UART_KBLE_PORT").unwrap_or_else(|_| "9696".to_string()); + let port = port.parse().unwrap(); + Server::new(mux.clone()) + .serve_in_background(SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), port)); + mux +}); + +#[no_mangle] +#[allow(clippy::missing_safety_doc)] +pub unsafe extern "C" fn UART_init(uart_config: *mut UART_Config) -> c_int { + let ch = unsafe { (*uart_config).ch }; + MUX.init_channel(ch); + UART_ERR_CODE_UART_OK.0 +} + +#[no_mangle] +#[allow(clippy::missing_safety_doc)] +pub unsafe extern "C" fn UART_rx( + uart_config: *mut UART_Config, + buf: *mut u8, + buf_size: c_int, +) -> c_int { + let ch = unsafe { (*uart_config).ch }; + let buf = unsafe { std::slice::from_raw_parts_mut(buf, buf_size as usize) }; + let ret = MUX.read(ch, buf); + UART_ERR_CODE(ret as i32).0 +} + +#[no_mangle] +#[allow(clippy::missing_safety_doc)] +pub unsafe extern "C" fn UART_tx( + uart_config: *mut UART_Config, + data: *mut u8, + data_size: c_int, +) -> c_int { + let ch = unsafe { (*uart_config).ch }; + let data = unsafe { std::slice::from_raw_parts_mut(data, data_size as usize) }; + MUX.write(ch, data); + UART_ERR_CODE_UART_OK.0 +} + +#[no_mangle] +pub extern "C" fn UART_reopen(_uart_config: *mut UART_Config, _reason: c_int) -> c_int { + UART_ERR_CODE_UART_OK.0 +} From 590a149ebc439c3517b0405ae34dfc8e547056a7 Mon Sep 17 00:00:00 2001 From: sksat Date: Wed, 2 Aug 2023 15:19:29 +0900 Subject: [PATCH 2/3] apply new directory name rule by #42 to c2a-uart-kble --- Cargo.toml | 2 +- hal/{uart_kble => uart-kble}/Cargo.toml | 0 hal/{uart_kble => uart-kble}/src/buffer.rs | 0 hal/{uart_kble => uart-kble}/src/kble.rs | 0 hal/{uart_kble => uart-kble}/src/lib.rs | 0 5 files changed, 1 insertion(+), 1 deletion(-) rename hal/{uart_kble => uart-kble}/Cargo.toml (100%) rename hal/{uart_kble => uart-kble}/src/buffer.rs (100%) rename hal/{uart_kble => uart-kble}/src/kble.rs (100%) rename hal/{uart_kble => uart-kble}/src/lib.rs (100%) diff --git a/Cargo.toml b/Cargo.toml index 6b2da3dd1..48ce722ba 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,7 +13,7 @@ members = [ "./hal/uart-noop", "./hal/wdt-noop", - "./hal/uart_kble", + "./hal/uart-kble", ] [workspace.dependencies] diff --git a/hal/uart_kble/Cargo.toml b/hal/uart-kble/Cargo.toml similarity index 100% rename from hal/uart_kble/Cargo.toml rename to hal/uart-kble/Cargo.toml diff --git a/hal/uart_kble/src/buffer.rs b/hal/uart-kble/src/buffer.rs similarity index 100% rename from hal/uart_kble/src/buffer.rs rename to hal/uart-kble/src/buffer.rs diff --git a/hal/uart_kble/src/kble.rs b/hal/uart-kble/src/kble.rs similarity index 100% rename from hal/uart_kble/src/kble.rs rename to hal/uart-kble/src/kble.rs diff --git a/hal/uart_kble/src/lib.rs b/hal/uart-kble/src/lib.rs similarity index 100% rename from hal/uart_kble/src/lib.rs rename to hal/uart-kble/src/lib.rs From 1a5199c32a6777162346c33c88009861efc32d45 Mon Sep 17 00:00:00 2001 From: sksat Date: Wed, 2 Aug 2023 15:53:17 +0900 Subject: [PATCH 3/3] add kble link included overview as doc comment --- hal/uart-kble/src/lib.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/hal/uart-kble/src/lib.rs b/hal/uart-kble/src/lib.rs index 17f56f08f..728113bd5 100644 --- a/hal/uart-kble/src/lib.rs +++ b/hal/uart-kble/src/lib.rs @@ -1,3 +1,6 @@ +//! **c2a-uart-kble** provides [kble](https://github.com/arkedge/kble) based c2a-hal UART +//! emulation for C2A + mod buffer; mod kble;