Skip to content

Commit

Permalink
Merge pull request #18 from nkysg/unbound_channel
Browse files Browse the repository at this point in the history
replace bounded channel with unbounded channel
  • Loading branch information
mattsse authored Aug 10, 2024
2 parents 40445d9 + e531844 commit a99e7b4
Showing 1 changed file with 19 additions and 19 deletions.
38 changes: 19 additions & 19 deletions src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use alloy_serde::WithOtherFields;
use alloy_transport::Transport;
use eyre::WrapErr;
use futures::{
channel::mpsc::{channel, Receiver, Sender},
channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender},
stream::Stream,
task::{Context, Poll},
Future, FutureExt,
Expand Down Expand Up @@ -114,7 +114,7 @@ pub struct BackendHandler<T, P> {
/// Listeners that wait for a `get_block` response
block_requests: FxHashMap<u64, Vec<BlockHashSender>>,
/// Incoming commands.
incoming: Receiver<BackendRequest>,
incoming: UnboundedReceiver<BackendRequest>,
/// unprocessed queued requests
queued_requests: VecDeque<BackendRequest>,
/// The block to fetch data from.
Expand All @@ -130,7 +130,7 @@ where
fn new(
provider: P,
db: BlockchainDb,
rx: Receiver<BackendRequest>,
rx: UnboundedReceiver<BackendRequest>,
block_id: Option<BlockId>,
) -> Self {
Self {
Expand Down Expand Up @@ -541,11 +541,11 @@ impl BlockingMode {

/// A cloneable backend type that shares access to the backend data with all its clones.
///
/// This backend type is connected to the `BackendHandler` via a mpsc channel. The `BackendHandler`
/// is spawned on a tokio task and listens for incoming commands on the receiver half of the
/// channel. A `SharedBackend` holds a sender for that channel, which is `Clone`, so there can be
/// multiple `SharedBackend`s communicating with the same `BackendHandler`, hence this `Backend`
/// type is thread safe.
/// This backend type is connected to the `BackendHandler` via a mpsc unbounded channel. The
/// `BackendHandler` is spawned on a tokio task and listens for incoming commands on the receiver
/// half of the channel. A `SharedBackend` holds a sender for that channel, which is `Clone`, so
/// there can be multiple `SharedBackend`s communicating with the same `BackendHandler`, hence this
/// `Backend` type is thread safe.
///
/// All `Backend` trait functions are delegated as a `BackendRequest` via the channel to the
/// `BackendHandler`. All `BackendRequest` variants include a sender half of an additional channel
Expand All @@ -570,7 +570,7 @@ impl BlockingMode {
#[derive(Clone, Debug)]
pub struct SharedBackend {
/// channel used for sending commands related to database operations
backend: Sender<BackendRequest>,
backend: UnboundedSender<BackendRequest>,
/// Ensures that the underlying cache gets flushed once the last `SharedBackend` is dropped.
///
/// There is only one instance of the type, so as soon as the last `SharedBackend` is deleted,
Expand Down Expand Up @@ -646,7 +646,7 @@ impl SharedBackend {
T: Transport + Clone + Unpin,
P: Provider<T, AnyNetwork> + Unpin + 'static + Clone,
{
let (backend, backend_rx) = channel(1);
let (backend, backend_rx) = unbounded();
let cache = Arc::new(FlushJsonBlockCacheDB(Arc::clone(db.cache())));
let handler = BackendHandler::new(provider, db, backend_rx, pin_block);
(Self { backend, cache, blocking_mode: Default::default() }, handler)
Expand All @@ -660,15 +660,15 @@ impl SharedBackend {
/// Updates the pinned block to fetch data from
pub fn set_pinned_block(&self, block: impl Into<BlockId>) -> eyre::Result<()> {
let req = BackendRequest::SetPinnedBlock(block.into());
self.backend.clone().try_send(req).map_err(|e| eyre::eyre!("{:?}", e))
self.backend.unbounded_send(req).map_err(|e| eyre::eyre!("{:?}", e))
}

/// Returns the full block for the given block identifier
pub fn get_full_block(&self, block: impl Into<BlockId>) -> DatabaseResult<Block> {
self.blocking_mode.run(|| {
let (sender, rx) = oneshot_channel();
let req = BackendRequest::FullBlock(block.into(), sender);
self.backend.clone().try_send(req)?;
self.backend.unbounded_send(req)?;
rx.recv()?
})
}
Expand All @@ -678,7 +678,7 @@ impl SharedBackend {
self.blocking_mode.run(|| {
let (sender, rx) = oneshot_channel();
let req = BackendRequest::Transaction(tx, sender);
self.backend.clone().try_send(req)?;
self.backend.unbounded_send(req)?;
rx.recv()?
})
}
Expand All @@ -687,7 +687,7 @@ impl SharedBackend {
self.blocking_mode.run(|| {
let (sender, rx) = oneshot_channel();
let req = BackendRequest::Basic(address, sender);
self.backend.clone().try_send(req)?;
self.backend.unbounded_send(req)?;
rx.recv()?.map(Some)
})
}
Expand All @@ -696,7 +696,7 @@ impl SharedBackend {
self.blocking_mode.run(|| {
let (sender, rx) = oneshot_channel();
let req = BackendRequest::Storage(address, index, sender);
self.backend.clone().try_send(req)?;
self.backend.unbounded_send(req)?;
rx.recv()?
})
}
Expand All @@ -705,15 +705,15 @@ impl SharedBackend {
self.blocking_mode.run(|| {
let (sender, rx) = oneshot_channel();
let req = BackendRequest::BlockHash(number, sender);
self.backend.clone().try_send(req)?;
self.backend.unbounded_send(req)?;
rx.recv()?
})
}

/// Inserts or updates data for multiple addresses
pub fn insert_or_update_address(&self, address_data: AddressData) {
let req = BackendRequest::UpdateAddress(address_data);
let err = self.backend.clone().try_send(req);
let err = self.backend.unbounded_send(req);
match err {
Ok(_) => (),
Err(e) => {
Expand All @@ -725,7 +725,7 @@ impl SharedBackend {
/// Inserts or updates data for multiple storage slots
pub fn insert_or_update_storage(&self, storage_data: StorageData) {
let req = BackendRequest::UpdateStorage(storage_data);
let err = self.backend.clone().try_send(req);
let err = self.backend.unbounded_send(req);
match err {
Ok(_) => (),
Err(e) => {
Expand All @@ -737,7 +737,7 @@ impl SharedBackend {
/// Inserts or updates data for multiple block hashes
pub fn insert_or_update_block_hashes(&self, block_hash_data: BlockHashData) {
let req = BackendRequest::UpdateBlockHash(block_hash_data);
let err = self.backend.clone().try_send(req);
let err = self.backend.unbounded_send(req);
match err {
Ok(_) => (),
Err(e) => {
Expand Down

0 comments on commit a99e7b4

Please sign in to comment.