Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(core): Add a CLI flag to allow for empty configs #19021

Merged
merged 7 commits into from
Nov 2, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 21 additions & 9 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ pub struct ApplicationConfig {
}

pub struct Application {
pub require_healthy: Option<bool>,
pub root_opts: RootOpts,
pub config: ApplicationConfig,
pub signals: SignalPair,
pub openssl_providers: Option<Vec<Provider>>,
Expand All @@ -75,6 +75,7 @@ impl ApplicationConfig {
&config_paths,
opts.watch_config,
opts.require_healthy,
opts.allow_empty_config,
graceful_shutdown_duration,
signal_handler,
)
Expand Down Expand Up @@ -219,7 +220,7 @@ impl Application {
Ok((
runtime,
Self {
require_healthy: opts.root.require_healthy,
root_opts: opts.root,
config,
signals,
openssl_providers,
Expand All @@ -236,7 +237,7 @@ impl Application {
handle.spawn(heartbeat::heartbeat());

let Self {
require_healthy,
root_opts,
config,
signals,
openssl_providers,
Expand All @@ -247,7 +248,7 @@ impl Application {
api_server: config.setup_api(handle),
topology: config.topology,
config_paths: config.config_paths.clone(),
require_healthy,
require_healthy: root_opts.require_healthy,
#[cfg(feature = "enterprise")]
enterprise_reporter: config.enterprise,
});
Expand All @@ -259,6 +260,7 @@ impl Application {
signals,
topology_controller,
openssl_providers,
allow_empty_config: root_opts.allow_empty_config,
})
}
}
Expand All @@ -270,6 +272,7 @@ pub struct StartedApplication {
pub signals: SignalPair,
pub topology_controller: SharedTopologyController,
pub openssl_providers: Option<Vec<Provider>>,
pub allow_empty_config: bool,
}

impl StartedApplication {
Expand All @@ -285,6 +288,7 @@ impl StartedApplication {
topology_controller,
openssl_providers,
internal_topologies,
allow_empty_config,
} = self;

let mut graceful_crash = UnboundedReceiverStream::new(graceful_crash_receiver);
Expand All @@ -293,18 +297,20 @@ impl StartedApplication {
let mut signal_rx = signals.receiver;

let signal = loop {
let has_sources = !topology_controller.lock().await.topology.config.is_empty();
tokio::select! {
signal = signal_rx.recv() => if let Some(signal) = handle_signal(
signal,
&topology_controller,
&config_paths,
&mut signal_handler,
allow_empty_config,
).await {
break signal;
},
// Trigger graceful shutdown if a component crashed, or all sources have ended.
error = graceful_crash.next() => break SignalTo::Shutdown(error),
_ = TopologyController::sources_finished(topology_controller.clone()) => {
_ = TopologyController::sources_finished(topology_controller.clone()), if has_sources => {
info!("All sources have finished.");
break SignalTo::Shutdown(None)
} ,
Expand All @@ -327,6 +333,7 @@ async fn handle_signal(
topology_controller: &SharedTopologyController,
config_paths: &[ConfigPath],
signal_handler: &mut SignalHandler,
allow_empty_config: bool,
) -> Option<SignalTo> {
match signal {
Ok(SignalTo::ReloadFromConfigBuilder(config_builder)) => {
Expand All @@ -349,6 +356,7 @@ async fn handle_signal(
let new_config = config::load_from_paths_with_provider_and_secrets(
&topology_controller.config_paths,
signal_handler,
allow_empty_config,
)
.await
.map_err(handle_config_errors)
Expand Down Expand Up @@ -496,6 +504,7 @@ pub async fn load_configs(
config_paths: &[ConfigPath],
watch_config: bool,
require_healthy: Option<bool>,
allow_empty_config: bool,
graceful_shutdown_duration: Option<Duration>,
signal_handler: &mut SignalHandler,
) -> Result<Config, ExitCode> {
Expand All @@ -520,10 +529,13 @@ pub async fn load_configs(
#[cfg(not(feature = "enterprise-tests"))]
config::init_log_schema(&config_paths, true).map_err(handle_config_errors)?;

let mut config =
config::load_from_paths_with_provider_and_secrets(&config_paths, signal_handler)
.await
.map_err(handle_config_errors)?;
let mut config = config::load_from_paths_with_provider_and_secrets(
&config_paths,
signal_handler,
allow_empty_config,
)
.await
.map_err(handle_config_errors)?;

config::init_telemetry(config.global.telemetry.clone(), true);

Expand Down
7 changes: 7 additions & 0 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,13 @@ pub struct RootOpts {
/// default inherits the environment of the Vector process.
#[arg(long, env = "VECTOR_OPENSSL_NO_PROBE", default_value = "false")]
pub openssl_no_probe: bool,

/// Allow the configuration to run without any components. This is useful for loading in an
/// empty stub config that will later be replaced with actual components. Note that this is
/// likely not useful without also watching for config file changes as described for
/// `--watch-empty`.
#[arg(long, env = "VECTOR_ALLOW_EMPTY", default_value = "false")]
pub allow_empty_config: bool,
}

impl RootOpts {
Expand Down
6 changes: 6 additions & 0 deletions src/config/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ pub struct ConfigBuilder {
#[serde(default, skip)]
#[doc(hidden)]
pub graceful_shutdown_duration: Option<Duration>,

/// Allow the configuration to be empty, resulting in a topology with no components.
#[serde(default, skip)]
#[doc(hidden)]
pub allow_empty: bool,
}

#[cfg(feature = "enterprise")]
Expand Down Expand Up @@ -232,6 +237,7 @@ impl From<Config> for ConfigBuilder {
tests,
secret,
graceful_shutdown_duration,
allow_empty: false,
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/config/compiler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ pub fn compile(mut builder: ConfigBuilder) -> Result<(Config, Vec<String>), Vec<
provider: _,
secret,
graceful_shutdown_duration,
allow_empty: _,
} = builder;

let graph = match Graph::new(&sources, &transforms, &sinks, schema) {
Expand Down
3 changes: 3 additions & 0 deletions src/config/loading/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ pub fn load_from_paths(config_paths: &[ConfigPath]) -> Result<Config, Vec<String
pub async fn load_from_paths_with_provider_and_secrets(
config_paths: &[ConfigPath],
signal_handler: &mut signal::SignalHandler,
allow_empty: bool,
) -> Result<Config, Vec<String>> {
// Load secret backends first
let (mut secrets_backends_loader, secrets_warning) =
Expand All @@ -149,6 +150,8 @@ pub async fn load_from_paths_with_provider_and_secrets(
load_builder_from_paths(config_paths)?
};

builder.allow_empty = allow_empty;

validation::check_provider(&builder)?;
signal_handler.clear();

Expand Down
4 changes: 4 additions & 0 deletions src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,10 @@ impl Config {
Default::default()
}

pub fn is_empty(&self) -> bool {
self.sources.is_empty()
}

pub fn sources(&self) -> impl Iterator<Item = (&ComponentKey, &SourceOuter)> {
self.sources.iter()
}
Expand Down
12 changes: 7 additions & 5 deletions src/config/validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,14 @@ pub fn check_names<'a, I: Iterator<Item = &'a ComponentKey>>(names: I) -> Result
pub fn check_shape(config: &ConfigBuilder) -> Result<(), Vec<String>> {
let mut errors = vec![];

if config.sources.is_empty() {
errors.push("No sources defined in the config.".to_owned());
}
if !config.allow_empty {
if config.sources.is_empty() {
errors.push("No sources defined in the config.".to_owned());
}

if config.sinks.is_empty() {
errors.push("No sinks defined in the config.".to_owned());
if config.sinks.is_empty() {
errors.push("No sinks defined in the config.".to_owned());
}
}

// Helper for below
Expand Down
Loading