Skip to content

Commit

Permalink
fix(reload): restart api server based on topology (vectordotdev#17958)
Browse files Browse the repository at this point in the history
This PR attempts to resolve:
vectordotdev#13508

I'm not married to switching out the `Runtime` parameters for `Handle`s,
it just seemed like the easiest way to get something that could spawn
tasks into the `TopologyController` was `Handle::current()`, and that
required shifting the parameter types to match. Let me know if another
route would be preferred.
  • Loading branch information
KH-Moogsoft authored Jul 18, 2023
1 parent 3b91662 commit b00727e
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 9 deletions.
7 changes: 4 additions & 3 deletions src/api/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use async_graphql::{
Data, Request, Schema,
};
use async_graphql_warp::{graphql_protocol, GraphQLResponse, GraphQLWebSocket};
use tokio::runtime::Handle;
use tokio::sync::oneshot;
use warp::{filters::BoxedFilter, http::Response, ws::Ws, Filter, Reply};

Expand All @@ -31,13 +32,13 @@ impl Server {
config: &config::Config,
watch_rx: topology::WatchRx,
running: Arc<AtomicBool>,
runtime: &tokio::runtime::Runtime,
handle: &Handle,
) -> crate::Result<Self> {
let routes = make_routes(config.api.playground, watch_rx, running);

let (_shutdown, rx) = oneshot::channel();
// warp uses `tokio::spawn` and so needs us to enter the runtime context.
let _guard = runtime.enter();
let _guard = handle.enter();
let (addr, server) = warp::serve(routes)
.try_bind_with_graceful_shutdown(
config.api.address.expect("No socket address"),
Expand All @@ -57,7 +58,7 @@ impl Server {
schema::components::update_config(config);

// Spawn the server in the background.
runtime.spawn(server);
handle.spawn(server);

Ok(Self { _shutdown, addr })
}
Expand Down
14 changes: 8 additions & 6 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use crate::{
use std::os::unix::process::ExitStatusExt;
#[cfg(windows)]
use std::os::windows::process::ExitStatusExt;
use tokio::runtime::Handle;

pub static WORKER_THREADS: OnceNonZeroUsize = OnceNonZeroUsize::new();

Expand Down Expand Up @@ -122,13 +123,13 @@ impl ApplicationConfig {

/// Configure the API server, if applicable
#[cfg(feature = "api")]
pub fn setup_api(&self, runtime: &Runtime) -> Option<api::Server> {
pub fn setup_api(&self, handle: &Handle) -> Option<api::Server> {
if self.api.enabled {
match api::Server::start(
self.topology.config(),
self.topology.watch(),
std::sync::Arc::clone(&self.topology.running),
runtime,
handle,
) {
Ok(api_server) => {
emit!(ApiStarted {
Expand Down Expand Up @@ -159,7 +160,8 @@ impl Application {
}

pub fn prepare_start() -> Result<(Runtime, StartedApplication), ExitCode> {
Self::prepare().and_then(|(runtime, app)| app.start(&runtime).map(|app| (runtime, app)))
Self::prepare()
.and_then(|(runtime, app)| app.start(runtime.handle()).map(|app| (runtime, app)))
}

pub fn prepare() -> Result<(Runtime, Self), ExitCode> {
Expand Down Expand Up @@ -208,13 +210,13 @@ impl Application {
))
}

pub fn start(self, runtime: &Runtime) -> Result<StartedApplication, ExitCode> {
pub fn start(self, handle: &Handle) -> Result<StartedApplication, ExitCode> {
// Any internal_logs sources will have grabbed a copy of the
// early buffer by this point and set up a subscriber.
crate::trace::stop_early_buffering();

emit!(VectorStarted);
runtime.spawn(heartbeat::heartbeat());
handle.spawn(heartbeat::heartbeat());

let Self {
require_healthy,
Expand All @@ -224,7 +226,7 @@ impl Application {

let topology_controller = SharedTopologyController::new(TopologyController {
#[cfg(feature = "api")]
api_server: config.setup_api(runtime),
api_server: config.setup_api(handle),
topology: config.topology,
config_paths: config.config_paths.clone(),
require_healthy,
Expand Down
38 changes: 38 additions & 0 deletions src/topology/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::sync::Arc;
#[cfg(feature = "enterprise")]
use futures_util::future::BoxFuture;
use futures_util::FutureExt as _;

use tokio::sync::{Mutex, MutexGuard};

#[cfg(feature = "api")]
Expand All @@ -14,6 +15,7 @@ use crate::config::enterprise::{
use crate::internal_events::{
VectorConfigLoadError, VectorRecoveryError, VectorReloadError, VectorReloaded,
};

use crate::{config, topology::RunningTopology};

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -93,6 +95,42 @@ impl TopologyController {
}
}

// Start the api server or disable it, if necessary
#[cfg(feature = "api")]
if !new_config.api.enabled {
if let Some(server) = self.api_server.take() {
debug!("Dropping api server.");
drop(server)
}
} else if self.api_server.is_none() {
use crate::internal_events::ApiStarted;
use crate::topology::ReloadOutcome::FatalError;
use std::sync::atomic::AtomicBool;
use tokio::runtime::Handle;

debug!("Starting api server.");

self.api_server = match api::Server::start(
self.topology.config(),
self.topology.watch(),
Arc::<AtomicBool>::clone(&self.topology.running),
&Handle::current(),
) {
Ok(api_server) => {
emit!(ApiStarted {
addr: new_config.api.address.unwrap(),
playground: new_config.api.playground
});

Some(api_server)
}
Err(e) => {
error!("An error occurred that Vector couldn't handle: {}.", e);
return FatalError;
}
}
}

match self.topology.reload_config_and_respawn(new_config).await {
Ok(true) => {
#[cfg(feature = "api")]
Expand Down

0 comments on commit b00727e

Please sign in to comment.