diff --git a/subxt/src/subscription.rs b/subxt/src/subscription.rs index 307bfea222..0adc6f929a 100644 --- a/subxt/src/subscription.rs +++ b/subxt/src/subscription.rs @@ -39,6 +39,16 @@ use sp_core::{ use sp_runtime::traits::Header; use std::collections::VecDeque; +/// Raw bytes for an Event, including the block hash where it occurred and its +/// corresponding event index. +#[derive(Debug)] +#[cfg_attr(test, derive(PartialEq, Clone))] +pub struct EventContext { + pub block_hash: Hash, + pub event_idx: usize, + pub event: RawEvent, +} + /// Event subscription simplifies filtering a storage change set stream for /// events of interest. pub struct EventSubscription<'a, T: Config> { @@ -46,7 +56,7 @@ pub struct EventSubscription<'a, T: Config> { block: Option, extrinsic: Option, event: Option<(&'static str, &'static str)>, - events: VecDeque, + events: VecDeque>, finished: bool, } @@ -57,13 +67,19 @@ enum BlockReader<'a, T: Config> { }, /// Mock event listener for unit tests #[cfg(test)] - Mock(Box, BasicError>)>>), + Mock( + Box< + dyn Iterator< + Item = (T::Hash, Result, BasicError>), + >, + >, + ), } impl<'a, T: Config> BlockReader<'a, T> { async fn next( &mut self, - ) -> Option<(T::Hash, Result, BasicError>)> { + ) -> Option<(T::Hash, Result, BasicError>)> { match self { BlockReader::Decoder { subscription, @@ -78,7 +94,13 @@ impl<'a, T: Config> BlockReader<'a, T> { }) .collect(); - let flattened_events = events.map(|x| x.into_iter().flatten().collect()); + let flattened_events = events.map(|x| { + x.into_iter() + .flatten() + .enumerate() + .map(|(event_idx, (phase, raw))| (phase, event_idx, raw)) + .collect() + }); Some((change_set.block, flattened_events)) } #[cfg(test)] @@ -124,6 +146,15 @@ impl<'a, T: Config> EventSubscription<'a, T> { /// Gets the next event. pub async fn next(&mut self) -> Option> { + self.next_context() + .await + .map(|res| res.map(|ctx| ctx.event)) + } + /// Gets the next event with the associated block hash and its corresponding + /// event index. + pub async fn next_context( + &mut self, + ) -> Option, BasicError>> { loop { if let Some(raw_event) = self.events.pop_front() { return Some(Ok(raw_event)) @@ -144,7 +175,7 @@ impl<'a, T: Config> EventSubscription<'a, T> { match events { Err(err) => return Some(Err(err)), Ok(raw_events) => { - for (phase, raw) in raw_events { + for (phase, event_idx, raw) in raw_events { if let Some(ext_index) = self.extrinsic { if !matches!(phase, Phase::ApplyExtrinsic(i) if i as usize == ext_index) { @@ -156,7 +187,11 @@ impl<'a, T: Config> EventSubscription<'a, T> { continue } } - self.events.push_back(raw); + self.events.push_back(EventContext { + block_hash: received_hash, + event_idx, + event: raw, + }); } } } @@ -276,7 +311,7 @@ mod tests { #[async_std::test] /// test that filters work correctly, and are independent of each other async fn test_filters() { - let mut events = vec![]; + let mut events: Vec<(H256, Phase, usize, RawEvent)> = vec![]; // create all events for block_hash in [H256::from([0; 32]), H256::from([1; 32])] { for phase in [ @@ -285,14 +320,24 @@ mod tests { Phase::ApplyExtrinsic(1), Phase::Finalization, ] { - for event in [named_event("a"), named_event("b")] { - events.push((block_hash, phase.clone(), event)) - } + [named_event("a"), named_event("b")] + .iter() + .enumerate() + .for_each(|(idx, event)| { + events.push(( + block_hash, + phase.clone(), + // The event index + idx, + event.clone(), + )) + }); } } + // set variant index so we can uniquely identify the event events.iter_mut().enumerate().for_each(|(idx, event)| { - event.2.variant_index = idx as u8; + event.3.variant_index = idx as u8; }); let half_len = events.len() / 2; @@ -309,8 +354,8 @@ mod tests { Ok(events .iter() .take(half_len) - .map(|(_, phase, event)| { - (phase.clone(), event.clone()) + .map(|(_, phase, idx, event)| { + (phase.clone(), *idx, event.clone()) }) .collect()), ), @@ -319,8 +364,8 @@ mod tests { Ok(events .iter() .skip(half_len) - .map(|(_, phase, event)| { - (phase.clone(), event.clone()) + .map(|(_, phase, idx, event)| { + (phase.clone(), *idx, event.clone()) }) .collect()), ), @@ -333,21 +378,24 @@ mod tests { events: Default::default(), finished: false, }; - let mut expected_events = events.clone(); + + let mut expected_events: Vec<(H256, Phase, usize, RawEvent)> = + events.clone(); + if let Some(hash) = block_filter { - expected_events.retain(|(h, _, _)| h == &hash); + expected_events.retain(|(h, _, _, _)| h == &hash); } if let Some(idx) = extrinsic_filter { - expected_events.retain(|(_, phase, _)| matches!(phase, Phase::ApplyExtrinsic(i) if *i as usize == idx)); + expected_events.retain(|(_, phase, _, _)| matches!(phase, Phase::ApplyExtrinsic(i) if *i as usize == idx)); } if let Some(name) = event_filter { - expected_events.retain(|(_, _, event)| event.pallet == name.0); + expected_events.retain(|(_, _, _, event)| event.pallet == name.0); } for expected_event in expected_events { assert_eq!( subscription.next().await.unwrap().unwrap(), - expected_event.2 + expected_event.3 ); } assert!(subscription.next().await.is_none()); @@ -355,4 +403,79 @@ mod tests { } } } + + #[async_std::test] + async fn test_context() { + let mut events = vec![]; + // create all events + for block_hash in [H256::from([0; 32]), H256::from([1; 32])] { + for phase in [ + Phase::Initialization, + Phase::ApplyExtrinsic(0), + Phase::ApplyExtrinsic(1), + Phase::Finalization, + ] { + [named_event("a"), named_event("b")] + .iter() + .enumerate() + .for_each(|(idx, event)| { + events.push(( + phase.clone(), + EventContext { + block_hash, + event_idx: idx, + event: event.clone(), + }, + )); + }); + } + } + + // set variant index so we can uniquely identify the event + events.iter_mut().enumerate().for_each(|(idx, (_, ctx))| { + ctx.event.variant_index = idx as u8; + }); + + let half_len = events.len() / 2; + + let mut subscription: EventSubscription = EventSubscription { + block_reader: BlockReader::Mock(Box::new( + vec![ + ( + events[0].1.block_hash, + Ok(events + .iter() + .take(half_len) + .map(|(phase, ctx)| { + (phase.clone(), ctx.event_idx, ctx.event.clone()) + }) + .collect()), + ), + ( + events[half_len].1.block_hash, + Ok(events + .iter() + .skip(half_len) + .map(|(phase, ctx)| { + (phase.clone(), ctx.event_idx, ctx.event.clone()) + }) + .collect()), + ), + ] + .into_iter(), + )), + block: None, + extrinsic: None, + event: None, + events: Default::default(), + finished: false, + }; + + let expected_events = events.clone(); + + for exp in expected_events { + assert_eq!(subscription.next_context().await.unwrap().unwrap(), exp.1); + } + assert!(subscription.next().await.is_none()); + } }