Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement logger adapter #239

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions .github/workflows/build_and_test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
name: Rust

on:
push:
branches:
- master

jobs:
test:
name: Running on ${{ matrix.os }}
runs-on: ${{ matrix.os }}
strategy:
matrix:
os: [ubuntu-latest, windows-latest, macOS-latest]
steps:
- uses: actions/checkout@v1
- name: Build
run: cargo build --verbose
- name: Run tests
run: cargo test --verbose
2 changes: 1 addition & 1 deletion examples/server_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ const SERVER: &str = "127.0.0.1:12351";
fn server() -> Result<(), ErrorKind> {
let mut socket = Socket::bind(SERVER)?;
let (sender, receiver) = (socket.get_packet_sender(), socket.get_event_receiver());
let _thread = thread::spawn(move || socket.start_polling());

loop {
socket.manual_poll(Instant::now());
if let Ok(event) = receiver.recv() {
match event {
SocketEvent::Packet(packet) => {
Expand Down
17 changes: 16 additions & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::log;
use crate::net::constants::{DEFAULT_MTU, FRAGMENT_SIZE_DEFAULT, MAX_FRAGMENTS_DEFAULT};
use std::{default::Default, time::Duration};
use std::{default::Default, rc::Rc, time::Duration};

#[derive(Clone, Debug)]
/// Contains the configuration options to configure laminar for special use-cases.
Expand All @@ -8,6 +9,9 @@ pub struct Config {
pub blocking_mode: bool,
/// Value which can specify the amount of time that can pass without hearing from a client before considering them disconnected
pub idle_connection_timeout: Duration,
/// Value which specifies at which interval (if at all) a heartbeat should be sent, if no other packet was sent in the meantime.
/// If None, no heartbeats will be sent (the default).
pub heartbeat_interval: Option<Duration>,
/// Value which can specify the maximum size a packet can be in bytes. This value is inclusive of fragmenting; if a packet is fragmented, the total size of the fragments cannot exceed this value.
///
/// Recommended value: 16384
Expand Down Expand Up @@ -45,13 +49,22 @@ pub struct Config {
///
/// Value that specifies how long we should block polling for socket events, in milliseconds. Defaults to `1ms`.
pub socket_polling_timeout: Option<Duration>,
/// The maximum amount of reliable packets in flight on this connection before we drop the
/// connection.
///
/// When we send a reliable packet, it is stored locally until an acknowledgement comes back to
/// us, if that store grows to a size
pub max_packets_in_flight: u16,
/// Logger used for this instance of laminar. See [log::LaminarLogger] for more details.
pub logger: Rc<dyn log::LaminarLogger>,
}

impl Default for Config {
fn default() -> Self {
Self {
blocking_mode: false,
idle_connection_timeout: Duration::from_secs(5),
heartbeat_interval: None,
max_packet_size: (MAX_FRAGMENTS_DEFAULT * FRAGMENT_SIZE_DEFAULT) as usize,
max_fragments: MAX_FRAGMENTS_DEFAULT as u8,
fragment_size: FRAGMENT_SIZE_DEFAULT,
Expand All @@ -61,6 +74,8 @@ impl Default for Config {
rtt_max_value: 250,
socket_event_buffer_size: 1024,
socket_polling_timeout: Some(Duration::from_millis(1)),
max_packets_in_flight: 512,
logger: Rc::new(log::DefaultLogger),
}
}
}
2 changes: 1 addition & 1 deletion src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,6 @@ mod tests {

#[test]
fn able_to_box_errors() {
let _: Box<Error> = Box::new(ErrorKind::CouldNotReadHeader("".into()));
let _: Box<dyn Error> = Box::new(ErrorKind::CouldNotReadHeader("".into()));
}
}
35 changes: 33 additions & 2 deletions src/infrastructure/acknowledgment.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::packet::OrderingGuarantee;
use crate::packet::SequenceNumber;
use crate::sequence_buffer::{sequence_less_than, SequenceBuffer};
use crate::sequence_buffer::{sequence_greater_than, sequence_less_than, SequenceBuffer};
use std::collections::HashMap;

const REDUNDANT_PACKET_ACKS_SIZE: u16 = 32;
Expand Down Expand Up @@ -31,6 +31,11 @@ impl AcknowledgmentHandler {
}
}

/// Get the current number of not yet acknowledged packets
pub fn packets_in_flight(&self) -> u16 {
self.sent_packets.len() as u16
}

/// Returns the next sequence number to send.
pub fn local_sequence_num(&self) -> SequenceNumber {
self.sequence_number
Expand Down Expand Up @@ -71,7 +76,11 @@ impl AcknowledgmentHandler {
remote_ack_seq: u16,
mut remote_ack_field: u32,
) {
self.remote_ack_sequence_num = remote_ack_seq;
// We must ensure that self.remote_ack_sequence_num is always increasing (with wrapping)
if sequence_greater_than(remote_ack_seq, self.remote_ack_sequence_num) {
self.remote_ack_sequence_num = remote_ack_seq;
}

self.received_packets
.insert(remote_seq_num, ReceivedPacket {});

Expand Down Expand Up @@ -285,4 +294,26 @@ mod test {
assert_eq!(handler.sent_packets.len(), 1);
assert_eq!(handler.local_sequence_num(), 1);
}

#[test]
fn remote_ack_seq_must_never_be_less_than_prior() {
let mut handler = AcknowledgmentHandler::new();
// Second packet received before first
handler.process_incoming(1, 1, 1);
assert_eq!(handler.remote_ack_sequence_num, 1);
// First packet received
handler.process_incoming(0, 0, 0);
assert_eq!(handler.remote_ack_sequence_num, 1);
}

#[test]
fn remote_ack_seq_must_never_be_less_than_prior_wrap_boundary() {
let mut handler = AcknowledgmentHandler::new();
// newer packet received before first
handler.process_incoming(1, 0, 1);
assert_eq!(handler.remote_ack_sequence_num, 0);
// earlier packet received
handler.process_incoming(0, u16::max_value(), 0);
assert_eq!(handler.remote_ack_sequence_num, 0);
}
}
2 changes: 1 addition & 1 deletion src/infrastructure/arranging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ pub trait Arranging {
/// If the `incoming_offset` satisfies the arranging algorithm it returns `Some` with the passed item.
fn arrange(
&mut self,
incoming_index: usize,
incoming_index: u16,
item: Self::ArrangingItem,
) -> Option<Self::ArrangingItem>;
}
Expand Down
Loading