Skip to content

Commit

Permalink
Add AsyncChannelSource and AsyncChannelSink (#12)
Browse files Browse the repository at this point in the history
feat(blocks): AsyncChannel Source and sink
  • Loading branch information
ratzrattillo authored Mar 22, 2024
1 parent 7c688e2 commit 0d86666
Show file tree
Hide file tree
Showing 24 changed files with 298 additions and 25 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
.idea
.vscode
config.toml
/target
/Cargo.lock
15 changes: 9 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,25 @@ members = [
[dependencies]
futuresdr = { git = "https://github.com/FutureSDR/FutureSDR", branch = "main" }
#futuresdr = { path = "../FutureSDR" }
async-trait = "0.1.68"
crossbeam-channel = { version = "0.5.8", optional = true }
async-channel = { version = "2.2.0", optional = true }
async-trait = "0.1.78"
crossbeam-channel = { version = "0.5.12", optional = true }
bimap = { version = "0.6.3", optional = true }
sigmf = { version = "0.1.0", path = "crates/sigmf" }
async-fs = "2.1.0"
serde = "1.0.193"
async-fs = "2.1.1"
serde = "1.0.197"

[dev-dependencies]
criterion = { version = "0.4.0", features = ["html_reports"] }
criterion = { version = "0.5.1", features = ["html_reports"] }
tokio-test = "0.4.4"
rand = { version = "0.8.5" }
quickcheck_macros = "1"
serde_json = "1.0.108"
serde_json = "1.0.114"

[features]
default = []
crossbeam = ["dep:crossbeam-channel"]
async-channel = ["dep:async-channel"]
cw = ["dep:bimap"]

[[bench]]
Expand Down
2 changes: 1 addition & 1 deletion benches/channel/crossbeam_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub fn crossbeam_sink_boxed_slice_u32(c: &mut Criterion) {

group.throughput(criterion::Throughput::Elements(n_samp as u64));

group.bench_function(format!("mock-u32-crossbeam-sink"), |b| {
group.bench_function("mock-u32-crossbeam-sink", |b| {
b.iter(|| {
let block = CrossbeamSink::new_typed(tx.clone());
let mut mocker = Mocker::new(block);
Expand Down
2 changes: 1 addition & 1 deletion benches/channel/crossbeam_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub fn crossbeam_source_boxed_slice_u32(c: &mut Criterion) {

group.throughput(criterion::Throughput::Elements(n_samp as u64));

group.bench_function(format!("mock-u32-crossbeam-source"), |b| {
group.bench_function("mock-u32-crossbeam-source", |b| {
b.iter(|| {
let block = CrossbeamSource::new_typed(rx.clone());
let mut mocker = Mocker::new(block);
Expand Down
2 changes: 1 addition & 1 deletion benches/cw/baseband_to_cw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub fn bench_baseband_to_cw(c: &mut Criterion) {

group.throughput(criterion::Throughput::Elements(baseband.len() as u64));

group.bench_function(format!("mock-baseband-to-cw"), |b| {
group.bench_function("mock-baseband-to-cw", |b| {
b.iter(|| {
let block = BaseBandToCW::new_typed(100, samples_per_dot);
let mut mocker = Mocker::new(block);
Expand Down
2 changes: 1 addition & 1 deletion benches/cw/cw_to_char.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub fn bench_cw_to_char(c: &mut Criterion) {

group.throughput(criterion::Throughput::Elements(cw.len() as u64));

group.bench_function(format!("mock-cw-to-char"), |b| {
group.bench_function("mock-cw-to-char", |b| {
b.iter(|| {
let block = CWToChar::new_typed(get_alphabet());
let mut mocker = Mocker::new(block);
Expand Down
4 changes: 2 additions & 2 deletions benches/cw/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub fn bench_char_to_baseband(c: &mut Criterion) {

group.throughput(criterion::Throughput::Elements(bb.len() as u64));

group.bench_function(format!("char_to_bb"), |b| {
group.bench_function("char_to_bb", |b| {
b.iter(|| {
message
.chars()
Expand All @@ -42,7 +42,7 @@ pub fn bench_msg_to_cw(c: &mut Criterion) {

group.throughput(criterion::Throughput::Elements(msg_slice.len() as u64));

group.bench_function(format!("msg_to_cw"), |b| {
group.bench_function("msg_to_cw", |b| {
b.iter(|| {
msg_to_cw(msg_slice);
});
Expand Down
4 changes: 2 additions & 2 deletions check.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ cd ${SCRIPTPATH} && cargo fmt --check
###########################################################
# CLIPPY
###########################################################
cd ${SCRIPTPATH} && cargo clippy --all-targets --workspace
cd ${SCRIPTPATH} && cargo clippy --all-targets --all-features --workspace

###########################################################
# Test
###########################################################
cd ${SCRIPTPATH} && cargo test --all-targets --workspace
cd ${SCRIPTPATH} && cargo test --all-targets --all-features --workspace
81 changes: 81 additions & 0 deletions src/async_channel/async_channel_sink.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
use async_channel::Sender;

use futuresdr::anyhow::Result;
use futuresdr::log::info;
use futuresdr::runtime::Block;
use futuresdr::runtime::BlockMeta;
use futuresdr::runtime::BlockMetaBuilder;
use futuresdr::runtime::Kernel;
use futuresdr::runtime::MessageIo;
use futuresdr::runtime::MessageIoBuilder;
use futuresdr::runtime::StreamIo;
use futuresdr::runtime::StreamIoBuilder;
use futuresdr::runtime::WorkIo;

/// Get samples out of a Flowgraph into a channel.
///
/// # Inputs
///
/// `in`: Samples retrieved from teh flowgraph
///
/// # Usage
/// ```
/// use async_channel;
/// use futuresdr::blocks::VectorSource;
/// use fsdr-blocks::blocks::AsyncChannelSink
/// use futuresdr::runtime::Flowgraph;
///
/// let mut fg = Flowgraph::new();
/// let (tx, rx) = async_channel::unbounded::<Box<[u32]>>();
/// let vec = vec![0, 1, 2];
/// let src = fg.add_block(VectorSource::<u32>::new(vec));
/// let cs = fg.add_block(AsyncChannelSink::<u32>::new(tx));
/// // start flowgraph
/// ```
pub struct AsyncChannelSink<T: Send + 'static> {
sender: Sender<Box<[T]>>,
}

impl<T: Send + Clone + 'static> AsyncChannelSink<T> {
#[allow(clippy::new_ret_no_self)]
pub fn new(sender: Sender<Box<[T]>>) -> Block {
Block::new(
BlockMetaBuilder::new("AsyncChannelSink").build(),
StreamIoBuilder::new().add_input::<T>("in").build(),
MessageIoBuilder::new().build(),
AsyncChannelSink::<T> { sender },
)
}
}

#[doc(hidden)]
#[async_trait]
impl<T: Send + Clone + 'static> Kernel for AsyncChannelSink<T> {
async fn work(
&mut self,
io: &mut WorkIo,
sio: &mut StreamIo,
_mio: &mut MessageIo<Self>,
_meta: &mut BlockMeta,
) -> Result<()> {
let i = sio.input(0).slice::<T>();

if !i.is_empty() {
match self.sender.try_send(i.into()) {
Ok(_) => {
info!("sent data...");
}
Err(err) => {
info!("{}", err.to_string());
}
}
sio.input(0).consume(i.len());
}

if sio.input(0).finished() {
io.finished = true;
}

Ok(())
}
}
103 changes: 103 additions & 0 deletions src/async_channel/async_channel_source.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
use async_channel::Receiver;
use futuresdr::futures::StreamExt;

use futuresdr::anyhow::Result;
use futuresdr::log::info;
use futuresdr::runtime::Block;
use futuresdr::runtime::BlockMeta;
use futuresdr::runtime::BlockMetaBuilder;
use futuresdr::runtime::Kernel;
use futuresdr::runtime::MessageIo;
use futuresdr::runtime::MessageIoBuilder;
use futuresdr::runtime::StreamIo;
use futuresdr::runtime::StreamIoBuilder;
use futuresdr::runtime::WorkIo;

/// Push samples through a channel into a stream connection.
///
/// # Outputs
///
/// `out`: Samples pushed into the channel
///
/// # Usage
/// ```
/// use async_channel;
/// use fsdr-blocks::blocks::AsyncChannelSource;
/// use futuresdr::runtime::Flowgraph;
///
/// let mut fg = Flowgraph::new();
/// let (tx, rx) = async_channel::unbounded::<Box<[u32]>>();
///
/// let async_channel_src = fg.add_block(AsyncChannelSource::<u32>::new(rx));
///
/// tx.send(orig.clone().into_boxed_slice()).await.unwrap();
/// ```
pub struct AsyncChannelSource<T: Send + 'static> {
receiver: Receiver<Box<[T]>>,
current: Option<(Box<[T]>, usize)>,
}

impl<T: Send + 'static> AsyncChannelSource<T> {
#[allow(clippy::new_ret_no_self)]
pub fn new(receiver: Receiver<Box<[T]>>) -> Block {
Block::new(
BlockMetaBuilder::new("AsyncChannelSource").build(),
StreamIoBuilder::new().add_output::<T>("out").build(),
MessageIoBuilder::new().build(),
AsyncChannelSource::<T> {
receiver,
current: None,
},
)
}
}

#[doc(hidden)]
#[async_trait]
impl<T: Send + 'static> Kernel for AsyncChannelSource<T> {
async fn work(
&mut self,
io: &mut WorkIo,
sio: &mut StreamIo,
_mio: &mut MessageIo<Self>,
_meta: &mut BlockMeta,
) -> Result<()> {
let out = sio.output(0).slice::<T>();
if out.is_empty() {
return Ok(());
}

if self.current.is_none() {
match self.receiver.by_ref().recv().await {
//.by_ref().next().await
Ok(data) => {
info!("received data chunk on channel");
self.current = Some((data, 0));
}
Err(_err) => {
info!("sender-end of channel was closed");
io.finished = true;
return Ok(());
}
}
}

if let Some((data, index)) = &mut self.current {
let n = std::cmp::min(data.len() - *index, out.len());
unsafe {
std::ptr::copy_nonoverlapping(data.as_ptr().add(*index), out.as_mut_ptr(), n);
};
sio.output(0).produce(n);
*index += n;
if *index == data.len() {
self.current = None;
}
}

if self.current.is_none() {
io.call_again = true;
}

Ok(())
}
}
6 changes: 6 additions & 0 deletions src/async_channel/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
//! ## Blocks related to channels
mod async_channel_sink;
mod async_channel_source;

pub use async_channel_sink::AsyncChannelSink;
pub use async_channel_source::AsyncChannelSource;
2 changes: 1 addition & 1 deletion src/channel/crossbeam_sink.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use async_trait::async_trait;
use crossbeam_channel::Sender;
use futuresdr::anyhow::Result;
use futuresdr::async_trait::async_trait;
use futuresdr::log::info;
use futuresdr::runtime::BlockMeta;
use futuresdr::runtime::BlockMetaBuilder;
Expand Down
2 changes: 1 addition & 1 deletion src/channel/crossbeam_source.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use async_trait::async_trait;
use crossbeam_channel::{Receiver, TryRecvError};
use futuresdr::anyhow::Result;
use futuresdr::async_trait::async_trait;
use futuresdr::log::debug;
use futuresdr::runtime::BlockMeta;
use futuresdr::runtime::BlockMetaBuilder;
Expand Down
2 changes: 1 addition & 1 deletion src/cw/baseband_to_cw.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use async_trait::async_trait;
use std::ops::RangeInclusive;

use futuresdr::anyhow::Result;
use futuresdr::async_trait::async_trait;
use futuresdr::runtime::BlockMeta;
use futuresdr::runtime::BlockMetaBuilder;
use futuresdr::runtime::Kernel;
Expand Down
3 changes: 2 additions & 1 deletion src/cw/cw_to_char.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use async_trait::async_trait;

use futuresdr::anyhow::Result;
use futuresdr::async_trait::async_trait;
use futuresdr::runtime::BlockMeta;
use futuresdr::runtime::BlockMetaBuilder;
use futuresdr::runtime::Kernel;
Expand Down
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ pub extern crate async_trait;
#[cfg(feature = "crossbeam")]
pub mod channel;

#[cfg(feature = "async-channel")]
pub mod async_channel;

#[cfg(feature = "cw")]
pub mod cw;

Expand Down
1 change: 0 additions & 1 deletion src/serde_pmt/error.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std;
use std::fmt::{self, Display};

use serde::{de, ser};
Expand Down
8 changes: 4 additions & 4 deletions src/serde_pmt/serialiser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,9 @@ impl<'a> serde::Serializer for &'a mut Serializer {
Ok(Pmt::Null)
}

fn serialize_some<T: ?Sized>(self, value: &T) -> Result<Self::Ok>
fn serialize_some<T>(self, value: &T) -> Result<Self::Ok>
where
T: Serialize,
T: Serialize + ?Sized,
{
value.serialize(self)
}
Expand All @@ -119,13 +119,13 @@ impl<'a> serde::Serializer for &'a mut Serializer {
self.serialize_str(variant)
}

fn serialize_newtype_struct<T: ?Sized>(
fn serialize_newtype_struct<T>(
self,
_name: &'static str,
value: &T,
) -> std::prelude::v1::Result<Self::Ok, Self::Error>
where
T: Serialize,
T: Serialize + ?Sized,
{
value.serialize(self)
}
Expand Down
Loading

0 comments on commit 0d86666

Please sign in to comment.