Skip to content

Commit

Permalink
fix(component validation): make tests deterministic through absolute …
Browse files Browse the repository at this point in the history
…comparisons instead of bounds checks (vectordotdev#17956)

### Changes

- Fixes a bug which was occasionally circumvented by comparing against a
lower bound for some of the _bytes metrics- it seems the offered
reasoning for the lower bound (inability to synchronize with the
component's internal logic) was inaccurate.
- The bug itself was that we were converting the `EventData` read from
the yaml file, into a proper Event (and thus generating metadata for
it), in two places- first when sending the event as input in the input
runner, and again in the validator to construct the expected result.
Because the event construction adds a timestamp, and because our
determination of timestamp precision is based on whether there are
trailing zeros that can be trimmed, this created a scenario where the
actual event input to the component under test, and the event used to
calculated expected metrics, had a timestamp that varied in it's number
of bytes. This of course did not happen all of the time, which meant
that the tests sometimes passed and sometimes failed.
- The fix for this is to create the proper Event immediately after
parsing the EventData from the yaml file (thereby only creating it
once).
- Moves the calculation of expected metrics, out of the Validators and
up to the input/output runners. This removes the need for a lot of logic
that was previously being done in the Validators.
- Increases the static sleep we have when initializing the runner
topology from 1s to 2s, and additionally increased the number of
internal metric ticks to wait for before shutting down the telemetry
from 2 ticks to 3. I found through experimentation that occasionally we
would receive no events. The two attributes were increased independently
and it was only through increasing both that the pass rate became 100%.
- Extracts duplicated code in the sources validator functions into
helper functions.

### Testing Done

The tests were run in a loop, stopping on the first failure.
This method was used to calibrate the waits we have on the topology as
mentioned above.
Before the current settings, the maximum sequential passes was about
100.
With the current settings, the loop was manually stopped at about 1.2k
iterations.
  • Loading branch information
neuronull authored Jul 19, 2023
1 parent 7d0db6b commit 52a8036
Show file tree
Hide file tree
Showing 10 changed files with 426 additions and 444 deletions.
13 changes: 13 additions & 0 deletions src/components/validation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,19 @@ macro_rules! register_validatable_component {
};
}

/// Input and Output runners populate this structure as they send and receive events.
/// The structure is passed into the validator to use as the expected values for the
/// metrics that the components under test actually output.
#[derive(Default)]
pub struct RunnerMetrics {
pub received_events_total: u64,
pub received_event_bytes_total: u64,
pub received_bytes_total: u64,
pub sent_bytes_total: u64, // a reciprocal for received_bytes_total
pub sent_event_bytes_total: u64,
pub sent_events_total: u64,
}

#[cfg(all(test, feature = "component-validation-tests"))]
mod tests {
use std::{
Expand Down
111 changes: 100 additions & 11 deletions src/components/validation/resources/event.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,20 @@
use bytes::BytesMut;
use serde::Deserialize;
use snafu::Snafu;
use tokio_util::codec::Encoder as _;

use crate::codecs::Encoder;
use codecs::{
encoding, JsonSerializer, LengthDelimitedEncoder, LogfmtSerializer, MetricTagValues,
NewlineDelimitedEncoder,
};
use vector_core::event::{Event, LogEvent};

/// An event used in a test case.
/// A test case event for deserialization from yaml file.
/// This is an intermediary step to TestEvent.
#[derive(Clone, Debug, Deserialize)]
#[serde(untagged)]
pub enum TestEvent {
pub enum RawTestEvent {
/// The event is used, as-is, without modification.
Passthrough(EventData),

Expand All @@ -20,15 +30,6 @@ pub enum TestEvent {
Modified { modified: bool, event: EventData },
}

impl TestEvent {
pub fn into_event(self) -> Event {
match self {
Self::Passthrough(event) => event.into_event(),
Self::Modified { event, .. } => event.into_event(),
}
}
}

#[derive(Clone, Debug, Deserialize)]
#[serde(untagged)]
pub enum EventData {
Expand All @@ -44,3 +45,91 @@ impl EventData {
}
}
}

/// An event used in a test case.
/// It is important to have created the event with all fields, immediately after deserializing from the
/// test case definition yaml file. This ensures that the event data we are using in the expected/actual
/// metrics collection is based on the same event. Namely, one issue that can arise from creating the event
/// from the event data twice (once for the expected and once for actual), it can result in a timestamp in
/// the event which may or may not have the same millisecond precision as it's counterpart.
#[derive(Clone, Debug, Deserialize)]
#[serde(from = "RawTestEvent")]
#[serde(untagged)]
pub enum TestEvent {
/// The event is used, as-is, without modification.
Passthrough(Event),

/// The event is potentially modified by the external resource.
///
/// The modification made is dependent on the external resource, but this mode is made available
/// for when a test case wants to exercise the failure path, but cannot cause a failure simply
/// by constructing the event in a certain way i.e. adding an invalid field, or removing a
/// required field, or using an invalid field value, and so on.
///
/// For transforms and sinks, generally, the only way to cause an error is if the event itself
/// is malformed in some way, which can be achieved without this test event variant.
Modified { modified: bool, event: Event },
}

impl TestEvent {
#[allow(clippy::missing_const_for_fn)] // const cannot run destructor
pub fn into_event(self) -> Event {
match self {
Self::Passthrough(event) => event,
Self::Modified { event, .. } => event,
}
}
}

#[derive(Clone, Debug, Eq, PartialEq, Snafu)]
pub enum RawTestEventParseError {}

impl From<RawTestEvent> for TestEvent {
fn from(other: RawTestEvent) -> Self {
match other {
RawTestEvent::Passthrough(event_data) => {
TestEvent::Passthrough(event_data.into_event())
}
RawTestEvent::Modified { modified, event } => TestEvent::Modified {
modified,
event: event.into_event(),
},
}
}
}

pub fn encode_test_event(
encoder: &mut Encoder<encoding::Framer>,
buf: &mut BytesMut,
event: TestEvent,
) {
match event {
TestEvent::Passthrough(event) => {
// Encode the event normally.
encoder
.encode(event, buf)
.expect("should not fail to encode input event");
}
TestEvent::Modified { event, .. } => {
// This is a little fragile, but we check what serializer this encoder uses, and based
// on `Serializer::supports_json`, we choose an opposing codec. For example, if the
// encoder supports JSON, we'll use a serializer that doesn't support JSON, and vise
// versa.
let mut alt_encoder = if encoder.serializer().supports_json() {
Encoder::<encoding::Framer>::new(
LengthDelimitedEncoder::new().into(),
LogfmtSerializer::new().into(),
)
} else {
Encoder::<encoding::Framer>::new(
NewlineDelimitedEncoder::new().into(),
JsonSerializer::new(MetricTagValues::default()).into(),
)
};

alt_encoder
.encode(event, buf)
.expect("should not fail to encode input event");
}
}
}
69 changes: 12 additions & 57 deletions src/components/validation/resources/http.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{
collections::VecDeque,
future::Future,
net::{IpAddr, SocketAddr},
str::FromStr,
sync::Arc,
Expand All @@ -11,26 +12,18 @@ use axum::{
Router,
};
use bytes::BytesMut;
use codecs::{
encoding, JsonSerializer, LengthDelimitedEncoder, LogfmtSerializer, MetricTagValues,
NewlineDelimitedEncoder,
};
use http::{Method, Request, StatusCode, Uri};
use hyper::{Body, Client, Server};
use std::future::Future;
use tokio::{
select,
sync::{mpsc, oneshot, Mutex, Notify},
};
use tokio_util::codec::{Decoder, Encoder as _};
use vector_core::event::Event;
use tokio_util::codec::Decoder;

use crate::{
codecs::Encoder,
components::validation::sync::{Configuring, TaskCoordinator},
};
use crate::components::validation::sync::{Configuring, TaskCoordinator};
use vector_core::event::Event;

use super::{ResourceCodec, ResourceDirection, TestEvent};
use super::{encode_test_event, ResourceCodec, ResourceDirection, TestEvent};

/// An HTTP resource.
#[derive(Clone)]
Expand Down Expand Up @@ -67,7 +60,7 @@ impl HttpResourceConfig {
self,
direction: ResourceDirection,
codec: ResourceCodec,
output_tx: mpsc::Sender<Event>,
output_tx: mpsc::Sender<Vec<Event>>,
task_coordinator: &TaskCoordinator<Configuring>,
) {
match direction {
Expand Down Expand Up @@ -230,7 +223,7 @@ fn spawn_input_http_client(
fn spawn_output_http_server(
config: HttpResourceConfig,
codec: ResourceCodec,
output_tx: mpsc::Sender<Event>,
output_tx: mpsc::Sender<Vec<Event>>,
task_coordinator: &TaskCoordinator<Configuring>,
) {
// This HTTP server will wait for events to be sent by a sink, and collect them and send them on
Expand All @@ -252,12 +245,10 @@ fn spawn_output_http_server(
loop {
match decoder.decode_eof(&mut body) {
Ok(Some((events, _byte_size))) => {
for event in events {
output_tx
.send(event)
.await
.expect("should not fail to send output event");
}
output_tx
.send(events.to_vec())
.await
.expect("should not fail to send output event");
}
Ok(None) => return StatusCode::OK.into_response(),
Err(_) => return StatusCode::INTERNAL_SERVER_ERROR.into_response(),
Expand Down Expand Up @@ -290,7 +281,7 @@ fn spawn_output_http_server(
fn spawn_output_http_client(
_config: HttpResourceConfig,
_codec: ResourceCodec,
_output_tx: mpsc::Sender<Event>,
_output_tx: mpsc::Sender<Vec<Event>>,
_task_coordinator: &TaskCoordinator<Configuring>,
) {
// TODO: The `prometheus_exporter` sink is the only sink that exposes an HTTP server which must be
Expand Down Expand Up @@ -400,39 +391,3 @@ fn socketaddr_from_uri(uri: &Uri) -> SocketAddr {

SocketAddr::from((uri_host, uri_port))
}

pub fn encode_test_event(
encoder: &mut Encoder<encoding::Framer>,
buf: &mut BytesMut,
event: TestEvent,
) {
match event {
TestEvent::Passthrough(event) => {
// Encode the event normally.
encoder
.encode(event.into_event(), buf)
.expect("should not fail to encode input event");
}
TestEvent::Modified { event, .. } => {
// This is a little fragile, but we check what serializer this encoder uses, and based
// on `Serializer::supports_json`, we choose an opposing codec. For example, if the
// encoder supports JSON, we'll use a serializer that doesn't support JSON, and vise
// versa.
let mut alt_encoder = if encoder.serializer().supports_json() {
Encoder::<encoding::Framer>::new(
LengthDelimitedEncoder::new().into(),
LogfmtSerializer::new().into(),
)
} else {
Encoder::<encoding::Framer>::new(
NewlineDelimitedEncoder::new().into(),
JsonSerializer::new(MetricTagValues::default()).into(),
)
};

alt_encoder
.encode(event.into_event(), buf)
.expect("should not fail to encode input event");
}
}
}
6 changes: 3 additions & 3 deletions src/components/validation/resources/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ use vector_core::{config::DataType, event::Event};

use crate::codecs::{Decoder, DecodingConfig, Encoder, EncodingConfig, EncodingConfigWithFraming};

pub use self::event::{EventData, TestEvent};
pub use self::http::{encode_test_event, HttpResourceConfig};
pub use self::event::{encode_test_event, TestEvent};
pub use self::http::HttpResourceConfig;

use super::sync::{Configuring, TaskCoordinator};

Expand Down Expand Up @@ -308,7 +308,7 @@ impl ExternalResource {
/// Spawns this resource for use as an output for a sink.
pub fn spawn_as_output(
self,
output_tx: mpsc::Sender<Event>,
output_tx: mpsc::Sender<Vec<Event>>,
task_coordinator: &TaskCoordinator<Configuring>,
) {
match self.definition {
Expand Down
31 changes: 17 additions & 14 deletions src/components/validation/runner/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ use crate::{

#[derive(Clone)]
pub struct EventForwardService {
tx: mpsc::Sender<Event>,
tx: mpsc::Sender<Vec<Event>>,
}

impl From<mpsc::Sender<Event>> for EventForwardService {
fn from(tx: mpsc::Sender<Event>) -> Self {
impl From<mpsc::Sender<Vec<Event>>> for EventForwardService {
fn from(tx: mpsc::Sender<Vec<Event>>) -> Self {
Self { tx }
}
}
Expand All @@ -42,14 +42,17 @@ impl VectorService for EventForwardService {
&self,
request: tonic::Request<PushEventsRequest>,
) -> Result<tonic::Response<PushEventsResponse>, Status> {
let events = request.into_inner().events.into_iter().map(Event::from);

for event in events {
self.tx
.send(event)
.await
.expect("event forward rx should not close first");
}
let events = request
.into_inner()
.events
.into_iter()
.map(Event::from)
.collect();

self.tx
.send(events)
.await
.expect("event forward rx should not close first");

Ok(tonic::Response::new(PushEventsResponse {}))
}
Expand All @@ -74,7 +77,7 @@ pub struct InputEdge {
pub struct OutputEdge {
listen_addr: GrpcAddress,
service: VectorServer<EventForwardService>,
rx: mpsc::Receiver<Event>,
rx: mpsc::Receiver<Vec<Event>>,
}

impl InputEdge {
Expand Down Expand Up @@ -129,7 +132,7 @@ impl OutputEdge {
pub fn spawn_output_server(
self,
task_coordinator: &TaskCoordinator<Configuring>,
) -> mpsc::Receiver<Event> {
) -> mpsc::Receiver<Vec<Event>> {
spawn_grpc_server(self.listen_addr, self.service, task_coordinator);
self.rx
}
Expand Down Expand Up @@ -184,5 +187,5 @@ pub fn spawn_grpc_server<S>(

pub struct ControlledEdges {
pub input: Option<mpsc::Sender<TestEvent>>,
pub output: Option<mpsc::Receiver<Event>>,
pub output: Option<mpsc::Receiver<Vec<Event>>>,
}
Loading

0 comments on commit 52a8036

Please sign in to comment.