diff --git a/src/api/server.rs b/src/api/server.rs index e4786f6d108ca..935c0b03ecaa6 100644 --- a/src/api/server.rs +++ b/src/api/server.rs @@ -9,6 +9,7 @@ use async_graphql::{ Data, Request, Schema, }; use async_graphql_warp::{graphql_protocol, GraphQLResponse, GraphQLWebSocket}; +use tokio::runtime::Handle; use tokio::sync::oneshot; use warp::{filters::BoxedFilter, http::Response, ws::Ws, Filter, Reply}; @@ -31,13 +32,13 @@ impl Server { config: &config::Config, watch_rx: topology::WatchRx, running: Arc, - runtime: &tokio::runtime::Runtime, + handle: &Handle, ) -> crate::Result { let routes = make_routes(config.api.playground, watch_rx, running); let (_shutdown, rx) = oneshot::channel(); // warp uses `tokio::spawn` and so needs us to enter the runtime context. - let _guard = runtime.enter(); + let _guard = handle.enter(); let (addr, server) = warp::serve(routes) .try_bind_with_graceful_shutdown( config.api.address.expect("No socket address"), @@ -57,7 +58,7 @@ impl Server { schema::components::update_config(config); // Spawn the server in the background. - runtime.spawn(server); + handle.spawn(server); Ok(Self { _shutdown, addr }) } diff --git a/src/app.rs b/src/app.rs index f306504d211c1..63d2a53bb1614 100644 --- a/src/app.rs +++ b/src/app.rs @@ -38,6 +38,7 @@ use crate::{ use std::os::unix::process::ExitStatusExt; #[cfg(windows)] use std::os::windows::process::ExitStatusExt; +use tokio::runtime::Handle; pub static WORKER_THREADS: OnceNonZeroUsize = OnceNonZeroUsize::new(); @@ -122,13 +123,13 @@ impl ApplicationConfig { /// Configure the API server, if applicable #[cfg(feature = "api")] - pub fn setup_api(&self, runtime: &Runtime) -> Option { + pub fn setup_api(&self, handle: &Handle) -> Option { if self.api.enabled { match api::Server::start( self.topology.config(), self.topology.watch(), std::sync::Arc::clone(&self.topology.running), - runtime, + handle, ) { Ok(api_server) => { emit!(ApiStarted { @@ -159,7 +160,8 @@ impl Application { } pub fn prepare_start() -> Result<(Runtime, StartedApplication), ExitCode> { - Self::prepare().and_then(|(runtime, app)| app.start(&runtime).map(|app| (runtime, app))) + Self::prepare() + .and_then(|(runtime, app)| app.start(runtime.handle()).map(|app| (runtime, app))) } pub fn prepare() -> Result<(Runtime, Self), ExitCode> { @@ -208,13 +210,13 @@ impl Application { )) } - pub fn start(self, runtime: &Runtime) -> Result { + pub fn start(self, handle: &Handle) -> Result { // Any internal_logs sources will have grabbed a copy of the // early buffer by this point and set up a subscriber. crate::trace::stop_early_buffering(); emit!(VectorStarted); - runtime.spawn(heartbeat::heartbeat()); + handle.spawn(heartbeat::heartbeat()); let Self { require_healthy, @@ -224,7 +226,7 @@ impl Application { let topology_controller = SharedTopologyController::new(TopologyController { #[cfg(feature = "api")] - api_server: config.setup_api(runtime), + api_server: config.setup_api(handle), topology: config.topology, config_paths: config.config_paths.clone(), require_healthy, diff --git a/src/topology/controller.rs b/src/topology/controller.rs index 16d349fe014ed..9b64b7f050279 100644 --- a/src/topology/controller.rs +++ b/src/topology/controller.rs @@ -3,6 +3,7 @@ use std::sync::Arc; #[cfg(feature = "enterprise")] use futures_util::future::BoxFuture; use futures_util::FutureExt as _; + use tokio::sync::{Mutex, MutexGuard}; #[cfg(feature = "api")] @@ -14,6 +15,7 @@ use crate::config::enterprise::{ use crate::internal_events::{ VectorConfigLoadError, VectorRecoveryError, VectorReloadError, VectorReloaded, }; + use crate::{config, topology::RunningTopology}; #[derive(Clone, Debug)] @@ -93,6 +95,42 @@ impl TopologyController { } } + // Start the api server or disable it, if necessary + #[cfg(feature = "api")] + if !new_config.api.enabled { + if let Some(server) = self.api_server.take() { + debug!("Dropping api server."); + drop(server) + } + } else if self.api_server.is_none() { + use crate::internal_events::ApiStarted; + use crate::topology::ReloadOutcome::FatalError; + use std::sync::atomic::AtomicBool; + use tokio::runtime::Handle; + + debug!("Starting api server."); + + self.api_server = match api::Server::start( + self.topology.config(), + self.topology.watch(), + Arc::::clone(&self.topology.running), + &Handle::current(), + ) { + Ok(api_server) => { + emit!(ApiStarted { + addr: new_config.api.address.unwrap(), + playground: new_config.api.playground + }); + + Some(api_server) + } + Err(e) => { + error!("An error occurred that Vector couldn't handle: {}.", e); + return FatalError; + } + } + } + match self.topology.reload_config_and_respawn(new_config).await { Ok(true) => { #[cfg(feature = "api")]