Skip to content

Commit

Permalink
Add the ability to exit the varlink::listen loop.
Browse files Browse the repository at this point in the history
Factor out the `varlink::listen` parameters into a `varlink::ListenConfig` struct,
so we can use ..Default::default().

Add an `stop_listening: Option<Arc<AtomicBool>>` to the `varlink::ListenConfig` struct,
which can be set remotely to break the `varlink::listen` loop.

Stop the running server after 10 seconds

```rust
      use std::{thread, time};
      use std::sync::atomic::Ordering;
      let stop_listening = Arc::new(std::sync::atomic::AtomicBool::new(false));

      let child = {
          let stop_running = stop_listening.clone();
          thread::spawn(move || {
              thread::sleep(time::Duration::from_secs(10));
              // Stop the running server after 10 seconds
              stop_listening.store(true, Ordering::Relaxed);
          })
      };

      varlink::listen(
          service,
          &address,
          &varlink::ListenConfig {
              idle_timeout: timeout,
              stop_listening: Some(stop_listening),
              ..Default::default()
          },
      )?;

      child.join().expect("Error joining thread");
```

Obsoletes varlink/rust#26
Addresses rust-lang#25
  • Loading branch information
haraldh committed Nov 18, 2019
1 parent 81463ba commit 52433a0
Show file tree
Hide file tree
Showing 8 changed files with 175 additions and 41 deletions.
3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,4 @@ members = [
"examples/example",
"examples/more",
"examples/ping",
]

]
9 changes: 8 additions & 1 deletion examples/example/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,13 @@ fn run_server<S: ?Sized + AsRef<str>>(address: &S, timeout: u64) -> varlink::Res
vec![Box::new(myinterface)],
);

