Skip to content

Commit

Permalink
Switch to best import notification for pubsub (#305)
Browse files Browse the repository at this point in the history
  • Loading branch information
tgmichel authored Mar 5, 2021
1 parent 3c76b0c commit 9bfb053
Showing 1 changed file with 104 additions and 111 deletions.
215 changes: 104 additions & 111 deletions client/rpc/src/eth_pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,21 +228,6 @@ fn storage_prefix_build(module: &[u8], storage: &[u8]) -> Vec<u8> {
[twox_128(module), twox_128(storage)].concat().to_vec()
}

macro_rules! stream_build {
($context:expr => $module:expr, $storage:expr) => {{
let key: StorageKey = StorageKey(
storage_prefix_build($module, $storage)
);
match $context.client.storage_changes_notification_stream(
Some(&[key]),
None
) {
Ok(stream) => Some(stream),
Err(_err) => None,
}
}};
}

impl<B: BlockT, P, C, BE, H: ExHashT> EthPubSubApiT for EthPubSubApi<B, P, C, BE, H>
where
B: BlockT<Hash=H256> + Send + Sync + 'static,
Expand Down Expand Up @@ -272,71 +257,84 @@ impl<B: BlockT, P, C, BE, H: ExHashT> EthPubSubApiT for EthPubSubApi<B, P, C, BE
let network = self.network.clone();
match kind {
Kind::Logs => {
if let Some(stream) = stream_build!(
self => b"Ethereum", b"CurrentReceipts"
) {
self.subscriptions.add(subscriber, |sink| {
let stream = stream
.flat_map(move |(block_hash, changes)| {
let id = BlockId::Hash(block_hash);
let data = changes.iter().last().unwrap().2.unwrap();
let receipts: Vec<ethereum::Receipt> =
Decode::decode(&mut &data.0[..]).unwrap();
let block: ethereum::Block = client.runtime_api()
.current_block(&id).unwrap().unwrap();
futures::stream::iter(
SubscriptionResult::new()
.logs(block, receipts, &filtered_params)
)
})
.map(|x| {
return Ok::<Result<
PubSubResult,
jsonrpc_core::types::error::Error
>, ()>(Ok(
PubSubResult::Log(Box::new(x))
));
})
.compat();

sink
.sink_map_err(|e| warn!(
"Error sending notifications: {:?}", e
))
.send_all(stream)
.map(|_| ())
});
}
self.subscriptions.add(subscriber, |sink| {
let stream = client.import_notification_stream()
.filter_map(move |notification| {
if notification.is_new_best {
let id = BlockId::Hash(notification.hash);
let receipts = client.runtime_api()
.current_receipts(&id);
let block = client.runtime_api()
.current_block(&id);
match (receipts, block) {
(Ok(Some(receipts)), Ok(Some(block))) =>
futures::future::ready(Some((block, receipts))),
_ => futures::future::ready(None)
}
} else {
futures::future::ready(None)
}
})
.flat_map(move |(block, receipts)| {
futures::stream::iter(
SubscriptionResult::new()
.logs(block, receipts, &filtered_params)
)
})
.map(|x| {
return Ok::<Result<
PubSubResult,
jsonrpc_core::types::error::Error
>, ()>(Ok(
PubSubResult::Log(Box::new(x))
));
})
.compat();
sink
.sink_map_err(|e| warn!(
"Error sending notifications: {:?}", e
))
.send_all(stream)
.map(|_| ())
});
},
Kind::NewHeads => {
if let Some(stream) = stream_build!(
self => b"Ethereum", b"CurrentBlock"
) {
self.subscriptions.add(subscriber, |sink| {
let stream = stream
.map(|(_block, changes)| {
let data = changes.iter().last().unwrap().2.unwrap();
let block: ethereum::Block =
Decode::decode(&mut &data.0[..]).unwrap();
return Ok::<_, ()>(Ok(
SubscriptionResult::new()
.new_heads(block)
));
})
.compat();

sink
.sink_map_err(|e| warn!(
"Error sending notifications: {:?}", e
))
.send_all(stream)
.map(|_| ())
});
}
self.subscriptions.add(subscriber, |sink| {
let stream = client.import_notification_stream()
.filter_map(move |notification| {
if notification.is_new_best {
let id = BlockId::Hash(notification.hash);
let block = client.runtime_api()
.current_block(&id);
match block {
Ok(Some(block)) => futures::future::ready(Some(block)),
_ => futures::future::ready(None)
}
} else {
futures::future::ready(None)
}
})
.map(|block| {
return Ok::<_, ()>(Ok(
SubscriptionResult::new()
.new_heads(block)
));
})
.compat();
sink
.sink_map_err(|e| warn!(
"Error sending notifications: {:?}", e
))
.send_all(stream)
.map(|_| ())
});
},
Kind::NewPendingTransactions => {
if let Some(stream) = stream_build!(
self => b"Ethereum", b"Pending"
if let Ok(stream) = client.storage_changes_notification_stream(
Some(&[StorageKey(
storage_prefix_build(b"Ethereum", b"Pending")
)]),
None
) {
self.subscriptions.add(subscriber, |sink| {
let stream = stream
Expand Down Expand Up @@ -386,41 +384,36 @@ impl<B: BlockT, P, C, BE, H: ExHashT> EthPubSubApiT for EthPubSubApi<B, P, C, BE
}
},
Kind::Syncing => {
if let Some(stream) = stream_build!(
self => b"Ethereum", b"CurrentBlock"
) {
self.subscriptions.add(subscriber, |sink| {
let mut previous_syncing = network.is_major_syncing();
let stream = stream
.filter_map(move |(_, _)| {
let syncing = network.is_major_syncing();
if previous_syncing != syncing {
previous_syncing = syncing;
futures::future::ready(Some(syncing))
} else {
futures::future::ready(None)
}
})
.map(|syncing| {
return Ok::<Result<
PubSubResult,
jsonrpc_core::types::error::Error
>, ()>(Ok(
PubSubResult::SyncState(PubSubSyncStatus {
syncing: syncing
})
));
})
.compat();
sink
.sink_map_err(|e| warn!(
"Error sending notifications: {:?}", e
))
.send_all(stream)
.map(|_| ())

});
}
self.subscriptions.add(subscriber, |sink| {
let mut previous_syncing = network.is_major_syncing();
let stream = client.import_notification_stream()
.filter_map(move |notification| {
let syncing = network.is_major_syncing();
if notification.is_new_best && previous_syncing != syncing {
previous_syncing = syncing;
futures::future::ready(Some(syncing))
} else {
futures::future::ready(None)
}
})
.map(|syncing| {
return Ok::<Result<
PubSubResult,
jsonrpc_core::types::error::Error
>, ()>(Ok(
PubSubResult::SyncState(PubSubSyncStatus {
syncing: syncing
})
));
})
.compat();
sink
.sink_map_err(|e| warn!(
"Error sending notifications: {:?}", e
))
.send_all(stream)
.map(|_| ())
});
},
}
}
Expand Down

0 comments on commit 9bfb053

Please sign in to comment.