diff --git a/src/error.rs b/src/error.rs index 755306b8..c6203219 100644 --- a/src/error.rs +++ b/src/error.rs @@ -185,6 +185,6 @@ mod tests { #[test] fn able_to_box_errors() { - let _: Box = Box::new(ErrorKind::CouldNotReadHeader("".into())); + let _: Box = Box::new(ErrorKind::CouldNotReadHeader("".into())); } } diff --git a/src/infrastructure/arranging/ordering.rs b/src/infrastructure/arranging/ordering.rs index 371a9714..779dc72b 100644 --- a/src/infrastructure/arranging/ordering.rs +++ b/src/infrastructure/arranging/ordering.rs @@ -159,7 +159,7 @@ impl OrderingStream { pub fn with_capacity(size: usize, stream_id: u8) -> OrderingStream { OrderingStream { storage: HashMap::with_capacity(size), - expected_index: 1, + expected_index: 0, _stream_id: stream_id, unique_item_identifier: 0, } @@ -179,8 +179,9 @@ impl OrderingStream { /// Returns the unique identifier which should be used for ordering on the other stream e.g. the remote endpoint. pub fn new_item_identifier(&mut self) -> SequenceNumber { + let id = self.unique_item_identifier; self.unique_item_identifier = self.unique_item_identifier.wrapping_add(1); - self.unique_item_identifier + id } /// Returns an iterator of stored items. @@ -216,6 +217,14 @@ impl OrderingStream { } } +fn is_u16_within_half_window_from_start(start: u16, incoming: u16) -> bool { + // Check (with wrapping) if the incoming value lies within the next u16::max_value()/2 from + // start. + (start < u16::max_value() / 2 && incoming > start && incoming < start + u16::max_value() / 2) + || (start > u16::max_value() / 2 + && (incoming > start || incoming < start.wrapping_add(u16::max_value() / 2))) +} + impl Arranging for OrderingStream { type ArrangingItem = T; @@ -234,7 +243,7 @@ impl Arranging for OrderingStream { /// This can only happen in cases where we have a duplicated package. Again we don't give anything back. /// /// # Remark - /// - When we receive an item there is a possibility that a gab is filled and one or more items will could be returned. + /// - When we receive an item there is a possibility that a gap is filled and one or more items will could be returned. /// You should use the `iter_mut` instead for reading the items in order. /// However the item given to `arrange` will be returned directly when it matches the `expected_index`. fn arrange( @@ -243,9 +252,9 @@ impl Arranging for OrderingStream { item: Self::ArrangingItem, ) -> Option { if incoming_offset == self.expected_index { - self.expected_index += 1; + self.expected_index = self.expected_index.wrapping_add(1); Some(item) - } else if incoming_offset > self.expected_index { + } else if is_u16_within_half_window_from_start(self.expected_index, incoming_offset) { self.storage.insert(incoming_offset, item); None } else { @@ -283,7 +292,7 @@ impl<'a, T> Iterator for IterMut<'a, T> { match self.items.remove(&self.expected_index) { None => None, Some(e) => { - *self.expected_index += 1; + *self.expected_index = self.expected_index.wrapping_add(1); Some(e) } } @@ -314,7 +323,7 @@ mod tests { let mut system: OrderingSystem = OrderingSystem::new(); let stream = system.get_or_create_stream(1); - assert_eq!(stream.expected_index(), 1); + assert_eq!(stream.expected_index(), 0); assert_eq!(stream.stream_id(), 1); } @@ -328,6 +337,25 @@ mod tests { assert_eq!(stream.stream_id(), 1); } + #[test] + fn packet_wraps_around_offset() { + let mut system: OrderingSystem<()> = OrderingSystem::new(); + + let stream = system.get_or_create_stream(1); + for idx in 0..=65500 { + assert![stream.arrange(idx, ()).is_some()]; + } + assert![stream.arrange(123, ()).is_none()]; + for idx in 65501..=65535u16 { + assert![stream.arrange(idx, ()).is_some()]; + } + assert![stream.arrange(0, ()).is_some()]; + for idx in 1..123 { + assert![stream.arrange(idx, ()).is_some()]; + } + assert![stream.iter_mut().next().is_some()]; + } + #[test] fn can_iterate() { let mut system: OrderingSystem = OrderingSystem::new(); @@ -335,21 +363,21 @@ mod tests { system.get_or_create_stream(1); let stream = system.get_or_create_stream(1); + let stub_packet0 = Packet::new(0, 1); let stub_packet1 = Packet::new(1, 1); let stub_packet2 = Packet::new(2, 1); let stub_packet3 = Packet::new(3, 1); let stub_packet4 = Packet::new(4, 1); - let stub_packet5 = Packet::new(5, 1); { assert_eq!( - stream.arrange(1, stub_packet1.clone()).unwrap(), - stub_packet1 + stream.arrange(0, stub_packet0.clone()).unwrap(), + stub_packet0 ); - assert![stream.arrange(4, stub_packet4.clone()).is_none()]; - assert![stream.arrange(5, stub_packet5.clone()).is_none()]; assert![stream.arrange(3, stub_packet3.clone()).is_none()]; + assert![stream.arrange(4, stub_packet4.clone()).is_none()]; + assert![stream.arrange(2, stub_packet2.clone()).is_none()]; } { let mut iterator = stream.iter_mut(); @@ -359,17 +387,17 @@ mod tests { } { assert_eq!( - stream.arrange(2, stub_packet2.clone()).unwrap(), - stub_packet2 + stream.arrange(1, stub_packet1.clone()).unwrap(), + stub_packet1 ); } { // since we processed packet 2 by now we should be able to iterate and get back: 3,4,5; let mut iterator = stream.iter_mut(); + assert_eq!(iterator.next().unwrap(), stub_packet2); assert_eq!(iterator.next().unwrap(), stub_packet3); assert_eq!(iterator.next().unwrap(), stub_packet4); - assert_eq!(iterator.next().unwrap(), stub_packet5); } } @@ -428,26 +456,26 @@ mod tests { #[test] fn expect_right_order() { // we order on stream 1 - assert_order!([1, 3, 5, 4, 2], [1, 2, 3, 4, 5], 1); - assert_order!([1, 5, 4, 3, 2], [1, 2, 3, 4, 5], 1); - assert_order!([5, 3, 4, 2, 1], [1, 2, 3, 4, 5], 1); - assert_order!([4, 3, 2, 1, 5], [1, 2, 3, 4, 5], 1); - assert_order!([2, 1, 4, 3, 5], [1, 2, 3, 4, 5], 1); - assert_order!([5, 2, 1, 4, 3], [1, 2, 3, 4, 5], 1); - assert_order!([3, 2, 4, 1, 5], [1, 2, 3, 4, 5], 1); - assert_order!([2, 1, 4, 3, 5], [1, 2, 3, 4, 5], 1); + assert_order!([0, 2, 4, 3, 1], [0, 1, 2, 3, 4], 1); + assert_order!([0, 4, 3, 2, 1], [0, 1, 2, 3, 4], 1); + assert_order!([4, 2, 3, 1, 0], [0, 1, 2, 3, 4], 1); + assert_order!([3, 2, 1, 0, 4], [0, 1, 2, 3, 4], 1); + assert_order!([1, 0, 3, 2, 4], [0, 1, 2, 3, 4], 1); + assert_order!([4, 1, 0, 3, 2], [0, 1, 2, 3, 4], 1); + assert_order!([2, 1, 3, 0, 4], [0, 1, 2, 3, 4], 1); + assert_order!([1, 0, 3, 2, 4], [0, 1, 2, 3, 4], 1); } #[test] fn order_on_multiple_streams() { // we order on streams [1...8] - assert_order!([1, 3, 5, 4, 2], [1, 2, 3, 4, 5], 1); - assert_order!([1, 5, 4, 3, 2], [1, 2, 3, 4, 5], 2); - assert_order!([5, 3, 4, 2, 1], [1, 2, 3, 4, 5], 3); - assert_order!([4, 3, 2, 1, 5], [1, 2, 3, 4, 5], 4); - assert_order!([2, 1, 4, 3, 5], [1, 2, 3, 4, 5], 5); - assert_order!([5, 2, 1, 4, 3], [1, 2, 3, 4, 5], 6); - assert_order!([3, 2, 4, 1, 5], [1, 2, 3, 4, 5], 7); - assert_order!([2, 1, 4, 3, 5], [1, 2, 3, 4, 5], 8); + assert_order!([0, 2, 4, 3, 1], [0, 1, 2, 3, 4], 1); + assert_order!([0, 4, 3, 2, 1], [0, 1, 2, 3, 4], 2); + assert_order!([4, 2, 3, 1, 0], [0, 1, 2, 3, 4], 3); + assert_order!([3, 2, 1, 0, 4], [0, 1, 2, 3, 4], 4); + assert_order!([1, 0, 3, 2, 4], [0, 1, 2, 3, 4], 5); + assert_order!([4, 1, 0, 3, 2], [0, 1, 2, 3, 4], 6); + assert_order!([2, 1, 3, 0, 4], [0, 1, 2, 3, 4], 7); + assert_order!([1, 0, 3, 2, 4], [0, 1, 2, 3, 4], 8); } } diff --git a/src/net/socket.rs b/src/net/socket.rs index c80a292d..80ceaf6d 100644 --- a/src/net/socket.rs +++ b/src/net/socket.rs @@ -593,7 +593,7 @@ mod tests { server.forget_all_incoming_packets(); // Send a packet that the server receives - for id in 0..36 { + for id in 0..35 { client .send(create_ordered_packet(id, "127.0.0.1:12333")) .unwrap(); @@ -1057,4 +1057,49 @@ mod tests { Socket::bind(format!("127.0.0.1:{}", port).parse::().unwrap()).unwrap(); assert_eq!(port, socket.local_addr().unwrap().port()); } + + #[test] + fn ordered_16_bit_overflow() { + let mut cfg = Config::default(); + + let mut client = Socket::bind_any_with_config(cfg.clone()).unwrap(); + let client_addr = client.local_addr().unwrap(); + + cfg.blocking_mode = false; + let mut server = Socket::bind_any_with_config(cfg).unwrap(); + let server_addr = server.local_addr().unwrap(); + + let time = Instant::now(); + + let mut last_payload = String::new(); + + for idx in 0..100_000u64 { + client + .send(Packet::reliable_ordered( + server_addr, + idx.to_string().as_bytes().to_vec(), + None, + )) + .unwrap(); + + client.manual_poll(time); + + while let Some(_) = client.recv() {} + server + .send(Packet::reliable_ordered(client_addr, vec![123], None)) + .unwrap(); + server.manual_poll(time); + + while let Some(msg) = server.recv() { + match msg { + SocketEvent::Packet(pkt) => { + last_payload = std::str::from_utf8(pkt.payload()).unwrap().to_string(); + } + _ => {} + } + } + } + + assert_eq!["99999", last_payload]; + } } diff --git a/src/net/virtual_connection.rs b/src/net/virtual_connection.rs index 325547a2..94bda930 100644 --- a/src/net/virtual_connection.rs +++ b/src/net/virtual_connection.rs @@ -678,7 +678,7 @@ mod tests { PAYLOAD.to_vec(), Some(1), ))), - 1, + 0, ); assert_incoming_with_order( @@ -686,7 +686,7 @@ mod tests { OrderingGuarantee::Ordered(Some(1)), &mut connection, Err(TryRecvError::Empty), - 3, + 2, ); assert_incoming_with_order( @@ -694,7 +694,7 @@ mod tests { OrderingGuarantee::Ordered(Some(1)), &mut connection, Err(TryRecvError::Empty), - 4, + 3, ); assert_incoming_with_order( @@ -706,7 +706,7 @@ mod tests { PAYLOAD.to_vec(), Some(1), ))), - 2, + 1, ); } @@ -750,7 +750,7 @@ mod tests { PAYLOAD.to_vec(), Some(1), ))), - 1, + 0, ); }