varlink::listen(service, address, 1, 10, timeout)?;
varlink::listen(
service,
address,
&varlink::ListenConfig {
idle_timeout: timeout,
..Default::default()
},
)?;
Ok(())
}
9 changes: 8 additions & 1 deletion examples/more/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,13 @@ fn run_server(address: &str, timeout: u64, sleep_duration: u64) -> varlink::Resu
"http://varlink.org",
vec![Box::new(myinterface)],
);
varlink::listen(service, &address, 1, 10, timeout)?;
varlink::listen(
service,
&address,
&varlink::ListenConfig {
idle_timeout: timeout,
..Default::default()
},
)?;
Ok(())
}
40 changes: 36 additions & 4 deletions examples/ping/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ fn main() {
opts.optflag("", "client", "run in client mode");
opts.optopt("", "bridge", "bridge", "<bridge>");
opts.optflag("m", "multiplex", "run in multiplex mode");
opts.optopt("t", "timeout", "server timeout", "<seconds>");
opts.optflag("h", "help", "print this help menu");

let matches = match opts.parse(&args[1..]) {
Expand Down Expand Up @@ -62,9 +63,17 @@ fn main() {
};
run_client(&connection).map_err(|e| e.into())
} else if let Some(address) = matches.opt_str("varlink") {
run_server(&address, 1000, matches.opt_present("m"))
// .map_err(mstrerr!("running server with address {}", address))
.map_err(|e| e.into())
let timeout = matches
.opt_str("timeout")
.unwrap_or("0".to_string())
.parse::<u64>()
.map_err(From::from);

timeout.and_then(|timeout| {
run_server(&address, timeout, matches.opt_present("m"))
// .map_err(mstrerr!("running server with address {}", address))
.map_err(From::from)
})
} else {
print_usage(&program, &opts);
eprintln!("Need varlink address in server mode.");
Expand Down Expand Up @@ -510,7 +519,30 @@ fn run_server(address: &str, timeout: u64, multiplex: bool) -> varlink::Result<(
// Demonstrate a single process, single-threaded service
multiplex::listen_multiplex(service, &address, timeout)?;
} else {
varlink::listen(service, &address, 1, 10, timeout)?;
/*
use std::sync::atomic::Ordering;
use std::{thread, time};
let stop_listening = Arc::new(std::sync::atomic::AtomicBool::new(false));
let child = {
let stop_running = stop_listening.clone();
thread::spawn(move || {
thread::sleep(time::Duration::from_secs(10));
stop_running.store(true, Ordering::Relaxed);
})
};
*/
varlink::listen(
service,
&address,
&varlink::ListenConfig {
idle_timeout: timeout,
// stop_listening: Some(stop_listening),
..Default::default()
},
)?;

//child.join().expect("Error joining thread");
}
}
Ok(())
Expand Down
9 changes: 8 additions & 1 deletion varlink-certification/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -784,7 +784,14 @@ pub fn run_server(address: &str, timeout: u64) -> varlink::Result<()> {
vec![Box::new(myinterface)],
);

if let Err(e) = varlink::listen(service, &address, 1, 10, timeout) {
if let Err(e) = varlink::listen(
service,
&address,
&varlink::ListenConfig {
idle_timeout: timeout,
..Default::default()
},
) {
match e.kind() {
::varlink::ErrorKind::Timeout => {}
_ => Err(e)?,
Expand Down
9 changes: 7 additions & 2 deletions varlink/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,12 @@
//! ],
//! );
//!
//! varlink::listen(service, &args[1], 1, 10, 0);
//! varlink::listen(service, &args[1],
//! &varlink::ListenConfig {
//! idle_timeout: 1,
//! ..Default::default()
//! },
//! );
//! # }
//! # fn main() {}
//! ```
Expand Down Expand Up @@ -242,7 +247,7 @@ pub use crate::stream::Stream;
pub type VarlinkStream = Box<dyn Stream>;
pub type ServerStream = Box<dyn Stream>;

pub use crate::server::{listen, Listener};
pub use crate::server::{listen, ListenConfig, Listener};

#[macro_use]
pub mod error;
Expand Down
125 changes: 98 additions & 27 deletions varlink/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ use std::os::unix::net::{UnixListener, UnixStream};
#[cfg(windows)]
use std::os::windows::io::{AsRawSocket, FromRawSocket, IntoRawSocket, RawSocket};
use std::process;
use std::sync::{mpsc, Arc, Mutex, RwLock};
use std::sync::{
atomic::{AtomicBool, Ordering},
mpsc, Arc, Mutex, RwLock,
};

#[cfg(windows)]
use uds_windows::UnixListener;
Expand Down Expand Up @@ -166,6 +169,11 @@ impl Listener {
self.as_raw_socket()
.ok_or_else(|| context!(ErrorKind::ConnectionClosed))? as usize;

let mut timeout = timeval {
tv_sec: (timeout / 1000u64) as _,
tv_usec: ((timeout % 1000u64) * 1000u64) as _,
};

unsafe {
let mut readfs: fd_set = mem::MaybeUninit::zeroed().assume_init();
loop {
Expand All @@ -174,10 +182,6 @@ impl Listener {

let mut writefds = mem::MaybeUninit::zeroed();
let mut errorfds = mem::MaybeUninit::zeroed();
let mut timeout = timeval {
tv_sec: timeout as i32,
tv_usec: 0,
};

let ret = select(
0,
Expand Down Expand Up @@ -212,13 +216,18 @@ impl Listener {

#[cfg(unix)]
pub fn accept(&self, timeout: u64) -> Result<Box<dyn Stream>> {
use libc::{fd_set, select, time_t, timeval, EAGAIN, EINTR, FD_ISSET, FD_SET, FD_ZERO};
use libc::{fd_set, select, timeval, EAGAIN, EINTR, FD_ISSET, FD_SET, FD_ZERO};

if timeout > 0 {
let fd = self
.as_raw_fd()
.ok_or_else(|| context!(ErrorKind::ConnectionClosed))?;

let mut timeout = timeval {
tv_sec: (timeout / 1000u64) as _,
tv_usec: ((timeout % 1000u64) * 1000u64) as _,
};

unsafe {
let mut readfs = mem::MaybeUninit::<fd_set>::uninit();
loop {
Expand All @@ -230,10 +239,6 @@ impl Listener {
let mut errorfds = mem::MaybeUninit::<fd_set>::uninit();
FD_ZERO(errorfds.as_mut_ptr());
errorfds.assume_init();
let mut timeout = timeval {
tv_sec: timeout as time_t,
tv_usec: 0,
};

FD_SET(fd, readfs.as_mut_ptr());
let ret = select(
Expand Down Expand Up @@ -458,9 +463,44 @@ impl Worker {
}
}

/// `ListenConfig` specifies the configuration parameters for [`varlink::listen`]
///
/// Examples:
///
/// ```rust
/// let l = varlink::ListenConfig::default();
/// assert_eq!(l.initial_worker_threads, 1);
/// assert_eq!(l.max_worker_threads, 100);
/// assert_eq!(l.idle_timeout, 0);
/// assert!(l.stop_listening.is_none());
/// ```
///
/// [`varlink::listen`]: fn.listen.html
pub struct ListenConfig {
/// The amount of initial worker threads
pub initial_worker_threads: usize,
/// The maximum amount of worker threads
pub max_worker_threads: usize,
/// Time in seconds for the server to quit, when it is idle
pub idle_timeout: u64,
/// An optional AtomicBool as a global flag, which lets the server stop accepting new connections, when set to `true`
pub stop_listening: Option<Arc<AtomicBool>>,
}

impl Default for ListenConfig {
fn default() -> Self {
ListenConfig {
initial_worker_threads: 1,
max_worker_threads: 100,
idle_timeout: 0,
stop_listening: None,
}
}
}

/// `listen` creates a server, with `num_worker` threads listening on `varlink_uri`.
///
/// If an `idle_timeout` != 0 is specified, this function returns after the specified
/// If an `listen_config.idle_timeout` != 0 is specified, this function returns after the specified
/// amount of seconds, if no new connection is made in that time frame. It still waits for
/// all pending connections to finish.
///
Expand All @@ -477,7 +517,14 @@ impl Worker {
/// vec![/* Your varlink interfaces go here */],
/// );
///
/// if let Err(e) = varlink::listen(service, "unix:test_listen_timeout", 1, 10, 1) {
/// if let Err(e) = varlink::listen(
/// service,
/// "unix:test_listen_timeout",
/// &varlink::ListenConfig {
/// idle_timeout: 1,
/// ..Default::default()
/// },
/// ) {
/// if *e.kind() != varlink::ErrorKind::Timeout {
/// panic!("Error listen: {:?}", e);
/// }
Expand All @@ -489,31 +536,55 @@ impl Worker {
pub fn listen<S: ?Sized + AsRef<str>, H: crate::ConnectionHandler + Send + Sync + 'static>(
handler: H,
address: &S,
initial_worker_threads: usize,
max_worker_threads: usize,
idle_timeout: u64,
listen_config: &ListenConfig,
) -> Result<()> {
let handler = Arc::new(handler);
let listener = Listener::new(address)?;

listener.set_nonblocking(false)?;

let mut pool = ThreadPool::new(initial_worker_threads, max_worker_threads);
let mut pool = ThreadPool::new(
listen_config.initial_worker_threads,
listen_config.max_worker_threads,
);

loop {
let mut stream = match listener.accept(idle_timeout) {
Err(e) => match e.kind() {
ErrorKind::Timeout => {
if pool.num_busy() == 0 {
let mut to_wait = listen_config.idle_timeout * 1000;
let wait_time = listen_config
.stop_listening
.as_ref()
.map(|_| 100)
.unwrap_or(to_wait);
let mut stream = loop {
match listener.accept(wait_time) {
Err(e) => match e.kind() {
ErrorKind::Timeout => {
if let Some(stop) = listen_config.stop_listening.as_ref() {
if stop.load(Ordering::SeqCst) {
return Ok(());
}
if listen_config.idle_timeout == 0 {
continue;
}
}

if to_wait <= wait_time {
if pool.num_busy() == 0 {
return Err(e);
}
to_wait = listen_config.idle_timeout * 1000;
} else {
to_wait -= wait_time;
}

continue;
}
_ => {
return Err(e);
}
continue;
}
_ => {
return Err(e);
}
},
r => r?,
},
r => break r?,
}
};
let handler = handler.clone();

Expand Down
12 changes: 9 additions & 3 deletions varlink/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,14 @@ fn test_listen() -> Result<()> {
vec![], // Your varlink interfaces go here
);

if let Err(e) = listen(service, &address, 1, 10, timeout) {
if let Err(e) = listen(
service,
&address,
&ListenConfig {
idle_timeout: timeout,
..Default::default()
},
) {
if *e.kind() != ErrorKind::Timeout {
panic!("Error listen: {:#?}", e);
}
Expand Down Expand Up @@ -187,8 +194,7 @@ fn test_handle() -> Result<()> {

let reply = from_slice::<Reply>(&w).unwrap();

let si =
from_value::<ServiceInfo>(reply.parameters.unwrap()).map_err(map_context!())?;
let si = from_value::<ServiceInfo>(reply.parameters.unwrap()).map_err(map_context!())?;

assert_eq!(
si,
Expand Down

0 comments on commit 52433a0

Please sign in to comment.