Skip to content

Commit

Permalink
Properly handle and smooth client ticks
Browse files Browse the repository at this point in the history
After long experimentation, I finally came to the conclusion that the client needs to properly estimate the client tick based on the round trip time. We then need to gently adjust the local tick rate so that no discolorations occur. We use the Cristian's algorithm for the synchronization itself.
  • Loading branch information
hasenbanck committed Dec 30, 2024
1 parent 18c2924 commit 356aa5e
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 96 deletions.
3 changes: 3 additions & 0 deletions korangar/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -931,6 +931,9 @@ impl Client {
let saved_login_data = self.saved_login_data.as_ref().unwrap();
self.networking_system.disconnect_from_character_server();
self.networking_system.connect_to_map_server(saved_login_data, login_data);
// Ask for the client tick right away, so that the player isn't de-synced when
// they spawn on the map.
let _ = self.networking_system.request_client_tick();

let character_information = self
.saved_characters
Expand Down
105 changes: 26 additions & 79 deletions korangar/src/system/timer.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::collections::VecDeque;
use std::time::Instant;

use chrono::prelude::*;
Expand All @@ -12,25 +11,13 @@ pub struct GameTimer {
frames_per_second: usize,
animation_timer: f32,
day_timer: f32,
last_client_tick: Instant,
last_packet_receive_time: Instant,
first_tick_received: bool,
base_client_tick: u32,
base_client_tick: f64,
frequency: f64,
last_update: Instant,
last_error: f64,
integral_error: f64,
error_history: VecDeque<(Instant, f64)>,
}

const TIME_FACTOR: f32 = 1000.0;
// PID constants
const KP: f64 = 0.0005;
const KI: f64 = 0.00005;
const KD: f64 = 0.00005;
// Gaussian filter constants
const GAUSSIAN_SIGMA: f64 = 4.0;
const GAUSSIAN_DENOMINATOR: f64 = 2.0 * GAUSSIAN_SIGMA * GAUSSIAN_SIGMA;
const GAUSSIAN_WINDOW_SIZE: usize = 15;

impl GameTimer {
pub fn new() -> Self {
Expand All @@ -45,87 +32,47 @@ impl GameTimer {
frames_per_second: Default::default(),
animation_timer: Default::default(),
day_timer,
last_client_tick: Instant::now(),
last_packet_receive_time: Instant::now(),
first_tick_received: false,
base_client_tick: 0,
base_client_tick: 0.0,
frequency: 0.0,
last_update: Instant::now(),
last_error: 0.0,
integral_error: 0.0,
error_history: VecDeque::with_capacity(GAUSSIAN_WINDOW_SIZE),
}
}

fn gaussian_filter(&self, packet_time: Instant) -> f64 {
if self.error_history.is_empty() {
return 0.0;
}

let mut weighted_sum = 0.0;
let mut weight_sum = 0.0;

for (time, error) in &self.error_history {
let dt = packet_time.duration_since(*time).as_secs_f64();
let weight = (-dt.powi(2) / GAUSSIAN_DENOMINATOR).exp();

weighted_sum += error * weight;
weight_sum += weight;
}

if weight_sum > 0.0 {
weighted_sum / weight_sum
} else {
0.0
}
}

/// Uses a simple PID regulator that uses a gaussian filter to be a bit more
/// resistant against network jitter to synchronize the client side tick and
/// the server tick.
pub fn set_client_tick(&mut self, server_tick: ClientTick, packet_receive_time: Instant) {
/// The networking system sends a request for the newest global tick rate
/// every 10 seconds and returns an update event that contains the estimated
/// server tick rate. We only need to gently adjust our frequency, so that
/// no discontinuation of the local client tick occur.
pub fn set_client_tick(&mut self, client_tick: ClientTick, packet_receive_time: Instant) {
if !self.first_tick_received {
self.first_tick_received = true;
self.base_client_tick = server_tick.0;
self.last_client_tick = packet_receive_time;
self.last_update = packet_receive_time;
self.last_error = 0.0;
self.integral_error = 0.0;
self.base_client_tick = client_tick.0 as f64;
self.last_packet_receive_time = packet_receive_time;
return;
}

let elapsed = packet_receive_time.duration_since(self.last_client_tick).as_secs_f64();
let adjustment = self.frequency * elapsed;
let tick_at_receive = self.base_client_tick as f64 + (elapsed * 1000.0) + adjustment;
let local_tick = self.get_client_tick_at(packet_receive_time);
let tick_difference = client_tick.0 as f64 - local_tick;

let error = server_tick.0 as f64 - tick_at_receive;
// Calculate frequency needed to make up the difference over the next 10
// seconds. We also clamp the difference, so that we are more resistant to
// cases, where the round trip time was highly asymmetric.
self.frequency = tick_difference.clamp(-5.0, 5.0) / 10000.0;

self.error_history.push_back((packet_receive_time, error));
while self.error_history.len() > GAUSSIAN_WINDOW_SIZE {
self.error_history.pop_front();
}

let filtered_error = self.gaussian_filter(packet_receive_time);

let dt = packet_receive_time.duration_since(self.last_update).as_secs_f64();

self.integral_error = (self.integral_error + filtered_error * dt).clamp(-10.0, 10.0);

let derivative = (filtered_error - self.last_error) / dt;

self.frequency = (KP * filtered_error + KI * self.integral_error + KD * derivative).clamp(-0.1, 0.1);
self.base_client_tick = local_tick;
self.last_packet_receive_time = packet_receive_time;
}

self.last_error = filtered_error;
self.base_client_tick = server_tick.0;
self.last_client_tick = packet_receive_time;
self.last_update = packet_receive_time;
#[cfg_attr(feature = "debug", korangar_debug::profile)]
pub fn get_client_tick_at(&self, time: Instant) -> f64 {
let elapsed = time.duration_since(self.last_packet_receive_time).as_secs_f64();
self.base_client_tick + (elapsed * 1000.0) + (elapsed * self.frequency * 1000.0)
}

#[cfg_attr(feature = "debug", korangar_debug::profile)]
pub fn get_client_tick(&self) -> ClientTick {
let elapsed = self.last_client_tick.elapsed().as_secs_f64();
let adjustment = self.frequency * elapsed;
let tick = self.base_client_tick as f64 + (elapsed * 1000.0) + adjustment;
ClientTick(tick as u32)
let tick = self.get_client_tick_at(Instant::now());
ClientTick(tick.round() as u32)
}

#[cfg(feature = "debug")]
Expand Down
96 changes: 79 additions & 17 deletions korangar_networking/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#![feature(let_chains)]

mod entity;
mod event;
mod hotkey;
Expand All @@ -8,6 +10,7 @@ mod server;
use std::cell::RefCell;
use std::net::{IpAddr, SocketAddr};
use std::rc::Rc;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};

use event::{
Expand Down Expand Up @@ -44,8 +47,44 @@ impl NetworkEventBuffer {
}
}

/// Simple time synchronization using the Cristian's algorithm.
struct TimeSynchronization {
request_send: Instant,
request_received: Instant,
client_tick: f64,
}

impl TimeSynchronization {
pub fn new() -> Self {
let now = Instant::now();
Self {
request_send: now,
request_received: now,
client_tick: 100.0,
}
}

/// Returns the client tick that must be used when sending the time
/// synchronization request immediately after calling this function.
fn request_client_tick(&mut self) -> u32 {
let request_send = Instant::now();
let elapsed = request_send.duration_since(self.request_received).as_secs_f64();
(self.client_tick + (elapsed * 1000.0)) as u32
}

/// Returns the estimated client tick using the Cristian's algorithm.
fn estimated_client_tick(&mut self, server_tick: u32, request_received: Instant) -> u32 {
self.request_received = request_received;
let round_trip_time = self.request_received.duration_since(self.request_send).as_secs_f64();
let tick_adjustment = (round_trip_time / 2.0) * 1000.0;
self.client_tick = f64::from(server_tick) + tick_adjustment;
self.client_tick as u32
}
}

pub struct NetworkingSystem<Callback> {
command_sender: UnboundedSender<ServerConnectCommand>,
time_context: Arc<Mutex<TimeSynchronization>>,
login_server_connection: ServerConnection,
character_server_connection: ServerConnection,
map_server_connection: ServerConnection,
Expand All @@ -54,19 +93,23 @@ pub struct NetworkingSystem<Callback> {

impl NetworkingSystem<NoPacketCallback> {
pub fn spawn() -> (Self, NetworkEventBuffer) {
let command_sender = Self::spawn_networking_thread(NoPacketCallback);

Self::inner_new(command_sender, NoPacketCallback)
let (command_sender, time_context) = Self::spawn_networking_thread(NoPacketCallback);
Self::inner_new(command_sender, time_context, NoPacketCallback)
}
}

impl<Callback> NetworkingSystem<Callback>
where
Callback: PacketCallback + Send,
{
fn inner_new(command_sender: UnboundedSender<ServerConnectCommand>, packet_callback: Callback) -> (Self, NetworkEventBuffer) {
fn inner_new(
command_sender: UnboundedSender<ServerConnectCommand>,
time_context: Arc<Mutex<TimeSynchronization>>,
packet_callback: Callback,
) -> (Self, NetworkEventBuffer) {
let networking_system = Self {
command_sender,
time_context,
login_server_connection: ServerConnection::Disconnected,
character_server_connection: ServerConnection::Disconnected,
map_server_connection: ServerConnection::Disconnected,
Expand All @@ -78,13 +121,14 @@ where
}

pub fn spawn_with_callback(packet_callback: Callback) -> (Self, NetworkEventBuffer) {
let command_sender = Self::spawn_networking_thread(packet_callback.clone());

Self::inner_new(command_sender, packet_callback)
let (command_sender, time_context) = Self::spawn_networking_thread(packet_callback.clone());
Self::inner_new(command_sender, time_context, packet_callback)
}

fn spawn_networking_thread(packet_callback: Callback) -> UnboundedSender<ServerConnectCommand> {
fn spawn_networking_thread(packet_callback: Callback) -> (UnboundedSender<ServerConnectCommand>, Arc<Mutex<TimeSynchronization>>) {
let (command_sender, mut command_receiver) = tokio::sync::mpsc::unbounded_channel::<ServerConnectCommand>();
let time_context = Arc::new(Mutex::new(TimeSynchronization::new()));
let thread_time_context = Arc::clone(&time_context);

std::thread::spawn(move || {
let runtime = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
Expand Down Expand Up @@ -115,9 +159,10 @@ where
action_receiver,
event_sender,
packet_handler,
LoginServerKeepalivePacket::new,
|_| LoginServerKeepalivePacket::new(),
Duration::from_secs(58),
false,
thread_time_context.clone(),
));

login_server_task_handle = Some(handle);
Expand All @@ -138,9 +183,10 @@ where
action_receiver,
event_sender,
packet_handler,
CharacterServerKeepalivePacket::new,
|_| CharacterServerKeepalivePacket::new(),
Duration::from_secs(10),
true,
thread_time_context.clone(),
));

character_server_task_handle = Some(handle);
Expand All @@ -161,11 +207,16 @@ where
action_receiver,
event_sender,
packet_handler,
// Always passing 100 seems to work fine for now, but it might cause
// issues when connecting to something other than rAthena.
|| RequestServerTickPacket::new(ClientTick(100)),
Duration::from_secs(4),
|time_context| match time_context.lock() {
Ok(mut context) => {
let client_tick = context.request_client_tick();
RequestServerTickPacket::new(ClientTick(client_tick))
}
Err(_) => RequestServerTickPacket::new(ClientTick(100)),
},
Duration::from_secs(10),
false,
thread_time_context.clone(),
));

map_server_task_handle = Some(handle);
Expand All @@ -175,7 +226,7 @@ where
});
});

command_sender
(command_sender, time_context)
}

