Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bugfix: residual sync clock thread #123

Merged
merged 2 commits into from
Apr 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 93 additions & 45 deletions vmm/sandbox/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,17 @@ limitations under the License.
*/

use std::{
os::unix::io::{IntoRawFd, RawFd},
os::fd::{IntoRawFd, RawFd},
sync::Arc,
time::Duration,
};

use anyhow::anyhow;
use containerd_sandbox::error::{Error, Result};
use log::{debug, error, warn};
use containerd_sandbox::{
error::{Error, Result},
signal::ExitSignal,
};
use log::{debug, error};
use nix::{
sys::{
socket::{connect, socket, AddressFamily, SockFlag, SockType, UnixAddr, VsockAddr},
Expand Down Expand Up @@ -258,59 +262,91 @@ pub(crate) async fn client_update_routes(
Ok(())
}

pub(crate) async fn client_sync_clock(client: &SandboxServiceClient, id: &str) {
pub(crate) fn client_sync_clock(
client: &SandboxServiceClient,
id: &str,
exit_signal: Arc<ExitSignal>,
) {
let id = id.to_string();
let client = client.clone();
let tolerance_nanos = Duration::from_millis(TIME_DIFF_TOLERANCE_IN_MS).as_nanos() as i64;
let clock_id = ClockId::from_raw(nix::libc::CLOCK_REALTIME);
let tolerance_nanos = Duration::from_millis(TIME_DIFF_TOLERANCE_IN_MS);
tokio::spawn(async move {
loop {
tokio::time::sleep(Duration::from_secs(TIME_SYNC_PERIOD)).await;
debug!("sync_clock {}: start sync clock from host to guest", id);

let mut req = SyncClockPacket::new();
match clock_gettime(clock_id) {
Ok(ts) => req.ClientSendTime = ts.num_nanoseconds(),
Err(e) => {
warn!("sync_clock {}: failed to get current clock: {}", id, e);
continue;
}
}
match client
.sync_clock(with_timeout(Duration::from_secs(1).as_nanos() as i64), &req)
.await
{
Ok(mut p) => {
match clock_gettime(clock_id) {
Ok(ts) => p.ServerArriveTime = ts.num_nanoseconds(),
Err(e) => {
warn!("sync_clock {}: failed to get current clock: {}", id, e);
continue;
}
}
p.Delta = ((p.ClientSendTime - p.ClientArriveTime)
+ (p.ServerArriveTime - p.ServerSendTime))
/ 2;
if p.Delta.abs() > tolerance_nanos {
if let Err(e) = client
.sync_clock(with_timeout(Duration::from_secs(1).as_nanos() as i64), &p)
.await
{
error!("sync_clock {}: sync clock set delta failed: {:?}", id, e);
}
}
}
Err(e) => {
error!("sync_clock {}: get error: {:?}", id, e);
let fut = async {
loop {
tokio::time::sleep(Duration::from_secs(TIME_SYNC_PERIOD)).await;
if let Err(e) = do_once_sync_clock(&client, tolerance_nanos).await {
debug!("sync_clock {}: {:?}", id, e);
}
}
};

tokio::select! {
_ = fut => (),
_ = exit_signal.wait() => {},
}
});
}

// Introduce a set of mechanism based on Precision Time Protocol to keep guest clock synchronized
// with host clock periodically.
async fn do_once_sync_clock(
client: &SandboxServiceClient,
tolerance_nanos: Duration,
) -> Result<()> {
let mut req = SyncClockPacket::new();
let clock_id = ClockId::from_raw(nix::libc::CLOCK_REALTIME);
req.ClientSendTime = clock_gettime(clock_id)
.map_err(|e| anyhow!("get current clock: {}", e))?
.num_nanoseconds();

let mut p = client
.sync_clock(with_timeout(Duration::from_secs(1).as_nanos() as i64), &req)
.await
.map_err(|e| anyhow!("get guest clock packet: {:?}", e))?;

p.ServerArriveTime = clock_gettime(clock_id)
.map_err(|e| anyhow!("get current clock: {}", e))?
.num_nanoseconds();

p.Delta = checked_compute_delta(
p.ClientSendTime,
p.ClientArriveTime,
p.ServerSendTime,
p.ServerArriveTime,
)?;
if p.Delta.abs() > tolerance_nanos.as_nanos() as i64 {
client
.sync_clock(with_timeout(Duration::from_secs(1).as_nanos() as i64), &p)
.await
.map_err(|e| anyhow!("set delta: {:?}", e))?;
}
Ok(())
}

// delta = ((c_send - c_arrive) + (s_arrive - s_send)) / 2
fn checked_compute_delta(c_send: i64, c_arrive: i64, s_send: i64, s_arrive: i64) -> Result<i64> {
let delta_client = c_send
.checked_sub(c_arrive)
.ok_or_else(|| anyhow!("integer overflow {} - {}", c_send, c_arrive))?;

let delta_server = s_arrive
.checked_sub(s_send)
.ok_or_else(|| anyhow!("integer overflow {} - {}", s_arrive, s_send))?;

let delta_sum = delta_client
.checked_add(delta_server)
.ok_or_else(|| anyhow!("integer overflow {} + {}", delta_client, delta_server))?;

let delta = delta_sum
.checked_div(2)
.ok_or_else(|| anyhow!("integer overflow {} / 2", delta_sum))?;

Ok(delta)
}

#[cfg(test)]
mod tests {
use crate::client::new_ttrpc_client_with_timeout;
use crate::client::{checked_compute_delta, new_ttrpc_client_with_timeout};

#[tokio::test]
async fn test_new_ttrpc_client_timeout() {
Expand All @@ -319,4 +355,16 @@ mod tests {
.await
.is_err());
}

#[test]
fn test_checked_compute_delta() {
let c_send = 231;
let c_arrive = 135;
let s_send = 137;
let s_arrive = 298;

let expect_delta = 128;
let actual_delta = checked_compute_delta(c_send, c_arrive, s_send, s_arrive).unwrap();
assert_eq!(expect_delta, actual_delta);
}
}
31 changes: 17 additions & 14 deletions vmm/sandbox/src/cloud_hypervisor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

use std::{os::unix::io::RawFd, process::Stdio, time::Duration};
use std::{os::fd::OwnedFd, process::Stdio, time::Duration};

use anyhow::anyhow;
use async_trait::async_trait;
Expand Down Expand Up @@ -72,7 +72,8 @@ pub struct CloudHypervisorVM {
wait_chan: Option<Receiver<(u32, i128)>>,
#[serde(skip)]
client: Option<ChClient>,
fds: Vec<RawFd>,
#[serde(skip)]
fds: Vec<OwnedFd>,
pids: Pids,
}

Expand Down Expand Up @@ -142,7 +143,7 @@ impl CloudHypervisorVM {
Ok(pid)
}

fn append_fd(&mut self, fd: RawFd) -> usize {
fn append_fd(&mut self, fd: OwnedFd) -> usize {
self.fds.push(fd);
self.fds.len() - 1 + 3
}
Expand Down Expand Up @@ -175,17 +176,19 @@ impl VM for CloudHypervisorVM {
params.push("-vv".to_string());
}

let mut cmd = tokio::process::Command::new(&self.config.path);
cmd.args(params.as_slice());

set_cmd_fd(&mut cmd, self.fds.to_vec())?;
set_cmd_netns(&mut cmd, self.netns.to_string())?;
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::piped());
info!("start cloud hypervisor with cmdline: {:?}", cmd);
let child = cmd
.spawn()
.map_err(|e| anyhow!("failed to spawn cloud hypervisor command: {}", e))?;
// Drop cmd immediately to let the fds in pre_exec be closed.
let child = {
let mut cmd = tokio::process::Command::new(&self.config.path);
cmd.args(params.as_slice());

set_cmd_fd(&mut cmd, self.fds.drain(..).collect())?;
set_cmd_netns(&mut cmd, self.netns.to_string())?;
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::piped());
info!("start cloud hypervisor with cmdline: {:?}", cmd);
cmd.spawn()
.map_err(|e| anyhow!("failed to spawn cloud hypervisor command: {}", e))?
};
let pid = child.id();
self.pids.vmm_pid = pid;
let pid_file = format!("{}/pid", self.base_dir);
Expand Down
20 changes: 8 additions & 12 deletions vmm/sandbox/src/device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

use std::os::unix::io::RawFd;
use std::os::fd::OwnedFd;

use containerd_sandbox::error::{Error, Result};

Expand Down Expand Up @@ -182,49 +182,46 @@ impl Transport {
}
}

#[derive(Debug, Clone)]
#[derive(Debug)]
pub enum DeviceInfo {
Block(BlockDeviceInfo),
#[allow(dead_code)]
Tap(TapDeviceInfo),
#[allow(dead_code)]
Physical(PhysicalDeviceInfo),
#[allow(dead_code)]
VhostUser(VhostUserDeviceInfo),
Char(CharDeviceInfo),
}

#[derive(Debug, Clone)]
#[derive(Debug)]
pub struct BlockDeviceInfo {
pub id: String,
pub path: String,
pub read_only: bool,
}

#[derive(Debug, Clone)]
#[derive(Debug)]
pub struct TapDeviceInfo {
pub id: String,
pub index: u32,
pub name: String,
pub mac_address: String,
pub fds: Vec<RawFd>,
pub fds: Vec<OwnedFd>,
}

#[derive(Debug, Clone)]
#[derive(Debug)]
pub struct PhysicalDeviceInfo {
pub id: String,
pub bdf: String,
}

#[derive(Debug, Clone)]
#[derive(Debug)]
pub struct VhostUserDeviceInfo {
pub id: String,
pub socket_path: String,
pub mac_address: String,
pub r#type: String,
}

#[derive(Debug, Clone)]
#[derive(Debug)]
pub struct CharDeviceInfo {
pub id: String,
pub chardev_id: String,
Expand All @@ -235,6 +232,5 @@ pub struct CharDeviceInfo {
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum CharBackendType {
Pipe(String),
#[allow(dead_code)]
Socket(String),
}
7 changes: 4 additions & 3 deletions vmm/sandbox/src/network/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,19 +324,19 @@ impl NetworkInterface {
Ok(())
}

pub async fn attach_to<V: VM>(&self, sandbox: &mut KuasarSandbox<V>) -> Result<()> {
pub async fn attach_to<V: VM>(&mut self, sandbox: &mut KuasarSandbox<V>) -> Result<()> {
let id = format!("intf-{}", self.index);
match &self.r#type {
LinkType::Veth => {
if let Some(intf) = &self.twin {
if let Some(intf) = self.twin.as_mut() {
sandbox
.vm
.attach(DeviceInfo::Tap(TapDeviceInfo {
id,
index: self.index,
name: intf.name.to_string(),
mac_address: self.mac_address.to_string(),
fds: intf.fds.iter().map(|fd| fd.as_raw_fd()).collect(),
fds: intf.fds.drain(..).collect(),
}))
.await?;
} else {
Expand Down Expand Up @@ -470,6 +470,7 @@ fn get_bdf_for_eth(if_name: &str) -> Result<String> {
e
)
})?;
close(sock).unwrap_or_default();
Ok(bdf.to_string())
}

Expand Down
8 changes: 4 additions & 4 deletions vmm/sandbox/src/qemu/devices/vsock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

use std::os::unix::io::{IntoRawFd, RawFd};
use std::os::fd::{AsRawFd, OwnedFd};

use anyhow::anyhow;
use containerd_sandbox::error::Result;
Expand Down Expand Up @@ -60,18 +60,18 @@ impl VSockDevice {
}
}

pub async fn find_context_id() -> Result<(RawFd, u64)> {
pub async fn find_context_id() -> Result<(OwnedFd, u64)> {
// TODO make sure if this thread_rng is enough, if we should new a seedable rng everytime.
let vsock_file = tokio::fs::OpenOptions::new()
.read(true)
.write(true)
.mode(0o666)
.open(VHOST_VSOCK_DEV_PATH)
.await?;
let vsockfd = vsock_file.into_std().await.into_raw_fd();
let vsockfd = OwnedFd::from(vsock_file.into_std().await);
for _i in 0..IOCTL_TRY_TIMES {
let cid = thread_rng().gen_range(3..i32::MAX as u64);
let res = unsafe { set_vhost_guest_cid(vsockfd, &cid) };
let res = unsafe { set_vhost_guest_cid(vsockfd.as_raw_fd(), &cid) };
match res {
Ok(_) => return Ok((vsockfd, cid)),
Err(_) => continue,
Expand Down
Loading
Loading