diff --git a/tx-pool/src/service.rs b/tx-pool/src/service.rs index c1c66141e1..8364c30ce3 100644 --- a/tx-pool/src/service.rs +++ b/tx-pool/src/service.rs @@ -139,6 +139,36 @@ impl Drop for TxPoolController { } } +macro_rules! send_message { + ($self:ident, $msg_type:ident, $args:expr) => {{ + let (responder, response) = oneshot::channel(); + let request = Request::call($args, responder); + $self + .sender + .try_send(Message::$msg_type(request)) + .map_err(|e| { + let (_m, e) = handle_try_send_error(e); + e + })?; + block_in_place(|| response.recv()) + .map_err(handle_recv_error) + .map_err(Into::into) + }}; +} + +macro_rules! send_notify { + ($self:ident, $msg_type:ident, $args:expr) => {{ + let notify = Notify::new($args); + $self + .sender + .try_send(Message::$msg_type(notify)) + .map_err(|e| { + let (_m, e) = handle_try_send_error(e); + e.into() + }) + }}; +} + impl TxPoolController { /// Return whether tx-pool service is started pub fn service_started(&self) -> bool { @@ -163,29 +193,16 @@ impl TxPoolController { proposals_limit: Option, max_version: Option, ) -> Result { - let (responder, response) = oneshot::channel(); - let request = Request::call((bytes_limit, proposals_limit, max_version), responder); - self.sender - .try_send(Message::BlockTemplate(request)) - .map_err(|e| { - let (_m, e) = handle_try_send_error(e); - e - })?; - - block_in_place(|| response.recv()) - .map_err(handle_recv_error) - .map_err(Into::into) + send_message!( + self, + BlockTemplate, + (bytes_limit, proposals_limit, max_version) + ) } /// Notify new uncle pub fn notify_new_uncle(&self, uncle: UncleBlockView) -> Result<(), AnyError> { - let notify = Notify::new(uncle); - self.sender - .try_send(Message::NewUncle(notify)) - .map_err(|e| { - let (_m, e) = handle_try_send_error(e); - e.into() - }) + send_notify!(self, NewUncle, uncle) } /// Make tx-pool consistent after a reorg, by re-adding or recursively erasing @@ -213,32 +230,12 @@ impl TxPoolController { /// Submit local tx to tx-pool pub fn submit_local_tx(&self, tx: TransactionView) -> Result { - let (responder, response) = oneshot::channel(); - let request = Request::call(tx, responder); - self.sender - .try_send(Message::SubmitLocalTx(request)) - .map_err(|e| { - let (_m, e) = handle_try_send_error(e); - e - })?; - block_in_place(|| response.recv()) - .map_err(handle_recv_error) - .map_err(Into::into) + send_message!(self, SubmitLocalTx, tx) } /// Remove tx from tx-pool pub fn remove_local_tx(&self, tx_hash: Byte32) -> Result { - let (responder, response) = oneshot::channel(); - let request = Request::call(tx_hash, responder); - self.sender - .try_send(Message::RemoveLocalTx(request)) - .map_err(|e| { - let (_m, e) = handle_try_send_error(e); - e - })?; - block_in_place(|| response.recv()) - .map_err(handle_recv_error) - .map_err(Into::into) + send_message!(self, RemoveLocalTx, tx_hash) } /// Submit remote tx with declared cycles and origin to tx-pool @@ -248,43 +245,17 @@ impl TxPoolController { declared_cycles: Cycle, peer: PeerIndex, ) -> Result<(), AnyError> { - let (responder, response) = oneshot::channel(); - let request = Request::call((tx, declared_cycles, peer), responder); - self.sender - .try_send(Message::SubmitRemoteTx(request)) - .map_err(|e| { - let (_m, e) = handle_try_send_error(e); - e - })?; - block_in_place(|| response.recv()) - .map_err(handle_recv_error) - .map_err(Into::into) + send_message!(self, SubmitRemoteTx, (tx, declared_cycles, peer)) } /// Receive txs from network, try to add txs to tx-pool pub fn notify_txs(&self, txs: Vec) -> Result<(), AnyError> { - let notify = Notify::new(txs); - self.sender - .try_send(Message::NotifyTxs(notify)) - .map_err(|e| { - let (_m, e) = handle_try_send_error(e); - e.into() - }) + send_notify!(self, NotifyTxs, txs) } /// Return tx-pool information pub fn get_tx_pool_info(&self) -> Result { - let (responder, response) = oneshot::channel(); - let request = Request::call((), responder); - self.sender - .try_send(Message::GetTxPoolInfo(request)) - .map_err(|e| { - let (_m, e) = handle_try_send_error(e); - e - })?; - block_in_place(|| response.recv()) - .map_err(handle_recv_error) - .map_err(Into::into) + send_message!(self, GetTxPoolInfo, ()) } /// Return fresh proposals @@ -292,32 +263,12 @@ impl TxPoolController { &self, proposals: Vec, ) -> Result, AnyError> { - let (responder, response) = oneshot::channel(); - let request = Request::call(proposals, responder); - self.sender - .try_send(Message::FreshProposalsFilter(request)) - .map_err(|e| { - let (_m, e) = handle_try_send_error(e); - e - })?; - block_in_place(|| response.recv()) - .map_err(handle_recv_error) - .map_err(Into::into) + send_message!(self, FreshProposalsFilter, proposals) } /// Return tx_status for rpc (get_transaction verbosity = 1) pub fn get_tx_status(&self, hash: Byte32) -> Result { - let (responder, response) = oneshot::channel(); - let request = Request::call(hash, responder); - self.sender - .try_send(Message::GetTxStatus(request)) - .map_err(|e| { - let (_m, e) = handle_try_send_error(e); - e - })?; - block_in_place(|| response.recv()) - .map_err(handle_recv_error) - .map_err(Into::into) + send_message!(self, GetTxStatus, hash) } /// Return transaction_with_status for rpc (get_transaction verbosity = 2) @@ -325,17 +276,7 @@ impl TxPoolController { &self, hash: Byte32, ) -> Result { - let (responder, response) = oneshot::channel(); - let request = Request::call(hash, responder); - self.sender - .try_send(Message::GetTransactionWithStatus(request)) - .map_err(|e| { - let (_m, e) = handle_try_send_error(e); - e - })?; - block_in_place(|| response.recv()) - .map_err(handle_recv_error) - .map_err(Into::into) + send_message!(self, GetTransactionWithStatus, hash) } /// Return txs for network @@ -343,17 +284,7 @@ impl TxPoolController { &self, short_ids: HashSet, ) -> Result, AnyError> { - let (responder, response) = oneshot::channel(); - let request = Request::call(short_ids, responder); - self.sender - .try_send(Message::FetchTxs(request)) - .map_err(|e| { - let (_m, e) = handle_try_send_error(e); - e - })?; - block_in_place(|| response.recv()) - .map_err(handle_recv_error) - .map_err(Into::into) + send_message!(self, FetchTxs, short_ids) } /// Return txs with cycles @@ -361,62 +292,28 @@ impl TxPoolController { &self, short_ids: HashSet, ) -> Result { - let (responder, response) = oneshot::channel(); - let request = Request::call(short_ids, responder); - self.sender - .try_send(Message::FetchTxsWithCycles(request)) - .map_err(|e| { - let (_m, e) = handle_try_send_error(e); - e - })?; - block_in_place(|| response.recv()) - .map_err(handle_recv_error) - .map_err(Into::into) + send_message!(self, FetchTxsWithCycles, short_ids) } /// Clears the tx-pool, removing all txs, update snapshot. pub fn clear_pool(&self, new_snapshot: Arc) -> Result<(), AnyError> { - let (responder, response) = oneshot::channel(); - let request = Request::call(new_snapshot, responder); - self.sender - .try_send(Message::ClearPool(request)) - .map_err(|e| { - let (_m, e) = handle_try_send_error(e); - e - })?; - block_in_place(|| response.recv()) - .map_err(handle_recv_error) - .map_err(Into::into) + send_message!(self, ClearPool, new_snapshot) } /// TODO(doc): @zhangsoledad pub fn get_all_entry_info(&self) -> Result { - let (responder, response) = oneshot::channel(); - let request = Request::call((), responder); - self.sender - .try_send(Message::GetAllEntryInfo(request)) - .map_err(|e| { - let (_m, e) = handle_try_send_error(e); - e - })?; - block_in_place(|| response.recv()) - .map_err(handle_recv_error) - .map_err(Into::into) + send_message!(self, GetAllEntryInfo, ()) } /// TODO(doc): @zhangsoledad pub fn get_all_ids(&self) -> Result { - let (responder, response) = oneshot::channel(); - let request = Request::call((), responder); - self.sender - .try_send(Message::GetAllIds(request)) - .map_err(|e| { - let (_m, e) = handle_try_send_error(e); - e - })?; - block_in_place(|| response.recv()) - .map_err(handle_recv_error) - .map_err(Into::into) + send_message!(self, GetAllIds, ()) + } + + /// Saves tx pool into disk. + pub fn save_pool(&self) -> Result<(), AnyError> { + info!("Please be patient, tx-pool are saving data into disk ..."); + send_message!(self, SavePool, ()) } /// Sends suspend chunk process cmd @@ -435,22 +332,6 @@ impl TxPoolController { .map_err(Into::into) } - /// Saves tx pool into disk. - pub fn save_pool(&self) -> Result<(), AnyError> { - info!("Please be patient, tx-pool are saving data into disk ..."); - let (responder, response) = oneshot::channel(); - let request = Request::call((), responder); - self.sender - .try_send(Message::SavePool(request)) - .map_err(|e| { - let (_m, e) = handle_try_send_error(e); - e - })?; - block_in_place(|| response.recv()) - .map_err(handle_recv_error) - .map_err(Into::into) - } - /// Load persisted txs into pool, assume that all txs are sorted fn load_persisted_data(&self, txs: Vec) -> Result<(), AnyError> { if !txs.is_empty() { @@ -476,33 +357,13 @@ impl TxPoolController { /// Plug tx-pool entry to tx-pool, skip verification. only for test #[cfg(feature = "internal")] pub fn plug_entry(&self, entries: Vec, target: PlugTarget) -> Result<(), AnyError> { - let (responder, response) = oneshot::channel(); - let request = Request::call((entries, target), responder); - self.sender - .try_send(Message::PlugEntry(request)) - .map_err(|e| { - let (_m, e) = handle_try_send_error(e); - e - })?; - block_in_place(|| response.recv()) - .map_err(handle_recv_error) - .map_err(Into::into) + send_message!(self, PlugEntry, (entries, target)) } /// Package txs with specified bytes_limit. for test #[cfg(feature = "internal")] pub fn package_txs(&self, bytes_limit: Option) -> Result, AnyError> { - let (responder, response) = oneshot::channel(); - let request = Request::call(bytes_limit, responder); - self.sender - .try_send(Message::PackageTxs(request)) - .map_err(|e| { - let (_m, e) = handle_try_send_error(e); - e - })?; - block_in_place(|| response.recv()) - .map_err(handle_recv_error) - .map_err(Into::into) + send_message!(self, PackageTxs, bytes_limit) } }