Skip to content

Commit

Permalink
fix: keep processing a block's events after encountering a dispatch e…
Browse files Browse the repository at this point in the history
…rror (#310)

* fix: keep processing a block's events after encountering a dispatch error

* test: unit test for subscription
  • Loading branch information
sander2 authored Nov 12, 2021
1 parent f342b06 commit 57ab87b
Show file tree
Hide file tree
Showing 2 changed files with 215 additions and 22 deletions.
1 change: 1 addition & 0 deletions src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use sp_core::Bytes;

/// Raw bytes for an Event
#[derive(Debug)]
#[cfg_attr(test, derive(PartialEq, Clone))]
pub struct RawEvent {
/// The name of the pallet from whence the Event originated.
pub pallet: String,
Expand Down
236 changes: 214 additions & 22 deletions src/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,24 +44,60 @@ use crate::{
/// Event subscription simplifies filtering a storage change set stream for
/// events of interest.
pub struct EventSubscription<'a, T: Config> {
subscription: EventStorageSubscription<T>,
decoder: &'a EventsDecoder<T>,
block_reader: BlockReader<'a, T>,
block: Option<T::Hash>,
extrinsic: Option<usize>,
event: Option<(&'static str, &'static str)>,
events: VecDeque<RawEvent>,
events: VecDeque<Raw>,
finished: bool,
}

enum BlockReader<'a, T: Config> {
Decoder {
subscription: EventStorageSubscription<T>,
decoder: &'a EventsDecoder<T>,
},
/// Mock event listener for unit tests
#[cfg(test)]
Mock(Box<dyn Iterator<Item = (T::Hash, Result<Vec<(Phase, Raw)>, Error>)>>),
}

impl<'a, T: Config> BlockReader<'a, T> {
async fn next(&mut self) -> Option<(T::Hash, Result<Vec<(Phase, Raw)>, Error>)> {
match self {
BlockReader::Decoder {
subscription,
decoder,
} => {
let change_set = subscription.next().await?;
let events: Result<Vec<_>, _> = change_set
.changes
.into_iter()
.filter_map(|(_key, change)| {
Some(decoder.decode_events(&mut change?.0.as_slice()))
})
.collect();

let flattened_events = events.map(|x| x.into_iter().flatten().collect());
Some((change_set.block, flattened_events))
}
#[cfg(test)]
BlockReader::Mock(it) => it.next(),
}
}
}

impl<'a, T: Config> EventSubscription<'a, T> {
/// Creates a new event subscription.
pub fn new(
subscription: EventStorageSubscription<T>,
decoder: &'a EventsDecoder<T>,
) -> Self {
Self {
subscription,
decoder,
block_reader: BlockReader::Decoder {
subscription,
decoder,
},
block: None,
extrinsic: None,
event: None,
Expand Down Expand Up @@ -89,44 +125,44 @@ impl<'a, T: Config> EventSubscription<'a, T> {
/// Gets the next event.
pub async fn next(&mut self) -> Option<Result<RawEvent, Error>> {
loop {
if let Some(event) = self.events.pop_front() {
return Some(Ok(event))
if let Some(raw_event) = self.events.pop_front() {
match raw_event {
Raw::Event(event) => return Some(Ok(event)),
Raw::Error(err) => return Some(Err(err.into())),
};
}
if self.finished {
return None
}
// always return None if subscription has closed
let change_set = self.subscription.next().await?;
let (received_hash, events) = self.block_reader.next().await?;
if let Some(hash) = self.block.as_ref() {
if &change_set.block == hash {
if &received_hash == hash {
self.finished = true;
} else {
continue
}
}
for (_key, data) in change_set.changes {
if let Some(data) = data {
let raw_events = match self.decoder.decode_events(&mut &data.0[..]) {
Ok(events) => events,
Err(error) => return Some(Err(error)),
};

match events {
Err(err) => return Some(Err(err)),
Ok(raw_events) => {
for (phase, raw) in raw_events {
if let Phase::ApplyExtrinsic(i) = phase {
if let Some(ext_index) = self.extrinsic {
if i as usize != ext_index {
continue
}
}
let event = match raw {
Raw::Event(event) => event,
Raw::Error(err) => return Some(Err(err.into())),
};
if let Some((module, variant)) = self.event {
if event.pallet != module || event.variant != variant {
continue
if let Raw::Event(ref event) = raw {
if event.pallet != module || event.variant != variant
{
continue
}
}
}
self.events.push_back(event);
self.events.push_back(raw);
}
}
}
Expand Down Expand Up @@ -227,3 +263,159 @@ where
}
}
}

#[cfg(test)]
mod tests {
use crate::RuntimeError;

use super::*;
use sp_core::H256;
#[derive(Clone)]
struct MockConfig;

impl Config for MockConfig {
type Index = u32;
type BlockNumber = u32;
type Hash = sp_core::H256;
type Hashing = sp_runtime::traits::BlakeTwo256;
type AccountId = sp_runtime::AccountId32;
type Address = sp_runtime::MultiAddress<Self::AccountId, u32>;
type Header = sp_runtime::generic::Header<
Self::BlockNumber,
sp_runtime::traits::BlakeTwo256,
>;
type Signature = sp_runtime::MultiSignature;
type Extrinsic = sp_runtime::OpaqueExtrinsic;
}

fn named_event(event_name: &str) -> RawEvent {
RawEvent {
data: sp_core::Bytes::from(Vec::new()),
pallet: event_name.to_string(),
variant: event_name.to_string(),
pallet_index: 0,
variant_index: 0,
}
}

fn raw_event(id: u8) -> RawEvent {
RawEvent {
data: sp_core::Bytes::from(Vec::new()),
pallet: "SomePallet".to_string(),
variant: "SomeVariant".to_string(),
pallet_index: id,
variant_index: id,
}
}

fn event(id: u8) -> Raw {
Raw::Event(raw_event(id))
}

#[async_std::test]
async fn test_error_does_not_stop_subscription() {
let mut subscription: EventSubscription<MockConfig> = EventSubscription {
block_reader: BlockReader::Mock(Box::new(
vec![(
H256::from([0; 32]),
Ok(vec![
(
Phase::ApplyExtrinsic(0),
Raw::Error(RuntimeError::BadOrigin),
),
(Phase::ApplyExtrinsic(0), event(1)),
]),
)]
.into_iter(),
)),
block: None,
extrinsic: None,
event: None,
events: Default::default(),
finished: false,
};

assert!(matches!(
subscription.next().await.unwrap().unwrap_err(),
Error::Runtime(RuntimeError::BadOrigin)
));
assert_eq!(subscription.next().await.unwrap().unwrap(), raw_event(1));
assert!(subscription.next().await.is_none());
}

#[async_std::test]
/// test that filters work correctly, and are independent of each other
async fn test_filters() {
let mut events = vec![];
// create all events
for block_hash in [H256::from([0; 32]), H256::from([1; 32])] {
for phase in [Phase::ApplyExtrinsic(0), Phase::ApplyExtrinsic(1)] {
for event in [named_event("a"), named_event("b")] {
events.push((block_hash, phase.clone(), event))
}
}
}
// set variant index so we can uniquely identify the event
events.iter_mut().enumerate().for_each(|(idx, event)| {
event.2.variant_index = idx as u8;
});

for block_filter in [None, Some(H256::from([1; 32]))] {
for extrinsic_filter in [None, Some(1)] {
for event_filter in [None, Some(("b", "b"))] {
let mut subscription: EventSubscription<MockConfig> =
EventSubscription {
block_reader: BlockReader::Mock(Box::new(
vec![
(
events[0].0,
Ok(events
.iter()
.take(4)
.map(|(_, phase, event)| {
(phase.clone(), Raw::Event(event.clone()))
})
.collect()),
),
(
events[4].0,
Ok(events
.iter()
.skip(4)
.take(4)
.map(|(_, phase, event)| {
(phase.clone(), Raw::Event(event.clone()))
})
.collect()),
),
]
.into_iter(),
)),
block: block_filter.clone(),
extrinsic: extrinsic_filter.clone(),
event: event_filter.clone(),
events: Default::default(),
finished: false,
};
let mut expected_events = events.clone();
if let Some(hash) = block_filter {
expected_events.retain(|(h, _, _)| h == &hash);
}
if let Some(idx) = extrinsic_filter {
expected_events.retain(|(_, phase, _)| matches!(phase, Phase::ApplyExtrinsic(i) if *i as usize == idx));
}
if let Some(name) = event_filter {
expected_events.retain(|(_, _, event)| event.pallet == name.0);
}
for expected_event in expected_events {
assert_eq!(
subscription.next().await.unwrap().unwrap(),
expected_event.2
);
}
assert!(subscription.next().await.is_none());
}
}
}
}
}

0 comments on commit 57ab87b

Please sign in to comment.