From 57ab87b5a56d39e5143bb9c790652183481d8cb4 Mon Sep 17 00:00:00 2001 From: sander2 Date: Fri, 12 Nov 2021 14:02:43 +0100 Subject: [PATCH] fix: keep processing a block's events after encountering a dispatch error (#310) * fix: keep processing a block's events after encountering a dispatch error * test: unit test for subscription --- src/events.rs | 1 + src/subscription.rs | 236 +++++++++++++++++++++++++++++++++++++++----- 2 files changed, 215 insertions(+), 22 deletions(-) diff --git a/src/events.rs b/src/events.rs index 634ff10e5a..52cf7d3b6a 100644 --- a/src/events.rs +++ b/src/events.rs @@ -42,6 +42,7 @@ use sp_core::Bytes; /// Raw bytes for an Event #[derive(Debug)] +#[cfg_attr(test, derive(PartialEq, Clone))] pub struct RawEvent { /// The name of the pallet from whence the Event originated. pub pallet: String, diff --git a/src/subscription.rs b/src/subscription.rs index 31ad50c3e8..819fe39b22 100644 --- a/src/subscription.rs +++ b/src/subscription.rs @@ -44,15 +44,49 @@ use crate::{ /// Event subscription simplifies filtering a storage change set stream for /// events of interest. pub struct EventSubscription<'a, T: Config> { - subscription: EventStorageSubscription, - decoder: &'a EventsDecoder, + block_reader: BlockReader<'a, T>, block: Option, extrinsic: Option, event: Option<(&'static str, &'static str)>, - events: VecDeque, + events: VecDeque, finished: bool, } +enum BlockReader<'a, T: Config> { + Decoder { + subscription: EventStorageSubscription, + decoder: &'a EventsDecoder, + }, + /// Mock event listener for unit tests + #[cfg(test)] + Mock(Box, Error>)>>), +} + +impl<'a, T: Config> BlockReader<'a, T> { + async fn next(&mut self) -> Option<(T::Hash, Result, Error>)> { + match self { + BlockReader::Decoder { + subscription, + decoder, + } => { + let change_set = subscription.next().await?; + let events: Result, _> = change_set + .changes + .into_iter() + .filter_map(|(_key, change)| { + Some(decoder.decode_events(&mut change?.0.as_slice())) + }) + .collect(); + + let flattened_events = events.map(|x| x.into_iter().flatten().collect()); + Some((change_set.block, flattened_events)) + } + #[cfg(test)] + BlockReader::Mock(it) => it.next(), + } + } +} + impl<'a, T: Config> EventSubscription<'a, T> { /// Creates a new event subscription. pub fn new( @@ -60,8 +94,10 @@ impl<'a, T: Config> EventSubscription<'a, T> { decoder: &'a EventsDecoder, ) -> Self { Self { - subscription, - decoder, + block_reader: BlockReader::Decoder { + subscription, + decoder, + }, block: None, extrinsic: None, event: None, @@ -89,27 +125,28 @@ impl<'a, T: Config> EventSubscription<'a, T> { /// Gets the next event. pub async fn next(&mut self) -> Option> { loop { - if let Some(event) = self.events.pop_front() { - return Some(Ok(event)) + if let Some(raw_event) = self.events.pop_front() { + match raw_event { + Raw::Event(event) => return Some(Ok(event)), + Raw::Error(err) => return Some(Err(err.into())), + }; } if self.finished { return None } // always return None if subscription has closed - let change_set = self.subscription.next().await?; + let (received_hash, events) = self.block_reader.next().await?; if let Some(hash) = self.block.as_ref() { - if &change_set.block == hash { + if &received_hash == hash { self.finished = true; } else { continue } } - for (_key, data) in change_set.changes { - if let Some(data) = data { - let raw_events = match self.decoder.decode_events(&mut &data.0[..]) { - Ok(events) => events, - Err(error) => return Some(Err(error)), - }; + + match events { + Err(err) => return Some(Err(err)), + Ok(raw_events) => { for (phase, raw) in raw_events { if let Phase::ApplyExtrinsic(i) = phase { if let Some(ext_index) = self.extrinsic { @@ -117,16 +154,15 @@ impl<'a, T: Config> EventSubscription<'a, T> { continue } } - let event = match raw { - Raw::Event(event) => event, - Raw::Error(err) => return Some(Err(err.into())), - }; if let Some((module, variant)) = self.event { - if event.pallet != module || event.variant != variant { - continue + if let Raw::Event(ref event) = raw { + if event.pallet != module || event.variant != variant + { + continue + } } } - self.events.push_back(event); + self.events.push_back(raw); } } } @@ -227,3 +263,159 @@ where } } } + +#[cfg(test)] +mod tests { + use crate::RuntimeError; + + use super::*; + use sp_core::H256; + #[derive(Clone)] + struct MockConfig; + + impl Config for MockConfig { + type Index = u32; + type BlockNumber = u32; + type Hash = sp_core::H256; + type Hashing = sp_runtime::traits::BlakeTwo256; + type AccountId = sp_runtime::AccountId32; + type Address = sp_runtime::MultiAddress; + type Header = sp_runtime::generic::Header< + Self::BlockNumber, + sp_runtime::traits::BlakeTwo256, + >; + type Signature = sp_runtime::MultiSignature; + type Extrinsic = sp_runtime::OpaqueExtrinsic; + } + + fn named_event(event_name: &str) -> RawEvent { + RawEvent { + data: sp_core::Bytes::from(Vec::new()), + pallet: event_name.to_string(), + variant: event_name.to_string(), + pallet_index: 0, + variant_index: 0, + } + } + + fn raw_event(id: u8) -> RawEvent { + RawEvent { + data: sp_core::Bytes::from(Vec::new()), + pallet: "SomePallet".to_string(), + variant: "SomeVariant".to_string(), + pallet_index: id, + variant_index: id, + } + } + + fn event(id: u8) -> Raw { + Raw::Event(raw_event(id)) + } + + #[async_std::test] + async fn test_error_does_not_stop_subscription() { + let mut subscription: EventSubscription = EventSubscription { + block_reader: BlockReader::Mock(Box::new( + vec![( + H256::from([0; 32]), + Ok(vec![ + ( + Phase::ApplyExtrinsic(0), + Raw::Error(RuntimeError::BadOrigin), + ), + (Phase::ApplyExtrinsic(0), event(1)), + ]), + )] + .into_iter(), + )), + block: None, + extrinsic: None, + event: None, + events: Default::default(), + finished: false, + }; + + assert!(matches!( + subscription.next().await.unwrap().unwrap_err(), + Error::Runtime(RuntimeError::BadOrigin) + )); + assert_eq!(subscription.next().await.unwrap().unwrap(), raw_event(1)); + assert!(subscription.next().await.is_none()); + } + + #[async_std::test] + /// test that filters work correctly, and are independent of each other + async fn test_filters() { + let mut events = vec![]; + // create all events + for block_hash in [H256::from([0; 32]), H256::from([1; 32])] { + for phase in [Phase::ApplyExtrinsic(0), Phase::ApplyExtrinsic(1)] { + for event in [named_event("a"), named_event("b")] { + events.push((block_hash, phase.clone(), event)) + } + } + } + // set variant index so we can uniquely identify the event + events.iter_mut().enumerate().for_each(|(idx, event)| { + event.2.variant_index = idx as u8; + }); + + for block_filter in [None, Some(H256::from([1; 32]))] { + for extrinsic_filter in [None, Some(1)] { + for event_filter in [None, Some(("b", "b"))] { + let mut subscription: EventSubscription = + EventSubscription { + block_reader: BlockReader::Mock(Box::new( + vec![ + ( + events[0].0, + Ok(events + .iter() + .take(4) + .map(|(_, phase, event)| { + (phase.clone(), Raw::Event(event.clone())) + }) + .collect()), + ), + ( + events[4].0, + Ok(events + .iter() + .skip(4) + .take(4) + .map(|(_, phase, event)| { + (phase.clone(), Raw::Event(event.clone())) + }) + .collect()), + ), + ] + .into_iter(), + )), + block: block_filter.clone(), + extrinsic: extrinsic_filter.clone(), + event: event_filter.clone(), + events: Default::default(), + finished: false, + }; + let mut expected_events = events.clone(); + if let Some(hash) = block_filter { + expected_events.retain(|(h, _, _)| h == &hash); + } + if let Some(idx) = extrinsic_filter { + expected_events.retain(|(_, phase, _)| matches!(phase, Phase::ApplyExtrinsic(i) if *i as usize == idx)); + } + if let Some(name) = event_filter { + expected_events.retain(|(_, _, event)| event.pallet == name.0); + } + for expected_event in expected_events { + assert_eq!( + subscription.next().await.unwrap().unwrap(), + expected_event.2 + ); + } + assert!(subscription.next().await.is_none()); + } + } + } + } +}