Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework Buffer block #36

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions lib/protoflow-blocks/doc/core/buffer.mmd
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
block-beta
columns 4
Source space:2 Buffer
columns 7
Source space:2 Count space:2 Sink
space:7
space:7
space:3 Pulse space:3
Source-- "input" -->Buffer
Pulse-- "trigger" -->Buffer
Buffer-- "output" -->Sink

classDef block height:48px,padding:8px;
classDef hidden visibility:none;
class Buffer block
class Source hidden
class Sink hidden
class Pulse hidden
27 changes: 22 additions & 5 deletions lib/protoflow-blocks/doc/core/buffer.seq.mmd
Original file line number Diff line number Diff line change
@@ -1,15 +1,32 @@
sequenceDiagram
autonumber
participant BlockA as Another block
participant BlockA as Another block (input source)
participant Buffer.input as Buffer.input port
participant Buffer as Buffer block
participant Buffer.output as Buffer.output port
participant BlockB as Another block (downstream sink)
participant Buffer.trigger as Buffer.trigger port
participant BlockC as Another block (trigger source)

BlockA-->>Buffer: Connect
BlockA-->>Buffer: Connect (input)
BlockC-->>Buffer: Connect (trigger)
Buffer-->>BlockB: Connect (output)

loop Buffer process
loop Storing messages
BlockA->>Buffer: Message
Buffer->>Buffer: Store message
Buffer->>Buffer: Store message internally
end

BlockA-->>Buffer: Disconnect
BlockC->>Buffer: Trigger
loop Releasing messages
Buffer->>BlockB: Stored Message
end

BlockA-->>Buffer: Disconnect (input)
Buffer-->>Buffer.input: Close

BlockC-->>Buffer: Disconnect (trigger)
Buffer-->>Buffer.trigger: Close

