Skip to content

Commit

Permalink
Get event context on EventSubscription (paritytech#423)
Browse files Browse the repository at this point in the history
* implement next_context

* write test_context for method next_context

* change how events are uniquely identified

* undo local changes for test-runtime

* introduce EventContext struct

* adjust test_context to EventContext struct

* fix return type for next_context

* add suggestions by jsdw

* ran cargo fmt and clippy
  • Loading branch information
lamafab authored and 0623forbidden committed Feb 15, 2022
1 parent fcc9415 commit fb87c92
Showing 1 changed file with 143 additions and 20 deletions.
163 changes: 143 additions & 20 deletions subxt/src/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,24 @@ use sp_core::{
use sp_runtime::traits::Header;
use std::collections::VecDeque;

/// Raw bytes for an Event, including the block hash where it occurred and its
/// corresponding event index.
#[derive(Debug)]
#[cfg_attr(test, derive(PartialEq, Clone))]
pub struct EventContext<Hash> {
pub block_hash: Hash,
pub event_idx: usize,
pub event: RawEvent,
}

/// Event subscription simplifies filtering a storage change set stream for
/// events of interest.
pub struct EventSubscription<'a, T: Config> {
block_reader: BlockReader<'a, T>,
block: Option<T::Hash>,
extrinsic: Option<usize>,
event: Option<(&'static str, &'static str)>,
events: VecDeque<RawEvent>,
events: VecDeque<EventContext<T::Hash>>,
finished: bool,
}

Expand All @@ -57,13 +67,19 @@ enum BlockReader<'a, T: Config> {
},
/// Mock event listener for unit tests
#[cfg(test)]
Mock(Box<dyn Iterator<Item = (T::Hash, Result<Vec<(Phase, RawEvent)>, BasicError>)>>),
Mock(
Box<
dyn Iterator<
Item = (T::Hash, Result<Vec<(Phase, usize, RawEvent)>, BasicError>),
>,
>,
),
}

