Skip to content

Commit

Permalink
fix: track actually requested transactions
Browse files Browse the repository at this point in the history
  • Loading branch information
mattsse committed Nov 18, 2023
1 parent e34aec2 commit d08480a
Showing 1 changed file with 20 additions and 26 deletions.
46 changes: 20 additions & 26 deletions crates/net/network/src/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -760,7 +760,7 @@ where
}

fn report_peer(&self, peer_id: PeerId, kind: ReputationChangeKind) {
trace!(target: "net::tx", ?peer_id, ?kind);
trace!(target: "net::tx", ?peer_id, ?kind, "reporting reputation change");
self.network.reputation_change(peer_id, kind);
self.metrics.reported_bad_transactions.increment(1);
}
Expand Down Expand Up @@ -831,11 +831,10 @@ where
while let Poll::Ready(fetch_event) = this.transaction_fetcher.poll(cx) {
match fetch_event {
FetchEvent::TransactionsFetched { peer_id, transactions } => {
if let Some(txns) = transactions {
this.import_transactions(peer_id, txns, TransactionSource::Response);
}
this.import_transactions(peer_id, transactions, TransactionSource::Response);
}
FetchEvent::FetchError { peer_id, error } => {
trace!(target: "net::tx", ?peer_id, ?error, "requesting transactions from peer failed");
this.on_request_error(peer_id, error);
}
}
Expand All @@ -857,7 +856,7 @@ where
// known that this transaction is bad. (e.g. consensus
// rules)
if err.is_bad_transaction() && !this.network.is_syncing() {
trace!(target: "net::tx", ?err, "Bad transaction import");
trace!(target: "net::tx", ?err, "bad pool transaction import");
this.on_bad_import(err.hash);
continue
}
Expand Down Expand Up @@ -1008,6 +1007,8 @@ impl TransactionSource {
/// An inflight request for `PooledTransactions` from a peer
struct GetPooledTxRequest {
peer_id: PeerId,
/// Transaction hashes that were requested, for cleanup purposes
requested_hashes: Vec<TxHash>,
response: oneshot::Receiver<RequestResult<PooledTransactions>>,
}

Expand All @@ -1026,11 +1027,13 @@ struct GetPooledTxRequestFut {
}

impl GetPooledTxRequestFut {
#[inline]
fn new(
peer_id: PeerId,
requested_hashes: Vec<TxHash>,
response: oneshot::Receiver<RequestResult<PooledTransactions>>,
) -> Self {
Self { inner: Some(GetPooledTxRequest { peer_id, response }) }
Self { inner: Some(GetPooledTxRequest { peer_id, requested_hashes, response }) }
}
}

Expand All @@ -1040,20 +1043,11 @@ impl Future for GetPooledTxRequestFut {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut req = self.as_mut().project().inner.take().expect("polled after completion");
match req.response.poll_unpin(cx) {
Poll::Ready(result) => {
let request_hashes: Vec<TxHash> = match &result {
Ok(Ok(pooled_txs)) => {
pooled_txs.0.iter().map(|tx_elem| *tx_elem.hash()).collect()
}
_ => Vec::new(),
};

Poll::Ready(GetPooledTxResponse {
peer_id: req.peer_id,
requested_hashes: request_hashes,
result,
})
}
Poll::Ready(result) => Poll::Ready(GetPooledTxResponse {
peer_id: req.peer_id,
requested_hashes: req.requested_hashes,
result,
}),
Poll::Pending => {
self.project().inner.set(Some(req));
Poll::Pending
Expand Down Expand Up @@ -1108,16 +1102,16 @@ impl TransactionFetcher {
self.inflight_requests.poll_next_unpin(cx)
{
return match result {
Ok(Ok(txs)) => {
Ok(Ok(transactions)) => {
// clear received hashes
self.remove_inflight_hashes(txs.hashes());
self.remove_inflight_hashes(transactions.hashes());

// TODO: re-request missing hashes, for now clear all of them
self.remove_inflight_hashes(requested_hashes.iter());

Poll::Ready(FetchEvent::TransactionsFetched {
peer_id,
transactions: Some(txs.0),
transactions: transactions.0,
})
}
Ok(Err(req_err)) => {
Expand Down Expand Up @@ -1189,7 +1183,7 @@ impl TransactionFetcher {

let (response, rx) = oneshot::channel();
let req: PeerRequest = PeerRequest::GetPooledTransactions {
request: GetPooledTransactions(announced_hashes),
request: GetPooledTransactions(announced_hashes.clone()),
response,
};

Expand All @@ -1210,7 +1204,7 @@ impl TransactionFetcher {
return false
} else {
//create a new request for it, from that peer
self.inflight_requests.push(GetPooledTxRequestFut::new(peer_id, rx))
self.inflight_requests.push(GetPooledTxRequestFut::new(peer_id, announced_hashes, rx))
}

true
Expand All @@ -1225,7 +1219,7 @@ enum FetchEvent {
/// The ID of the peer from which transactions were fetched.
peer_id: PeerId,
/// The transactions that were fetched, if available.
transactions: Option<Vec<PooledTransactionsElement>>,
transactions: Vec<PooledTransactionsElement>,
},
/// Triggered when there is an error in fetching transactions.
FetchError {
Expand Down

0 comments on commit d08480a

Please sign in to comment.