diff --git a/korangar/src/main.rs b/korangar/src/main.rs index fc5511d9..5766dde7 100644 --- a/korangar/src/main.rs +++ b/korangar/src/main.rs @@ -931,6 +931,9 @@ impl Client { let saved_login_data = self.saved_login_data.as_ref().unwrap(); self.networking_system.disconnect_from_character_server(); self.networking_system.connect_to_map_server(saved_login_data, login_data); + // Ask for the client tick right away, so that the player isn't de-synced when + // they spawn on the map. + let _ = self.networking_system.request_client_tick(); let character_information = self .saved_characters diff --git a/korangar/src/system/timer.rs b/korangar/src/system/timer.rs index 8feffc3c..d8e18fd9 100644 --- a/korangar/src/system/timer.rs +++ b/korangar/src/system/timer.rs @@ -1,4 +1,3 @@ -use std::collections::VecDeque; use std::time::Instant; use chrono::prelude::*; @@ -12,25 +11,13 @@ pub struct GameTimer { frames_per_second: usize, animation_timer: f32, day_timer: f32, - last_client_tick: Instant, + last_packet_receive_time: Instant, first_tick_received: bool, - base_client_tick: u32, + base_client_tick: f64, frequency: f64, - last_update: Instant, - last_error: f64, - integral_error: f64, - error_history: VecDeque<(Instant, f64)>, } const TIME_FACTOR: f32 = 1000.0; -// PID constants -const KP: f64 = 0.0005; -const KI: f64 = 0.00005; -const KD: f64 = 0.00005; -// Gaussian filter constants -const GAUSSIAN_SIGMA: f64 = 4.0; -const GAUSSIAN_DENOMINATOR: f64 = 2.0 * GAUSSIAN_SIGMA * GAUSSIAN_SIGMA; -const GAUSSIAN_WINDOW_SIZE: usize = 15; impl GameTimer { pub fn new() -> Self { @@ -45,87 +32,47 @@ impl GameTimer { frames_per_second: Default::default(), animation_timer: Default::default(), day_timer, - last_client_tick: Instant::now(), + last_packet_receive_time: Instant::now(), first_tick_received: false, - base_client_tick: 0, + base_client_tick: 0.0, frequency: 0.0, - last_update: Instant::now(), - last_error: 0.0, - integral_error: 0.0, - error_history: VecDeque::with_capacity(GAUSSIAN_WINDOW_SIZE), } } - fn gaussian_filter(&self, packet_time: Instant) -> f64 { - if self.error_history.is_empty() { - return 0.0; - } - - let mut weighted_sum = 0.0; - let mut weight_sum = 0.0; - - for (time, error) in &self.error_history { - let dt = packet_time.duration_since(*time).as_secs_f64(); - let weight = (-dt.powi(2) / GAUSSIAN_DENOMINATOR).exp(); - - weighted_sum += error * weight; - weight_sum += weight; - } - - if weight_sum > 0.0 { - weighted_sum / weight_sum - } else { - 0.0 - } - } - - /// Uses a simple PID regulator that uses a gaussian filter to be a bit more - /// resistant against network jitter to synchronize the client side tick and - /// the server tick. - pub fn set_client_tick(&mut self, server_tick: ClientTick, packet_receive_time: Instant) { + /// The networking system sends a request for the newest global tick rate + /// every 10 seconds and returns an update event that contains the estimated + /// server tick rate. We only need to gently adjust our frequency, so that + /// no discontinuation of the local client tick occur. + pub fn set_client_tick(&mut self, client_tick: ClientTick, packet_receive_time: Instant) { if !self.first_tick_received { self.first_tick_received = true; - self.base_client_tick = server_tick.0; - self.last_client_tick = packet_receive_time; - self.last_update = packet_receive_time; - self.last_error = 0.0; - self.integral_error = 0.0; + self.base_client_tick = client_tick.0 as f64; + self.last_packet_receive_time = packet_receive_time; return; } - let elapsed = packet_receive_time.duration_since(self.last_client_tick).as_secs_f64(); - let adjustment = self.frequency * elapsed; - let tick_at_receive = self.base_client_tick as f64 + (elapsed * 1000.0) + adjustment; + let local_tick = self.get_client_tick_at(packet_receive_time); + let tick_difference = client_tick.0 as f64 - local_tick; - let error = server_tick.0 as f64 - tick_at_receive; + // Calculate frequency needed to make up the difference over the next 10 + // seconds. We also clamp the difference, so that we are more resistant to + // cases, where the round trip time was highly asymmetric. + self.frequency = tick_difference.clamp(-5.0, 5.0) / 10000.0; - self.error_history.push_back((packet_receive_time, error)); - while self.error_history.len() > GAUSSIAN_WINDOW_SIZE { - self.error_history.pop_front(); - } - - let filtered_error = self.gaussian_filter(packet_receive_time); - - let dt = packet_receive_time.duration_since(self.last_update).as_secs_f64(); - - self.integral_error = (self.integral_error + filtered_error * dt).clamp(-10.0, 10.0); - - let derivative = (filtered_error - self.last_error) / dt; - - self.frequency = (KP * filtered_error + KI * self.integral_error + KD * derivative).clamp(-0.1, 0.1); + self.base_client_tick = local_tick; + self.last_packet_receive_time = packet_receive_time; + } - self.last_error = filtered_error; - self.base_client_tick = server_tick.0; - self.last_client_tick = packet_receive_time; - self.last_update = packet_receive_time; + #[cfg_attr(feature = "debug", korangar_debug::profile)] + pub fn get_client_tick_at(&self, time: Instant) -> f64 { + let elapsed = time.duration_since(self.last_packet_receive_time).as_secs_f64(); + self.base_client_tick + (elapsed * 1000.0) + (elapsed * self.frequency * 1000.0) } #[cfg_attr(feature = "debug", korangar_debug::profile)] pub fn get_client_tick(&self) -> ClientTick { - let elapsed = self.last_client_tick.elapsed().as_secs_f64(); - let adjustment = self.frequency * elapsed; - let tick = self.base_client_tick as f64 + (elapsed * 1000.0) + adjustment; - ClientTick(tick as u32) + let tick = self.get_client_tick_at(Instant::now()); + ClientTick(tick.round() as u32) } #[cfg(feature = "debug")] diff --git a/korangar_networking/src/lib.rs b/korangar_networking/src/lib.rs index b8ab10da..35e22177 100644 --- a/korangar_networking/src/lib.rs +++ b/korangar_networking/src/lib.rs @@ -1,3 +1,5 @@ +#![feature(let_chains)] + mod entity; mod event; mod hotkey; @@ -8,6 +10,7 @@ mod server; use std::cell::RefCell; use std::net::{IpAddr, SocketAddr}; use std::rc::Rc; +use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; use event::{ @@ -44,8 +47,44 @@ impl NetworkEventBuffer { } } +/// Simple time synchronization using the Cristian's algorithm. +struct TimeSynchronization { + request_send: Instant, + request_received: Instant, + client_tick: f64, +} + +impl TimeSynchronization { + pub fn new() -> Self { + let now = Instant::now(); + Self { + request_send: now, + request_received: now, + client_tick: 100.0, + } + } + + /// Returns the client tick that must be used when sending the time + /// synchronization request immediately after calling this function. + fn request_client_tick(&mut self) -> u32 { + let request_send = Instant::now(); + let elapsed = request_send.duration_since(self.request_received).as_secs_f64(); + (self.client_tick + (elapsed * 1000.0)) as u32 + } + + /// Returns the estimated client tick using the Cristian's algorithm. + fn estimated_client_tick(&mut self, server_tick: u32, request_received: Instant) -> u32 { + self.request_received = request_received; + let round_trip_time = self.request_received.duration_since(self.request_send).as_secs_f64(); + let tick_adjustment = (round_trip_time / 2.0) * 1000.0; + self.client_tick = f64::from(server_tick) + tick_adjustment; + self.client_tick as u32 + } +} + pub struct NetworkingSystem { command_sender: UnboundedSender, + time_context: Arc>, login_server_connection: ServerConnection, character_server_connection: ServerConnection, map_server_connection: ServerConnection, @@ -54,9 +93,8 @@ pub struct NetworkingSystem { impl NetworkingSystem { pub fn spawn() -> (Self, NetworkEventBuffer) { - let command_sender = Self::spawn_networking_thread(NoPacketCallback); - - Self::inner_new(command_sender, NoPacketCallback) + let (command_sender, time_context) = Self::spawn_networking_thread(NoPacketCallback); + Self::inner_new(command_sender, time_context, NoPacketCallback) } } @@ -64,9 +102,14 @@ impl NetworkingSystem where Callback: PacketCallback + Send, { - fn inner_new(command_sender: UnboundedSender, packet_callback: Callback) -> (Self, NetworkEventBuffer) { + fn inner_new( + command_sender: UnboundedSender, + time_context: Arc>, + packet_callback: Callback, + ) -> (Self, NetworkEventBuffer) { let networking_system = Self { command_sender, + time_context, login_server_connection: ServerConnection::Disconnected, character_server_connection: ServerConnection::Disconnected, map_server_connection: ServerConnection::Disconnected, @@ -78,13 +121,14 @@ where } pub fn spawn_with_callback(packet_callback: Callback) -> (Self, NetworkEventBuffer) { - let command_sender = Self::spawn_networking_thread(packet_callback.clone()); - - Self::inner_new(command_sender, packet_callback) + let (command_sender, time_context) = Self::spawn_networking_thread(packet_callback.clone()); + Self::inner_new(command_sender, time_context, packet_callback) } - fn spawn_networking_thread(packet_callback: Callback) -> UnboundedSender { + fn spawn_networking_thread(packet_callback: Callback) -> (UnboundedSender, Arc>) { let (command_sender, mut command_receiver) = tokio::sync::mpsc::unbounded_channel::(); + let time_context = Arc::new(Mutex::new(TimeSynchronization::new())); + let thread_time_context = Arc::clone(&time_context); std::thread::spawn(move || { let runtime = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap(); @@ -115,9 +159,10 @@ where action_receiver, event_sender, packet_handler, - LoginServerKeepalivePacket::new, + |_| LoginServerKeepalivePacket::new(), Duration::from_secs(58), false, + thread_time_context.clone(), )); login_server_task_handle = Some(handle); @@ -138,9 +183,10 @@ where action_receiver, event_sender, packet_handler, - CharacterServerKeepalivePacket::new, + |_| CharacterServerKeepalivePacket::new(), Duration::from_secs(10), true, + thread_time_context.clone(), )); character_server_task_handle = Some(handle); @@ -161,11 +207,16 @@ where action_receiver, event_sender, packet_handler, - // Always passing 100 seems to work fine for now, but it might cause - // issues when connecting to something other than rAthena. - || RequestServerTickPacket::new(ClientTick(100)), - Duration::from_secs(4), + |time_context| match time_context.lock() { + Ok(mut context) => { + let client_tick = context.request_client_tick(); + RequestServerTickPacket::new(ClientTick(client_tick)) + } + Err(_) => RequestServerTickPacket::new(ClientTick(100)), + }, + Duration::from_secs(10), false, + thread_time_context.clone(), )); map_server_task_handle = Some(handle); @@ -175,7 +226,7 @@ where }); }); - command_sender + (command_sender, time_context) } fn handle_connection(connection: &mut ServerConnection, event_buffer: &mut NetworkEventBuffer) @@ -219,17 +270,19 @@ where Self::handle_connection::(&mut self.map_server_connection, events); } + #[allow(clippy::too_many_arguments)] async fn handle_server_connection( address: SocketAddr, mut action_receiver: UnboundedReceiver>, event_sender: UnboundedSender, mut packet_handler: PacketHandler, - ping_factory: impl Fn() -> PingPacket, + ping_factory: impl Fn(&Mutex) -> PingPacket, ping_frequency: Duration, // After logging in to the character server, it sends the account id without any packet. // Since our packet handler has no way of working with this, we need to add some special // logic. mut read_account_id: bool, + time_context: Arc>, ) -> Result<(), NetworkTaskError> where PingPacket: Packet + ClientPacket, @@ -307,12 +360,16 @@ where } for event in events.drain(..) { + if let NetworkEvent::UpdateClientTick {client_tick,received_at} = &event && let Ok(mut context) = time_context.lock() { + context.estimated_client_tick(client_tick.0, *received_at); + } + event_sender.send(event).map_err(|_| NetworkTaskError::ConnectionClosed)?; } } // Send a keep-alive packet to the server. _ = interval.tick() => { - let packet_bytes = ping_factory().packet_to_bytes().unwrap(); + let packet_bytes = ping_factory(&time_context).packet_to_bytes().unwrap(); stream.write_all(&packet_bytes).await.map_err(|_| NetworkTaskError::ConnectionClosed)?; } } @@ -1097,6 +1154,11 @@ where self.send_map_server_packet(&MapLoadedPacket::default()) } + pub fn request_client_tick(&mut self) -> Result<(), NotConnectedError> { + let client_tick = self.time_context.lock().map(|context| context.client_tick as u32).unwrap_or(100); + self.send_map_server_packet(&RequestServerTickPacket::new(ClientTick(client_tick))) + } + pub fn respawn(&mut self) -> Result<(), NotConnectedError> { self.send_map_server_packet(&RestartPacket::new(RestartType::Respawn)) }