impl<'a, T: Config> BlockReader<'a, T> {
async fn next(
&mut self,
) -> Option<(T::Hash, Result<Vec<(Phase, RawEvent)>, BasicError>)> {
) -> Option<(T::Hash, Result<Vec<(Phase, usize, RawEvent)>, BasicError>)> {
match self {
BlockReader::Decoder {
subscription,
Expand All @@ -78,7 +94,13 @@ impl<'a, T: Config> BlockReader<'a, T> {
})
.collect();

let flattened_events = events.map(|x| x.into_iter().flatten().collect());
let flattened_events = events.map(|x| {
x.into_iter()
.flatten()
.enumerate()
.map(|(event_idx, (phase, raw))| (phase, event_idx, raw))
.collect()
});
Some((change_set.block, flattened_events))
}
#[cfg(test)]
Expand Down Expand Up @@ -124,6 +146,15 @@ impl<'a, T: Config> EventSubscription<'a, T> {

/// Gets the next event.
pub async fn next(&mut self) -> Option<Result<RawEvent, BasicError>> {
self.next_context()
.await
.map(|res| res.map(|ctx| ctx.event))
}
/// Gets the next event with the associated block hash and its corresponding
/// event index.
pub async fn next_context(
&mut self,
) -> Option<Result<EventContext<T::Hash>, BasicError>> {
loop {
if let Some(raw_event) = self.events.pop_front() {
return Some(Ok(raw_event))
Expand All @@ -144,7 +175,7 @@ impl<'a, T: Config> EventSubscription<'a, T> {
match events {
Err(err) => return Some(Err(err)),
Ok(raw_events) => {
for (phase, raw) in raw_events {
for (phase, event_idx, raw) in raw_events {
if let Some(ext_index) = self.extrinsic {
if !matches!(phase, Phase::ApplyExtrinsic(i) if i as usize == ext_index)
{
Expand All @@ -156,7 +187,11 @@ impl<'a, T: Config> EventSubscription<'a, T> {
continue
}
}
self.events.push_back(raw);
self.events.push_back(EventContext {
block_hash: received_hash,
event_idx,
event: raw,
});
}
}
}
Expand Down Expand Up @@ -276,7 +311,7 @@ mod tests {
#[async_std::test]
/// test that filters work correctly, and are independent of each other
async fn test_filters() {
let mut events = vec![];
let mut events: Vec<(H256, Phase, usize, RawEvent)> = vec![];
// create all events
for block_hash in [H256::from([0; 32]), H256::from([1; 32])] {
for phase in [
Expand All @@ -285,14 +320,24 @@ mod tests {
Phase::ApplyExtrinsic(1),
Phase::Finalization,
] {
for event in [named_event("a"), named_event("b")] {
events.push((block_hash, phase.clone(), event))
}
[named_event("a"), named_event("b")]
.iter()
.enumerate()
.for_each(|(idx, event)| {
events.push((
block_hash,
phase.clone(),
// The event index
idx,
event.clone(),
))
});
}
}

// set variant index so we can uniquely identify the event
events.iter_mut().enumerate().for_each(|(idx, event)| {
event.2.variant_index = idx as u8;
event.3.variant_index = idx as u8;
});

let half_len = events.len() / 2;
Expand All @@ -309,8 +354,8 @@ mod tests {
Ok(events
.iter()
.take(half_len)
.map(|(_, phase, event)| {
(phase.clone(), event.clone())
.map(|(_, phase, idx, event)| {
(phase.clone(), *idx, event.clone())
})
.collect()),
),
Expand All @@ -319,8 +364,8 @@ mod tests {
Ok(events
.iter()
.skip(half_len)
.map(|(_, phase, event)| {
(phase.clone(), event.clone())
.map(|(_, phase, idx, event)| {
(phase.clone(), *idx, event.clone())
})
.collect()),
),
Expand All @@ -333,26 +378,104 @@ mod tests {
events: Default::default(),
finished: false,
};
let mut expected_events = events.clone();

let mut expected_events: Vec<(H256, Phase, usize, RawEvent)> =
events.clone();

if let Some(hash) = block_filter {
expected_events.retain(|(h, _, _)| h == &hash);
expected_events.retain(|(h, _, _, _)| h == &hash);
}
if let Some(idx) = extrinsic_filter {
expected_events.retain(|(_, phase, _)| matches!(phase, Phase::ApplyExtrinsic(i) if *i as usize == idx));
expected_events.retain(|(_, phase, _, _)| matches!(phase, Phase::ApplyExtrinsic(i) if *i as usize == idx));
}
if let Some(name) = event_filter {
expected_events.retain(|(_, _, event)| event.pallet == name.0);
expected_events.retain(|(_, _, _, event)| event.pallet == name.0);
}

for expected_event in expected_events {
assert_eq!(
subscription.next().await.unwrap().unwrap(),
expected_event.2
expected_event.3
);
}
assert!(subscription.next().await.is_none());
}
}
}
}

#[async_std::test]
async fn test_context() {
let mut events = vec![];
// create all events
for block_hash in [H256::from([0; 32]), H256::from([1; 32])] {
for phase in [
Phase::Initialization,
Phase::ApplyExtrinsic(0),
Phase::ApplyExtrinsic(1),
Phase::Finalization,
] {
[named_event("a"), named_event("b")]
.iter()
.enumerate()
.for_each(|(idx, event)| {
events.push((
phase.clone(),
EventContext {
block_hash,
event_idx: idx,
event: event.clone(),
},
));
});
}
}

// set variant index so we can uniquely identify the event
events.iter_mut().enumerate().for_each(|(idx, (_, ctx))| {
ctx.event.variant_index = idx as u8;
});

let half_len = events.len() / 2;

let mut subscription: EventSubscription<DefaultConfig> = EventSubscription {
block_reader: BlockReader::Mock(Box::new(
vec![
(
events[0].1.block_hash,
Ok(events
.iter()
.take(half_len)
.map(|(phase, ctx)| {
(phase.clone(), ctx.event_idx, ctx.event.clone())
})
.collect()),
),
(
events[half_len].1.block_hash,
Ok(events
.iter()
.skip(half_len)
.map(|(phase, ctx)| {
(phase.clone(), ctx.event_idx, ctx.event.clone())
})
.collect()),
),
]
.into_iter(),
)),
block: None,
extrinsic: None,
event: None,
events: Default::default(),
finished: false,
};

let expected_events = events.clone();

for exp in expected_events {
assert_eq!(subscription.next_context().await.unwrap().unwrap(), exp.1);
}
assert!(subscription.next().await.is_none());
}
}

0 comments on commit fb87c92

Please sign in to comment.