Buffer-->>BlockB: Disconnect (output)
Buffer-->>Buffer.output: Close
10 changes: 8 additions & 2 deletions lib/protoflow-blocks/src/blocks/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ pub mod core {
use protoflow_core::{Block, Message};

pub trait CoreBlocks {
fn buffer<T: Message + Into<T> + 'static>(&mut self) -> Buffer<T>;
fn buffer<Input: Message + Into<Input> + 'static, Trigger: Message + 'static>(
&mut self,
) -> Buffer<Input, Trigger>;

fn const_bytes<T: Into<Bytes>>(&mut self, value: T) -> Const<Bytes>;

Expand Down Expand Up @@ -119,7 +121,11 @@ pub mod core {
use super::SystemBuilding;
use CoreBlockConfig::*;
match self {
Buffer { .. } => Box::new(super::Buffer::new(system.input_any())), // TODO: Buffer::with_system(system)
Buffer { .. } => Box::new(super::Buffer::<_, ()>::new(
system.input_any(),
system.input(),
system.output_any(),
)), // TODO: Buffer::with_system(system)
Const { value, .. } => Box::new(super::Const::with_system(system, value.clone())),
Count { .. } => Box::new(super::Count::new(
system.input_any(),
Expand Down
71 changes: 52 additions & 19 deletions lib/protoflow-blocks/src/blocks/core/buffer.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
// This is free and unencumbered software released into the public domain.

use crate::{prelude::VecDeque, StdioConfig, StdioError, StdioSystem, System};
use protoflow_core::{types::Any, Block, BlockResult, BlockRuntime, InputPort, Message};
use crate::{prelude::Vec, StdioConfig, StdioError, StdioSystem, System};
use protoflow_core::{
types::Any, Block, BlockResult, BlockRuntime, InputPort, Message, OutputPort,
};
use protoflow_derive::Block;
use simple_mermaid::mermaid;

/// A block that simply stores all messages it receives.
/// A block that stores all messages it receives,
/// and sends them downstream when triggered.
///
/// When triggered, the block will send all messages it received so far,
/// _WITHOUT_ clearing the internal buffer.
///
/// # Block Diagram
#[doc = mermaid!("../../../doc/core/buffer.mmd")]
Expand All @@ -22,8 +28,14 @@ use simple_mermaid::mermaid;
/// # fn main() {
/// System::build(|s| {
/// let stdin = s.read_stdin();
/// let hello = s.const_string("Hello, World!");
/// let encode = s.encode_lines();
/// let buffer = s.buffer();
/// s.connect(&stdin.output, &buffer.input);
/// let stdout = s.write_stdout();
/// s.connect(&hello.output, &encode.input);
/// s.connect(&encode.output, &buffer.input);
/// s.connect(&stdin.output, &buffer.trigger);
/// s.connect(&buffer.output, &stdout.input);
/// });
/// # }
/// ```
Expand All @@ -35,55 +47,76 @@ use simple_mermaid::mermaid;
/// ```
///
#[derive(Block, Clone)]
pub struct Buffer<T: Message = Any> {
pub struct Buffer<Input: Message = Any, Trigger: Message = ()> {
/// The input message stream.
#[input]
pub input: InputPort<T>,
pub input: InputPort<Input>,

/// The trigger port.
#[input]
pub trigger: InputPort<Trigger>,

/// The output message stream.
#[output]
pub output: OutputPort<Input>,

/// The internal state storing the messages received.
#[state]
messages: VecDeque<T>,
messages: Vec<Input>,
}

impl<T: Message> Buffer<T> {
pub fn new(input: InputPort<T>) -> Self {
impl<Input: Message, Trigger: Message> Buffer<Input, Trigger> {
pub fn new(
input: InputPort<Input>,
trigger: InputPort<Trigger>,
output: OutputPort<Input>,
) -> Self {
Self {
input,
messages: VecDeque::new(),
trigger,
output,
messages: Vec::new(),
}
}

pub fn messages(&self) -> &VecDeque<T> {
pub fn messages(&self) -> &Vec<Input> {
&self.messages
}
}

impl<T: Message + 'static> Buffer<T> {
impl<Input: Message + 'static, Trigger: Message + 'static> Buffer<Input, Trigger> {
pub fn with_system(system: &System) -> Self {
use crate::SystemBuilding;
Self::new(system.input())
Self::new(system.input(), system.input(), system.output())
}
}

impl<T: Message> Block for Buffer<T> {
fn execute(&mut self, _runtime: &dyn BlockRuntime) -> BlockResult {
impl<Input: Message, Trigger: Message> Block for Buffer<Input, Trigger> {
fn execute(&mut self, _: &dyn BlockRuntime) -> BlockResult {
while let Some(message) = self.input.recv()? {
self.messages.push_back(message);
self.messages.push(message);
}

while let Some(_) = self.trigger.recv()? {
for message in &self.messages {
self.output.send(message)?;
}
}

Ok(())
}
}

#[cfg(feature = "std")]
impl<T: Message> StdioSystem for Buffer<T> {
impl StdioSystem for Buffer {
fn build_system(config: StdioConfig) -> Result<System, StdioError> {
use crate::{CoreBlocks, SystemBuilding};

config.reject_any()?;

Ok(System::build(|s| {
let stdin = config.read_stdin(s);
let buffer = s.buffer();
let buffer = s.buffer::<_, ()>();
s.connect(&stdin.output, &buffer.input);
}))
}
Expand All @@ -98,7 +131,7 @@ mod tests {
fn instantiate_block() {
// Check that the block is constructible:
let _ = System::build(|s| {
let _ = s.block(Buffer::<i32>::new(s.input()));
let _ = s.block(Buffer::<i32>::new(s.input(), s.input(), s.output()));
});
}
}
2 changes: 1 addition & 1 deletion lib/protoflow-blocks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub fn build_stdio_system(
use prelude::String;
Ok(match system_name.as_ref() {
// CoreBlocks
"Buffer" => Buffer::<String>::build_system(config)?,
"Buffer" => Buffer::build_system(config)?,
"Const" => Const::<String>::build_system(config)?,
"Count" => Count::<String>::build_system(config)?,
"Delay" => Delay::<String>::build_system(config)?,
Expand Down
6 changes: 4 additions & 2 deletions lib/protoflow-blocks/src/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,10 @@ impl SystemBuilding for System {
impl AllBlocks for System {}

impl CoreBlocks for System {
fn buffer<T: Message + Into<T> + 'static>(&mut self) -> Buffer<T> {
self.0.block(Buffer::<T>::with_system(self))
fn buffer<Input: Message + Into<Input> + 'static, Trigger: Message + 'static>(
&mut self,
) -> Buffer<Input, Trigger> {
self.0.block(Buffer::<Input, Trigger>::with_system(self))
}

fn const_bytes<T: Into<Bytes>>(&mut self, value: T) -> Const<Bytes> {
Expand Down