Skip to content

Commit

Permalink
chore(core): Set up internal topology API (vectordotdev#18919)
Browse files Browse the repository at this point in the history
* Rename `Pieces` to `TopologyPieces`

* Move `topology::start_validated` into `impl RunningTopology`

* Move `topology::builder::build_pieces` into `impl TopologyPieces`

* Move `topology::build_or_log_errors` into `TopologyPieces`

* Add helper function to `RunningTopology` for initial startup

* Simplify topology start return type

* Add support for internal topologies

* Add demo of internal pipeline

* Create the internal config manually

* Revert the sample use of the interface
  • Loading branch information
bruceg authored Oct 25, 2023
1 parent 5c1707f commit c9c184e
Show file tree
Hide file tree
Showing 14 changed files with 193 additions and 204 deletions.
57 changes: 34 additions & 23 deletions src/app.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,13 @@
#![allow(missing_docs)]
use std::{
collections::HashMap, num::NonZeroUsize, path::PathBuf, process::ExitStatus, time::Duration,
};
use std::{num::NonZeroUsize, path::PathBuf, process::ExitStatus, time::Duration};

use exitcode::ExitCode;
use futures::StreamExt;
#[cfg(feature = "enterprise")]
use futures_util::future::BoxFuture;
use once_cell::race::OnceNonZeroUsize;
use openssl::provider::Provider;
use tokio::{
runtime::{self, Runtime},
sync::mpsc,
};
use tokio::runtime::{self, Runtime};
use tokio_stream::wrappers::UnboundedReceiverStream;

#[cfg(feature = "enterprise")]
Expand All @@ -28,9 +23,10 @@ use crate::{
cli::{handle_config_errors, LogFormat, Opts, RootOpts},
config::{self, Config, ConfigPath},
heartbeat,
signal::{ShutdownError, SignalHandler, SignalPair, SignalRx, SignalTo},
signal::{SignalHandler, SignalPair, SignalRx, SignalTo},
topology::{
self, ReloadOutcome, RunningTopology, SharedTopologyController, TopologyController,
ReloadOutcome, RunningTopology, SharedTopologyController, ShutdownErrorReceiver,
TopologyController,
},
trace,
};
Expand All @@ -50,8 +46,8 @@ use tokio::sync::broadcast::error::RecvError;
pub struct ApplicationConfig {
pub config_paths: Vec<config::ConfigPath>,
pub topology: RunningTopology,
pub graceful_crash_sender: mpsc::UnboundedSender<ShutdownError>,
pub graceful_crash_receiver: mpsc::UnboundedReceiver<ShutdownError>,
pub graceful_crash_receiver: ShutdownErrorReceiver,
pub internal_topologies: Vec<RunningTopology>,
#[cfg(feature = "api")]
pub api: config::api::Options,
#[cfg(feature = "enterprise")]
Expand Down Expand Up @@ -99,30 +95,33 @@ impl ApplicationConfig {
#[cfg(feature = "enterprise")]
let enterprise = build_enterprise(&mut config, config_paths.clone())?;

let diff = config::ConfigDiff::initial(&config);
let pieces = topology::build_or_log_errors(&config, &diff, HashMap::new())
.await
.ok_or(exitcode::CONFIG)?;

#[cfg(feature = "api")]
let api = config.api;

let result = topology::start_validated(config, diff, pieces).await;
let (topology, (graceful_crash_sender, graceful_crash_receiver)) =
result.ok_or(exitcode::CONFIG)?;
let (topology, graceful_crash_receiver) = RunningTopology::start_init_validated(config)
.await
.ok_or(exitcode::CONFIG)?;

Ok(Self {
config_paths,
topology,
graceful_crash_sender,
graceful_crash_receiver,
internal_topologies: Vec::new(),
#[cfg(feature = "api")]
api,
#[cfg(feature = "enterprise")]
enterprise,
})
}

pub async fn add_internal_config(&mut self, config: Config) -> Result<(), ExitCode> {
let Some((topology, _)) = RunningTopology::start_init_validated(config).await else {
return Err(exitcode::CONFIG);
};
self.internal_topologies.push(topology);
Ok(())
}

/// Configure the API server, if applicable
#[cfg(feature = "api")]
pub fn setup_api(&self, handle: &Handle) -> Option<api::Server> {
Expand All @@ -145,8 +144,9 @@ impl ApplicationConfig {
let error = error.to_string();
error!("An error occurred that Vector couldn't handle: {}.", error);
_ = self
.graceful_crash_sender
.send(ShutdownError::ApiFailed { error });
.topology
.abort_tx
.send(crate::signal::ShutdownError::ApiFailed { error });
None
}
}
Expand Down Expand Up @@ -254,6 +254,7 @@ impl Application {

Ok(StartedApplication {
config_paths: config.config_paths,
internal_topologies: config.internal_topologies,
graceful_crash_receiver: config.graceful_crash_receiver,
signals,
topology_controller,
Expand All @@ -264,7 +265,8 @@ impl Application {

pub struct StartedApplication {
pub config_paths: Vec<ConfigPath>,
pub graceful_crash_receiver: mpsc::UnboundedReceiver<ShutdownError>,
pub internal_topologies: Vec<RunningTopology>,
pub graceful_crash_receiver: ShutdownErrorReceiver,
pub signals: SignalPair,
pub topology_controller: SharedTopologyController,
pub openssl_providers: Option<Vec<Provider>>,
Expand All @@ -282,6 +284,7 @@ impl StartedApplication {
signals,
topology_controller,
openssl_providers,
internal_topologies,
} = self;

let mut graceful_crash = UnboundedReceiverStream::new(graceful_crash_receiver);
Expand Down Expand Up @@ -314,6 +317,7 @@ impl StartedApplication {
signal_rx,
topology_controller,
openssl_providers,
internal_topologies,
}
}
}
Expand Down Expand Up @@ -369,6 +373,7 @@ pub struct FinishedApplication {
pub signal_rx: SignalRx,
pub topology_controller: SharedTopologyController,
pub openssl_providers: Option<Vec<Provider>>,
pub internal_topologies: Vec<RunningTopology>,
}

impl FinishedApplication {
Expand All @@ -378,6 +383,7 @@ impl FinishedApplication {
signal_rx,
topology_controller,
openssl_providers,
internal_topologies,
} = self;

// At this point, we'll have the only reference to the shared topology controller and can
Expand All @@ -392,6 +398,11 @@ impl FinishedApplication {
SignalTo::Quit => Self::quit(),
_ => unreachable!(),
};

for topology in internal_topologies {
topology.stop().await;
}

drop(openssl_providers);
status
}
Expand Down
14 changes: 4 additions & 10 deletions src/components/validation/runner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ use vector_core::{event::Event, EstimatedJsonEncodedSizeOf};
use crate::{
codecs::Encoder,
components::validation::{RunnerMetrics, TestCase},
config::{ConfigBuilder, ConfigDiff},
topology,
config::ConfigBuilder,
topology::RunningTopology,
};

use super::{
Expand Down Expand Up @@ -471,7 +471,6 @@ fn spawn_component_topology(
.build()
.expect("config should not have any errors");
config.healthchecks.set_require_healthy(Some(true));
let config_diff = ConfigDiff::initial(&config);

_ = std::thread::spawn(move || {
let test_runtime = Builder::new_current_thread()
Expand All @@ -482,13 +481,8 @@ fn spawn_component_topology(
test_runtime.block_on(async move {
debug!("Building component topology...");

let pieces = topology::build_or_log_errors(&config, &config_diff, HashMap::new())
.await
.unwrap();
let (topology, (_, mut crash_rx)) =
topology::start_validated(config, config_diff, pieces)
.await
.unwrap();
let (topology, mut crash_rx) =
RunningTopology::start_init_validated(config).await.unwrap();

debug!("Component topology built and spawned.");
topology_started.mark_as_done();
Expand Down
2 changes: 1 addition & 1 deletion src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,7 @@ mod tests {
let c2 = config::load_from_str(config, format).unwrap();
match (
config::warnings(&c2),
topology::builder::build_pieces(&c, &diff, HashMap::new()).await,
topology::TopologyPieces::build(&c, &diff, HashMap::new()).await,
) {
(warnings, Ok(_pieces)) => Ok(warnings),
(_, Err(errors)) => Err(errors),
Expand Down
11 changes: 4 additions & 7 deletions src/config/unit_test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,13 @@ use crate::{
},
event::{Event, LogEvent, Value},
signal,
topology::{
self,
builder::{self, Pieces},
},
topology::{builder::TopologyPieces, RunningTopology},
};

pub struct UnitTest {
pub name: String,
config: Config,
pieces: Pieces,
pieces: TopologyPieces,
test_result_rxs: Vec<Receiver<UnitTestSinkResult>>,
}

Expand All @@ -49,7 +46,7 @@ pub struct UnitTestResult {
impl UnitTest {
pub async fn run(self) -> UnitTestResult {
let diff = config::ConfigDiff::initial(&self.config);
let (topology, _) = topology::start_validated(self.config, diff, self.pieces)
let (topology, _) = RunningTopology::start_validated(self.config, diff, self.pieces)
.await
.unwrap();
topology.sources_finished().await;
Expand Down Expand Up @@ -422,7 +419,7 @@ async fn build_unit_test(
}
let config = config_builder.build()?;
let diff = config::ConfigDiff::initial(&config);
let pieces = builder::build_pieces(&config, &diff, HashMap::new()).await?;
let pieces = TopologyPieces::build(&config, &diff, HashMap::new()).await?;

Ok(UnitTest {
name: test.name,
Expand Down
11 changes: 2 additions & 9 deletions src/sinks/datadog/traces/apm_stats/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@ use tokio::time::{sleep, Duration};

use crate::{
config::ConfigBuilder,
signal::ShutdownError,
sinks::datadog::traces::{apm_stats::StatsPayload, DatadogTracesConfig},
sources::datadog_agent::DatadogAgentConfig,
test_util::{start_topology, trace_init},
topology::RunningTopology,
topology::{RunningTopology, ShutdownErrorReceiver},
};

/// The port on which the Agent will send traces to vector, and vector `datadog_agent` source will
Expand Down Expand Up @@ -320,13 +319,7 @@ fn validate_stats(agent_stats: &StatsPayload, vector_stats: &StatsPayload) {
/// This creates a scenario where the stats payload that is output by the sink after processing the
/// *second* batch of events (the second event) *should* contain the aggregated statistics of both
/// of the trace events. i.e , the hit count for that bucket should be equal to "2" , not "1".
async fn start_vector() -> (
RunningTopology,
(
tokio::sync::mpsc::UnboundedSender<ShutdownError>,
tokio::sync::mpsc::UnboundedReceiver<ShutdownError>,
),
) {
async fn start_vector() -> (RunningTopology, ShutdownErrorReceiver) {
let dd_agent_address = format!("0.0.0.0:{}", vector_receive_port());

let source_config = toml::from_str::<DatadogAgentConfig>(&format!(
Expand Down
21 changes: 4 additions & 17 deletions src/test_util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,8 @@ use vector_core::event::{BatchNotifier, Event, EventArray, LogEvent};
use zstd::Decoder as ZstdDecoder;

use crate::{
config::{Config, ConfigDiff, GenerateConfig},
signal::ShutdownError,
topology::{self, RunningTopology},
config::{Config, GenerateConfig},
topology::{RunningTopology, ShutdownErrorReceiver},
trace,
};

Expand Down Expand Up @@ -681,21 +680,9 @@ impl CountReceiver<Event> {
pub async fn start_topology(
mut config: Config,
require_healthy: impl Into<Option<bool>>,
) -> (
RunningTopology,
(
tokio::sync::mpsc::UnboundedSender<ShutdownError>,
tokio::sync::mpsc::UnboundedReceiver<ShutdownError>,
),
) {
) -> (RunningTopology, ShutdownErrorReceiver) {
config.healthchecks.set_require_healthy(require_healthy);
let diff = ConfigDiff::initial(&config);
let pieces = topology::build_or_log_errors(&config, &diff, HashMap::new())
.await
.unwrap();
topology::start_validated(config, diff, pieces)
.await
.unwrap()
RunningTopology::start_init_validated(config).await.unwrap()
}

/// Collect the first `n` events from a stream while a future is spawned
Expand Down
46 changes: 32 additions & 14 deletions src/topology/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ use super::{
};
use crate::{
config::{
ComponentKey, DataType, EnrichmentTableConfig, Input, Inputs, OutputId, ProxyConfig,
SinkContext, SourceContext, TransformContext, TransformOuter, TransformOutput,
ComponentKey, Config, DataType, EnrichmentTableConfig, Input, Inputs, OutputId,
ProxyConfig, SinkContext, SourceContext, TransformContext, TransformOuter, TransformOutput,
},
event::{EventArray, EventContainer},
internal_events::EventsReceived,
Expand Down Expand Up @@ -73,15 +73,6 @@ static TRANSFORM_CONCURRENCY_LIMIT: Lazy<usize> = Lazy::new(|| {

const INTERNAL_SOURCES: [&str; 2] = ["internal_logs", "internal_metrics"];

/// Builds only the new pieces, and doesn't check their topology.
pub async fn build_pieces(
config: &super::Config,
diff: &ConfigDiff,
buffers: HashMap<ComponentKey, BuiltBuffer>,
) -> Result<Pieces, Vec<String>> {
Builder::new(config, diff, buffers).build().await
}

struct Builder<'a> {
config: &'a super::Config,
diff: &'a ConfigDiff,
Expand Down Expand Up @@ -116,7 +107,7 @@ impl<'a> Builder<'a> {
}

/// Builds the new pieces of the topology found in `self.diff`.
async fn build(mut self) -> Result<Pieces, Vec<String>> {
async fn build(mut self) -> Result<TopologyPieces, Vec<String>> {
let enrichment_tables = self.load_enrichment_tables().await;
let source_tasks = self.build_sources().await;
self.build_transforms(enrichment_tables).await;
Expand All @@ -127,7 +118,7 @@ impl<'a> Builder<'a> {
enrichment_tables.finish_load();

if self.errors.is_empty() {
Ok(Pieces {
Ok(TopologyPieces {
inputs: self.inputs,
outputs: Self::finalize_outputs(self.outputs),
tasks: self.tasks,
Expand Down Expand Up @@ -677,7 +668,7 @@ impl<'a> Builder<'a> {
}
}

pub struct Pieces {
pub struct TopologyPieces {
pub(super) inputs: HashMap<ComponentKey, (BufferSender<EventArray>, Inputs<OutputId>)>,
pub(crate) outputs: HashMap<ComponentKey, HashMap<Option<String>, fanout::ControlChannel>>,
pub(super) tasks: HashMap<ComponentKey, Task>,
Expand All @@ -687,6 +678,33 @@ pub struct Pieces {
pub(crate) detach_triggers: HashMap<ComponentKey, Trigger>,
}

impl TopologyPieces {
pub async fn build_or_log_errors(
config: &Config,
diff: &ConfigDiff,
buffers: HashMap<ComponentKey, BuiltBuffer>,
) -> Option<Self> {
match TopologyPieces::build(config, diff, buffers).await {
Err(errors) => {
for error in errors {
error!(message = "Configuration error.", %error);
}
None
}
Ok(new_pieces) => Some(new_pieces),
}
}

/// Builds only the new pieces, and doesn't check their topology.
pub async fn build(
config: &super::Config,
diff: &ConfigDiff,
buffers: HashMap<ComponentKey, BuiltBuffer>,
) -> Result<Self, Vec<String>> {
Builder::new(config, diff, buffers).build().await
}
}

const fn filter_events_type(events: &EventArray, data_type: DataType) -> bool {
match events {
EventArray::Logs(_) => data_type.contains(DataType::Log),
Expand Down
Loading

0 comments on commit c9c184e

Please sign in to comment.