Skip to content

Commit

Permalink
Made AsyncManualOutputHandler generic
Browse files Browse the repository at this point in the history
  • Loading branch information
mortenhaahr committed Dec 12, 2024
1 parent d699c62 commit 885a510
Showing 1 changed file with 12 additions and 15 deletions.
27 changes: 12 additions & 15 deletions src/manual_output_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@ use futures::future::join_all;
use tokio::sync::{mpsc, oneshot};
use tokio_stream::{wrappers::ReceiverStream, StreamExt};

use crate::{
core::{OutputHandler, OutputStream, StreamData, VarName},
Value,
};
use crate::core::{OutputHandler, OutputStream, StreamData, VarName};

/* Some members are defined as Option<T> as either they are provided after
* construction by provide_streams or once they are used they are taken and
Expand Down Expand Up @@ -101,17 +98,17 @@ impl<V: StreamData> OutputHandler<V> for ManualOutputHandler<V> {
}


pub struct AsyncManualOutputHandler {
pub struct AsyncManualOutputHandler<V: StreamData>{
var_names: Vec<VarName>,
stream_senders: Option<Vec<oneshot::Sender<OutputStream<Value>>>>,
stream_receivers: Option<Vec<oneshot::Receiver<OutputStream<Value>>>>,
output_sender: Option<mpsc::Sender<(VarName, Value)>>,
output_receiver: Option<mpsc::Receiver<(VarName, Value)>>,
stream_senders: Option<Vec<oneshot::Sender<OutputStream<V>>>>,
stream_receivers: Option<Vec<oneshot::Receiver<OutputStream<V>>>>,
output_sender: Option<mpsc::Sender<(VarName, V)>>,
output_receiver: Option<mpsc::Receiver<(VarName, V)>>,
}

#[async_trait]
impl OutputHandler<Value> for AsyncManualOutputHandler {
fn provide_streams(&mut self, mut streams: BTreeMap<VarName, OutputStream<Value>>) {
impl<V: StreamData> OutputHandler<V> for AsyncManualOutputHandler<V> {
fn provide_streams(&mut self, mut streams: BTreeMap<VarName, OutputStream<V>>) {
for (var_name, sender) in self
.var_names
.iter()
Expand Down Expand Up @@ -156,11 +153,11 @@ impl OutputHandler<Value> for AsyncManualOutputHandler {
}
}

impl AsyncManualOutputHandler {
impl<V: StreamData> AsyncManualOutputHandler<V> {
pub fn new(var_names: Vec<VarName>) -> Self {
let (stream_senders, stream_receivers): (
Vec<oneshot::Sender<OutputStream<Value>>>,
Vec<oneshot::Receiver<OutputStream<Value>>>,
Vec<oneshot::Sender<OutputStream<V>>>,
Vec<oneshot::Receiver<OutputStream<V>>>,
) = var_names.iter().map(|_| oneshot::channel()).unzip();
let (output_sender, output_receiver) = mpsc::channel(10);
Self {
Expand All @@ -172,7 +169,7 @@ impl AsyncManualOutputHandler {
}
}

pub fn get_output(&mut self) -> OutputStream<(VarName, Value)> {
pub fn get_output(&mut self) -> OutputStream<(VarName, V)> {
Box::pin(ReceiverStream::new(
self.output_receiver
.take()
Expand Down

0 comments on commit 885a510

Please sign in to comment.