Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a new event type to notify nodes about dropped inputs #533

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions apis/c++/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ fn event_as_input(event: Box<DoraEvent>) -> eyre::Result<ffi::DoraInput> {
id,
metadata: _,
data,
..
}) = event.0
else {
bail!("not an input event");
Expand Down
16 changes: 16 additions & 0 deletions apis/rust/node/src/event_stream/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,33 @@ pub enum Event {
Reload {
operator_id: Option<OperatorId>,
},
#[non_exhaustive]
Input {
id: DataId,
metadata: Metadata,
data: ArrowData,
/// Number of dropped inputs of this ID.
///
/// Specifies the number of inputs of this ID that were dropped _before_ this input.
dropped: usize,
},
InputClosed {
id: DataId,
},
Error(String),
}

impl Event {
pub fn new_input(id: DataId, metadata: Metadata, data: ArrowData) -> Event {
Event::Input {
id,
metadata,
data,
dropped: 0,
}
}
}

pub enum RawData {
Empty,
Vec(AVec<u8, ConstAlign<128>>),
Expand Down
8 changes: 7 additions & 1 deletion apis/rust/node/src/event_stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,12 @@ impl EventStream {
NodeEvent::Stop => Event::Stop,
NodeEvent::Reload { operator_id } => Event::Reload { operator_id },
NodeEvent::InputClosed { id } => Event::InputClosed { id },
NodeEvent::Input { id, metadata, data } => {
NodeEvent::Input {
id,
metadata,
data,
dropped,
} => {
let data = match data {
None => Ok(None),
Some(daemon_messages::DataMessage::Vec(v)) => Ok(Some(RawData::Vec(v))),
Expand Down Expand Up @@ -164,6 +169,7 @@ impl EventStream {
id,
metadata,
data: data.into(),
dropped,
},
Err(err) => Event::Error(format!("{err:?}")),
}
Expand Down
1 change: 1 addition & 0 deletions binaries/cli/src/template/rust/node/main-template.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ fn main() -> Result<(), Box<dyn Error>> {
id,
metadata,
data: _,
..
} => match id.as_str() {
other => eprintln!("Received input `{other}`"),
},
Expand Down
3 changes: 3 additions & 0 deletions binaries/daemon/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -984,6 +984,7 @@ impl Daemon {
id: input_id.clone(),
metadata: metadata.clone(),
data: None,
dropped: 0,
},
&self.clock,
);
Expand Down Expand Up @@ -1031,6 +1032,7 @@ impl Daemon {
id: input_id.clone(),
metadata: metadata.clone(),
data: Some(message.clone()),
dropped: 0,
},
&self.clock,
);
Expand Down Expand Up @@ -1162,6 +1164,7 @@ async fn send_output_to_local_receivers(
id: input_id.clone(),
metadata: metadata.clone(),
data: data.clone(),
dropped: 0,
};
match channel.send(Timestamped {
inner: item,
Expand Down
21 changes: 13 additions & 8 deletions binaries/daemon/src/node_communication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use eyre::{eyre, Context};
use futures::{future, task, Future};
use shared_memory_server::{ShmemConf, ShmemServer};
use std::{
collections::{BTreeMap, VecDeque},
collections::{BTreeMap, HashMap, VecDeque},
mem,
net::Ipv4Addr,
sync::Arc,
Expand Down Expand Up @@ -151,6 +151,7 @@ struct Listener {
queue: VecDeque<Box<Option<Timestamped<NodeEvent>>>>,
queue_sizes: BTreeMap<DataId, usize>,
clock: Arc<uhlc::HLC>,
dropped_inputs: HashMap<DataId, usize>,
}

impl Listener {
Expand Down Expand Up @@ -211,6 +212,7 @@ impl Listener {
queue_sizes,
queue: VecDeque::new(),
clock: hlc.clone(),
dropped_inputs: HashMap::new(),
};
match listener
.run_inner(connection)
Expand Down Expand Up @@ -281,7 +283,10 @@ impl Listener {

async fn handle_events(&mut self) -> eyre::Result<()> {
if let Some(events) = &mut self.subscribed_events {
while let Ok(event) = events.try_recv() {
while let Ok(mut event) = events.try_recv() {
if let NodeEvent::Input { id, dropped, .. } = &mut event.inner {
*dropped += self.dropped_inputs.remove(id).unwrap_or_default();
}
self.queue.push_back(Box::new(Some(event)));
}

Expand All @@ -294,21 +299,24 @@ impl Listener {
#[tracing::instrument(skip(self), fields(%self.node_id), level = "trace")]
async fn drop_oldest_inputs(&mut self) -> Result<(), eyre::ErrReport> {
let mut queue_size_remaining = self.queue_sizes.clone();
let mut dropped = 0;
let mut drop_tokens = Vec::new();

// iterate over queued events, newest first
for event in self.queue.iter_mut().rev() {
let Some(Timestamped {
inner: NodeEvent::Input { id, data, .. },
inner:
NodeEvent::Input {
id, data, dropped, ..
},
..
}) = event.as_mut()
else {
continue;
};
match queue_size_remaining.get_mut(id) {
Some(0) => {
dropped += 1;
*self.dropped_inputs.entry(id.clone()).or_default() += *dropped + 1;

if let Some(drop_token) = data.as_ref().and_then(|d| d.drop_token()) {
drop_tokens.push(drop_token);
}
Expand All @@ -324,9 +332,6 @@ impl Listener {
}
self.report_drop_tokens(drop_tokens).await?;

if dropped > 0 {
tracing::debug!("dropped {dropped} inputs because event queue was too full");
}
Ok(())
}

Expand Down
10 changes: 4 additions & 6 deletions binaries/runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,9 @@ async fn run(
RuntimeEvent::Event(Event::Reload { operator_id: None }) => {
tracing::warn!("Reloading runtime nodes is not supported");
}
RuntimeEvent::Event(Event::Input { id, metadata, data }) => {
RuntimeEvent::Event(Event::Input {
id, metadata, data, ..
}) => {
let Some((operator_id, input_id)) = id.as_str().split_once('/') else {
tracing::warn!("received non-operator input {id}");
continue;
Expand All @@ -261,11 +263,7 @@ async fn run(
};

if let Err(err) = operator_channel
.send_async(Event::Input {
id: input_id.clone(),
metadata,
data,
})
.send_async(Event::new_input(input_id.clone(), metadata, data))
.await
.wrap_err_with(|| {
format!("failed to send input `{input_id}` to operator `{operator_id}`")
Expand Down
1 change: 1 addition & 0 deletions binaries/runtime/src/operator/shared_lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ impl<'lib> SharedLibraryOperator<'lib> {
id: input_id,
metadata,
data,
..
} => {
let (data_array, schema) = arrow::ffi::to_ffi(&data.to_data())?;

Expand Down
4 changes: 3 additions & 1 deletion examples/benchmark/sink/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ fn main() -> eyre::Result<()> {

while let Some(event) = events.recv() {
match event {
Event::Input { id, metadata, data } => {
Event::Input {
id, metadata, data, ..
} => {
// check if new size bracket
let data_len = data.len();
if data_len != current_size {
Expand Down
1 change: 1 addition & 0 deletions examples/multiple-daemons/node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ fn main() -> eyre::Result<()> {
id,
metadata,
data: _,
..
} => match id.as_str() {
"tick" => {
let random: u64 = rand::random();
Expand Down
1 change: 1 addition & 0 deletions examples/multiple-daemons/sink/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ fn main() -> eyre::Result<()> {
id,
metadata: _,
data,
..
} => match id.as_str() {
"message" => {
let received_string: &str =
Expand Down
1 change: 1 addition & 0 deletions examples/rust-dataflow/node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ fn main() -> eyre::Result<()> {
id,
metadata,
data: _,
..
} => match id.as_str() {
"tick" => {
let random: u64 = rand::random();
Expand Down
1 change: 1 addition & 0 deletions examples/rust-dataflow/sink/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ fn main() -> eyre::Result<()> {
id,
metadata: _,
data,
..
} => match id.as_str() {
"message" => {
let received_string: &str =
Expand Down
4 changes: 3 additions & 1 deletion examples/rust-dataflow/status-node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ fn main() -> eyre::Result<()> {
let mut ticks = 0;
while let Some(event) = events.recv() {
match event {
Event::Input { id, metadata, data } => match id.as_ref() {
Event::Input {
id, metadata, data, ..
} => match id.as_ref() {
"tick" => {
ticks += 1;
}
Expand Down
4 changes: 4 additions & 0 deletions libraries/core/src/daemon_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,10 @@ pub enum NodeEvent {
id: DataId,
metadata: Metadata,
data: Option<DataMessage>,
/// Number of dropped inputs of this ID.
///
/// Specifies the number of inputs of this ID that were dropped _before_ this input.
dropped: usize,
},
InputClosed {
id: DataId,
Expand Down
4 changes: 3 additions & 1 deletion tool_nodes/dora-record/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ async fn main() -> eyre::Result<()> {

while let Some(event) = events.recv() {
match event {
Event::Input { id, data, metadata } => {
Event::Input {
id, data, metadata, ..
} => {
match writers.get(&id) {
None => {
let field_uhlc = Field::new("timestamp_uhlc", DataType::UInt64, false);
Expand Down
1 change: 1 addition & 0 deletions tool_nodes/dora-rerun/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ fn main() -> Result<()> {
id,
data,
metadata: _,
..
} = event
{
if id.as_str().contains("image") {
Expand Down
Loading