Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Support the subscription of every imported block #13372

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions client/api/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,19 @@ pub type TransactionForSB<B, Block> = <B as StateBackend<HashFor<Block>>>::Trans
/// Extracts the transaction for the given backend.
pub type TransactionFor<B, Block> = TransactionForSB<StateBackendFor<B, Block>, Block>;

/// Describes which block import notification stream should be notified.
#[derive(Debug, Clone)]
liuchengxu marked this conversation as resolved.
Show resolved Hide resolved
pub enum ImportNotificationAction {
/// Notify only when the chain has synced to the tip or there is a re-org.
RecentBlock,
/// Notify for every single block no matter what the sync state is.
EveryBlock,
/// Both block import notifications above should be fired.
Both,
/// No block import notification should be fired.
None,
}

/// Import operation summary.
///
/// Contains information about the block that just got imported,
Expand All @@ -68,6 +81,8 @@ pub struct ImportSummary<Block: BlockT> {
///
/// If `None`, there was no re-org while importing.
pub tree_route: Option<sp_blockchain::TreeRoute<Block>>,
/// What notify action to take for this import.
pub import_notification_action: ImportNotificationAction,
}

/// Finalization operation summary.
Expand Down
3 changes: 3 additions & 0 deletions client/api/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ pub trait BlockchainEvents<Block: BlockT> {
/// imported block.
fn import_notification_stream(&self) -> ImportNotifications<Block>;

/// Get a stream of every imported block.
fn every_import_notification_stream(&self) -> ImportNotifications<Block>;
liuchengxu marked this conversation as resolved.
Show resolved Hide resolved

/// Get a stream of finality notifications. Not guaranteed to be fired for every
/// finalized block.
fn finality_notification_stream(&self) -> FinalityNotifications<Block>;
Expand Down
4 changes: 4 additions & 0 deletions client/merkle-mountain-range/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,10 @@ impl BlockchainEvents<Block> for MockClient {
unimplemented!()
}

fn every_import_notification_stream(&self) -> ImportNotifications<Block> {
unimplemented!()
}

fn finality_notification_stream(&self) -> FinalityNotifications<Block> {
self.client.lock().finality_notification_stream()
}
Expand Down
110 changes: 93 additions & 17 deletions client/service/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use sc_block_builder::{BlockBuilderApi, BlockBuilderProvider, RecordProof};
use sc_client_api::{
backend::{
self, apply_aux, BlockImportOperation, ClientImportOperation, FinalizeSummary, Finalizer,
ImportSummary, LockImportRun, NewBlockState, StorageProvider,
ImportNotificationAction, ImportSummary, LockImportRun, NewBlockState, StorageProvider,
},
client::{
BadBlocks, BlockBackend, BlockImportNotification, BlockOf, BlockchainEvents, ClientInfo,
Expand Down Expand Up @@ -106,6 +106,7 @@ where
executor: E,
storage_notifications: StorageNotifications<Block>,
import_notification_sinks: NotificationSinks<BlockImportNotification<Block>>,
every_block_import_notification_sinks: NotificationSinks<BlockImportNotification<Block>>,
liuchengxu marked this conversation as resolved.
Show resolved Hide resolved
finality_notification_sinks: NotificationSinks<FinalityNotification<Block>>,
// Collects auxiliary operations to be performed atomically together with
// block import operations.
Expand Down Expand Up @@ -304,19 +305,22 @@ where
FinalityNotification::from_summary(summary, self.unpin_worker_sender.clone())
});

let (import_notification, storage_changes) = match notify_imported {
Some(mut summary) => {
let storage_changes = summary.storage_changes.take();
(
Some(BlockImportNotification::from_summary(
summary,
self.unpin_worker_sender.clone(),
)),
storage_changes,
)
},
None => (None, None),
};
let (import_notification, storage_changes, import_notification_action) =
match notify_imported {
Some(mut summary) => {
let import_notification_action = summary.import_notification_action.clone();
let storage_changes = summary.storage_changes.take();
(
Some(BlockImportNotification::from_summary(
summary,
self.unpin_worker_sender.clone(),
)),
storage_changes,
import_notification_action,
)
},
None => (None, None, ImportNotificationAction::None),
};

if let Some(ref notification) = finality_notification {
for action in self.finality_actions.lock().iter_mut() {
Expand Down Expand Up @@ -353,7 +357,24 @@ where
}

self.notify_finalized(finality_notification)?;
self.notify_imported(import_notification, storage_changes)?;

match import_notification_action {
ImportNotificationAction::Both => {
self.notify_imported(import_notification.clone(), storage_changes.clone())?;
self.notify_imported_for_every_block(import_notification, storage_changes)?;
},
ImportNotificationAction::RecentBlock =>
self.notify_imported(import_notification, storage_changes)?,
ImportNotificationAction::EveryBlock =>
self.notify_imported_for_every_block(import_notification, storage_changes)?,
ImportNotificationAction::None => {
// Cleanup any closed import notification sinks.
self.import_notification_sinks.lock().retain(|sink| !sink.is_closed());
self.every_block_import_notification_sinks
.lock()
.retain(|sink| !sink.is_closed());
},
}

Ok(r)
};
Expand Down Expand Up @@ -451,6 +472,7 @@ where
executor,
storage_notifications: StorageNotifications::new(prometheus_registry),
import_notification_sinks: Default::default(),
every_block_import_notification_sinks: Default::default(),
finality_notification_sinks: Default::default(),
import_actions: Default::default(),
finality_actions: Default::default(),
Expand Down Expand Up @@ -771,9 +793,14 @@ where

operation.op.insert_aux(aux)?;

// We only notify when we are already synced to the tip of the chain
let should_notify_every_block =
!self.every_block_import_notification_sinks.lock().is_empty();

// Notify when we are already synced to the tip of the chain
// or if this import triggers a re-org
if make_notifications || tree_route.is_some() {
let should_notify_recent_block = make_notifications || tree_route.is_some();

if should_notify_every_block || should_notify_recent_block {
let header = import_headers.into_post();
if finalized {
liuchengxu marked this conversation as resolved.
Show resolved Hide resolved
let mut summary = match operation.notify_finalized.take() {
Expand Down Expand Up @@ -812,13 +839,24 @@ where
operation.notify_finalized = Some(summary);
}

let import_notification_action = if should_notify_every_block {
if should_notify_recent_block {
ImportNotificationAction::Both
} else {
ImportNotificationAction::EveryBlock
}
} else {
ImportNotificationAction::RecentBlock
};

operation.notify_imported = Some(ImportSummary {
hash,
origin,
header,
is_new_best,
storage_changes,
tree_route,
import_notification_action,
})
}

Expand Down Expand Up @@ -1047,6 +1085,38 @@ where
Ok(())
}

fn notify_imported_for_every_block(
&self,
notification: Option<BlockImportNotification<Block>>,
storage_changes: Option<(StorageCollection, ChildStorageCollection)>,
) -> sp_blockchain::Result<()> {
liuchengxu marked this conversation as resolved.
Show resolved Hide resolved
let notification = match notification {
Some(notify_import) => notify_import,
None => {
// Cleanup any closed import notification sinks.
self.every_block_import_notification_sinks
.lock()
.retain(|sink| !sink.is_closed());
return Ok(())
},
};

if let Some(storage_changes) = storage_changes {
// TODO [ToDr] How to handle re-orgs? Should we re-emit all storage changes?
self.storage_notifications.trigger(
&notification.hash,
storage_changes.0.into_iter(),
storage_changes.1.into_iter().map(|(sk, v)| (sk, v.into_iter())),
);
}

self.every_block_import_notification_sinks
.lock()
.retain(|sink| sink.unbounded_send(notification.clone()).is_ok());

Ok(())
}

/// Attempts to revert the chain by `n` blocks guaranteeing that no block is
/// reverted past the last finalized block. Returns the number of blocks
/// that were successfully reverted.
Expand Down Expand Up @@ -1980,6 +2050,12 @@ where
stream
}

fn every_import_notification_stream(&self) -> ImportNotifications<Block> {
let (sink, stream) = tracing_unbounded("mpsc_every_import_notification_stream", 100_000);
self.every_block_import_notification_sinks.lock().push(sink);
stream
}

fn finality_notification_stream(&self) -> FinalityNotifications<Block> {
let (sink, stream) = tracing_unbounded("mpsc_finality_notification_stream", 100_000);
self.finality_notification_sinks.lock().push(sink);
Expand Down