Skip to content

Commit

Permalink
Adjust for clock drift in tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Zoxc committed Sep 24, 2024
1 parent 9bd2717 commit 3b617ea
Show file tree
Hide file tree
Showing 6 changed files with 141 additions and 44 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ This file lists the changes that have occurred since January 2024 in the project

* Increase samples used for clock synchronization and idle latency measurement
* Clock synchronization now uses the average of the lowest 1/3rd of samples
* Adjust for clock drift in tests
* Fix connecting to servers on non-standard port with peers

## 0.3 - 2024-09-16

Expand Down
3 changes: 0 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,3 @@ docker build .. -t crusader -f server-static.Dockerfile
* The up and down latency measurement rely on symmetric stable latency
measurements to the server.
These values may be wrong if those assumption don't hold on test startup.

* The up and down latency measurement may slowly get out of sync due to
clock drift. Clocks are currently only synchronized on test startup.
48 changes: 34 additions & 14 deletions src/crusader-lib/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,15 @@ async fn ping_measure_recv(
Ok(storage)
}

pub struct LatencyResult {
pub latency: Duration,
pub threshold: Duration,
pub server_pong: Duration,
pub server_offset: u64,
pub server_time: u64,
pub control_rx: FramedRead<OwnedReadHalf, LengthDelimitedCodec>,
}

