Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix ordering arranging handler for >65536 packets #229

Merged
merged 5 commits into from
Sep 1, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,6 @@ mod tests {

#[test]
fn able_to_box_errors() {
let _: Box<Error> = Box::new(ErrorKind::CouldNotReadHeader("".into()));
let _: Box<dyn Error> = Box::new(ErrorKind::CouldNotReadHeader("".into()));
}
}
90 changes: 59 additions & 31 deletions src/infrastructure/arranging/ordering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ impl<T> OrderingStream<T> {
pub fn with_capacity(size: usize, stream_id: u8) -> OrderingStream<T> {
OrderingStream {
storage: HashMap::with_capacity(size),
expected_index: 1,
expected_index: 0,
_stream_id: stream_id,
unique_item_identifier: 0,
}
Expand All @@ -179,8 +179,9 @@ impl<T> OrderingStream<T> {

/// Returns the unique identifier which should be used for ordering on the other stream e.g. the remote endpoint.
pub fn new_item_identifier(&mut self) -> SequenceNumber {
let id = self.unique_item_identifier;
self.unique_item_identifier = self.unique_item_identifier.wrapping_add(1);
self.unique_item_identifier
id
}

/// Returns an iterator of stored items.
Expand Down Expand Up @@ -216,6 +217,14 @@ impl<T> OrderingStream<T> {
}
}

fn is_u16_within_half_window_from_start(start: u16, incoming: u16) -> bool {
// Check (with wrapping) if the incoming value lies within the next u16::max_value()/2 from
// start.
(start < u16::max_value() / 2 && incoming > start && incoming < start + u16::max_value() / 2)
|| (start > u16::max_value() / 2
&& (incoming > start || incoming < start.wrapping_add(u16::max_value() / 2)))
}

impl<T> Arranging for OrderingStream<T> {
type ArrangingItem = T;

Expand All @@ -234,7 +243,7 @@ impl<T> Arranging for OrderingStream<T> {
/// This can only happen in cases where we have a duplicated package. Again we don't give anything back.
///
/// # Remark
/// - When we receive an item there is a possibility that a gab is filled and one or more items will could be returned.
/// - When we receive an item there is a possibility that a gap is filled and one or more items will could be returned.
/// You should use the `iter_mut` instead for reading the items in order.
/// However the item given to `arrange` will be returned directly when it matches the `expected_index`.
fn arrange(
Expand All @@ -243,9 +252,9 @@ impl<T> Arranging for OrderingStream<T> {
item: Self::ArrangingItem,
) -> Option<Self::ArrangingItem> {
if incoming_offset == self.expected_index {
self.expected_index += 1;
self.expected_index = self.expected_index.wrapping_add(1);
Some(item)
} else if incoming_offset > self.expected_index {
} else if is_u16_within_half_window_from_start(self.expected_index, incoming_offset) {
self.storage.insert(incoming_offset, item);
None
} else {
Expand Down Expand Up @@ -283,7 +292,7 @@ impl<'a, T> Iterator for IterMut<'a, T> {
match self.items.remove(&self.expected_index) {
None => None,
Some(e) => {
*self.expected_index += 1;
*self.expected_index = self.expected_index.wrapping_add(1);
Some(e)
}
}
Expand Down Expand Up @@ -314,7 +323,7 @@ mod tests {
let mut system: OrderingSystem<Packet> = OrderingSystem::new();
let stream = system.get_or_create_stream(1);

assert_eq!(stream.expected_index(), 1);
assert_eq!(stream.expected_index(), 0);
assert_eq!(stream.stream_id(), 1);
}

Expand All @@ -328,28 +337,47 @@ mod tests {
assert_eq!(stream.stream_id(), 1);
}

#[test]
fn packet_wraps_around_offset() {
let mut system: OrderingSystem<()> = OrderingSystem::new();

let stream = system.get_or_create_stream(1);
for idx in 0..=65500 {
assert![stream.arrange(idx, ()).is_some()];
}
assert![stream.arrange(123, ()).is_none()];
for idx in 65501..=65535u16 {
assert![stream.arrange(idx, ()).is_some()];
}
assert![stream.arrange(0, ()).is_some()];
for idx in 1..123 {
assert![stream.arrange(idx, ()).is_some()];
}
assert![stream.iter_mut().next().is_some()];
}

#[test]
fn can_iterate() {
let mut system: OrderingSystem<Packet> = OrderingSystem::new();

system.get_or_create_stream(1);
let stream = system.get_or_create_stream(1);

let stub_packet0 = Packet::new(0, 1);
let stub_packet1 = Packet::new(1, 1);
let stub_packet2 = Packet::new(2, 1);
let stub_packet3 = Packet::new(3, 1);
let stub_packet4 = Packet::new(4, 1);
let stub_packet5 = Packet::new(5, 1);

{
assert_eq!(
stream.arrange(1, stub_packet1.clone()).unwrap(),
stub_packet1
stream.arrange(0, stub_packet0.clone()).unwrap(),
stub_packet0
);

assert![stream.arrange(4, stub_packet4.clone()).is_none()];
assert![stream.arrange(5, stub_packet5.clone()).is_none()];
assert![stream.arrange(3, stub_packet3.clone()).is_none()];
assert![stream.arrange(4, stub_packet4.clone()).is_none()];
assert![stream.arrange(2, stub_packet2.clone()).is_none()];
}
{
let mut iterator = stream.iter_mut();
Expand All @@ -359,17 +387,17 @@ mod tests {
}
{
assert_eq!(
stream.arrange(2, stub_packet2.clone()).unwrap(),
stub_packet2
stream.arrange(1, stub_packet1.clone()).unwrap(),
stub_packet1
);
}
{
// since we processed packet 2 by now we should be able to iterate and get back: 3,4,5;
let mut iterator = stream.iter_mut();

assert_eq!(iterator.next().unwrap(), stub_packet2);
assert_eq!(iterator.next().unwrap(), stub_packet3);
assert_eq!(iterator.next().unwrap(), stub_packet4);
assert_eq!(iterator.next().unwrap(), stub_packet5);
}
}

Expand Down Expand Up @@ -428,26 +456,26 @@ mod tests {
#[test]
fn expect_right_order() {
// we order on stream 1
assert_order!([1, 3, 5, 4, 2], [1, 2, 3, 4, 5], 1);
assert_order!([1, 5, 4, 3, 2], [1, 2, 3, 4, 5], 1);
assert_order!([5, 3, 4, 2, 1], [1, 2, 3, 4, 5], 1);
assert_order!([4, 3, 2, 1, 5], [1, 2, 3, 4, 5], 1);
assert_order!([2, 1, 4, 3, 5], [1, 2, 3, 4, 5], 1);
assert_order!([5, 2, 1, 4, 3], [1, 2, 3, 4, 5], 1);
assert_order!([3, 2, 4, 1, 5], [1, 2, 3, 4, 5], 1);
assert_order!([2, 1, 4, 3, 5], [1, 2, 3, 4, 5], 1);
assert_order!([0, 2, 4, 3, 1], [0, 1, 2, 3, 4], 1);
assert_order!([0, 4, 3, 2, 1], [0, 1, 2, 3, 4], 1);
assert_order!([4, 2, 3, 1, 0], [0, 1, 2, 3, 4], 1);
assert_order!([3, 2, 1, 0, 4], [0, 1, 2, 3, 4], 1);
assert_order!([1, 0, 3, 2, 4], [0, 1, 2, 3, 4], 1);
assert_order!([4, 1, 0, 3, 2], [0, 1, 2, 3, 4], 1);
assert_order!([2, 1, 3, 0, 4], [0, 1, 2, 3, 4], 1);
assert_order!([1, 0, 3, 2, 4], [0, 1, 2, 3, 4], 1);
}

#[test]
fn order_on_multiple_streams() {
// we order on streams [1...8]
assert_order!([1, 3, 5, 4, 2], [1, 2, 3, 4, 5], 1);
assert_order!([1, 5, 4, 3, 2], [1, 2, 3, 4, 5], 2);
assert_order!([5, 3, 4, 2, 1], [1, 2, 3, 4, 5], 3);
assert_order!([4, 3, 2, 1, 5], [1, 2, 3, 4, 5], 4);
assert_order!([2, 1, 4, 3, 5], [1, 2, 3, 4, 5], 5);
assert_order!([5, 2, 1, 4, 3], [1, 2, 3, 4, 5], 6);
assert_order!([3, 2, 4, 1, 5], [1, 2, 3, 4, 5], 7);
assert_order!([2, 1, 4, 3, 5], [1, 2, 3, 4, 5], 8);
assert_order!([0, 2, 4, 3, 1], [0, 1, 2, 3, 4], 1);
assert_order!([0, 4, 3, 2, 1], [0, 1, 2, 3, 4], 2);
assert_order!([4, 2, 3, 1, 0], [0, 1, 2, 3, 4], 3);
assert_order!([3, 2, 1, 0, 4], [0, 1, 2, 3, 4], 4);
assert_order!([1, 0, 3, 2, 4], [0, 1, 2, 3, 4], 5);
assert_order!([4, 1, 0, 3, 2], [0, 1, 2, 3, 4], 6);
assert_order!([2, 1, 3, 0, 4], [0, 1, 2, 3, 4], 7);
assert_order!([1, 0, 3, 2, 4], [0, 1, 2, 3, 4], 8);
}
}
47 changes: 46 additions & 1 deletion src/net/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ mod tests {
server.forget_all_incoming_packets();

// Send a packet that the server receives
for id in 0..36 {
for id in 0..35 {
client
.send(create_ordered_packet(id, "127.0.0.1:12333"))
.unwrap();
Expand Down Expand Up @@ -1057,4 +1057,49 @@ mod tests {
Socket::bind(format!("127.0.0.1:{}", port).parse::<SocketAddr>().unwrap()).unwrap();
assert_eq!(port, socket.local_addr().unwrap().port());
}

#[test]
fn ordered_16_bit_overflow() {
let mut cfg = Config::default();

let mut client = Socket::bind_any_with_config(cfg.clone()).unwrap();
let client_addr = client.local_addr().unwrap();

cfg.blocking_mode = false;
let mut server = Socket::bind_any_with_config(cfg).unwrap();
let server_addr = server.local_addr().unwrap();

let time = Instant::now();

let mut last_payload = String::new();

for idx in 0..100_000u64 {
client
.send(Packet::reliable_ordered(
server_addr,
idx.to_string().as_bytes().to_vec(),
None,
))
.unwrap();

client.manual_poll(time);

while let Some(_) = client.recv() {}
server
.send(Packet::reliable_ordered(client_addr, vec![123], None))
.unwrap();
server.manual_poll(time);

while let Some(msg) = server.recv() {
match msg {
SocketEvent::Packet(pkt) => {
last_payload = std::str::from_utf8(pkt.payload()).unwrap().to_string();
}
_ => {}
}
}
}

assert_eq!["99999", last_payload];
}
}
10 changes: 5 additions & 5 deletions src/net/virtual_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -678,23 +678,23 @@ mod tests {
PAYLOAD.to_vec(),
Some(1),
))),
1,
0,
);

assert_incoming_with_order(
DeliveryGuarantee::Reliable,
OrderingGuarantee::Ordered(Some(1)),
&mut connection,
Err(TryRecvError::Empty),
3,
2,
);

assert_incoming_with_order(
DeliveryGuarantee::Reliable,
OrderingGuarantee::Ordered(Some(1)),
&mut connection,
Err(TryRecvError::Empty),
4,
3,
);

assert_incoming_with_order(
Expand All @@ -706,7 +706,7 @@ mod tests {
PAYLOAD.to_vec(),
Some(1),
))),
2,
1,
);
}

Expand Down Expand Up @@ -750,7 +750,7 @@ mod tests {
PAYLOAD.to_vec(),
Some(1),
))),
1,
0,
);
}

Expand Down