Skip to content

Commit

Permalink
backend(fix): Remove only finalized blocks from the event window (#1356)
Browse files Browse the repository at this point in the history
* backend(fix): Remove only finalized blocks from the event window

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* backend: Improve documentation

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* backend/tests: Check new block is delivered after finalized event

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* backend: More docs about finalzied / prunning and new block / best block

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

---------

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
  • Loading branch information
lexnv authored Jan 11, 2024
1 parent 7f714cb commit d004789
Showing 1 changed file with 70 additions and 12 deletions.
82 changes: 70 additions & 12 deletions subxt/src/backend/unstable/follow_stream_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::backend::unstable::rpc_methods::{FollowEvent, Initialized, RuntimeEve
use crate::config::BlockHash;
use crate::error::Error;
use futures::stream::{Stream, StreamExt};
use std::collections::{HashMap, VecDeque};
use std::collections::{HashMap, HashSet, VecDeque};
use std::ops::DerefMut;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
Expand Down Expand Up @@ -165,9 +165,8 @@ struct SharedState<Hash: BlockHash> {
done: bool,
next_id: usize,
subscribers: HashMap<usize, SubscriberDetails<Hash>>,
// Keep a buffer of all events from last finalized block so that new
// subscriptions can be handed this info first.
block_events_from_last_finalized: VecDeque<FollowEvent<BlockRef<Hash>>>,
/// Keep a buffer of all events that should be handed to a new subscription.
block_events_for_new_subscriptions: VecDeque<FollowEvent<BlockRef<Hash>>>,
// Keep track of the subscription ID we send out on new subs.
current_subscription_id: Option<String>,
// Keep track of the init message we send out on new subs.
Expand All @@ -186,7 +185,7 @@ impl<Hash: BlockHash> Default for Shared<Hash> {
current_init_message: None,
current_subscription_id: None,
seen_runtime_events: HashMap::new(),
block_events_from_last_finalized: VecDeque::new(),
block_events_for_new_subscriptions: VecDeque::new(),
})))
}
}
Expand Down Expand Up @@ -251,7 +250,7 @@ impl<Hash: BlockHash> Shared<Hash> {
// New subscriptions will be given this init message:
shared.current_init_message = Some(ev.clone());
// Clear block cache (since a new finalized block hash is seen):
shared.block_events_from_last_finalized.clear();
shared.block_events_for_new_subscriptions.clear();
}
FollowStreamMsg::Event(FollowEvent::Finalized(finalized_ev)) => {
// Update the init message that we'll hand out to new subscriptions. If the init message
Expand All @@ -275,8 +274,30 @@ impl<Hash: BlockHash> Shared<Hash> {
}
}

// New finalized block, so clear the cache of older block events.
shared.block_events_from_last_finalized.clear();
// The last finalized block will be reported as Initialized by our driver,
// therefore there is no need to report NewBlock and BestBlock events for it.
// If the Finalized event reported multiple finalized hashes, we only care about
// the state at the head of the chain, therefore it is correct to remove those as well.
// Idem for the pruned hashes; they will never be reported again and we remove
// them from the window of events.
let to_remove: HashSet<Hash> = finalized_ev
.finalized_block_hashes
.iter()
.chain(finalized_ev.pruned_block_hashes.iter())
.map(|h| h.hash())
.collect();

shared
.block_events_for_new_subscriptions
.retain(|ev| match ev {
FollowEvent::NewBlock(new_block_ev) => {
!to_remove.contains(&new_block_ev.block_hash.hash())
}
FollowEvent::BestBlockChanged(best_block_ev) => {
!to_remove.contains(&best_block_ev.best_block_hash.hash())
}
_ => true,
});
}
FollowStreamMsg::Event(FollowEvent::NewBlock(new_block_ev)) => {
// If a new runtime is seen, note it so that when a block is finalized, we
Expand All @@ -288,15 +309,15 @@ impl<Hash: BlockHash> Shared<Hash> {
}

shared
.block_events_from_last_finalized
.block_events_for_new_subscriptions
.push_back(FollowEvent::NewBlock(new_block_ev));
}
FollowStreamMsg::Event(ev @ FollowEvent::BestBlockChanged(_)) => {
shared.block_events_from_last_finalized.push_back(ev);
shared.block_events_for_new_subscriptions.push_back(ev);
}
FollowStreamMsg::Event(FollowEvent::Stop) => {
// On a stop event, clear everything. Wait for resubscription and new ready/initialised events.
shared.block_events_from_last_finalized.clear();
shared.block_events_for_new_subscriptions.clear();
shared.current_subscription_id = None;
shared.current_init_message = None;
}
Expand Down Expand Up @@ -334,7 +355,7 @@ impl<Hash: BlockHash> Shared<Hash> {
init_msg.clone(),
)));
}
for ev in &shared.block_events_from_last_finalized {
for ev in &shared.block_events_for_new_subscriptions {
local_items.push_back(FollowStreamMsg::Event(ev.clone()));
}

Expand Down Expand Up @@ -485,4 +506,41 @@ mod test {
];
assert_eq!(evs, expected);
}

#[tokio::test]
async fn subscribers_receive_new_blocks_before_subscribing() {
let mut driver = test_follow_stream_driver_getter(
|| {
[
Ok(ev_initialized(0)),
Ok(ev_new_block(0, 1)),
Ok(ev_best_block(1)),
Ok(ev_new_block(1, 2)),
Ok(ev_new_block(2, 3)),
Ok(ev_finalized([1])),
Err(Error::Other("ended".to_owned())),
]
},
10,
);

// Skip to the first finalized block F1.
let _r = driver.next().await.unwrap();
let _i0 = driver.next().await.unwrap();
let _n1 = driver.next().await.unwrap();
let _b1 = driver.next().await.unwrap();
let _n2 = driver.next().await.unwrap();
let _n3 = driver.next().await.unwrap();
let _f1 = driver.next().await.unwrap();

// THEN subscribe; and make sure new block 1 and 2 are received.
let evs: Vec<_> = driver.handle().subscribe().take(4).collect().await;
let expected = vec![
FollowStreamMsg::Ready("sub_id_0".into()),
FollowStreamMsg::Event(ev_initialized_ref(1)),
FollowStreamMsg::Event(ev_new_block_ref(1, 2)),
FollowStreamMsg::Event(ev_new_block_ref(2, 3)),
];
assert_eq!(evs, expected);
}
}

0 comments on commit d004789

Please sign in to comment.