Skip to content

Commit

Permalink
WIP: call ClientRequest.tx.send even if there is an error
Browse files Browse the repository at this point in the history
  • Loading branch information
teor2345 committed Dec 16, 2020
1 parent da1e59e commit 4a3d53f
Showing 1 changed file with 21 additions and 11 deletions.
32 changes: 21 additions & 11 deletions zebra-network/src/peer/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ where
.peer_tx
.send(Message::GetAddr)
.await
.map_err(|e| e.into())
.map_err(|e| (e.into(), Some(tx)))
.map(|()| AwaitingResponse {
handler: Handler::Peers,
tx,
Expand All @@ -526,7 +526,7 @@ where
.peer_tx
.send(Message::Ping(nonce))
.await
.map_err(|e| e.into())
.map_err(|e| (e.into(), Some(tx)))
.map(|()| AwaitingResponse {
handler: Handler::Ping(nonce),
tx,
Expand All @@ -538,7 +538,7 @@ where
hashes.iter().map(|h| (*h).into()).collect(),
))
.await
.map_err(|e| e.into())
.map_err(|e| (e.into(), Some(tx)))
.map(|()| AwaitingResponse {
handler: Handler::BlocksByHash {
blocks: Vec::with_capacity(hashes.len()),
Expand All @@ -553,7 +553,7 @@ where
hashes.iter().map(|h| (*h).into()).collect(),
))
.await
.map_err(|e| e.into())
.map_err(|e| (e.into(), Some(tx)))
.map(|()| AwaitingResponse {
handler: Handler::TransactionsByHash {
transactions: Vec::with_capacity(hashes.len()),
Expand All @@ -566,7 +566,7 @@ where
.peer_tx
.send(Message::GetBlocks { known_blocks, stop })
.await
.map_err(|e| e.into())
.map_err(|e| (e.into(), Some(tx)))
.map(|()| AwaitingResponse {
handler: Handler::FindBlocks,
tx,
Expand All @@ -576,7 +576,7 @@ where
.peer_tx
.send(Message::GetHeaders { known_blocks, stop })
.await
.map_err(|e| e.into())
.map_err(|e| (e.into(), Some(tx)))
.map(|()| AwaitingResponse {
handler: Handler::FindHeaders,
tx,
Expand All @@ -586,7 +586,10 @@ where
.peer_tx
.send(Message::Mempool)
.await
.map_err(|e| e.into())
// TODO: work out how to re-use `tx`
// `tx` is only used once, in either the error or the result,
// but rustc doesn't know that
.map_err(|e| (e.into(), Some(tx)))
.map(|()| AwaitingResponse {
handler: Handler::MempoolTransactions,
tx,
Expand All @@ -599,31 +602,38 @@ where
self.peer_tx
.send(Message::Tx(transaction))
.await
.map_err(|e| e.into())
.map_err(|e| (e.into(), None))
.map(|()| AwaitingRequest)
}
(AwaitingRequest, AdvertiseTransactions(hashes)) => {
let _ = tx.send(Ok(Response::Nil));
self.peer_tx
.send(Message::Inv(hashes.iter().map(|h| (*h).into()).collect()))
.await
.map_err(|e| e.into())
.map_err(|e| (e.into(), None))
.map(|()| AwaitingRequest)
}
(AwaitingRequest, AdvertiseBlock(hash)) => {
let _ = tx.send(Ok(Response::Nil));
self.peer_tx
.send(Message::Inv(vec![hash.into()]))
.await
.map_err(|e| e.into())
.map_err(|e| (e.into(), None))
.map(|()| AwaitingRequest)
}
} {
Ok(new_state) => {
self.state = new_state;
self.request_timer = Some(sleep(constants::REQUEST_TIMEOUT));
}
Err(e) => self.fail_with(e),
Err((e, Some(tx))) => {
// TODO: work out how to re-use `e`
// copy and clone don't work here, because some of the source
// errors don't implement copy or clone
tx.send(Err(SharedPeerError::from(e)));
self.fail_with(e);
}
Err((e, None)) => self.fail_with(e),
}
}

Expand Down

0 comments on commit 4a3d53f

Please sign in to comment.