From 8755f25f8f6e100ce6b00cddafd89bf4c27d2c79 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Tue, 25 Feb 2020 13:23:19 +0100 Subject: [PATCH] Remove request ID from the new protocol --- client/network/src/protocol/block_requests.rs | 13 +- .../src/protocol/light_client_handler.rs | 162 +++++------------- .../network/src/protocol/schema/api.v1.proto | 18 +- .../src/protocol/schema/light.v1.proto | 22 +-- 4 files changed, 68 insertions(+), 147 deletions(-) diff --git a/client/network/src/protocol/block_requests.rs b/client/network/src/protocol/block_requests.rs index ef970657c5ff9..20a99378b16c9 100644 --- a/client/network/src/protocol/block_requests.rs +++ b/client/network/src/protocol/block_requests.rs @@ -111,7 +111,7 @@ impl Config { let mut v = Vec::new(); v.extend_from_slice(b"/"); v.extend_from_slice(id.as_bytes()); - v.extend_from_slice(b"/sync/1"); + v.extend_from_slice(b"/sync/2"); self.protocol = v.into(); self } @@ -146,8 +146,7 @@ where , request: &api::v1::BlockRequest ) -> Result { - log::trace!("block request {} from peer {}: from block {:?} to block {:?}, max blocks {:?}", - request.id, + log::trace!("block request from peer {}: from block {:?} to block {:?}, max blocks {:?}", peer, request.from_block, request.to_block, @@ -242,7 +241,7 @@ where } } - Ok(api::v1::BlockResponse { id: request.id, blocks }) + Ok(api::v1::BlockResponse { blocks }) } } @@ -274,10 +273,10 @@ where fn inject_node_event(&mut self, peer: PeerId, Request(request, mut stream): Request) { match self.on_block_request(&peer, &request) { Ok(res) => { - log::trace!("enqueueing block response {} for peer {} with {} blocks", res.id, peer, res.blocks.len()); + log::trace!("enqueueing block response for peer {} with {} blocks", peer, res.blocks.len()); let mut data = Vec::with_capacity(res.encoded_len()); if let Err(e) = res.encode(&mut data) { - log::debug!("error encoding block response {} for peer {}: {}", res.id, peer, e) + log::debug!("error encoding block response for peer {}: {}", peer, e) } else { let future = async move { if let Err(e) = write_one(&mut stream, data).await { @@ -287,7 +286,7 @@ where self.outgoing.push(future.boxed()) } } - Err(e) => log::debug!("error handling block request {} from peer {}: {}", request.id, peer, e) + Err(e) => log::debug!("error handling block request from peer {}: {}", peer, e) } } diff --git a/client/network/src/protocol/light_client_handler.rs b/client/network/src/protocol/light_client_handler.rs index 77cf71408d6c4..b531f3515a6ec 100644 --- a/client/network/src/protocol/light_client_handler.rs +++ b/client/network/src/protocol/light_client_handler.rs @@ -127,7 +127,7 @@ impl Config { let mut v = Vec::new(); v.extend_from_slice(b"/"); v.extend_from_slice(id.as_bytes()); - v.extend_from_slice(b"/light/1"); + v.extend_from_slice(b"/light/2"); self.protocol = v.into(); self } @@ -350,7 +350,7 @@ where , response: api::v1::light::Response ) -> Result, Error> { - log::trace!("response {} from {}", response.id, peer); + log::trace!("response from {}", peer); use api::v1::light::response::Response; match response.response { Some(Response::RemoteCallResponse(response)) => @@ -419,12 +419,10 @@ where fn on_remote_call_request ( &mut self , peer: &PeerId - , request_id: u64 , request: &api::v1::light::RemoteCallRequest ) -> Result { - log::trace!("remote call request {} from {} ({} at {:?})", - request_id, + log::trace!("remote call request from {} ({} at {:?})", peer, request.method, request.block); @@ -434,8 +432,7 @@ where let proof = match self.chain.execution_proof(&block, &request.method, &request.data) { Ok((_, proof)) => proof, Err(e) => { - log::trace!("remote call request {} from {} ({} at {:?}) failed with: {}", - request_id, + log::trace!("remote call request from {} ({} at {:?}) failed with: {}", peer, request.method, request.block, @@ -449,13 +446,12 @@ where api::v1::light::response::Response::RemoteCallResponse(r) }; - Ok(api::v1::light::Response { id: request_id, response: Some(response) }) + Ok(api::v1::light::Response { response: Some(response) }) } fn on_remote_read_request ( &mut self , peer: &PeerId - , request_id: u64 , request: &api::v1::light::RemoteReadRequest ) -> Result { @@ -464,8 +460,7 @@ where return Err(Error::BadRequest("remote read request without keys")) } - log::trace!("remote read request {} from {} ({} at {:?})", - request_id, + log::trace!("remote read request from {} ({} at {:?})", peer, fmt_keys(request.keys.first(), request.keys.last()), request.block); @@ -475,8 +470,7 @@ where let proof = match self.chain.read_proof(&block, &request.keys) { Ok(proof) => proof, Err(error) => { - log::trace!("remote read request {} from {} ({} at {:?}) failed with: {}", - request_id, + log::trace!("remote read request from {} ({} at {:?}) failed with: {}", peer, fmt_keys(request.keys.first(), request.keys.last()), request.block, @@ -490,13 +484,12 @@ where api::v1::light::response::Response::RemoteReadResponse(r) }; - Ok(api::v1::light::Response { id: request_id, response: Some(response) }) + Ok(api::v1::light::Response { response: Some(response) }) } fn on_remote_read_child_request ( &mut self , peer: &PeerId - , request_id: u64 , request: &api::v1::light::RemoteReadChildRequest ) -> Result { @@ -505,8 +498,7 @@ where return Err(Error::BadRequest("remove read child request without keys")) } - log::trace!("remote read child request {} from {} ({} {} at {:?})", - request_id, + log::trace!("remote read child request from {} ({} {} at {:?})", peer, request.storage_key.to_hex::(), fmt_keys(request.keys.first(), request.keys.last()), @@ -519,8 +511,7 @@ where match self.chain.read_child_proof(&block, &request.storage_key, info, &request.keys) { Ok(proof) => proof, Err(error) => { - log::trace!("remote read child request {} from {} ({} {} at {:?}) failed with: {}", - request_id, + log::trace!("remote read child request from {} ({} {} at {:?}) failed with: {}", peer, request.storage_key.to_hex::(), fmt_keys(request.keys.first(), request.keys.last()), @@ -530,8 +521,7 @@ where } } } else { - log::trace!("remote read child request {} from {} ({} {} at {:?}) failed with: {}", - request_id, + log::trace!("remote read child request from {} ({} {} at {:?}) failed with: {}", peer, request.storage_key.to_hex::(), fmt_keys(request.keys.first(), request.keys.last()), @@ -546,25 +536,23 @@ where api::v1::light::response::Response::RemoteReadResponse(r) }; - Ok(api::v1::light::Response { id: request_id, response: Some(response) }) + Ok(api::v1::light::Response { response: Some(response) }) } fn on_remote_header_request ( &mut self , peer: &PeerId - , request_id: u64 , request: &api::v1::light::RemoteHeaderRequest ) -> Result { - log::trace!("remote header proof request {} from {} ({:?})", request_id, peer, request.block); + log::trace!("remote header proof request from {} ({:?})", peer, request.block); let block = Decode::decode(&mut request.block.as_ref())?; let (header, proof) = match self.chain.header_proof(block) { Ok((header, proof)) => (header.encode(), proof), Err(error) => { - log::trace!("remote header proof request {} from {} ({:?}) failed with: {}", - request_id, + log::trace!("remote header proof request from {} ({:?}) failed with: {}", peer, request.block, error); @@ -577,18 +565,16 @@ where api::v1::light::response::Response::RemoteHeaderResponse(r) }; - Ok(api::v1::light::Response { id: request_id, response: Some(response) }) + Ok(api::v1::light::Response { response: Some(response) }) } fn on_remote_changes_request ( &mut self , peer: &PeerId - , request_id: u64 , request: &api::v1::light::RemoteChangesRequest ) -> Result { - log::trace!("remote changes proof request {} from {} for key {} ({:?}..{:?})", - request_id, + log::trace!("remote changes proof request from {} for key {} ({:?}..{:?})", peer, if !request.storage_key.is_empty() { format!("{} : {}", request.storage_key.to_hex::(), request.key.to_hex::()) @@ -613,8 +599,7 @@ where let proof = match self.chain.key_changes_proof(first, last, min, max, storage_key.as_ref(), &key) { Ok(proof) => proof, Err(error) => { - log::trace!("remote changes proof request {} from {} for key {} ({:?}..{:?}) failed with: {}", - request_id, + log::trace!("remote changes proof request from {} for key {} ({:?}..{:?}) failed with: {}", peer, if let Some(sk) = storage_key { format!("{} : {}", sk.0.to_hex::(), key.0.to_hex::()) @@ -646,7 +631,7 @@ where api::v1::light::response::Response::RemoteChangesResponse(r) }; - Ok(api::v1::light::Response { id: request_id, response: Some(response) }) + Ok(api::v1::light::Response { response: Some(response) }) } } @@ -697,29 +682,29 @@ where match event { // An incoming request from remote has been received. Event::Request(request, mut stream) => { - log::trace!("incoming request {} from {}", peer, request.id); + log::trace!("incoming request from {}", peer); let result = match &request.request { Some(api::v1::light::request::Request::RemoteCallRequest(r)) => - self.on_remote_call_request(&peer, request.id, r), + self.on_remote_call_request(&peer, r), Some(api::v1::light::request::Request::RemoteReadRequest(r)) => - self.on_remote_read_request(&peer, request.id, r), + self.on_remote_read_request(&peer, r), Some(api::v1::light::request::Request::RemoteHeaderRequest(r)) => - self.on_remote_header_request(&peer, request.id, r), + self.on_remote_header_request(&peer, r), Some(api::v1::light::request::Request::RemoteReadChildRequest(r)) => - self.on_remote_read_child_request(&peer, request.id, r), + self.on_remote_read_child_request(&peer, r), Some(api::v1::light::request::Request::RemoteChangesRequest(r)) => - self.on_remote_changes_request(&peer, request.id, r), + self.on_remote_changes_request(&peer, r), None => { - log::debug!("ignoring request {} without request data from peer {}", request.id, peer); + log::debug!("ignoring request without request data from peer {}", peer); return } }; match result { Ok(response) => { - log::trace!("enqueueing response {} for peer {}", response.id, peer); + log::trace!("enqueueing response for peer {}", peer); let mut data = Vec::new(); if let Err(e) = response.encode(&mut data) { - log::debug!("error encoding response {} for peer {}: {}", response.id, peer, e) + log::debug!("error encoding response for peer {}: {}", peer, e) } else { let future = async move { if let Err(e) = write_one(&mut stream, data).await { @@ -733,16 +718,15 @@ where self.remove_peer(&peer); self.peerset.report_peer(peer, ReputationChange::new(-(1 << 12), "bad request")) } - Err(e) => log::debug!("error handling request {} from peer {}: {}", request.id, peer, e) + Err(e) => log::debug!("error handling request from peer {}: {}", peer, e) } } // A response to one of our own requests has been received. - Event::Response(response) => { - let id = response.id; + Event::Response(id, response) => { if let Some(request) = self.outstanding.remove(&id) { // We first just check if the response originates from the expected peer. if request.peer != peer { - log::debug!("was expecting response {} from {} instead of {}", id, request.peer, peer); + log::debug!("was expecting response from {} instead of {}", request.peer, peer); self.outstanding.insert(id, request); self.remove_peer(&peer); self.peerset.report_peer(peer, ReputationChange::new_fatal("response from unexpected peer")); @@ -836,16 +820,17 @@ where } }; if let Some(peer) = available_peer { - let id = self.next_request_id(); - let rq = serialize_request(id, &request.request); + let rq = serialize_request(&request.request); let mut buf = Vec::with_capacity(rq.encoded_len()); if let Err(e) = rq.encode(&mut buf) { - log::debug!("failed to serialize request {}: {}", id, e); + log::debug!("failed to serialize request: {}", e); send_reply(Err(ClientError::RemoteFetchFailed), request.request) } else { + let id = self.next_request_id(); log::trace!("sending request {} to peer {}", id, peer); let protocol = OutboundProtocol { request: buf, + request_id: id, max_data_size: self.config.max_data_size, protocol: self.config.protocol.clone(), }; @@ -918,7 +903,7 @@ fn retries(request: &Request) -> usize { rc.unwrap_or(0) } -fn serialize_request(id: u64, request: &Request) -> api::v1::light::Request { +fn serialize_request(request: &Request) -> api::v1::light::Request { let request = match request { Request::Header { request, .. } => { let r = api::v1::light::RemoteHeaderRequest { block: request.block.encode() }; @@ -962,7 +947,7 @@ fn serialize_request(id: u64, request: &Request) -> api::v1::light: } }; - api::v1::light::Request { id, request: Some(request) } + api::v1::light::Request { request: Some(request) } } fn send_reply(result: Result, ClientError>, request: Request) { @@ -1004,7 +989,7 @@ pub enum Event { /// Incoming request from remote and substream to use for the response. Request(api::v1::light::Request, T), /// Incoming response from remote. - Response(api::v1::light::Response), + Response(u64, api::v1::light::Response), } /// Substream upgrade protocol. @@ -1054,6 +1039,8 @@ where pub struct OutboundProtocol { /// The serialized protobuf request. request: Vec, + /// Local identifier for the request. Used to associate it with a response. + request_id: u64, /// The max. request length in bytes. max_data_size: usize, /// The protocol to use for upgrade negotiation. @@ -1082,7 +1069,7 @@ where write_one(&mut s, &self.request).await?; let vec = read_one(&mut s, self.max_data_size).await?; api::v1::light::Response::decode(&vec[..]) - .map(Event::Response) + .map(|r| Event::Response(self.request_id, r)) .map_err(|e| { ReadOneError::Io(io::Error::new(io::ErrorKind::Other, e)) }) @@ -1308,53 +1295,6 @@ mod tests { assert_eq!(0, behaviour.outstanding.len()); } - #[test] - fn disconnects_from_peer_on_response_with_wrong_id() { - let peer = PeerId::random(); - let pset = peerset(); - let mut behaviour = make_behaviour(true, pset.1, make_config()); - - behaviour.inject_connected(peer.clone(), empty_dialer()); - assert_eq!(1, behaviour.peers.len()); - - let chan = oneshot::channel(); - let request = fetcher::RemoteCallRequest { - block: Default::default(), - header: dummy_header(), - method: "test".into(), - call_data: vec![], - retry_count: Some(1), - }; - behaviour.request(Request::Call { request, sender: chan.0 }).unwrap(); - - assert_eq!(1, behaviour.pending_requests.len()); - assert_eq!(0, behaviour.outstanding.len()); - poll(&mut behaviour); // Make progress - assert_eq!(0, behaviour.pending_requests.len()); - assert_eq!(1, behaviour.outstanding.len()); - - // Construct response with bogus ID - let response = { - let r = api::v1::light::RemoteCallResponse { proof: empty_proof() }; - api::v1::light::Response { - id: 2365789, - response: Some(api::v1::light::response::Response::RemoteCallResponse(r)), - } - }; - - // Make sure our bogus ID is really not used. - assert!(!behaviour.outstanding.keys().any(|id| id == &response.id)); - - behaviour.inject_node_event(peer.clone(), Event::Response(response)); - assert!(behaviour.peers.is_empty()); - - poll(&mut behaviour); // More progress - - // The request should be back in the pending queue - assert_eq!(1, behaviour.pending_requests.len()); - assert_eq!(0, behaviour.outstanding.len()); - } - #[test] fn disconnects_from_peer_on_incorrect_response() { let peer = PeerId::random(); @@ -1386,12 +1326,11 @@ mod tests { let response = { let r = api::v1::light::RemoteCallResponse { proof: empty_proof() }; api::v1::light::Response { - id: request_id, response: Some(api::v1::light::response::Response::RemoteCallResponse(r)), } }; - behaviour.inject_node_event(peer.clone(), Event::Response(response)); + behaviour.inject_node_event(peer.clone(), Event::Response(request_id, response)); assert!(behaviour.peers.is_empty()); poll(&mut behaviour); // More progress @@ -1416,12 +1355,11 @@ mod tests { let response = { let r = api::v1::light::RemoteCallResponse { proof: empty_proof() }; api::v1::light::Response { - id: 2347895932, response: Some(api::v1::light::response::Response::RemoteCallResponse(r)), } }; - behaviour.inject_node_event(peer.clone(), Event::Response(response)); + behaviour.inject_node_event(peer.clone(), Event::Response(2347895932, response)); assert!(behaviour.peers.is_empty()); poll(&mut behaviour); @@ -1459,12 +1397,11 @@ mod tests { let response = { let r = api::v1::light::RemoteReadResponse { proof: empty_proof() }; // Not a RemoteCallResponse! api::v1::light::Response { - id: request_id, response: Some(api::v1::light::response::Response::RemoteReadResponse(r)), } }; - behaviour.inject_node_event(peer.clone(), Event::Response(response)); + behaviour.inject_node_event(peer.clone(), Event::Response(request_id, response)); assert!(behaviour.peers.is_empty()); poll(&mut behaviour); // More progress @@ -1513,11 +1450,10 @@ mod tests { let response = { let r = api::v1::light::RemoteCallResponse { proof: empty_proof() }; api::v1::light::Response { - id: request_id, response: Some(api::v1::light::response::Response::RemoteCallResponse(r)) } }; - behaviour.inject_node_event(responding_peer, Event::Response(response.clone())); + behaviour.inject_node_event(responding_peer, Event::Response(request_id, response.clone())); assert_matches!(poll(&mut behaviour), Poll::Ready(NetworkBehaviourAction::SendEvent { .. })); assert_matches!(chan.1.try_recv(), Ok(None)) } @@ -1527,11 +1463,10 @@ mod tests { let response = { let r = api::v1::light::RemoteCallResponse { proof: empty_proof() }; api::v1::light::Response { - id: request_id, response: Some(api::v1::light::response::Response::RemoteCallResponse(r)), } }; - behaviour.inject_node_event(responding_peer, Event::Response(response)); + behaviour.inject_node_event(responding_peer, Event::Response(request_id, response)); assert_matches!(poll(&mut behaviour), Poll::Pending); assert_matches!(chan.1.try_recv(), Ok(Some(Err(ClientError::RemoteFetchFailed)))) } @@ -1551,28 +1486,24 @@ mod tests { proof: empty_proof() }; api::v1::light::Response { - id: 1, response: Some(api::v1::light::response::Response::RemoteHeaderResponse(r)), } } Request::Read{..} => { let r = api::v1::light::RemoteReadResponse { proof: empty_proof() }; api::v1::light::Response { - id: 1, response: Some(api::v1::light::response::Response::RemoteReadResponse(r)), } } Request::ReadChild{..} => { let r = api::v1::light::RemoteReadResponse { proof: empty_proof() }; api::v1::light::Response { - id: 1, response: Some(api::v1::light::response::Response::RemoteReadResponse(r)), } } Request::Call{..} => { let r = api::v1::light::RemoteCallResponse { proof: empty_proof() }; api::v1::light::Response { - id: 1, response: Some(api::v1::light::response::Response::RemoteCallResponse(r)), } } @@ -1584,7 +1515,6 @@ mod tests { roots_proof: empty_proof() }; api::v1::light::Response { - id: 1, response: Some(api::v1::light::response::Response::RemoteChangesResponse(r)), } } @@ -1599,7 +1529,7 @@ mod tests { assert_eq!(1, behaviour.outstanding.len()); assert_eq!(1, *behaviour.outstanding.keys().next().unwrap()); - behaviour.inject_node_event(peer.clone(), Event::Response(response)); + behaviour.inject_node_event(peer.clone(), Event::Response(1, response)); poll(&mut behaviour); diff --git a/client/network/src/protocol/schema/api.v1.proto b/client/network/src/protocol/schema/api.v1.proto index 73128c53de466..ccbf49d666115 100644 --- a/client/network/src/protocol/schema/api.v1.proto +++ b/client/network/src/protocol/schema/api.v1.proto @@ -14,31 +14,27 @@ enum Direction { // Request block data from a peer. message BlockRequest { - // Unique request id. - uint64 id = 1; // Bits of block data to request. - uint32 fields = 2; + uint32 fields = 1; // Start from this block. oneof from_block { // Start with given hash. - bytes hash = 3; + bytes hash = 2; // Start with given block number. - bytes number = 4; + bytes number = 3; } // End at this block. An implementation defined maximum is used when unspecified. - bytes to_block = 5; // optional + bytes to_block = 4; // optional // Sequence direction. - Direction direction = 6; + Direction direction = 5; // Maximum number of blocks to return. An implementation defined maximum is used when unspecified. - uint32 max_blocks = 7; // optional + uint32 max_blocks = 6; // optional } // Response to `BlockRequest` message BlockResponse { - // Id of a request this response was made for. - uint64 id = 1; // Block data for the requested sequence. - repeated BlockData blocks = 2; + repeated BlockData blocks = 1; } // Block data sent in the response. diff --git a/client/network/src/protocol/schema/light.v1.proto b/client/network/src/protocol/schema/light.v1.proto index b9aee67b5ee24..1c98d49730cf9 100644 --- a/client/network/src/protocol/schema/light.v1.proto +++ b/client/network/src/protocol/schema/light.v1.proto @@ -14,26 +14,22 @@ message Pair { // Enumerate all possible light client request messages. message Request { - // Unique request id. - uint64 id = 1; oneof request { - RemoteCallRequest remote_call_request = 2; - RemoteReadRequest remote_read_request = 3; - RemoteHeaderRequest remote_header_request = 4; - RemoteReadChildRequest remote_read_child_request = 5; - RemoteChangesRequest remote_changes_request = 6; + RemoteCallRequest remote_call_request = 1; + RemoteReadRequest remote_read_request = 2; + RemoteHeaderRequest remote_header_request = 3; + RemoteReadChildRequest remote_read_child_request = 4; + RemoteChangesRequest remote_changes_request = 5; } } // Enumerate all possible light client response messages. message Response { - /// Id of a request this response was made for. - uint64 id = 1; oneof response { - RemoteCallResponse remote_call_response = 2; - RemoteReadResponse remote_read_response = 3; - RemoteHeaderResponse remote_header_response = 4; - RemoteChangesResponse remote_changes_response = 6; + RemoteCallResponse remote_call_response = 1; + RemoteReadResponse remote_read_response = 2; + RemoteHeaderResponse remote_header_response = 3; + RemoteChangesResponse remote_changes_response = 4; } }