fn handle_connection<Event>(connection: &mut ServerConnection, event_buffer: &mut NetworkEventBuffer)
Expand Down Expand Up @@ -219,17 +270,19 @@ where
Self::handle_connection::<MapServerDisconnectedEvent>(&mut self.map_server_connection, events);
}

#[allow(clippy::too_many_arguments)]
async fn handle_server_connection<PingPacket>(
address: SocketAddr,
mut action_receiver: UnboundedReceiver<Vec<u8>>,
event_sender: UnboundedSender<NetworkEvent>,
mut packet_handler: PacketHandler<NetworkEventList, (), Callback>,
ping_factory: impl Fn() -> PingPacket,
ping_factory: impl Fn(&Mutex<TimeSynchronization>) -> PingPacket,
ping_frequency: Duration,
// After logging in to the character server, it sends the account id without any packet.
// Since our packet handler has no way of working with this, we need to add some special
// logic.
mut read_account_id: bool,
time_context: Arc<Mutex<TimeSynchronization>>,
) -> Result<(), NetworkTaskError>
where
PingPacket: Packet + ClientPacket,
Expand Down Expand Up @@ -307,12 +360,16 @@ where
}

for event in events.drain(..) {
if let NetworkEvent::UpdateClientTick {client_tick,received_at} = &event && let Ok(mut context) = time_context.lock() {
context.estimated_client_tick(client_tick.0, *received_at);
}

event_sender.send(event).map_err(|_| NetworkTaskError::ConnectionClosed)?;
}
}
// Send a keep-alive packet to the server.
_ = interval.tick() => {
let packet_bytes = ping_factory().packet_to_bytes().unwrap();
let packet_bytes = ping_factory(&time_context).packet_to_bytes().unwrap();
stream.write_all(&packet_bytes).await.map_err(|_| NetworkTaskError::ConnectionClosed)?;
}
}
Expand Down Expand Up @@ -1097,6 +1154,11 @@ where
self.send_map_server_packet(&MapLoadedPacket::default())
}

pub fn request_client_tick(&mut self) -> Result<(), NotConnectedError> {
let client_tick = self.time_context.lock().map(|context| context.client_tick as u32).unwrap_or(100);
self.send_map_server_packet(&RequestServerTickPacket::new(ClientTick(client_tick)))
}

pub fn respawn(&mut self) -> Result<(), NotConnectedError> {
self.send_map_server_packet(&RestartPacket::new(RestartType::Respawn))
}
Expand Down

0 comments on commit 356aa5e

Please sign in to comment.