pub(crate) async fn measure_latency(
id: u64,
ping_index: &mut u64,
Expand All @@ -428,15 +437,7 @@ pub(crate) async fn measure_latency(
server: SocketAddr,
local_udp: SocketAddr,
setup_start: Instant,
) -> Result<
(
Duration,
Duration,
u64,
FramedRead<OwnedReadHalf, LengthDelimitedCodec>,
),
anyhow::Error,
> {
) -> Result<LatencyResult, anyhow::Error> {
send(&mut control_tx, &ClientMessage::GetMeasurements).await?;

let latencies = tokio::spawn(async move {
Expand Down Expand Up @@ -524,17 +525,36 @@ pub(crate) async fn measure_latency(

let server_offset = (server_pong.as_micros() as u64).wrapping_sub(server_time);

(server_pong, latency, server_offset)
(server_pong, latency, server_offset, server_time)
})
.collect();

let server_pong = pings
.iter()
.map(|&(server_pong, _, _, _)| server_pong)
.sum::<Duration>()
/ (pings.len() as u32);

let server_offset = pings
.iter()
.map(|&(_, _, offset)| offset as u128)
.map(|&(_, _, offset, _)| offset as u128)
.sum::<u128>()
/ (pings.len() as u128);

let server_time = pings
.iter()
.map(|&(_, _, _, time)| time as u128)
.sum::<u128>()
/ (pings.len() as u128);

Ok((latency, threshold, server_offset as u64, control_rx))
Ok(LatencyResult {
latency,
threshold,
server_pong,
server_offset: server_offset as u64,
server_time: server_time as u64,
control_rx,
})
}

pub(crate) async fn ping_send(
Expand All @@ -545,7 +565,7 @@ pub(crate) async fn ping_send(
socket: Arc<UdpSocket>,
interval: Duration,
estimated_duration: Duration,
) -> Result<Vec<Duration>, anyhow::Error> {
) -> Result<(Vec<Duration>, u64), anyhow::Error> {
let mut storage = Vec::with_capacity(
((estimated_duration.as_secs_f64() + 2.0) * (1000.0 / interval.as_millis() as f64) * 1.5)
as usize,
Expand Down Expand Up @@ -579,7 +599,7 @@ pub(crate) async fn ping_send(
storage.push(current);
}

Ok(storage)
Ok((storage, ping_index))
}

pub(crate) async fn ping_recv(
Expand Down
9 changes: 7 additions & 2 deletions src/crusader-lib/src/latency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use tokio::{
};
use tokio_util::codec::{FramedRead, FramedWrite};

use crate::common::{connect, hello, measure_latency, udp_handle};
use crate::common::{connect, hello, measure_latency, udp_handle, LatencyResult};
use crate::discovery;
use crate::protocol::{codec, receive, send, ClientMessage, Ping, ServerMessage};

Expand Down Expand Up @@ -133,7 +133,12 @@ async fn test_async(

let mut ping_index = 0;

let (_, latency, mut server_time_offset, mut control_rx) = measure_latency(
let LatencyResult {
threshold: latency,
server_offset: mut server_time_offset,
mut control_rx,
..
} = measure_latency(
id,
&mut ping_index,
&mut control_tx,
Expand Down
62 changes: 50 additions & 12 deletions src/crusader-lib/src/peer.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::common::connect;
use crate::common::{connect, LatencyResult};
#[cfg(feature = "client")]
use crate::common::{Config, Msg};
#[cfg(feature = "client")]
Expand Down Expand Up @@ -110,7 +110,7 @@ pub async fn connect_to_peer(
IpAddr::V6(ip) => ip,
}
.octets(),
port: config.port,
port: server.port(),
ping_interval: config.ping_interval.as_millis() as u64,
estimated_duration: estimated_duration.as_millis(),
},
Expand Down Expand Up @@ -174,7 +174,13 @@ pub async fn run_peer(

let mut ping_index = 0;

let (latency, _, server_time_offset, mut control_rx) = measure_latency(
let LatencyResult {
latency,
server_pong: pre_server_pong,
server_time: pre_server_time,
mut control_rx,
..
} = measure_latency(
id,
&mut ping_index,
&mut control_tx,
Expand Down Expand Up @@ -216,7 +222,7 @@ pub async fn run_peer(
};
}

Ok((latencies, overload_))
Ok((latencies, overload_, control_rx))
});

send(
Expand Down Expand Up @@ -268,13 +274,47 @@ pub async fn run_peer(

state_tx.send((TestState::EndPingRecv, Instant::now())).ok();

let pings_sent = ping_send.await??;
let (pings_sent, mut ping_index) = ping_send.await??;
let mut pongs = ping_recv.await??;

send(&mut control_tx, &ClientMessage::StopMeasurements).await?;

let (mut latencies, server_overload, control_rx) = measures.await??;

let LatencyResult {
server_pong: post_server_pong,
server_time: post_server_time,
..
} = measure_latency(
id,
&mut ping_index,
&mut control_tx,
control_rx,
server,
local_udp,
setup_start,
)
.await?;

send(&mut control_tx, &ClientMessage::Done).await?;

let mut pongs = ping_recv.await??;
let server_time = post_server_time.wrapping_sub(pre_server_time);
let peer_time = post_server_pong.saturating_sub(pre_server_pong);
let peer_time_micros = peer_time.as_micros() as f64;
let ratio = peer_time_micros / server_time as f64;
let inv_ratio = server_time as f64 / peer_time_micros;

let to_peer_time = |server_time: u64| -> u64 {
let time = server_time.wrapping_sub(pre_server_time);
let time = (time as f64 * ratio) as u64;
(pre_server_pong.as_micros() as u64).saturating_add(time)
};

let (mut latencies, server_overload) = measures.await??;
let to_server_time = |peer_time: Duration| -> u64 {
let time = peer_time.saturating_sub(pre_server_pong).as_micros() as u64;
let time = (time as f64 * inv_ratio) as u64;
pre_server_time.wrapping_add(time)
};

latencies.sort_by_key(|d| d.index);
pongs.sort_by_key(|d| d.0.index);
Expand All @@ -288,10 +328,8 @@ pub async fn run_peer(
.ok()
.map(|ping| RawLatency {
total: None,
up: Duration::from_micros(
latencies[ping].time.wrapping_add(server_time_offset),
)
.saturating_sub(sent),
up: Duration::from_micros(to_peer_time(latencies[ping].time))
.saturating_sub(sent),
});

latency.as_mut().map(|latency| {
Expand All @@ -304,7 +342,7 @@ pub async fn run_peer(
});

PeerLatency {
sent: (sent.as_micros() as u64).wrapping_sub(server_time_offset),
sent: to_server_time(sent),
latency,
}
})
Expand Down
61 changes: 48 additions & 13 deletions src/crusader-lib/src/test.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::common::{
connect, data, fresh_socket_addr, hello, measure_latency, ping_recv, ping_send, read_data,
wait_for_state, write_data, Config, Msg, TestState,
wait_for_state, write_data, Config, LatencyResult, Msg, TestState,
};
use crate::file_format::{
RawConfig, RawHeader, RawPing, RawPoint, RawResult, RawStream, RawStreamGroup, TestData,
Expand Down Expand Up @@ -153,7 +153,13 @@ pub(crate) async fn test_async(

let mut ping_index = 0;

let (latency, _, server_time_offset, mut control_rx) = measure_latency(
let LatencyResult {
latency,
server_pong: pre_server_pong,
server_time: pre_server_time,
mut control_rx,
..
} = measure_latency(
id,
&mut ping_index,
&mut control_tx,
Expand Down Expand Up @@ -311,7 +317,7 @@ pub(crate) async fn test_async(
};
}

Ok((latencies, throughput, overload_))
Ok((latencies, throughput, overload_, control_rx))
});

if let Some(peer) = peer.as_mut() {
Expand Down Expand Up @@ -472,20 +478,51 @@ pub(crate) async fn test_async(
state_tx.send((TestState::EndPingRecv, Instant::now()))?;

let peer = if let Some(peer) = peer {
Some(peer.complete().await?)
Some(
peer.complete()
.await
.context("Failed to wait for peer completion")?,
)
} else {
None
};

let duration = start.elapsed();

let pings_sent = ping_send.await??;
let (pings_sent, mut ping_index) = ping_send.await??;
let mut pongs = ping_recv.await??;

send(&mut control_tx, &ClientMessage::StopMeasurements).await?;

let (mut latencies, throughput, server_overload, control_rx) = measures.await??;

let LatencyResult {
server_pong: post_server_pong,
server_time: post_server_time,
..
} = measure_latency(
id,
&mut ping_index,
&mut control_tx,
control_rx,
server,
local_udp,
setup_start,
)
.await?;

send(&mut control_tx, &ClientMessage::Done).await?;

let mut pongs = ping_recv.await??;
let server_time = post_server_time.wrapping_sub(pre_server_time);
let client_time = post_server_pong.saturating_sub(pre_server_pong);
let client_time_micros = client_time.as_micros() as f64;
let ratio = client_time_micros / server_time as f64;

let (mut latencies, throughput, server_overload) = measures.await??;
let to_client_time = |server_time: u64| -> u64 {
let time = server_time.wrapping_sub(pre_server_time);
let time = (time as f64 * ratio) as u64;
(pre_server_pong.as_micros() as u64).saturating_add(time)
};

let server_overload = server_overload || peer.as_ref().map(|p| p.0).unwrap_or_default();

Expand All @@ -495,7 +532,7 @@ pub(crate) async fn test_async(
.enumerate()
.map(|(i, p)| RawPing {
index: i as u64,
sent: Duration::from_micros(p.sent.wrapping_add(server_time_offset)),
sent: Duration::from_micros(to_client_time(p.sent)),
latency: p.latency,
})
.collect::<Vec<_>>()
Expand All @@ -516,10 +553,8 @@ pub(crate) async fn test_async(
.ok()
.map(|ping| RawLatency {
total: None,
up: Duration::from_micros(
latencies[ping].time.wrapping_add(server_time_offset),
)
.saturating_sub(sent),
up: Duration::from_micros(to_client_time(latencies[ping].time))
.saturating_sub(sent),
});

latency.as_mut().map(|latency| {
Expand Down Expand Up @@ -573,7 +608,7 @@ pub(crate) async fn test_async(
throughput
.iter()
.filter(|e| e.0.group == group && e.0.id == id)
.map(|e| (e.1.wrapping_add(server_time_offset), e.2))
.map(|e| (to_client_time(e.1), e.2))
.collect()
};

Expand Down

0 comments on commit 3b617ea

Please sign in to comment.