From 0159da4aee2167b826a5fc32d13ce880f1820578 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Wed, 5 Jun 2024 15:15:14 +0200 Subject: [PATCH 1/4] Add a new event type to notify nodes about dropped inputs This gives nodes a way to check whether some inputs have been dropped by dora. The event also includes a drop reason, which is currently only the queue size. --- apis/rust/node/src/event_stream/event.rs | 5 +++++ apis/rust/node/src/event_stream/mod.rs | 3 +++ binaries/daemon/src/node_communication/mod.rs | 20 +++++++++++++++---- libraries/core/src/daemon_messages.rs | 10 ++++++++++ 4 files changed, 34 insertions(+), 4 deletions(-) diff --git a/apis/rust/node/src/event_stream/event.rs b/apis/rust/node/src/event_stream/event.rs index 75b3c595b..426b344c8 100644 --- a/apis/rust/node/src/event_stream/event.rs +++ b/apis/rust/node/src/event_stream/event.rs @@ -2,6 +2,7 @@ use std::{ptr::NonNull, sync::Arc}; use aligned_vec::{AVec, ConstAlign}; use dora_arrow_convert::{ArrowData, IntoArrow}; +pub use dora_core::daemon_messages::InputDropReason; use dora_core::{ config::{DataId, OperatorId}, message::{ArrowTypeInfo, BufferOffset, Metadata}, @@ -25,6 +26,10 @@ pub enum Event { id: DataId, }, Error(String), + DroppedInputs { + reason: InputDropReason, + number: usize, + }, } pub enum RawData { diff --git a/apis/rust/node/src/event_stream/mod.rs b/apis/rust/node/src/event_stream/mod.rs index 9575a8d7b..d3985fca4 100644 --- a/apis/rust/node/src/event_stream/mod.rs +++ b/apis/rust/node/src/event_stream/mod.rs @@ -175,6 +175,9 @@ impl EventStream { tracing::error!("{err:?}"); Event::Error(err.wrap_err("internal error").to_string()) } + NodeEvent::DroppedInputs { reason, number } => { + Event::DroppedInputs { reason, number } + } }, EventItem::FatalError(err) => { diff --git a/binaries/daemon/src/node_communication/mod.rs b/binaries/daemon/src/node_communication/mod.rs index 8fd200de7..76f5cc045 100644 --- a/binaries/daemon/src/node_communication/mod.rs +++ b/binaries/daemon/src/node_communication/mod.rs @@ -2,8 +2,8 @@ use crate::{DaemonNodeEvent, Event}; use dora_core::{ config::{DataId, LocalCommunicationConfig, NodeId}, daemon_messages::{ - DaemonCommunication, DaemonReply, DaemonRequest, DataflowId, NodeDropEvent, NodeEvent, - Timestamped, + DaemonCommunication, DaemonReply, DaemonRequest, DataflowId, InputDropReason, + NodeDropEvent, NodeEvent, Timestamped, }, message::uhlc, }; @@ -298,7 +298,8 @@ impl Listener { let mut drop_tokens = Vec::new(); // iterate over queued events, newest first - for event in self.queue.iter_mut().rev() { + let mut last_dropped = None; + for (i, event) in self.queue.iter_mut().enumerate().rev() { let Some(Timestamped { inner: NodeEvent::Input { id, data, .. }, .. @@ -313,6 +314,7 @@ impl Listener { drop_tokens.push(drop_token); } *event.as_mut() = None; + last_dropped = Some(i); } Some(size_remaining) => { *size_remaining = size_remaining.saturating_sub(1); @@ -324,8 +326,18 @@ impl Listener { } self.report_drop_tokens(drop_tokens).await?; - if dropped > 0 { + if let Some(last_dropped) = last_dropped { tracing::debug!("dropped {dropped} inputs because event queue was too full"); + // replace last dropped event with `DroppedInputs` event + let entry = &mut self.queue[last_dropped]; + assert!(entry.is_none()); + *entry = Box::new(Some(Timestamped { + inner: NodeEvent::DroppedInputs { + reason: InputDropReason::QueueSize, + number: dropped, + }, + timestamp: self.clock.new_timestamp(), + })); } Ok(()) } diff --git a/libraries/core/src/daemon_messages.rs b/libraries/core/src/daemon_messages.rs index 91c634cca..647750cd8 100644 --- a/libraries/core/src/daemon_messages.rs +++ b/libraries/core/src/daemon_messages.rs @@ -160,6 +160,16 @@ pub enum NodeEvent { id: DataId, }, AllInputsClosed, + DroppedInputs { + reason: InputDropReason, + number: usize, + }, +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +#[non_exhaustive] +pub enum InputDropReason { + QueueSize, } #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] From 7ea74d049dbe5e5db03edeb1821487dec5690478 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Thu, 6 Jun 2024 15:36:13 +0200 Subject: [PATCH 2/4] Report dropped inputs in input event MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Instead of adding a new ˋDroppedInputsˋ event. --- apis/c++/node/src/lib.rs | 1 + apis/rust/node/src/event_stream/event.rs | 9 ++-- apis/rust/node/src/event_stream/mod.rs | 11 +++-- binaries/daemon/src/lib.rs | 3 ++ binaries/daemon/src/node_communication/mod.rs | 42 +++++++++---------- binaries/runtime/src/lib.rs | 8 +++- binaries/runtime/src/operator/shared_lib.rs | 1 + examples/benchmark/sink/src/main.rs | 7 +++- examples/multiple-daemons/node/src/main.rs | 1 + examples/multiple-daemons/sink/src/main.rs | 1 + examples/rust-dataflow/node/src/main.rs | 1 + libraries/core/src/daemon_messages.rs | 14 ++----- tool_nodes/dora-record/src/main.rs | 7 +++- 13 files changed, 61 insertions(+), 45 deletions(-) diff --git a/apis/c++/node/src/lib.rs b/apis/c++/node/src/lib.rs index e3476de87..b6a9ea17a 100644 --- a/apis/c++/node/src/lib.rs +++ b/apis/c++/node/src/lib.rs @@ -142,6 +142,7 @@ fn event_as_input(event: Box) -> eyre::Result { id, metadata: _, data, + dropped, }) = event.0 else { bail!("not an input event"); diff --git a/apis/rust/node/src/event_stream/event.rs b/apis/rust/node/src/event_stream/event.rs index 426b344c8..cec736fc9 100644 --- a/apis/rust/node/src/event_stream/event.rs +++ b/apis/rust/node/src/event_stream/event.rs @@ -2,7 +2,6 @@ use std::{ptr::NonNull, sync::Arc}; use aligned_vec::{AVec, ConstAlign}; use dora_arrow_convert::{ArrowData, IntoArrow}; -pub use dora_core::daemon_messages::InputDropReason; use dora_core::{ config::{DataId, OperatorId}, message::{ArrowTypeInfo, BufferOffset, Metadata}, @@ -21,15 +20,15 @@ pub enum Event { id: DataId, metadata: Metadata, data: ArrowData, + /// Number of dropped inputs of this ID. + /// + /// Specifies the number of inputs of this ID that were dropped _before_ this input. + dropped: usize, }, InputClosed { id: DataId, }, Error(String), - DroppedInputs { - reason: InputDropReason, - number: usize, - }, } pub enum RawData { diff --git a/apis/rust/node/src/event_stream/mod.rs b/apis/rust/node/src/event_stream/mod.rs index d3985fca4..af114febd 100644 --- a/apis/rust/node/src/event_stream/mod.rs +++ b/apis/rust/node/src/event_stream/mod.rs @@ -136,7 +136,12 @@ impl EventStream { NodeEvent::Stop => Event::Stop, NodeEvent::Reload { operator_id } => Event::Reload { operator_id }, NodeEvent::InputClosed { id } => Event::InputClosed { id }, - NodeEvent::Input { id, metadata, data } => { + NodeEvent::Input { + id, + metadata, + data, + dropped, + } => { let data = match data { None => Ok(None), Some(daemon_messages::DataMessage::Vec(v)) => Ok(Some(RawData::Vec(v))), @@ -164,6 +169,7 @@ impl EventStream { id, metadata, data: data.into(), + dropped, }, Err(err) => Event::Error(format!("{err:?}")), } @@ -175,9 +181,6 @@ impl EventStream { tracing::error!("{err:?}"); Event::Error(err.wrap_err("internal error").to_string()) } - NodeEvent::DroppedInputs { reason, number } => { - Event::DroppedInputs { reason, number } - } }, EventItem::FatalError(err) => { diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 8b85ee8e6..d7f71ac06 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -984,6 +984,7 @@ impl Daemon { id: input_id.clone(), metadata: metadata.clone(), data: None, + dropped: 0, }, &self.clock, ); @@ -1031,6 +1032,7 @@ impl Daemon { id: input_id.clone(), metadata: metadata.clone(), data: Some(message.clone()), + dropped: 0, }, &self.clock, ); @@ -1162,6 +1164,7 @@ async fn send_output_to_local_receivers( id: input_id.clone(), metadata: metadata.clone(), data: data.clone(), + dropped: 0, }; match channel.send(Timestamped { inner: item, diff --git a/binaries/daemon/src/node_communication/mod.rs b/binaries/daemon/src/node_communication/mod.rs index 76f5cc045..dcbad3859 100644 --- a/binaries/daemon/src/node_communication/mod.rs +++ b/binaries/daemon/src/node_communication/mod.rs @@ -2,8 +2,8 @@ use crate::{DaemonNodeEvent, Event}; use dora_core::{ config::{DataId, LocalCommunicationConfig, NodeId}, daemon_messages::{ - DaemonCommunication, DaemonReply, DaemonRequest, DataflowId, InputDropReason, - NodeDropEvent, NodeEvent, Timestamped, + DaemonCommunication, DaemonReply, DaemonRequest, DataflowId, NodeDropEvent, NodeEvent, + Timestamped, }, message::uhlc, }; @@ -11,7 +11,7 @@ use eyre::{eyre, Context}; use futures::{future, task, Future}; use shared_memory_server::{ShmemConf, ShmemServer}; use std::{ - collections::{BTreeMap, VecDeque}, + collections::{BTreeMap, HashMap, VecDeque}, mem, net::Ipv4Addr, sync::Arc, @@ -151,6 +151,7 @@ struct Listener { queue: VecDeque>>>, queue_sizes: BTreeMap, clock: Arc, + dropped_inputs: HashMap, } impl Listener { @@ -211,6 +212,7 @@ impl Listener { queue_sizes, queue: VecDeque::new(), clock: hlc.clone(), + dropped_inputs: HashMap::new(), }; match listener .run_inner(connection) @@ -281,7 +283,10 @@ impl Listener { async fn handle_events(&mut self) -> eyre::Result<()> { if let Some(events) = &mut self.subscribed_events { - while let Ok(event) = events.try_recv() { + while let Ok(mut event) = events.try_recv() { + if let NodeEvent::Input { id, dropped, .. } = &mut event.inner { + *dropped += self.dropped_inputs.remove(id).unwrap_or_default(); + } self.queue.push_back(Box::new(Some(event))); } @@ -294,14 +299,15 @@ impl Listener { #[tracing::instrument(skip(self), fields(%self.node_id), level = "trace")] async fn drop_oldest_inputs(&mut self) -> Result<(), eyre::ErrReport> { let mut queue_size_remaining = self.queue_sizes.clone(); - let mut dropped = 0; let mut drop_tokens = Vec::new(); // iterate over queued events, newest first - let mut last_dropped = None; - for (i, event) in self.queue.iter_mut().enumerate().rev() { + for event in self.queue.iter_mut().rev() { let Some(Timestamped { - inner: NodeEvent::Input { id, data, .. }, + inner: + NodeEvent::Input { + id, data, dropped, .. + }, .. }) = event.as_mut() else { @@ -309,12 +315,15 @@ impl Listener { }; match queue_size_remaining.get_mut(id) { Some(0) => { - dropped += 1; + self.dropped_inputs + .entry(id.clone()) + .or_default() + .saturating_add(*dropped + 1); + if let Some(drop_token) = data.as_ref().and_then(|d| d.drop_token()) { drop_tokens.push(drop_token); } *event.as_mut() = None; - last_dropped = Some(i); } Some(size_remaining) => { *size_remaining = size_remaining.saturating_sub(1); @@ -326,19 +335,6 @@ impl Listener { } self.report_drop_tokens(drop_tokens).await?; - if let Some(last_dropped) = last_dropped { - tracing::debug!("dropped {dropped} inputs because event queue was too full"); - // replace last dropped event with `DroppedInputs` event - let entry = &mut self.queue[last_dropped]; - assert!(entry.is_none()); - *entry = Box::new(Some(Timestamped { - inner: NodeEvent::DroppedInputs { - reason: InputDropReason::QueueSize, - number: dropped, - }, - timestamp: self.clock.new_timestamp(), - })); - } Ok(()) } diff --git a/binaries/runtime/src/lib.rs b/binaries/runtime/src/lib.rs index 308d59f17..bf5934f94 100644 --- a/binaries/runtime/src/lib.rs +++ b/binaries/runtime/src/lib.rs @@ -248,7 +248,12 @@ async fn run( RuntimeEvent::Event(Event::Reload { operator_id: None }) => { tracing::warn!("Reloading runtime nodes is not supported"); } - RuntimeEvent::Event(Event::Input { id, metadata, data }) => { + RuntimeEvent::Event(Event::Input { + id, + metadata, + data, + dropped, + }) => { let Some((operator_id, input_id)) = id.as_str().split_once('/') else { tracing::warn!("received non-operator input {id}"); continue; @@ -265,6 +270,7 @@ async fn run( id: input_id.clone(), metadata, data, + dropped, }) .await .wrap_err_with(|| { diff --git a/binaries/runtime/src/operator/shared_lib.rs b/binaries/runtime/src/operator/shared_lib.rs index 811c3cd07..3c4ca5605 100644 --- a/binaries/runtime/src/operator/shared_lib.rs +++ b/binaries/runtime/src/operator/shared_lib.rs @@ -191,6 +191,7 @@ impl<'lib> SharedLibraryOperator<'lib> { id: input_id, metadata, data, + dropped, } => { let (data_array, schema) = arrow::ffi::to_ffi(&data.to_data())?; diff --git a/examples/benchmark/sink/src/main.rs b/examples/benchmark/sink/src/main.rs index 154b47d24..34aa84b7a 100644 --- a/examples/benchmark/sink/src/main.rs +++ b/examples/benchmark/sink/src/main.rs @@ -20,7 +20,12 @@ fn main() -> eyre::Result<()> { while let Some(event) = events.recv() { match event { - Event::Input { id, metadata, data } => { + Event::Input { + id, + metadata, + data, + dropped, + } => { // check if new size bracket let data_len = data.len(); if data_len != current_size { diff --git a/examples/multiple-daemons/node/src/main.rs b/examples/multiple-daemons/node/src/main.rs index 36f42d578..f4664708f 100644 --- a/examples/multiple-daemons/node/src/main.rs +++ b/examples/multiple-daemons/node/src/main.rs @@ -18,6 +18,7 @@ fn main() -> eyre::Result<()> { id, metadata, data: _, + dropped, } => match id.as_str() { "tick" => { let random: u64 = rand::random(); diff --git a/examples/multiple-daemons/sink/src/main.rs b/examples/multiple-daemons/sink/src/main.rs index e180af081..4f4351c5d 100644 --- a/examples/multiple-daemons/sink/src/main.rs +++ b/examples/multiple-daemons/sink/src/main.rs @@ -10,6 +10,7 @@ fn main() -> eyre::Result<()> { id, metadata: _, data, + dropped, } => match id.as_str() { "message" => { let received_string: &str = diff --git a/examples/rust-dataflow/node/src/main.rs b/examples/rust-dataflow/node/src/main.rs index 36f42d578..f4664708f 100644 --- a/examples/rust-dataflow/node/src/main.rs +++ b/examples/rust-dataflow/node/src/main.rs @@ -18,6 +18,7 @@ fn main() -> eyre::Result<()> { id, metadata, data: _, + dropped, } => match id.as_str() { "tick" => { let random: u64 = rand::random(); diff --git a/libraries/core/src/daemon_messages.rs b/libraries/core/src/daemon_messages.rs index 647750cd8..de4272fb0 100644 --- a/libraries/core/src/daemon_messages.rs +++ b/libraries/core/src/daemon_messages.rs @@ -155,21 +155,15 @@ pub enum NodeEvent { id: DataId, metadata: Metadata, data: Option, + /// Number of dropped inputs of this ID. + /// + /// Specifies the number of inputs of this ID that were dropped _before_ this input. + dropped: usize, }, InputClosed { id: DataId, }, AllInputsClosed, - DroppedInputs { - reason: InputDropReason, - number: usize, - }, -} - -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -#[non_exhaustive] -pub enum InputDropReason { - QueueSize, } #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] diff --git a/tool_nodes/dora-record/src/main.rs b/tool_nodes/dora-record/src/main.rs index fb10bf18e..116c3339d 100644 --- a/tool_nodes/dora-record/src/main.rs +++ b/tool_nodes/dora-record/src/main.rs @@ -25,7 +25,12 @@ async fn main() -> eyre::Result<()> { while let Some(event) = events.recv() { match event { - Event::Input { id, data, metadata } => { + Event::Input { + id, + data, + metadata, + dropped, + } => { match writers.get(&id) { None => { let field_uhlc = Field::new("timestamp_uhlc", DataType::UInt64, false); From fa73ea15960e75c64e7adad317206e6077147c42 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Thu, 6 Jun 2024 15:45:26 +0200 Subject: [PATCH 3/4] Make `Event::Input` `non_exhaustive` Allows us to add more fields in the future. This is a breaking change. --- apis/c++/node/src/lib.rs | 2 +- apis/rust/node/src/event_stream/event.rs | 12 ++++++++++++ binaries/daemon/src/node_communication/mod.rs | 5 +---- binaries/runtime/src/lib.rs | 12 ++---------- binaries/runtime/src/operator/shared_lib.rs | 2 +- examples/benchmark/sink/src/main.rs | 5 +---- examples/multiple-daemons/node/src/main.rs | 2 +- examples/multiple-daemons/sink/src/main.rs | 2 +- examples/rust-dataflow/node/src/main.rs | 2 +- examples/rust-dataflow/sink/src/main.rs | 1 + examples/rust-dataflow/status-node/src/main.rs | 4 +++- tool_nodes/dora-record/src/main.rs | 5 +---- tool_nodes/dora-rerun/src/main.rs | 1 + 13 files changed, 27 insertions(+), 28 deletions(-) diff --git a/apis/c++/node/src/lib.rs b/apis/c++/node/src/lib.rs index b6a9ea17a..646d87d7f 100644 --- a/apis/c++/node/src/lib.rs +++ b/apis/c++/node/src/lib.rs @@ -142,7 +142,7 @@ fn event_as_input(event: Box) -> eyre::Result { id, metadata: _, data, - dropped, + .. }) = event.0 else { bail!("not an input event"); diff --git a/apis/rust/node/src/event_stream/event.rs b/apis/rust/node/src/event_stream/event.rs index cec736fc9..73fd8fd74 100644 --- a/apis/rust/node/src/event_stream/event.rs +++ b/apis/rust/node/src/event_stream/event.rs @@ -16,6 +16,7 @@ pub enum Event { Reload { operator_id: Option, }, + #[non_exhaustive] Input { id: DataId, metadata: Metadata, @@ -31,6 +32,17 @@ pub enum Event { Error(String), } +impl Event { + pub fn new_input(id: DataId, metadata: Metadata, data: ArrowData) -> Event { + Event::Input { + id, + metadata, + data, + dropped: 0, + } + } +} + pub enum RawData { Empty, Vec(AVec>), diff --git a/binaries/daemon/src/node_communication/mod.rs b/binaries/daemon/src/node_communication/mod.rs index dcbad3859..26321f2b7 100644 --- a/binaries/daemon/src/node_communication/mod.rs +++ b/binaries/daemon/src/node_communication/mod.rs @@ -315,10 +315,7 @@ impl Listener { }; match queue_size_remaining.get_mut(id) { Some(0) => { - self.dropped_inputs - .entry(id.clone()) - .or_default() - .saturating_add(*dropped + 1); + *self.dropped_inputs.entry(id.clone()).or_default() += *dropped + 1; if let Some(drop_token) = data.as_ref().and_then(|d| d.drop_token()) { drop_tokens.push(drop_token); diff --git a/binaries/runtime/src/lib.rs b/binaries/runtime/src/lib.rs index bf5934f94..8aaca29f7 100644 --- a/binaries/runtime/src/lib.rs +++ b/binaries/runtime/src/lib.rs @@ -249,10 +249,7 @@ async fn run( tracing::warn!("Reloading runtime nodes is not supported"); } RuntimeEvent::Event(Event::Input { - id, - metadata, - data, - dropped, + id, metadata, data, .. }) => { let Some((operator_id, input_id)) = id.as_str().split_once('/') else { tracing::warn!("received non-operator input {id}"); @@ -266,12 +263,7 @@ async fn run( }; if let Err(err) = operator_channel - .send_async(Event::Input { - id: input_id.clone(), - metadata, - data, - dropped, - }) + .send_async(Event::new_input(input_id.clone(), metadata, data)) .await .wrap_err_with(|| { format!("failed to send input `{input_id}` to operator `{operator_id}`") diff --git a/binaries/runtime/src/operator/shared_lib.rs b/binaries/runtime/src/operator/shared_lib.rs index 3c4ca5605..04f1b657f 100644 --- a/binaries/runtime/src/operator/shared_lib.rs +++ b/binaries/runtime/src/operator/shared_lib.rs @@ -191,7 +191,7 @@ impl<'lib> SharedLibraryOperator<'lib> { id: input_id, metadata, data, - dropped, + .. } => { let (data_array, schema) = arrow::ffi::to_ffi(&data.to_data())?; diff --git a/examples/benchmark/sink/src/main.rs b/examples/benchmark/sink/src/main.rs index 34aa84b7a..4ff54d875 100644 --- a/examples/benchmark/sink/src/main.rs +++ b/examples/benchmark/sink/src/main.rs @@ -21,10 +21,7 @@ fn main() -> eyre::Result<()> { while let Some(event) = events.recv() { match event { Event::Input { - id, - metadata, - data, - dropped, + id, metadata, data, .. } => { // check if new size bracket let data_len = data.len(); diff --git a/examples/multiple-daemons/node/src/main.rs b/examples/multiple-daemons/node/src/main.rs index f4664708f..c7f0006ba 100644 --- a/examples/multiple-daemons/node/src/main.rs +++ b/examples/multiple-daemons/node/src/main.rs @@ -18,7 +18,7 @@ fn main() -> eyre::Result<()> { id, metadata, data: _, - dropped, + .. } => match id.as_str() { "tick" => { let random: u64 = rand::random(); diff --git a/examples/multiple-daemons/sink/src/main.rs b/examples/multiple-daemons/sink/src/main.rs index 4f4351c5d..715d0dc5e 100644 --- a/examples/multiple-daemons/sink/src/main.rs +++ b/examples/multiple-daemons/sink/src/main.rs @@ -10,7 +10,7 @@ fn main() -> eyre::Result<()> { id, metadata: _, data, - dropped, + .. } => match id.as_str() { "message" => { let received_string: &str = diff --git a/examples/rust-dataflow/node/src/main.rs b/examples/rust-dataflow/node/src/main.rs index f4664708f..c7f0006ba 100644 --- a/examples/rust-dataflow/node/src/main.rs +++ b/examples/rust-dataflow/node/src/main.rs @@ -18,7 +18,7 @@ fn main() -> eyre::Result<()> { id, metadata, data: _, - dropped, + .. } => match id.as_str() { "tick" => { let random: u64 = rand::random(); diff --git a/examples/rust-dataflow/sink/src/main.rs b/examples/rust-dataflow/sink/src/main.rs index e180af081..715d0dc5e 100644 --- a/examples/rust-dataflow/sink/src/main.rs +++ b/examples/rust-dataflow/sink/src/main.rs @@ -10,6 +10,7 @@ fn main() -> eyre::Result<()> { id, metadata: _, data, + .. } => match id.as_str() { "message" => { let received_string: &str = diff --git a/examples/rust-dataflow/status-node/src/main.rs b/examples/rust-dataflow/status-node/src/main.rs index 09de8184c..aec483471 100644 --- a/examples/rust-dataflow/status-node/src/main.rs +++ b/examples/rust-dataflow/status-node/src/main.rs @@ -10,7 +10,9 @@ fn main() -> eyre::Result<()> { let mut ticks = 0; while let Some(event) = events.recv() { match event { - Event::Input { id, metadata, data } => match id.as_ref() { + Event::Input { + id, metadata, data, .. + } => match id.as_ref() { "tick" => { ticks += 1; } diff --git a/tool_nodes/dora-record/src/main.rs b/tool_nodes/dora-record/src/main.rs index 116c3339d..f9c0c3236 100644 --- a/tool_nodes/dora-record/src/main.rs +++ b/tool_nodes/dora-record/src/main.rs @@ -26,10 +26,7 @@ async fn main() -> eyre::Result<()> { while let Some(event) = events.recv() { match event { Event::Input { - id, - data, - metadata, - dropped, + id, data, metadata, .. } => { match writers.get(&id) { None => { diff --git a/tool_nodes/dora-rerun/src/main.rs b/tool_nodes/dora-rerun/src/main.rs index 3bf8c2311..32531a8eb 100644 --- a/tool_nodes/dora-rerun/src/main.rs +++ b/tool_nodes/dora-rerun/src/main.rs @@ -43,6 +43,7 @@ fn main() -> Result<()> { id, data, metadata: _, + .. } = event { if id.as_str().contains("image") { From d2dacc28326e59c7522266c3252f42b566b3a455 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Thu, 6 Jun 2024 17:04:08 +0200 Subject: [PATCH 4/4] Adjust template for non-exhaustive input event --- binaries/cli/src/template/rust/node/main-template.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/binaries/cli/src/template/rust/node/main-template.rs b/binaries/cli/src/template/rust/node/main-template.rs index 659f706c2..049692547 100644 --- a/binaries/cli/src/template/rust/node/main-template.rs +++ b/binaries/cli/src/template/rust/node/main-template.rs @@ -10,6 +10,7 @@ fn main() -> Result<(), Box> { id, metadata, data: _, + .. } => match id.as_str() { other => eprintln!("Received input `{other}`"), },