Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor tx_pool controller #3960

Merged
merged 1 commit into from
Apr 26, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
253 changes: 57 additions & 196 deletions tx-pool/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,36 @@ impl Drop for TxPoolController {
}
}

macro_rules! send_message {
($self:ident, $msg_type:ident, $args:expr) => {{
let (responder, response) = oneshot::channel();
let request = Request::call($args, responder);
$self
.sender
.try_send(Message::$msg_type(request))
.map_err(|e| {
let (_m, e) = handle_try_send_error(e);
e
})?;
block_in_place(|| response.recv())
.map_err(handle_recv_error)
.map_err(Into::into)
}};
}

macro_rules! send_notify {
($self:ident, $msg_type:ident, $args:expr) => {{
let notify = Notify::new($args);
$self
.sender
.try_send(Message::$msg_type(notify))
.map_err(|e| {
let (_m, e) = handle_try_send_error(e);
e.into()
})
}};
}

impl TxPoolController {
/// Return whether tx-pool service is started
pub fn service_started(&self) -> bool {
Expand All @@ -163,29 +193,16 @@ impl TxPoolController {
proposals_limit: Option<u64>,
max_version: Option<Version>,
) -> Result<BlockTemplateResult, AnyError> {
let (responder, response) = oneshot::channel();
let request = Request::call((bytes_limit, proposals_limit, max_version), responder);
self.sender
.try_send(Message::BlockTemplate(request))
.map_err(|e| {
let (_m, e) = handle_try_send_error(e);
e
})?;

block_in_place(|| response.recv())
.map_err(handle_recv_error)
.map_err(Into::into)
send_message!(
self,
BlockTemplate,
(bytes_limit, proposals_limit, max_version)
)
}

/// Notify new uncle
pub fn notify_new_uncle(&self, uncle: UncleBlockView) -> Result<(), AnyError> {
let notify = Notify::new(uncle);
self.sender
.try_send(Message::NewUncle(notify))
.map_err(|e| {
let (_m, e) = handle_try_send_error(e);
e.into()
})
send_notify!(self, NewUncle, uncle)
}

/// Make tx-pool consistent after a reorg, by re-adding or recursively erasing
Expand Down Expand Up @@ -213,32 +230,12 @@ impl TxPoolController {

/// Submit local tx to tx-pool
pub fn submit_local_tx(&self, tx: TransactionView) -> Result<SubmitTxResult, AnyError> {
let (responder, response) = oneshot::channel();
let request = Request::call(tx, responder);
self.sender
.try_send(Message::SubmitLocalTx(request))
.map_err(|e| {
let (_m, e) = handle_try_send_error(e);
e
})?;
block_in_place(|| response.recv())
.map_err(handle_recv_error)
.map_err(Into::into)
send_message!(self, SubmitLocalTx, tx)
}

/// Remove tx from tx-pool
pub fn remove_local_tx(&self, tx_hash: Byte32) -> Result<bool, AnyError> {
let (responder, response) = oneshot::channel();
let request = Request::call(tx_hash, responder);
self.sender
.try_send(Message::RemoveLocalTx(request))
.map_err(|e| {
let (_m, e) = handle_try_send_error(e);
e
})?;
block_in_place(|| response.recv())
.map_err(handle_recv_error)
.map_err(Into::into)
send_message!(self, RemoveLocalTx, tx_hash)
}

/// Submit remote tx with declared cycles and origin to tx-pool
Expand All @@ -248,175 +245,75 @@ impl TxPoolController {
declared_cycles: Cycle,
peer: PeerIndex,
) -> Result<(), AnyError> {
let (responder, response) = oneshot::channel();
let request = Request::call((tx, declared_cycles, peer), responder);
self.sender
.try_send(Message::SubmitRemoteTx(request))
.map_err(|e| {
let (_m, e) = handle_try_send_error(e);
e
})?;
block_in_place(|| response.recv())
.map_err(handle_recv_error)
.map_err(Into::into)
send_message!(self, SubmitRemoteTx, (tx, declared_cycles, peer))
}

/// Receive txs from network, try to add txs to tx-pool
pub fn notify_txs(&self, txs: Vec<TransactionView>) -> Result<(), AnyError> {
let notify = Notify::new(txs);
self.sender
.try_send(Message::NotifyTxs(notify))
.map_err(|e| {
let (_m, e) = handle_try_send_error(e);
e.into()
})
send_notify!(self, NotifyTxs, txs)
}

/// Return tx-pool information
pub fn get_tx_pool_info(&self) -> Result<TxPoolInfo, AnyError> {
let (responder, response) = oneshot::channel();
let request = Request::call((), responder);
self.sender
.try_send(Message::GetTxPoolInfo(request))
.map_err(|e| {
let (_m, e) = handle_try_send_error(e);
e
})?;
block_in_place(|| response.recv())
.map_err(handle_recv_error)
.map_err(Into::into)
send_message!(self, GetTxPoolInfo, ())
}

/// Return fresh proposals
pub fn fresh_proposals_filter(
&self,
proposals: Vec<ProposalShortId>,
) -> Result<Vec<ProposalShortId>, AnyError> {
let (responder, response) = oneshot::channel();
let request = Request::call(proposals, responder);
self.sender
.try_send(Message::FreshProposalsFilter(request))
.map_err(|e| {
let (_m, e) = handle_try_send_error(e);
e
})?;
block_in_place(|| response.recv())
.map_err(handle_recv_error)
.map_err(Into::into)
send_message!(self, FreshProposalsFilter, proposals)
}

/// Return tx_status for rpc (get_transaction verbosity = 1)
pub fn get_tx_status(&self, hash: Byte32) -> Result<GetTxStatusResult, AnyError> {
let (responder, response) = oneshot::channel();
let request = Request::call(hash, responder);
self.sender
.try_send(Message::GetTxStatus(request))
.map_err(|e| {
let (_m, e) = handle_try_send_error(e);
e
})?;
block_in_place(|| response.recv())
.map_err(handle_recv_error)
.map_err(Into::into)
send_message!(self, GetTxStatus, hash)
}

/// Return transaction_with_status for rpc (get_transaction verbosity = 2)
pub fn get_transaction_with_status(
&self,
hash: Byte32,
) -> Result<GetTransactionWithStatusResult, AnyError> {
let (responder, response) = oneshot::channel();
let request = Request::call(hash, responder);
self.sender
.try_send(Message::GetTransactionWithStatus(request))
.map_err(|e| {
let (_m, e) = handle_try_send_error(e);
e
})?;
block_in_place(|| response.recv())
.map_err(handle_recv_error)
.map_err(Into::into)
send_message!(self, GetTransactionWithStatus, hash)
}

/// Return txs for network
pub fn fetch_txs(
&self,
short_ids: HashSet<ProposalShortId>,
) -> Result<HashMap<ProposalShortId, TransactionView>, AnyError> {
let (responder, response) = oneshot::channel();
let request = Request::call(short_ids, responder);
self.sender
.try_send(Message::FetchTxs(request))
.map_err(|e| {
let (_m, e) = handle_try_send_error(e);
e
})?;
block_in_place(|| response.recv())
.map_err(handle_recv_error)
.map_err(Into::into)
send_message!(self, FetchTxs, short_ids)
}

/// Return txs with cycles
pub fn fetch_txs_with_cycles(
&self,
short_ids: HashSet<ProposalShortId>,
) -> Result<FetchTxsWithCyclesResult, AnyError> {
let (responder, response) = oneshot::channel();
let request = Request::call(short_ids, responder);
self.sender
.try_send(Message::FetchTxsWithCycles(request))
.map_err(|e| {
let (_m, e) = handle_try_send_error(e);
e
})?;
block_in_place(|| response.recv())
.map_err(handle_recv_error)
.map_err(Into::into)
send_message!(self, FetchTxsWithCycles, short_ids)
}

/// Clears the tx-pool, removing all txs, update snapshot.
pub fn clear_pool(&self, new_snapshot: Arc<Snapshot>) -> Result<(), AnyError> {
let (responder, response) = oneshot::channel();
let request = Request::call(new_snapshot, responder);
self.sender
.try_send(Message::ClearPool(request))
.map_err(|e| {
let (_m, e) = handle_try_send_error(e);
e
})?;
block_in_place(|| response.recv())
.map_err(handle_recv_error)
.map_err(Into::into)
send_message!(self, ClearPool, new_snapshot)
}

/// TODO(doc): @zhangsoledad
pub fn get_all_entry_info(&self) -> Result<TxPoolEntryInfo, AnyError> {
let (responder, response) = oneshot::channel();
let request = Request::call((), responder);
self.sender
.try_send(Message::GetAllEntryInfo(request))
.map_err(|e| {
let (_m, e) = handle_try_send_error(e);
e
})?;
block_in_place(|| response.recv())
.map_err(handle_recv_error)
.map_err(Into::into)
send_message!(self, GetAllEntryInfo, ())
}

/// TODO(doc): @zhangsoledad
pub fn get_all_ids(&self) -> Result<TxPoolIds, AnyError> {
let (responder, response) = oneshot::channel();
let request = Request::call((), responder);
self.sender
.try_send(Message::GetAllIds(request))
.map_err(|e| {
let (_m, e) = handle_try_send_error(e);
e
})?;
block_in_place(|| response.recv())
.map_err(handle_recv_error)
.map_err(Into::into)
send_message!(self, GetAllIds, ())
}

/// Saves tx pool into disk.
pub fn save_pool(&self) -> Result<(), AnyError> {
info!("Please be patient, tx-pool are saving data into disk ...");
send_message!(self, SavePool, ())
}

/// Sends suspend chunk process cmd
Expand All @@ -435,22 +332,6 @@ impl TxPoolController {
.map_err(Into::into)
}

/// Saves tx pool into disk.
pub fn save_pool(&self) -> Result<(), AnyError> {
info!("Please be patient, tx-pool are saving data into disk ...");
let (responder, response) = oneshot::channel();
let request = Request::call((), responder);
self.sender
.try_send(Message::SavePool(request))
.map_err(|e| {
let (_m, e) = handle_try_send_error(e);
e
})?;
block_in_place(|| response.recv())
.map_err(handle_recv_error)
.map_err(Into::into)
}

/// Load persisted txs into pool, assume that all txs are sorted
fn load_persisted_data(&self, txs: Vec<TransactionView>) -> Result<(), AnyError> {
if !txs.is_empty() {
Expand All @@ -476,33 +357,13 @@ impl TxPoolController {
/// Plug tx-pool entry to tx-pool, skip verification. only for test
#[cfg(feature = "internal")]
pub fn plug_entry(&self, entries: Vec<TxEntry>, target: PlugTarget) -> Result<(), AnyError> {
let (responder, response) = oneshot::channel();
let request = Request::call((entries, target), responder);
self.sender
.try_send(Message::PlugEntry(request))
.map_err(|e| {
let (_m, e) = handle_try_send_error(e);
e
})?;
block_in_place(|| response.recv())
.map_err(handle_recv_error)
.map_err(Into::into)
send_message!(self, PlugEntry, (entries, target))
}

/// Package txs with specified bytes_limit. for test
#[cfg(feature = "internal")]
pub fn package_txs(&self, bytes_limit: Option<u64>) -> Result<Vec<TxEntry>, AnyError> {
let (responder, response) = oneshot::channel();
let request = Request::call(bytes_limit, responder);
self.sender
.try_send(Message::PackageTxs(request))
.map_err(|e| {
let (_m, e) = handle_try_send_error(e);
e
})?;
block_in_place(|| response.recv())
.map_err(handle_recv_error)
.map_err(Into::into)
send_message!(self, PackageTxs, bytes_limit)
}
}

Expand Down