diff --git a/benches/batch.rs b/benches/batch.rs index d1ca6bb557535..2caa5a2c11cd5 100644 --- a/benches/batch.rs +++ b/benches/batch.rs @@ -6,7 +6,7 @@ use vector::{ buffers::Acker, sinks::util::{ batch::{Batch, BatchConfig, BatchError, BatchSettings, BatchSize, PushResult}, - BatchSink, Buffer, Compression, Partition, PartitionBatchSink, + BatchSink, Buffer, Compression, EncodedEvent, Partition, PartitionBatchSink, }, test_util::{random_lines, runtime}, }; @@ -55,7 +55,7 @@ fn benchmark_batch(c: &mut Criterion) { inner: b, key: Bytes::from("key"), })) - .map(Ok), + .map(|item| Ok(EncodedEvent::new(item))), batch_sink, ) }, @@ -83,7 +83,11 @@ fn benchmark_batch(c: &mut Criterion) { ) .sink_map_err(|error| panic!("{}", error)); - (rt, stream::iter(input.clone()).map(Ok), batch_sink) + ( + rt, + stream::iter(input.clone()).map(|item| Ok(EncodedEvent::new(item))), + batch_sink, + ) }, |(rt, input, batch_sink)| rt.block_on(input.forward(batch_sink)).unwrap(), criterion::BatchSize::LargeInput, diff --git a/benches/isolated_buffer.rs b/benches/isolated_buffer.rs index 94fce697c1e29..78ebf273e001e 100644 --- a/benches/isolated_buffer.rs +++ b/benches/isolated_buffer.rs @@ -1,7 +1,6 @@ use async_trait::async_trait; use criterion::{criterion_group, BatchSize, Criterion, SamplingMode, Throughput}; use futures::{ - compat::{Future01CompatExt, Sink01CompatExt}, stream::{self, BoxStream}, Sink, SinkExt, Stream, StreamExt, }; diff --git a/lib/vector-core/src/event/mod.rs b/lib/vector-core/src/event/mod.rs index bb41096c7d6a7..e57fc2d2a1ad3 100644 --- a/lib/vector-core/src/event/mod.rs +++ b/lib/vector-core/src/event/mod.rs @@ -592,6 +592,35 @@ impl From for Event { } } +/// A wrapper for references to inner event types, where reconstituting +/// a full `Event` from a `LogEvent` or `Metric` might be inconvenient. +#[derive(Clone, Copy, Debug)] +pub enum EventRef<'a> { + Log(&'a LogEvent), + Metric(&'a Metric), +} + +impl<'a> From<&'a Event> for EventRef<'a> { + fn from(event: &'a Event) -> Self { + match event { + Event::Log(log) => log.into(), + Event::Metric(metric) => metric.into(), + } + } +} + +impl<'a> From<&'a LogEvent> for EventRef<'a> { + fn from(log: &'a LogEvent) -> Self { + Self::Log(log) + } +} + +impl<'a> From<&'a Metric> for EventRef<'a> { + fn from(metric: &'a Metric) -> Self { + Self::Metric(metric) + } +} + #[cfg(test)] mod test { use super::*; diff --git a/src/sinks/aws_cloudwatch_logs/mod.rs b/src/sinks/aws_cloudwatch_logs/mod.rs index 3e90b9cfd32da..a6eb6afa5594f 100644 --- a/src/sinks/aws_cloudwatch_logs/mod.rs +++ b/src/sinks/aws_cloudwatch_logs/mod.rs @@ -6,10 +6,11 @@ use crate::{ internal_events::TemplateRenderingFailed, rusoto::{self, AwsAuthentication, RegionOrEndpoint}, sinks::util::{ + batch::{BatchConfig, BatchSettings}, encoding::{EncodingConfig, EncodingConfiguration}, retries::{FixedRetryPolicy, RetryLogic}, - BatchConfig, BatchSettings, Compression, EncodedLength, PartitionBatchSink, - PartitionBuffer, PartitionInnerBuffer, TowerRequestConfig, TowerRequestSettings, VecBuffer, + Compression, EncodedEvent, EncodedLength, PartitionBatchSink, PartitionBuffer, + PartitionInnerBuffer, TowerRequestConfig, TowerRequestSettings, VecBuffer, }, template::Template, }; @@ -440,7 +441,7 @@ fn partition_encode( encoding: &EncodingConfig, group: &Template, stream: &Template, -) -> Option> { +) -> Option>> { let group = match group.render_string(&event) { Ok(b) => b, Err(error) => { @@ -474,7 +475,7 @@ fn partition_encode( ) .ok()?; - Some(PartitionInnerBuffer::new(event, key)) + Some(EncodedEvent::new(PartitionInnerBuffer::new(event, key))) } #[derive(Debug, Snafu)] @@ -693,9 +694,8 @@ mod tests { let group = "group".try_into().unwrap(); let encoding = Encoding::Text.into(); - let (_event, key) = partition_encode(event, &encoding, &group, &stream) - .unwrap() - .into_parts(); + let encoded = partition_encode(event, &encoding, &group, &stream).unwrap(); + let (_event, key) = encoded.item.into_parts(); let expected = CloudwatchKey { stream: "stream".into(), @@ -715,9 +715,8 @@ mod tests { let group = "group".try_into().unwrap(); let encoding = Encoding::Text.into(); - let (_event, key) = partition_encode(event, &encoding, &group, &stream) - .unwrap() - .into_parts(); + let encoded = partition_encode(event, &encoding, &group, &stream).unwrap(); + let (_event, key) = encoded.item.into_parts(); let expected = CloudwatchKey { stream: "stream".into(), @@ -737,9 +736,8 @@ mod tests { let group = "group".try_into().unwrap(); let encoding = Encoding::Text.into(); - let (_event, key) = partition_encode(event, &encoding, &group, &stream) - .unwrap() - .into_parts(); + let encoded = partition_encode(event, &encoding, &group, &stream).unwrap(); + let (_event, key) = encoded.item.into_parts(); let expected = CloudwatchKey { stream: "abcd-stream".into(), @@ -759,9 +757,8 @@ mod tests { let group = "group".try_into().unwrap(); let encoding = Encoding::Text.into(); - let (_event, key) = partition_encode(event, &encoding, &group, &stream) - .unwrap() - .into_parts(); + let encoded = partition_encode(event, &encoding, &group, &stream).unwrap(); + let (_event, key) = encoded.item.into_parts(); let expected = CloudwatchKey { stream: "stream-abcd".into(), @@ -896,7 +893,7 @@ mod integration_tests { let timestamp = chrono::Utc::now(); - let (input_lines, events) = random_lines_with_stream(100, 11); + let (input_lines, events) = random_lines_with_stream(100, 11, None); sink.run(events).await.unwrap(); let request = GetLogEventsRequest { @@ -943,7 +940,7 @@ mod integration_tests { let timestamp = chrono::Utc::now() - chrono::Duration::days(1); - let (mut input_lines, events) = random_lines_with_stream(100, 11); + let (mut input_lines, events) = random_lines_with_stream(100, 11, None); // add a historical timestamp to all but the first event, to simulate // out-of-order timestamps. @@ -1081,7 +1078,7 @@ mod integration_tests { let timestamp = chrono::Utc::now(); - let (input_lines, events) = random_lines_with_stream(100, 11); + let (input_lines, events) = random_lines_with_stream(100, 11, None); sink.run(events).await.unwrap(); let request = GetLogEventsRequest { @@ -1133,7 +1130,7 @@ mod integration_tests { let timestamp = chrono::Utc::now(); - let (input_lines, events) = random_lines_with_stream(100, 11); + let (input_lines, events) = random_lines_with_stream(100, 11, None); let mut events = events.map(Ok); let _ = sink.into_sink().send_all(&mut events).await.unwrap(); @@ -1181,7 +1178,7 @@ mod integration_tests { let timestamp = chrono::Utc::now(); - let (input_lines, _events) = random_lines_with_stream(100, 10); + let (input_lines, _events) = random_lines_with_stream(100, 10, None); let events = input_lines .clone() diff --git a/src/sinks/aws_cloudwatch_metrics.rs b/src/sinks/aws_cloudwatch_metrics.rs index a31d3c9bd5ae8..fc3b5e29b22db 100644 --- a/src/sinks/aws_cloudwatch_metrics.rs +++ b/src/sinks/aws_cloudwatch_metrics.rs @@ -6,10 +6,11 @@ use crate::{ }, rusoto::{self, AwsAuthentication, RegionOrEndpoint}, sinks::util::{ + batch::{BatchConfig, BatchSettings}, buffer::metrics::{MetricNormalize, MetricNormalizer, MetricSet, MetricsBuffer}, retries::RetryLogic, - BatchConfig, BatchSettings, Compression, PartitionBatchSink, PartitionBuffer, - PartitionInnerBuffer, TowerRequestConfig, + Compression, EncodedEvent, PartitionBatchSink, PartitionBuffer, PartitionInnerBuffer, + TowerRequestConfig, }, }; use chrono::{DateTime, SecondsFormat, Utc}; @@ -149,15 +150,16 @@ impl CloudWatchMetricsSvc { let sink = PartitionBatchSink::new(svc, buffer, batch.timeout, cx.acker()) .sink_map_err(|error| error!(message = "Fatal CloudwatchMetrics sink error.", %error)) .with_flat_map(move |event: Event| { - stream::iter(normalizer.apply(event).map(|mut event| { - let namespace = event - .as_mut_metric() + stream::iter(normalizer.apply(event).map(|mut metric| { + let namespace = metric .series .name .namespace .take() .unwrap_or_else(|| default_namespace.clone()); - Ok(PartitionInnerBuffer::new(event, namespace)) + Ok(EncodedEvent::new(PartitionInnerBuffer::new( + metric, namespace, + ))) })) }); diff --git a/src/sinks/aws_kinesis_firehose.rs b/src/sinks/aws_kinesis_firehose.rs index 3a4a7bd1e28d6..ba00c0258a944 100644 --- a/src/sinks/aws_kinesis_firehose.rs +++ b/src/sinks/aws_kinesis_firehose.rs @@ -6,7 +6,8 @@ use crate::{ encoding::{EncodingConfig, EncodingConfiguration}, retries::RetryLogic, sink::Response, - BatchConfig, BatchSettings, Compression, EncodedLength, TowerRequestConfig, VecBuffer, + BatchConfig, BatchSettings, Compression, EncodedEvent, EncodedLength, TowerRequestConfig, + VecBuffer, }, }; use bytes::Bytes; @@ -240,7 +241,7 @@ enum HealthcheckError { StreamNamesMismatch { name: String, stream_name: String }, } -fn encode_event(mut event: Event, encoding: &EncodingConfig) -> Record { +fn encode_event(mut event: Event, encoding: &EncodingConfig) -> EncodedEvent { encoding.apply_rules(&mut event); let log = event.into_log(); let data = match encoding.codec() { @@ -254,7 +255,7 @@ fn encode_event(mut event: Event, encoding: &EncodingConfig) -> Record let data = Bytes::from(data); - Record { data } + EncodedEvent::new(Record { data }) } #[cfg(test)] @@ -272,7 +273,7 @@ mod tests { let message = "hello world".to_string(); let event = encode_event(message.clone().into(), &Encoding::Text.into()); - assert_eq!(&event.data[..], message.as_bytes()); + assert_eq!(&event.item.data[..], message.as_bytes()); } #[test] @@ -282,7 +283,7 @@ mod tests { event.as_mut_log().insert("key", "value"); let event = encode_event(event, &Encoding::Json.into()); - let map: BTreeMap = serde_json::from_slice(&event.data[..]).unwrap(); + let map: BTreeMap = serde_json::from_slice(&event.item.data[..]).unwrap(); assert_eq!( map[&crate::config::log_schema().message_key().to_string()], diff --git a/src/sinks/aws_kinesis_streams.rs b/src/sinks/aws_kinesis_streams.rs index af616577d336e..529e649e8439f 100644 --- a/src/sinks/aws_kinesis_streams.rs +++ b/src/sinks/aws_kinesis_streams.rs @@ -7,7 +7,8 @@ use crate::{ encoding::{EncodingConfig, EncodingConfiguration}, retries::RetryLogic, sink::Response, - BatchConfig, BatchSettings, Compression, EncodedLength, TowerRequestConfig, VecBuffer, + BatchConfig, BatchSettings, Compression, EncodedEvent, EncodedLength, TowerRequestConfig, + VecBuffer, }, }; use bytes::Bytes; @@ -269,7 +270,7 @@ fn encode_event( mut event: Event, partition_key_field: &Option, encoding: &EncodingConfig, -) -> Option { +) -> Option> { let partition_key = if let Some(partition_key_field) = partition_key_field { if let Some(v) = event.as_log().get(&partition_key_field) { v.to_string_lossy() @@ -302,11 +303,11 @@ fn encode_event( .unwrap_or_default(), }; - Some(PutRecordsRequestEntry { + Some(EncodedEvent::new(PutRecordsRequestEntry { data: Bytes::from(data), partition_key, ..Default::default() - }) + })) } fn gen_partition_key() -> String { @@ -334,7 +335,7 @@ mod tests { let message = "hello world".to_string(); let event = encode_event(message.clone().into(), &None, &Encoding::Text.into()).unwrap(); - assert_eq!(&event.data[..], message.as_bytes()); + assert_eq!(&event.item.data[..], message.as_bytes()); } #[test] @@ -344,7 +345,7 @@ mod tests { event.as_mut_log().insert("key", "value"); let event = encode_event(event, &None, &Encoding::Json.into()).unwrap(); - let map: BTreeMap = serde_json::from_slice(&event.data[..]).unwrap(); + let map: BTreeMap = serde_json::from_slice(&event.item.data[..]).unwrap(); assert_eq!(map[&log_schema().message_key().to_string()], message); assert_eq!(map["key"], "value".to_string()); @@ -356,8 +357,8 @@ mod tests { event.as_mut_log().insert("key", "some_key"); let event = encode_event(event, &Some("key".into()), &Encoding::Text.into()).unwrap(); - assert_eq!(&event.data[..], b"hello world"); - assert_eq!(&event.partition_key, &"some_key".to_string()); + assert_eq!(&event.item.data[..], b"hello world"); + assert_eq!(&event.item.partition_key, &"some_key".to_string()); } #[test] @@ -366,8 +367,8 @@ mod tests { event.as_mut_log().insert("key", random_string(300)); let event = encode_event(event, &Some("key".into()), &Encoding::Text.into()).unwrap(); - assert_eq!(&event.data[..], b"hello world"); - assert_eq!(event.partition_key.len(), 256); + assert_eq!(&event.item.data[..], b"hello world"); + assert_eq!(event.item.partition_key.len(), 256); } #[test] @@ -379,9 +380,9 @@ mod tests { encoding.except_fields = Some(vec!["key".into()]); let event = encode_event(event, &Some("key".into()), &encoding).unwrap(); - let map: BTreeMap = serde_json::from_slice(&event.data[..]).unwrap(); + let map: BTreeMap = serde_json::from_slice(&event.item.data[..]).unwrap(); - assert_eq!(&event.partition_key, &"some_key".to_string()); + assert_eq!(&event.item.partition_key, &"some_key".to_string()); assert!(!map.contains_key("key")); } } @@ -433,7 +434,7 @@ mod integration_tests { let timestamp = chrono::Utc::now().timestamp_millis(); - let (mut input_lines, events) = random_lines_with_stream(100, 11); + let (mut input_lines, events) = random_lines_with_stream(100, 11, None); let mut events = events.map(Ok); let _ = sink.send_all(&mut events).await.unwrap(); diff --git a/src/sinks/aws_s3.rs b/src/sinks/aws_s3.rs index 7c6aaf1040b18..5a90a2bf99e3c 100644 --- a/src/sinks/aws_s3.rs +++ b/src/sinks/aws_s3.rs @@ -5,11 +5,12 @@ use crate::{ rusoto::{self, AwsAuthentication, RegionOrEndpoint}, serde::to_string, sinks::util::{ + batch::{BatchConfig, BatchSettings}, encoding::{EncodingConfig, EncodingConfiguration}, retries::RetryLogic, sink::Response, - BatchConfig, BatchSettings, Buffer, Compression, Concurrency, PartitionBatchSink, - PartitionBuffer, PartitionInnerBuffer, ServiceBuilderExt, TowerRequestConfig, + Buffer, Compression, Concurrency, EncodedEvent, PartitionBatchSink, PartitionBuffer, + PartitionInnerBuffer, ServiceBuilderExt, TowerRequestConfig, }, template::Template, }; @@ -394,7 +395,7 @@ fn encode_event( mut event: Event, key_prefix: &Template, encoding: &EncodingConfig, -) -> Option, Bytes>> { +) -> Option, Bytes>>> { let key = key_prefix .render_string(&event) .map_err(|error| { @@ -426,7 +427,10 @@ fn encode_event( } }; - Some(PartitionInnerBuffer::new(bytes, key.into())) + Some(EncodedEvent::new(PartitionInnerBuffer::new( + bytes, + key.into(), + ))) } #[cfg(test)] @@ -442,7 +446,7 @@ mod tests { fn s3_encode_event_text() { let message = "hello world".to_string(); let batch_time_format = Template::try_from("date=%F").unwrap(); - let bytes = encode_event( + let encoded = encode_event( message.clone().into(), &batch_time_format, &Encoding::Text.into(), @@ -450,7 +454,7 @@ mod tests { .unwrap(); let encoded_message = message + "\n"; - let (bytes, _) = bytes.into_parts(); + let (bytes, _) = encoded.item.into_parts(); assert_eq!(&bytes[..], encoded_message.as_bytes()); } @@ -461,9 +465,9 @@ mod tests { event.as_mut_log().insert("key", "value"); let batch_time_format = Template::try_from("date=%F").unwrap(); - let bytes = encode_event(event, &batch_time_format, &Encoding::Ndjson.into()).unwrap(); + let encoded = encode_event(event, &batch_time_format, &Encoding::Ndjson.into()).unwrap(); - let (bytes, _) = bytes.into_parts(); + let (bytes, _) = encoded.item.into_parts(); let map: BTreeMap = serde_json::from_slice(&bytes[..]).unwrap(); assert_eq!(map[&log_schema().message_key().to_string()], message); @@ -486,9 +490,9 @@ mod tests { timestamp_format: None, }; - let bytes = encode_event(event, &key_prefix, &encoding_config).unwrap(); + let encoded = encode_event(event, &key_prefix, &encoding_config).unwrap(); - let (bytes, _) = bytes.into_parts(); + let (bytes, _) = encoded.item.into_parts(); let map: BTreeMap = serde_json::from_slice(&bytes[..]).unwrap(); assert_eq!(map[&log_schema().message_key().to_string()], message); @@ -590,7 +594,7 @@ mod integration_tests { let client = config.create_client().unwrap(); let sink = config.new(client, cx).unwrap(); - let (lines, events) = random_lines_with_stream(100, 10); + let (lines, events) = random_lines_with_stream(100, 10, None); sink.run(events).await.unwrap(); let keys = get_keys(prefix.unwrap()).await; @@ -620,7 +624,7 @@ mod integration_tests { let client = config.create_client().unwrap(); let sink = config.new(client, cx).unwrap(); - let (lines, _events) = random_lines_with_stream(100, 30); + let (lines, _events) = random_lines_with_stream(100, 30, None); let events = lines.clone().into_iter().enumerate().map(|(i, line)| { let mut e = Event::from(line); @@ -665,7 +669,7 @@ mod integration_tests { let client = config.create_client().unwrap(); let sink = config.new(client, cx).unwrap(); - let (lines, events) = random_lines_with_stream(100, 500); + let (lines, events) = random_lines_with_stream(100, 500, None); sink.run(events).await.unwrap(); let keys = get_keys(prefix.unwrap()).await; diff --git a/src/sinks/aws_sqs.rs b/src/sinks/aws_sqs.rs index 35ad8ab1b5e64..6bada5885ef1c 100644 --- a/src/sinks/aws_sqs.rs +++ b/src/sinks/aws_sqs.rs @@ -7,7 +7,7 @@ use crate::{ encoding::{EncodingConfig, EncodingConfiguration}, retries::RetryLogic, sink::Response, - BatchSettings, EncodedLength, TowerRequestConfig, VecBuffer, + BatchSettings, EncodedEvent, EncodedLength, TowerRequestConfig, VecBuffer, }, template::{Template, TemplateParseError}, }; @@ -252,7 +252,7 @@ fn encode_event( mut event: Event, encoding: &EncodingConfig, message_group_id: Option<&Template>, -) -> Option { +) -> Option> { encoding.apply_rules(&mut event); let message_group_id = match message_group_id { @@ -279,10 +279,10 @@ fn encode_event( Encoding::Json => serde_json::to_string(&log).expect("Error encoding event as json."), }; - Some(SendMessageEntry { + Some(EncodedEvent::new(SendMessageEntry { message_body, message_group_id, - }) + })) } #[cfg(test)] @@ -295,7 +295,7 @@ mod tests { let message = "hello world".to_string(); let event = encode_event(message.clone().into(), &Encoding::Text.into(), None).unwrap(); - assert_eq!(&event.message_body, &message); + assert_eq!(&event.item.message_body, &message); } #[test] @@ -305,7 +305,7 @@ mod tests { event.as_mut_log().insert("key", "value"); let event = encode_event(event, &Encoding::Json.into(), None).unwrap(); - let map: BTreeMap = serde_json::from_str(&event.message_body).unwrap(); + let map: BTreeMap = serde_json::from_str(&event.item.message_body).unwrap(); assert_eq!(map[&log_schema().message_key().to_string()], message); assert_eq!(map["key"], "value".to_string()); @@ -351,7 +351,7 @@ mod integration_tests { let mut sink = SqsSink::new(config, cx, client.clone()).unwrap(); - let (mut input_lines, events) = random_lines_with_stream(100, 10); + let (mut input_lines, events) = random_lines_with_stream(100, 10, None); sink.send_all(&mut events.map(Ok)).await.unwrap(); sleep(Duration::from_secs(1)).await; diff --git a/src/sinks/azure_monitor_logs.rs b/src/sinks/azure_monitor_logs.rs index 6ac975cc9084f..8a17a7ae04753 100644 --- a/src/sinks/azure_monitor_logs.rs +++ b/src/sinks/azure_monitor_logs.rs @@ -6,7 +6,8 @@ use crate::{ util::{ encoding::{EncodingConfigWithDefault, EncodingConfiguration}, http::{BatchedHttpSink, HttpSink}, - BatchConfig, BatchSettings, BoxedRawValue, JsonArrayBuffer, TowerRequestConfig, + BatchConfig, BatchSettings, BoxedRawValue, EncodedEvent, JsonArrayBuffer, + TowerRequestConfig, }, Healthcheck, VectorSink, }, @@ -153,7 +154,7 @@ impl HttpSink for AzureMonitorLogsSink { type Input = serde_json::Value; type Output = Vec; - fn encode_event(&self, mut event: Event) -> Option { + fn encode_event(&self, mut event: Event) -> Option> { self.encoding.apply_rules(&mut event); // it seems like Azure Monitor doesn't support full 9-digit nanosecond precision @@ -174,7 +175,7 @@ impl HttpSink for AzureMonitorLogsSink { JsonValue::String(timestamp.to_rfc3339_opts(chrono::SecondsFormat::Millis, true)), ); - Some(entry) + Some(EncodedEvent::new(entry)) } async fn build_request(&self, events: Self::Output) -> crate::Result>> { @@ -343,7 +344,7 @@ mod tests { let (timestamp_key, timestamp_value) = insert_timestamp_kv(&mut log); let event = Event::from(log); - let json = sink.encode_event(event).unwrap(); + let json = sink.encode_event(event).unwrap().item; let expected_json = serde_json::json!({ timestamp_key: timestamp_value, "message": "hello world" @@ -372,8 +373,8 @@ mod tests { let mut log2 = LogEvent::from_iter([("message", "world")].iter().copied()); let (timestamp_key2, timestamp_value2) = insert_timestamp_kv(&mut log2); - let event1 = sink.encode_event(Event::from(log1)).unwrap(); - let event2 = sink.encode_event(Event::from(log2)).unwrap(); + let event1 = sink.encode_event(Event::from(log1)).unwrap().item; + let event2 = sink.encode_event(Event::from(log2)).unwrap().item; let json1 = serde_json::to_string(&event1).unwrap(); let json2 = serde_json::to_string(&event2).unwrap(); diff --git a/src/sinks/clickhouse.rs b/src/sinks/clickhouse.rs index 1ebec3ff57cc6..89f85a296cff0 100644 --- a/src/sinks/clickhouse.rs +++ b/src/sinks/clickhouse.rs @@ -6,7 +6,8 @@ use crate::{ encoding::{EncodingConfigWithDefault, EncodingConfiguration}, http::{BatchedHttpSink, HttpRetryLogic, HttpSink}, retries::{RetryAction, RetryLogic}, - BatchConfig, BatchSettings, Buffer, Compression, TowerRequestConfig, UriSerde, + BatchConfig, BatchSettings, Buffer, Compression, EncodedEvent, TowerRequestConfig, + UriSerde, }, tls::{TlsOptions, TlsSettings}, }; @@ -111,13 +112,13 @@ impl HttpSink for ClickhouseConfig { type Input = Vec; type Output = Vec; - fn encode_event(&self, mut event: Event) -> Option { + fn encode_event(&self, mut event: Event) -> Option> { self.encoding.apply_rules(&mut event); let mut body = serde_json::to_vec(&event.as_log()).expect("Events should be valid json!"); body.push(b'\n'); - Some(body) + Some(EncodedEvent::new(body)) } async fn build_request(&self, events: Self::Output) -> crate::Result>> { diff --git a/src/sinks/datadog/logs.rs b/src/sinks/datadog/logs.rs index 2a7e1b7ce671e..78019e8ca6e19 100644 --- a/src/sinks/datadog/logs.rs +++ b/src/sinks/datadog/logs.rs @@ -9,8 +9,8 @@ use crate::{ encode_event, encoding::{EncodingConfig, EncodingConfiguration}, http::{BatchedHttpSink, HttpSink}, - BatchConfig, BatchSettings, BoxedRawValue, Compression, Encoding, JsonArrayBuffer, - TowerRequestConfig, VecBuffer, + BatchConfig, BatchSettings, BoxedRawValue, Compression, EncodedEvent, Encoding, + JsonArrayBuffer, TowerRequestConfig, VecBuffer, }, Healthcheck, VectorSink, }, @@ -227,7 +227,7 @@ impl HttpSink for DatadogLogsJsonService { type Input = serde_json::Value; type Output = Vec; - fn encode_event(&self, mut event: Event) -> Option { + fn encode_event(&self, mut event: Event) -> Option> { let log = event.as_mut_log(); if let Some(message) = log.remove(log_schema().message_key()) { @@ -244,7 +244,7 @@ impl HttpSink for DatadogLogsJsonService { self.config.encoding.apply_rules(&mut event); - Some(json!(event.into_log())) + Some(EncodedEvent::new(json!(event.into_log()))) } async fn build_request(&self, events: Self::Output) -> crate::Result>> { @@ -266,13 +266,13 @@ impl HttpSink for DatadogLogsTextService { type Input = Bytes; type Output = Vec; - fn encode_event(&self, event: Event) -> Option { + fn encode_event(&self, event: Event) -> Option> { encode_event(event, &self.config.encoding).map(|e| { emit!(DatadogLogEventProcessed { - byte_size: e.len(), + byte_size: e.item.len(), count: 1, }); - e + EncodedEvent::new(e.item) }) } @@ -359,7 +359,7 @@ mod tests { let (rx, _trigger, server) = build_test_server(addr); tokio::spawn(server); - let (expected, events) = random_lines_with_stream(100, 10); + let (expected, events) = random_lines_with_stream(100, 10, None); let _ = sink.run(events).await.unwrap(); @@ -392,7 +392,7 @@ mod tests { let (rx, _trigger, server) = build_test_server(addr); tokio::spawn(server); - let (expected, events) = random_lines_with_stream(100, 10); + let (expected, events) = random_lines_with_stream(100, 10, None); let _ = sink.run(events).await.unwrap(); diff --git a/src/sinks/datadog/metrics.rs b/src/sinks/datadog/metrics.rs index 7a950b04da9a3..1d6168433f499 100644 --- a/src/sinks/datadog/metrics.rs +++ b/src/sinks/datadog/metrics.rs @@ -5,10 +5,11 @@ use crate::{ http::HttpClient, sinks::{ util::{ + batch::{BatchConfig, BatchSettings}, buffer::metrics::{MetricNormalize, MetricNormalizer, MetricSet, MetricsBuffer}, encode_namespace, http::{HttpBatchService, HttpRetryLogic}, - BatchConfig, BatchSettings, PartitionBatchSink, PartitionBuffer, PartitionInnerBuffer, + EncodedEvent, PartitionBatchSink, PartitionBuffer, PartitionInnerBuffer, TowerRequestConfig, }, Healthcheck, HealthcheckError, UriParseError, VectorSink, @@ -159,8 +160,8 @@ impl DatadogEndpoint { ]) } - fn from_metric(event: &Event) -> Self { - match event.as_metric().data.value { + fn from_metric(metric: &Metric) -> Self { + match metric.data.value { MetricValue::Distribution { statistic: StatisticKind::Summary, .. @@ -213,7 +214,9 @@ impl SinkConfig for DatadogConfig { .with_flat_map(move |event: Event| { stream::iter(normalizer.apply(event).map(|event| { let endpoint = DatadogEndpoint::from_metric(&event); - Ok(PartitionInnerBuffer::new(event, endpoint)) + Ok(EncodedEvent::new(PartitionInnerBuffer::new( + event, endpoint, + ))) })) }); diff --git a/src/sinks/elasticsearch.rs b/src/sinks/elasticsearch.rs index 7c367d2c7fa01..a45a3e165debd 100644 --- a/src/sinks/elasticsearch.rs +++ b/src/sinks/elasticsearch.rs @@ -9,7 +9,8 @@ use crate::{ encoding::{EncodingConfigWithDefault, EncodingConfiguration}, http::{BatchedHttpSink, HttpSink, RequestConfig}, retries::{RetryAction, RetryLogic}, - BatchConfig, BatchSettings, Buffer, Compression, TowerRequestConfig, UriSerde, + BatchConfig, BatchSettings, Buffer, Compression, EncodedEvent, TowerRequestConfig, + UriSerde, }, template::{Template, TemplateParseError}, tls::{TlsOptions, TlsSettings}, @@ -195,7 +196,7 @@ impl HttpSink for ElasticSearchCommon { type Input = Vec; type Output = Vec; - fn encode_event(&self, mut event: Event) -> Option { + fn encode_event(&self, mut event: Event) -> Option> { let index = self .index .render_string(&event) @@ -235,7 +236,7 @@ impl HttpSink for ElasticSearchCommon { index }); - Some(body) + Some(EncodedEvent::new(body)) } async fn build_request(&self, events: Self::Output) -> crate::Result>> { @@ -612,7 +613,7 @@ mod tests { log_schema().timestamp_key(), Utc.ymd(2020, 12, 1).and_hms(1, 2, 3), ); - let encoded = es.encode_event(event).unwrap(); + let encoded = es.encode_event(event).unwrap().item; let expected = r#"{"create":{"_index":"vector","_type":"_doc"}} {"message":"hello there","timestamp":"2020-12-01T01:02:03Z"} "#; @@ -664,7 +665,7 @@ mod tests { event.as_mut_log().insert("foo", "bar"); event.as_mut_log().insert("idx", "purple"); - let encoded = es.encode_event(event).unwrap(); + let encoded = es.encode_event(event).unwrap().item; let expected = r#"{"index":{"_index":"purple","_type":"_doc"}} {"foo":"bar","message":"hello there"} "#; diff --git a/src/sinks/file/mod.rs b/src/sinks/file/mod.rs index d99c1407e6d95..2d1ea667ec948 100644 --- a/src/sinks/file/mod.rs +++ b/src/sinks/file/mod.rs @@ -371,7 +371,7 @@ mod tests { }; let mut sink = FileSink::new(&config, Acker::Null); - let (input, _events) = random_lines_with_stream(100, 64); + let (input, _events) = random_lines_with_stream(100, 64, None); let events = Box::pin(stream::iter(input.clone().into_iter().map(Event::from))); sink.run(events).await.unwrap(); @@ -396,7 +396,7 @@ mod tests { }; let mut sink = FileSink::new(&config, Acker::Null); - let (input, _) = random_lines_with_stream(100, 64); + let (input, _) = random_lines_with_stream(100, 64, None); let events = Box::pin(stream::iter(input.clone().into_iter().map(Event::from))); sink.run(events).await.unwrap(); @@ -507,7 +507,7 @@ mod tests { }; let mut sink = FileSink::new(&config, Acker::Null); - let (mut input, _events) = random_lines_with_stream(10, 64); + let (mut input, _events) = random_lines_with_stream(10, 64, None); let (mut tx, rx) = futures::channel::mpsc::channel(0); diff --git a/src/sinks/gcp/cloud_storage.rs b/src/sinks/gcp/cloud_storage.rs index 5c46a5fa52fc3..bbe3d5f44cc5b 100644 --- a/src/sinks/gcp/cloud_storage.rs +++ b/src/sinks/gcp/cloud_storage.rs @@ -7,10 +7,11 @@ use crate::{ serde::to_string, sinks::{ util::{ + batch::{BatchConfig, BatchSettings}, encoding::{EncodingConfig, EncodingConfiguration}, retries::{RetryAction, RetryLogic}, - BatchConfig, BatchSettings, Buffer, Compression, Concurrency, PartitionBatchSink, - PartitionBuffer, PartitionInnerBuffer, ServiceBuilderExt, TowerRequestConfig, + Buffer, Compression, Concurrency, EncodedEvent, PartitionBatchSink, PartitionBuffer, + PartitionInnerBuffer, ServiceBuilderExt, TowerRequestConfig, }, Healthcheck, VectorSink, }, @@ -230,7 +231,9 @@ impl GcsSink { let sink = PartitionBatchSink::new(svc, buffer, batch.timeout, cx.acker()) .sink_map_err(|error| error!(message = "Fatal gcp_cloud_storage error.", %error)) - .with_flat_map(move |e| stream::iter(encode_event(e, &key_prefix, &encoding)).map(Ok)); + .with_flat_map(move |event| { + stream::iter(encode_event(event, &key_prefix, &encoding)).map(Ok) + }); Ok(VectorSink::Sink(Box::new(sink))) } @@ -402,7 +405,7 @@ fn encode_event( mut event: Event, key_prefix: &Template, encoding: &EncodingConfig, -) -> Option, Bytes>> { +) -> Option, Bytes>>> { let key = key_prefix .render_string(&event) .map_err(|error| { @@ -432,7 +435,10 @@ fn encode_event( } }; - Some(PartitionInnerBuffer::new(bytes, key.into())) + Some(EncodedEvent::new(PartitionInnerBuffer::new( + bytes, + key.into(), + ))) } #[derive(Clone)] @@ -475,7 +481,7 @@ mod tests { fn gcs_encode_event_text() { let message = "hello world".to_string(); let batch_time_format = Template::try_from("date=%F").unwrap(); - let bytes = encode_event( + let encoded = encode_event( message.clone().into(), &batch_time_format, &Encoding::Text.into(), @@ -483,7 +489,7 @@ mod tests { .unwrap(); let encoded_message = message + "\n"; - let (bytes, _) = bytes.into_parts(); + let (bytes, _) = encoded.item.into_parts(); assert_eq!(&bytes[..], encoded_message.as_bytes()); } @@ -494,9 +500,9 @@ mod tests { event.as_mut_log().insert("key", "value"); let batch_time_format = Template::try_from("date=%F").unwrap(); - let bytes = encode_event(event, &batch_time_format, &Encoding::Ndjson.into()).unwrap(); + let encoded = encode_event(event, &batch_time_format, &Encoding::Ndjson.into()).unwrap(); - let (bytes, _) = bytes.into_parts(); + let (bytes, _) = encoded.item.into_parts(); let map: HashMap = serde_json::from_slice(&bytes[..]).unwrap(); assert_eq!( @@ -515,9 +521,9 @@ mod tests { event.as_mut_log().insert("key", "value"); let key_format = Template::try_from("key: {{ key }}").unwrap(); - let bytes = encode_event(event, &key_format, &Encoding::Text.into()).unwrap(); + let encoded = encode_event(event, &key_format, &Encoding::Text.into()).unwrap(); - let (_, key) = bytes.into_parts(); + let (_, key) = encoded.item.into_parts(); assert_eq!(key, "key: value"); } diff --git a/src/sinks/gcp/pubsub.rs b/src/sinks/gcp/pubsub.rs index f6a8668b62c37..dec1f020cf18a 100644 --- a/src/sinks/gcp/pubsub.rs +++ b/src/sinks/gcp/pubsub.rs @@ -7,7 +7,8 @@ use crate::{ util::{ encoding::{EncodingConfigWithDefault, EncodingConfiguration}, http::{BatchedHttpSink, HttpSink}, - BatchConfig, BatchSettings, BoxedRawValue, JsonArrayBuffer, TowerRequestConfig, + BatchConfig, BatchSettings, BoxedRawValue, EncodedEvent, JsonArrayBuffer, + TowerRequestConfig, }, Healthcheck, UriParseError, VectorSink, }, @@ -162,12 +163,12 @@ impl HttpSink for PubsubSink { type Input = Value; type Output = Vec; - fn encode_event(&self, mut event: Event) -> Option { + fn encode_event(&self, mut event: Event) -> Option> { self.encoding.apply_rules(&mut event); // Each event needs to be base64 encoded, and put into a JSON object // as the `data` item. let json = serde_json::to_string(&event.into_log()).unwrap(); - Some(json!({ "data": base64::encode(&json) })) + Some(EncodedEvent::new(json!({ "data": base64::encode(&json) }))) } async fn build_request(&self, events: Self::Output) -> crate::Result>> { diff --git a/src/sinks/gcp/stackdriver_logs.rs b/src/sinks/gcp/stackdriver_logs.rs index f522eaa693031..68385754aae74 100644 --- a/src/sinks/gcp/stackdriver_logs.rs +++ b/src/sinks/gcp/stackdriver_logs.rs @@ -7,7 +7,8 @@ use crate::{ util::{ encoding::{EncodingConfigWithDefault, EncodingConfiguration}, http::{BatchedHttpSink, HttpSink}, - BatchConfig, BatchSettings, BoxedRawValue, JsonArrayBuffer, TowerRequestConfig, + BatchConfig, BatchSettings, BoxedRawValue, EncodedEvent, JsonArrayBuffer, + TowerRequestConfig, }, Healthcheck, VectorSink, }, @@ -157,7 +158,7 @@ impl HttpSink for StackdriverSink { type Input = serde_json::Value; type Output = Vec; - fn encode_event(&self, event: Event) -> Option { + fn encode_event(&self, event: Event) -> Option> { let mut log = event.into_log(); let severity = self .severity_key @@ -180,7 +181,7 @@ impl HttpSink for StackdriverSink { entry.insert("timestamp".into(), json!(timestamp)); } - Some(json!(entry)) + Some(EncodedEvent::new(json!(entry))) } async fn build_request(&self, events: Self::Output) -> crate::Result>> { @@ -303,7 +304,7 @@ mod tests { .iter() .copied(), ); - let json = sink.encode_event(Event::from(log)).unwrap(); + let json = sink.encode_event(Event::from(log)).unwrap().item; let body = serde_json::to_string(&json).unwrap(); assert_eq!( body, @@ -335,7 +336,7 @@ mod tests { Value::Timestamp(Utc.ymd(2020, 1, 1).and_hms(12, 30, 0)), ); - let json = sink.encode_event(Event::from(log)).unwrap(); + let json = sink.encode_event(Event::from(log)).unwrap().item; let body = serde_json::to_string(&json).unwrap(); assert_eq!( body, @@ -389,8 +390,8 @@ mod tests { let log1 = LogEvent::from_iter([("message", "hello")].iter().copied()); let log2 = LogEvent::from_iter([("message", "world")].iter().copied()); - let event1 = sink.encode_event(Event::from(log1)).unwrap(); - let event2 = sink.encode_event(Event::from(log2)).unwrap(); + let event1 = sink.encode_event(Event::from(log1)).unwrap().item; + let event2 = sink.encode_event(Event::from(log2)).unwrap().item; let json1 = serde_json::to_string(&event1).unwrap(); let json2 = serde_json::to_string(&event2).unwrap(); diff --git a/src/sinks/gcp/stackdriver_metrics.rs b/src/sinks/gcp/stackdriver_metrics.rs index 6767c11c5835b..c5d1f57aa16d0 100644 --- a/src/sinks/gcp/stackdriver_metrics.rs +++ b/src/sinks/gcp/stackdriver_metrics.rs @@ -4,12 +4,11 @@ use crate::http::HttpClient; use crate::sinks::gcp; use crate::sinks::util::buffer::metrics::MetricsBuffer; use crate::sinks::util::http::{BatchedHttpSink, HttpSink}; -use crate::sinks::util::{BatchConfig, BatchSettings, TowerRequestConfig}; +use crate::sinks::util::{BatchConfig, BatchSettings, EncodedEvent, TowerRequestConfig}; use crate::sinks::{Healthcheck, VectorSink}; use crate::tls::{TlsOptions, TlsSettings}; use chrono::{DateTime, Utc}; -use futures::sink::SinkExt; -use futures::FutureExt; +use futures::{sink::SinkExt, FutureExt}; use http::header::AUTHORIZATION; use http::{HeaderValue, Uri}; use lazy_static::lazy_static; @@ -102,20 +101,21 @@ struct HttpEventSink { #[async_trait::async_trait] impl HttpSink for HttpEventSink { - type Input = Event; + type Input = Metric; type Output = Vec; - fn encode_event(&self, event: Event) -> Option { + fn encode_event(&self, event: Event) -> Option> { let metric = event.into_metric(); match &metric.data.value { - &MetricValue::Counter { .. } => Some(Event::Metric(metric)), - &MetricValue::Gauge { .. } => Some(Event::Metric(metric)), + &MetricValue::Counter { .. } => Some(metric), + &MetricValue::Gauge { .. } => Some(metric), not_supported => { warn!("Unsupported metric type: {:?}.", not_supported); None } } + .map(EncodedEvent::new) } async fn build_request( diff --git a/src/sinks/honeycomb.rs b/src/sinks/honeycomb.rs index 09968c1cb632e..d6a7b5411d996 100644 --- a/src/sinks/honeycomb.rs +++ b/src/sinks/honeycomb.rs @@ -4,7 +4,8 @@ use crate::{ http::HttpClient, sinks::util::{ http::{BatchedHttpSink, HttpSink}, - BatchConfig, BatchSettings, BoxedRawValue, JsonArrayBuffer, TowerRequestConfig, + BatchConfig, BatchSettings, BoxedRawValue, EncodedEvent, JsonArrayBuffer, + TowerRequestConfig, }, }; use futures::{FutureExt, SinkExt}; @@ -89,7 +90,7 @@ impl HttpSink for HoneycombConfig { type Input = serde_json::Value; type Output = Vec; - fn encode_event(&self, event: Event) -> Option { + fn encode_event(&self, event: Event) -> Option> { let mut log = event.into_log(); let timestamp = if let Some(Value::Timestamp(ts)) = log.remove(log_schema().timestamp_key()) @@ -99,10 +100,10 @@ impl HttpSink for HoneycombConfig { chrono::Utc::now() }; - Some(json!({ + Some(EncodedEvent::new(json!({ "timestamp": timestamp.to_rfc3339_opts(chrono::SecondsFormat::Nanos, true), "data": log.all_fields(), - })) + }))) } async fn build_request(&self, events: Self::Output) -> crate::Result>> { diff --git a/src/sinks/http.rs b/src/sinks/http.rs index 18b74cfa1e422..771393bba9091 100644 --- a/src/sinks/http.rs +++ b/src/sinks/http.rs @@ -7,7 +7,8 @@ use crate::{ buffer::compression::GZIP_DEFAULT, encoding::{EncodingConfig, EncodingConfiguration}, http::{BatchedHttpSink, HttpSink, RequestConfig}, - BatchConfig, BatchSettings, Buffer, Compression, Concurrency, TowerRequestConfig, UriSerde, + BatchConfig, BatchSettings, Buffer, Compression, Concurrency, EncodedEvent, + TowerRequestConfig, UriSerde, }, tls::{TlsOptions, TlsSettings}, }; @@ -178,7 +179,7 @@ impl HttpSink for HttpSinkConfig { type Input = Vec; type Output = Vec; - fn encode_event(&self, mut event: Event) -> Option { + fn encode_event(&self, mut event: Event) -> Option> { self.encoding.apply_rules(&mut event); let event = event.into_log(); @@ -215,7 +216,11 @@ impl HttpSink for HttpSinkConfig { byte_size: body.len(), }); - Some(body) + let (_, metadata) = event.into_parts(); + Some(EncodedEvent { + item: body, + metadata: Some(metadata), + }) } async fn build_request(&self, mut body: Self::Output) -> crate::Result>> { @@ -311,7 +316,10 @@ mod tests { config::SinkContext, sinks::{ http::HttpSinkConfig, - util::{http::HttpSink, test::build_test_server}, + util::{ + http::HttpSink, + test::{build_test_server, build_test_server_generic}, + }, }, test_util::{next_addr, random_lines_with_stream}, }; @@ -320,9 +328,11 @@ mod tests { use futures::{channel::mpsc, stream, StreamExt}; use headers::{Authorization, HeaderMapExt}; use http::request::Parts; - use hyper::Method; + use hyper::{Method, Response, StatusCode}; use serde::Deserialize; use std::io::{BufRead, BufReader}; + use std::sync::{atomic, Arc}; + use vector_core::event::{BatchNotifier, BatchStatus}; #[test] fn generate_config() { @@ -336,7 +346,7 @@ mod tests { let mut config = default_config(Encoding::Text); config.encoding = encoding; - let bytes = config.encode_event(event).unwrap(); + let bytes = config.encode_event(event).unwrap().item; assert_eq!(bytes, Vec::from("hello world\n")); } @@ -348,7 +358,7 @@ mod tests { let mut config = default_config(Encoding::Json); config.encoding = encoding; - let bytes = config.encode_event(event).unwrap(); + let bytes = config.encode_event(event).unwrap().item; #[derive(Deserialize, Debug)] #[serde(deny_unknown_fields)] @@ -417,90 +427,98 @@ mod tests { #[tokio::test] async fn http_happy_path_post() { - let num_lines = 1000; - - let in_addr = next_addr(); - - let config = r#" - uri = "http://$IN_ADDR/frames" - compression = "gzip" - encoding = "ndjson" - + run_sink( + r#" [auth] strategy = "basic" user = "waldo" password = "hunter2" - "# - .replace("$IN_ADDR", &format!("{}", in_addr)); - let config: HttpSinkConfig = toml::from_str(&config).unwrap(); - - let cx = SinkContext::new_test(); - - let (sink, _) = config.build(cx).await.unwrap(); - let (rx, trigger, server) = build_test_server(in_addr); - - let (input_lines, events) = random_lines_with_stream(100, num_lines); - let pump = sink.run(events); - - tokio::spawn(server); - - pump.await.unwrap(); - drop(trigger); - - let output_lines = get_received(rx, |parts| { - assert_eq!(Method::POST, parts.method); - assert_eq!("/frames", parts.uri.path()); - assert_eq!( - Some(Authorization::basic("waldo", "hunter2")), - parts.headers.typed_get() - ); - }) + "#, + |parts| { + assert_eq!(Method::POST, parts.method); + assert_eq!("/frames", parts.uri.path()); + assert_eq!( + Some(Authorization::basic("waldo", "hunter2")), + parts.headers.typed_get() + ); + }, + ) .await; - - assert_eq!(num_lines, output_lines.len()); - assert_eq!(input_lines, output_lines); } #[tokio::test] async fn http_happy_path_put() { - let num_lines = 1000; - - let in_addr = next_addr(); - - let config = r#" - uri = "http://$IN_ADDR/frames" + run_sink( + r#" method = "put" - compression = "gzip" - encoding = "ndjson" [auth] strategy = "basic" user = "waldo" password = "hunter2" - "# - .replace("$IN_ADDR", &format!("{}", in_addr)); - let config: HttpSinkConfig = toml::from_str(&config).unwrap(); + "#, + |parts| { + assert_eq!(Method::PUT, parts.method); + assert_eq!("/frames", parts.uri.path()); + assert_eq!( + Some(Authorization::basic("waldo", "hunter2")), + parts.headers.typed_get() + ); + }, + ) + .await; + } - let cx = SinkContext::new_test(); + #[tokio::test] + async fn http_passes_custom_headers() { + run_sink( + r#" + [request.headers] + foo = "bar" + baz = "quux" + "#, + |parts| { + assert_eq!(Method::POST, parts.method); + assert_eq!("/frames", parts.uri.path()); + assert_eq!( + Some("bar"), + parts.headers.get("foo").map(|v| v.to_str().unwrap()) + ); + assert_eq!( + Some("quux"), + parts.headers.get("baz").map(|v| v.to_str().unwrap()) + ); + }, + ) + .await; + } - let (sink, _) = config.build(cx).await.unwrap(); - let (rx, trigger, server) = build_test_server(in_addr); + #[tokio::test] + async fn retries_on_no_connection() { + let num_lines = 10; - let (input_lines, events) = random_lines_with_stream(100, num_lines); - let pump = sink.run(events); + let (in_addr, sink) = build_sink("").await; + let (batch, mut receiver) = BatchNotifier::new_with_receiver(); + let (input_lines, events) = random_lines_with_stream(100, num_lines, Some(batch)); + let pump = tokio::spawn(sink.run(events)); + + // This ordering starts the sender before the server has built + // its accepting socket. The delay below ensures that the sink + // attempts to connect at least once before creating the + // listening socket. + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + let (rx, trigger, server) = build_test_server(in_addr); tokio::spawn(server); - pump.await.unwrap(); + pump.await.unwrap().unwrap(); drop(trigger); + assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); + let output_lines = get_received(rx, |parts| { - assert_eq!(Method::PUT, parts.method); + assert_eq!(Method::POST, parts.method); assert_eq!("/frames", parts.uri.path()); - assert_eq!( - Some(Authorization::basic("waldo", "hunter2")), - parts.headers.typed_get() - ); }) .await; @@ -509,28 +527,29 @@ mod tests { } #[tokio::test] - async fn http_passes_custom_headers() { - let num_lines = 1000; - - let in_addr = next_addr(); - - let config = r#" - uri = "http://$IN_ADDR/frames" - encoding = "ndjson" - compression = "gzip" - [request.headers] - foo = "bar" - baz = "quux" - "# - .replace("$IN_ADDR", &format!("{}", in_addr)); - let config: HttpSinkConfig = toml::from_str(&config).unwrap(); - - let cx = SinkContext::new_test(); - - let (sink, _) = config.build(cx).await.unwrap(); - let (rx, trigger, server) = build_test_server(in_addr); + async fn retries_on_temporary_error() { + const NUM_LINES: usize = 1000; + const NUM_FAILURES: usize = 2; + + let (in_addr, sink) = build_sink("").await; + + let counter = Arc::new(atomic::AtomicUsize::new(0)); + let in_counter = Arc::clone(&counter); + let (rx, trigger, server) = build_test_server_generic(in_addr, move || { + let count = in_counter.fetch_add(1, atomic::Ordering::Relaxed); + if count < NUM_FAILURES { + // Send a temporary error for the first two responses + Response::builder() + .status(StatusCode::INTERNAL_SERVER_ERROR) + .body(Body::empty()) + .unwrap_or_else(|_| unreachable!()) + } else { + Response::new(Body::empty()) + } + }); - let (input_lines, events) = random_lines_with_stream(100, num_lines); + let (batch, mut receiver) = BatchNotifier::new_with_receiver(); + let (input_lines, events) = random_lines_with_stream(100, NUM_LINES, Some(batch)); let pump = sink.run(events); tokio::spawn(server); @@ -538,73 +557,46 @@ mod tests { pump.await.unwrap(); drop(trigger); + assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); + let output_lines = get_received(rx, |parts| { assert_eq!(Method::POST, parts.method); assert_eq!("/frames", parts.uri.path()); - assert_eq!( - Some("bar"), - parts.headers.get("foo").map(|v| v.to_str().unwrap()) - ); - assert_eq!( - Some("quux"), - parts.headers.get("baz").map(|v| v.to_str().unwrap()) - ); }) .await; - assert_eq!(num_lines, output_lines.len()); + let tries = counter.load(atomic::Ordering::Relaxed); + assert!(tries > NUM_FAILURES); + assert_eq!(NUM_LINES, output_lines.len()); assert_eq!(input_lines, output_lines); } #[tokio::test] - async fn retries_on_no_connection() { - let num_lines = 10; - - let in_addr = next_addr(); - - let config = r#" - uri = "http://$IN_ADDR/frames" - compression = "gzip" - encoding = "ndjson" - - [auth] - strategy = "basic" - user = "waldo" - password = "hunter2" - "# - .replace("$IN_ADDR", &format!("{}", in_addr)); - let config: HttpSinkConfig = toml::from_str(&config).unwrap(); + async fn fails_on_permanent_error() { + let num_lines = 1000; - let cx = SinkContext::new_test(); + let (in_addr, sink) = build_sink("").await; - let (sink, _) = config.build(cx).await.unwrap(); + let (rx, trigger, server) = build_test_server_generic(in_addr, move || { + Response::builder() + .status(StatusCode::FORBIDDEN) + .body(Body::empty()) + .unwrap_or_else(|_| unreachable!()) + }); - let (input_lines, events) = random_lines_with_stream(100, num_lines); - let pump = tokio::spawn(sink.run(events)); + let (batch, mut receiver) = BatchNotifier::new_with_receiver(); + let (_input_lines, events) = random_lines_with_stream(100, num_lines, Some(batch)); + let pump = sink.run(events); - // This ordering starts the sender before the server has built - // its accepting socket. The delay below ensures that the sink - // attempts to connect at least once before creating the - // listening socket. - tokio::time::sleep(std::time::Duration::from_secs(2)).await; - let (rx, trigger, server) = build_test_server(in_addr); tokio::spawn(server); - pump.await.unwrap().unwrap(); + pump.await.unwrap(); drop(trigger); - let output_lines = get_received(rx, |parts| { - assert_eq!(Method::POST, parts.method); - assert_eq!("/frames", parts.uri.path()); - assert_eq!( - Some(Authorization::basic("waldo", "hunter2")), - parts.headers.typed_get() - ); - }) - .await; + assert_eq!(receiver.try_recv(), Ok(BatchStatus::Failed)); - assert_eq!(num_lines, output_lines.len()); - assert_eq!(input_lines, output_lines); + let output_lines = get_received(rx, |_| unreachable!("There should be no lines")).await; + assert!(output_lines.is_empty()); } #[tokio::test] @@ -631,7 +623,8 @@ mod tests { let (sink, _) = config.build(cx).await.unwrap(); let (rx, trigger, server) = build_test_server(in_addr); - let (input_lines, events) = random_lines_with_stream(100, num_lines); + let (batch, mut receiver) = BatchNotifier::new_with_receiver(); + let (input_lines, events) = random_lines_with_stream(100, num_lines, Some(batch)); let pump = sink.run(events); tokio::spawn(server); @@ -639,6 +632,8 @@ mod tests { pump.await.unwrap(); drop(trigger); + assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); + let output_lines = rx .flat_map(|(parts, body)| { assert_eq!(Method::POST, parts.method); @@ -676,4 +671,47 @@ mod tests { .collect::>() .await } + + async fn run_sink(extra_config: &str, assert_parts: impl Fn(http::request::Parts)) { + let num_lines = 1000; + + let (in_addr, sink) = build_sink(extra_config).await; + + let (rx, trigger, server) = build_test_server(in_addr); + + let (batch, mut receiver) = BatchNotifier::new_with_receiver(); + let (input_lines, events) = random_lines_with_stream(100, num_lines, Some(batch)); + let pump = sink.run(events); + + tokio::spawn(server); + pump.await.unwrap(); + drop(trigger); + + assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); + + let output_lines = get_received(rx, assert_parts).await; + + assert_eq!(num_lines, output_lines.len()); + assert_eq!(input_lines, output_lines); + } + + async fn build_sink(extra_config: &str) -> (std::net::SocketAddr, crate::sinks::VectorSink) { + let in_addr = next_addr(); + let config = format!( + r#" + uri = "http://{addr}/frames" + compression = "gzip" + encoding = "ndjson" + {extras} + "#, + addr = in_addr, + extras = extra_config + ); + let config: HttpSinkConfig = toml::from_str(&config).unwrap(); + + let cx = SinkContext::new_test(); + + let (sink, _) = config.build(cx).await.unwrap(); + (in_addr, sink) + } } diff --git a/src/sinks/humio/logs.rs b/src/sinks/humio/logs.rs index 88653d83187bd..656f24a0689e4 100644 --- a/src/sinks/humio/logs.rs +++ b/src/sinks/humio/logs.rs @@ -128,7 +128,7 @@ mod tests { .unwrap(); let config = config.build_hec_config(); - let bytes = config.encode_event(event).unwrap(); + let bytes = config.encode_event(event).unwrap().item; let hec_event = serde_json::from_slice::(&bytes[..]).unwrap(); let now = Utc::now().timestamp_millis() as f64 / 1000f64; diff --git a/src/sinks/influxdb/logs.rs b/src/sinks/influxdb/logs.rs index 2b02306bfd731..dccf5c520efec 100644 --- a/src/sinks/influxdb/logs.rs +++ b/src/sinks/influxdb/logs.rs @@ -11,7 +11,7 @@ use crate::{ encode_namespace, encoding::{EncodingConfig, EncodingConfigWithDefault, EncodingConfiguration}, http::{BatchedHttpSink, HttpSink}, - BatchConfig, BatchSettings, Buffer, Compression, TowerRequestConfig, + BatchConfig, BatchSettings, Buffer, Compression, EncodedEvent, TowerRequestConfig, }, Healthcheck, VectorSink, }, @@ -157,7 +157,7 @@ impl HttpSink for InfluxDbLogsSink { type Input = Vec; type Output = Vec; - fn encode_event(&self, mut event: Event) -> Option { + fn encode_event(&self, mut event: Event) -> Option> { self.encoding.apply_rules(&mut event); let mut event = event.into_log(); @@ -200,7 +200,7 @@ impl HttpSink for InfluxDbLogsSink { return None; }; - Some(output.into_bytes()) + Some(EncodedEvent::new(output.into_bytes())) } async fn build_request(&self, events: Self::Output) -> crate::Result>> { @@ -285,7 +285,7 @@ mod tests { ); sink.encoding.except_fields = Some(vec!["host".into()]); - let bytes = sink.encode_event(event).unwrap(); + let bytes = sink.encode_event(event).unwrap().item; let string = std::str::from_utf8(&bytes).unwrap(); let line_protocol = split_line_protocol(&string); @@ -315,7 +315,7 @@ mod tests { ["source_type", "host"].to_vec(), ); - let bytes = sink.encode_event(event).unwrap(); + let bytes = sink.encode_event(event).unwrap().item; let string = std::str::from_utf8(&bytes).unwrap(); let line_protocol = split_line_protocol(&string); @@ -359,7 +359,7 @@ mod tests { ["source_type", "host"].to_vec(), ); - let bytes = sink.encode_event(event).unwrap(); + let bytes = sink.encode_event(event).unwrap().item; let string = std::str::from_utf8(&bytes).unwrap(); let line_protocol = split_line_protocol(&string); @@ -398,7 +398,7 @@ mod tests { [].to_vec(), ); - let bytes = sink.encode_event(event).unwrap(); + let bytes = sink.encode_event(event).unwrap().item; let string = std::str::from_utf8(&bytes).unwrap(); let line_protocol = split_line_protocol(&string); @@ -435,7 +435,7 @@ mod tests { [].to_vec(), ); - let bytes = sink.encode_event(event).unwrap(); + let bytes = sink.encode_event(event).unwrap().item; let string = std::str::from_utf8(&bytes).unwrap(); let line_protocol = split_line_protocol(&string); @@ -472,7 +472,7 @@ mod tests { ["as_a_tag", "not_exists_field", "source_type"].to_vec(), ); - let bytes = sink.encode_event(event).unwrap(); + let bytes = sink.encode_event(event).unwrap().item; let string = std::str::from_utf8(&bytes).unwrap(); let line_protocol = split_line_protocol(&string); diff --git a/src/sinks/influxdb/metrics.rs b/src/sinks/influxdb/metrics.rs index 2addf0b693c62..c7a4aca6d3c7d 100644 --- a/src/sinks/influxdb/metrics.rs +++ b/src/sinks/influxdb/metrics.rs @@ -15,7 +15,7 @@ use crate::{ encode_namespace, http::{HttpBatchService, HttpRetryLogic}, statistic::{validate_quantiles, DistributionStatistic}, - BatchConfig, BatchSettings, TowerRequestConfig, + BatchConfig, BatchSettings, EncodedEvent, TowerRequestConfig, }, Healthcheck, VectorSink, }, @@ -148,7 +148,13 @@ impl InfluxDbSvc { batch.timeout, cx.acker(), ) - .with_flat_map(move |event: Event| stream::iter(normalizer.apply(event).map(Ok))) + .with_flat_map(move |event: Event| { + stream::iter( + normalizer + .apply(event) + .map(|metric| Ok(EncodedEvent::new(metric))), + ) + }) .sink_map_err(|error| error!(message = "Fatal influxdb sink error.", %error)); Ok(VectorSink::Sink(Box::new(sink))) diff --git a/src/sinks/kafka.rs b/src/sinks/kafka.rs index d1a18c515d015..8524f6cb3bf66 100644 --- a/src/sinks/kafka.rs +++ b/src/sinks/kafka.rs @@ -790,7 +790,7 @@ mod integration_test { let sink = KafkaSink::new(config, acker).unwrap(); let num_events = 1000; - let (input, events) = random_lines_with_stream(100, num_events); + let (input, events) = random_lines_with_stream(100, num_events, None); events.map(Ok).forward(sink).await.unwrap(); // read back everything from the beginning diff --git a/src/sinks/logdna.rs b/src/sinks/logdna.rs index c60709b3df0dd..2d231468e7bed 100644 --- a/src/sinks/logdna.rs +++ b/src/sinks/logdna.rs @@ -6,7 +6,7 @@ use crate::{ sinks::util::{ encoding::{EncodingConfigWithDefault, EncodingConfiguration}, http::{HttpSink, PartitionHttpSink}, - BatchConfig, BatchSettings, BoxedRawValue, JsonArrayBuffer, PartitionBuffer, + BatchConfig, BatchSettings, BoxedRawValue, EncodedEvent, JsonArrayBuffer, PartitionBuffer, PartitionInnerBuffer, TowerRequestConfig, UriSerde, }, template::{Template, TemplateRenderingError}, @@ -122,7 +122,7 @@ impl HttpSink for LogdnaConfig { type Input = PartitionInnerBuffer; type Output = PartitionInnerBuffer, PartitionKey>; - fn encode_event(&self, mut event: Event) -> Option { + fn encode_event(&self, mut event: Event) -> Option> { let key = self .render_key(&event) .map_err(|(field, error)| { @@ -180,7 +180,10 @@ impl HttpSink for LogdnaConfig { map.insert("meta".into(), json!(&log)); } - Some(PartitionInnerBuffer::new(map.into(), key)) + Some(EncodedEvent::new(PartitionInnerBuffer::new( + map.into(), + key, + ))) } async fn build_request(&self, output: Self::Output) -> crate::Result>> { @@ -255,7 +258,7 @@ impl LogdnaConfig { ) -> Result, TemplateRenderingError)> { let hostname = self .hostname - .render_string(&event) + .render_string(event) .map_err(|e| (Some("hostname"), e))?; let tags = self .tags @@ -331,13 +334,13 @@ mod tests { let mut event4 = Event::from("hello world"); event4.as_mut_log().insert("env", "staging"); - let event1_out = config.encode_event(event1).unwrap().into_parts().0; + let event1_out = config.encode_event(event1).unwrap().item.into_parts().0; let event1_out = event1_out.as_object().unwrap(); - let event2_out = config.encode_event(event2).unwrap().into_parts().0; + let event2_out = config.encode_event(event2).unwrap().item.into_parts().0; let event2_out = event2_out.as_object().unwrap(); - let event3_out = config.encode_event(event3).unwrap().into_parts().0; + let event3_out = config.encode_event(event3).unwrap().item.into_parts().0; let event3_out = event3_out.as_object().unwrap(); - let event4_out = config.encode_event(event4).unwrap().into_parts().0; + let event4_out = config.encode_event(event4).unwrap().item.into_parts().0; let event4_out = event4_out.as_object().unwrap(); assert_eq!(event1_out.get("app").unwrap(), &json!("notvector")); diff --git a/src/sinks/loki.rs b/src/sinks/loki.rs index 7d47d51b9ffac..b0317efb6d620 100644 --- a/src/sinks/loki.rs +++ b/src/sinks/loki.rs @@ -21,8 +21,8 @@ use crate::{ encoding::{EncodingConfig, EncodingConfiguration}, http::{HttpSink, PartitionHttpSink}, service::ConcurrencyOption, - BatchConfig, BatchSettings, PartitionBuffer, PartitionInnerBuffer, TowerRequestConfig, - UriSerde, + BatchConfig, BatchSettings, EncodedEvent, PartitionBuffer, PartitionInnerBuffer, + TowerRequestConfig, UriSerde, }, template::Template, tls::{TlsOptions, TlsSettings}, @@ -181,7 +181,7 @@ impl HttpSink for LokiSink { type Input = PartitionInnerBuffer; type Output = PartitionInnerBuffer; - fn encode_event(&self, mut event: Event) -> Option { + fn encode_event(&self, mut event: Event) -> Option> { let tenant_id = self.tenant_id.as_ref().and_then(|t| { t.render_string(&event) .map_err(|missing| { @@ -242,14 +242,14 @@ impl HttpSink for LokiSink { } let event = LokiEvent { timestamp, event }; - Some(PartitionInnerBuffer::new( + Some(EncodedEvent::new(PartitionInnerBuffer::new( LokiRecord { labels, event, partition: key.clone(), }, key, - )) + ))) } async fn build_request(&self, output: Self::Output) -> crate::Result>> { @@ -325,7 +325,7 @@ mod tests { e1.as_mut_log().insert("foo", "bar"); - let mut record = sink.encode_event(e1).unwrap().into_parts().0; + let mut record = sink.encode_event(e1).unwrap().item.into_parts().0; // HashMap -> Vec doesn't like keeping ordering record.labels.sort(); @@ -364,7 +364,7 @@ mod tests { e1.as_mut_log().insert("foo", "bar"); - let record = sink.encode_event(e1).unwrap().into_parts().0; + let record = sink.encode_event(e1).unwrap().item.into_parts().0; let expected_line = serde_json::to_string(&serde_json::json!({ "message": "hello world", diff --git a/src/sinks/nats.rs b/src/sinks/nats.rs index 717adde948bc6..65d1338d82113 100644 --- a/src/sinks/nats.rs +++ b/src/sinks/nats.rs @@ -272,7 +272,7 @@ mod integration_tests { let (acker, ack_counter) = Acker::new_for_testing(); let mut sink = NatsSink::new(cnf.clone(), acker).unwrap(); let num_events = 1_000; - let (input, events) = random_lines_with_stream(100, num_events); + let (input, events) = random_lines_with_stream(100, num_events, None); let _ = sink.run(Box::pin(events)).await.unwrap(); diff --git a/src/sinks/papertrail.rs b/src/sinks/papertrail.rs index 89a1ce973afb5..dcdd272796096 100644 --- a/src/sinks/papertrail.rs +++ b/src/sinks/papertrail.rs @@ -4,7 +4,7 @@ use crate::{ sinks::util::{ encoding::{EncodingConfig, EncodingConfiguration}, tcp::TcpSinkConfig, - Encoding, UriSerde, + EncodedEvent, Encoding, UriSerde, }, tcp::TcpKeepaliveConfig, tls::TlsConfig, @@ -77,7 +77,11 @@ impl SinkConfig for PapertrailConfig { } } -fn encode_event(mut event: Event, pid: u32, encoding: &EncodingConfig) -> Bytes { +fn encode_event( + mut event: Event, + pid: u32, + encoding: &EncodingConfig, +) -> EncodedEvent { let host = event .as_mut_log() .remove(log_schema().host_key()) @@ -109,7 +113,7 @@ fn encode_event(mut event: Event, pid: u32, encoding: &EncodingConfig) s.push(b'\n'); - Bytes::from(s) + EncodedEvent::new(Bytes::from(s)) } #[cfg(test)] @@ -136,7 +140,8 @@ mod tests { except_fields: Some(vec!["magic".into()]), timestamp_format: None, }, - ); + ) + .item; let msg = bytes.slice(String::from_utf8_lossy(&bytes).find(": ").unwrap() + 2..bytes.len() - 1); diff --git a/src/sinks/prometheus/remote_write.rs b/src/sinks/prometheus/remote_write.rs index fbc15da31f582..4ec378e8cb103 100644 --- a/src/sinks/prometheus/remote_write.rs +++ b/src/sinks/prometheus/remote_write.rs @@ -7,9 +7,10 @@ use crate::{ sinks::{ self, util::{ + batch::{BatchConfig, BatchSettings}, buffer::metrics::{MetricNormalize, MetricNormalizer, MetricSet, MetricsBuffer}, http::HttpRetryLogic, - BatchConfig, BatchSettings, PartitionBatchSink, PartitionBuffer, PartitionInnerBuffer, + EncodedEvent, PartitionBatchSink, PartitionBuffer, PartitionInnerBuffer, TowerRequestConfig, }, }, @@ -119,7 +120,7 @@ impl SinkConfig for RemoteWriteConfig { .ok() }); let key = PartitionKey { tenant_id }; - Ok(PartitionInnerBuffer::new(event, key)) + Ok(EncodedEvent::new(PartitionInnerBuffer::new(event, key))) })) }) .sink_map_err( diff --git a/src/sinks/pulsar.rs b/src/sinks/pulsar.rs index 6d05a975d368a..86f81e72f09bd 100644 --- a/src/sinks/pulsar.rs +++ b/src/sinks/pulsar.rs @@ -403,7 +403,7 @@ mod integration_tests { trace_init(); let num_events = 1_000; - let (_input, events) = random_lines_with_stream(100, num_events); + let (_input, events) = random_lines_with_stream(100, num_events, None); let topic = format!("test-{}", random_string(10)); let cnf = PulsarSinkConfig { diff --git a/src/sinks/sematext/logs.rs b/src/sinks/sematext/logs.rs index b78a817c0bd5f..4f9e36b9e5f3e 100644 --- a/src/sinks/sematext/logs.rs +++ b/src/sinks/sematext/logs.rs @@ -148,7 +148,7 @@ mod tests { let (mut rx, _trigger, server) = build_test_server(addr); tokio::spawn(server); - let (expected, events) = random_lines_with_stream(100, 10); + let (expected, events) = random_lines_with_stream(100, 10, None); sink.run(events).await.unwrap(); let output = rx.next().await.unwrap(); diff --git a/src/sinks/sematext/metrics.rs b/src/sinks/sematext/metrics.rs index 2271925e02560..c1ee2c64cae6c 100644 --- a/src/sinks/sematext/metrics.rs +++ b/src/sinks/sematext/metrics.rs @@ -11,7 +11,7 @@ use crate::{ sinks::util::{ buffer::metrics::{MetricNormalize, MetricNormalizer, MetricSet, MetricsBuffer}, http::{HttpBatchService, HttpRetryLogic}, - BatchConfig, BatchSettings, TowerRequestConfig, + BatchConfig, BatchSettings, EncodedEvent, TowerRequestConfig, }, sinks::{Healthcheck, HealthcheckError, VectorSink}, vector_version, Result, @@ -156,7 +156,13 @@ impl SematextMetricsService { batch.timeout, cx.acker(), ) - .with_flat_map(move |event: Event| stream::iter(normalizer.apply(event).map(Ok))) + .with_flat_map(move |event: Event| { + stream::iter( + normalizer + .apply(event) + .map(|item| Ok(EncodedEvent::new(item))), + ) + }) .sink_map_err(|error| error!(message = "Fatal sematext metrics sink error.", %error)); Ok(VectorSink::Sink(Box::new(sink))) @@ -177,7 +183,7 @@ impl Service> for SematextMetricsService { fn call(&mut self, items: Vec) -> Self::Future { let input = encode_events(&self.config.token, &self.config.default_namespace, items); - let body: Vec = input.into_bytes(); + let body: Vec = input.item.into_bytes(); self.inner.call(body) } @@ -211,7 +217,11 @@ fn create_build_request( } } -fn encode_events(token: &str, default_namespace: &str, events: Vec) -> String { +fn encode_events( + token: &str, + default_namespace: &str, + events: Vec, +) -> EncodedEvent { let mut output = String::new(); for event in events.into_iter() { let namespace = event @@ -246,7 +256,7 @@ fn encode_events(token: &str, default_namespace: &str, events: Vec) -> S } output.pop(); - output + EncodedEvent::new(output) } fn to_fields(label: String, value: f64) -> HashMap { @@ -284,7 +294,7 @@ mod tests { assert_eq!( "jvm,metric_type=counter,token=aaa pool.used=42 1597784400000000000", - encode_events("aaa", "ns", events) + encode_events("aaa", "ns", events).item ); } @@ -299,7 +309,7 @@ mod tests { assert_eq!( "ns,metric_type=counter,token=aaa used=42 1597784400000000000", - encode_events("aaa", "ns", events) + encode_events("aaa", "ns", events).item ); } @@ -325,7 +335,7 @@ mod tests { assert_eq!( "jvm,metric_type=counter,token=aaa pool.used=42 1597784400000000000\n\ jvm,metric_type=counter,token=aaa pool.committed=18874368 1597784400000000001", - encode_events("aaa", "ns", events) + encode_events("aaa", "ns", events).item ); } diff --git a/src/sinks/socket.rs b/src/sinks/socket.rs index a3110c28dec03..3761c23c1fa9c 100644 --- a/src/sinks/socket.rs +++ b/src/sinks/socket.rs @@ -161,7 +161,7 @@ mod test { let mut receiver = CountReceiver::receive_lines(addr); - let (lines, events) = random_lines_with_stream(10, 100); + let (lines, events) = random_lines_with_stream(10, 100, None); sink.run(events).await.unwrap(); // Wait for output to connect @@ -295,7 +295,7 @@ mod test { .await; }); - let (_, mut events) = random_lines_with_stream(10, 10); + let (_, mut events) = random_lines_with_stream(10, 10, None); while let Some(event) = events.next().await { let _ = sender.send(Some(event)).await.unwrap(); } @@ -317,7 +317,7 @@ mod test { assert_eq!(conn_counter.load(Ordering::SeqCst), 1); // Send another 10 events - let (_, mut events) = random_lines_with_stream(10, 10); + let (_, mut events) = random_lines_with_stream(10, 10, None); while let Some(event) = events.next().await { let _ = sender.send(Some(event)).await.unwrap(); } @@ -346,7 +346,7 @@ mod test { let context = SinkContext::new_test(); let (sink, _healthcheck) = config.build(context).await.unwrap(); - let (_, events) = random_lines_with_stream(1000, 10000); + let (_, events) = random_lines_with_stream(1000, 10000, None); let _ = tokio::spawn(sink.run(events)); // First listener diff --git a/src/sinks/splunk_hec.rs b/src/sinks/splunk_hec.rs index 1348416aa1d45..0838e69e03f16 100644 --- a/src/sinks/splunk_hec.rs +++ b/src/sinks/splunk_hec.rs @@ -6,7 +6,8 @@ use crate::{ sinks::util::{ encoding::{EncodingConfig, EncodingConfiguration}, http::{BatchedHttpSink, HttpSink}, - BatchConfig, BatchSettings, Buffer, Compression, Concurrency, TowerRequestConfig, + BatchConfig, BatchSettings, Buffer, Compression, Concurrency, EncodedEvent, + TowerRequestConfig, }, template::Template, tls::{TlsOptions, TlsSettings}, @@ -139,7 +140,7 @@ impl HttpSink for HecSinkConfig { type Input = Vec; type Output = Vec; - fn encode_event(&self, event: Event) -> Option { + fn encode_event(&self, event: Event) -> Option> { let sourcetype = self.sourcetype.as_ref().and_then(|sourcetype| { sourcetype .render_string(&event) @@ -235,7 +236,7 @@ impl HttpSink for HecSinkConfig { emit!(SplunkEventSent { byte_size: value.len() }); - Some(value) + Some(EncodedEvent::new(value)) } Err(error) => { emit!(SplunkEventEncodeError { error }); @@ -349,7 +350,7 @@ mod tests { ) .unwrap(); - let bytes = config.encode_event(event).unwrap(); + let bytes = config.encode_event(event).unwrap().item; let hec_event = serde_json::from_slice::(&bytes[..]).unwrap(); @@ -401,7 +402,7 @@ mod tests { ) .unwrap(); - let bytes = config.encode_event(event).unwrap(); + let bytes = config.encode_event(event).unwrap().item; let hec_event = serde_json::from_slice::(&bytes[..]).unwrap(); @@ -559,7 +560,7 @@ mod integration_tests { let config = config(Encoding::Text, vec![]).await; let (sink, _) = config.build(cx).await.unwrap(); - let (messages, events) = random_lines_with_stream(100, 10); + let (messages, events) = random_lines_with_stream(100, 10, None); sink.run(events).await.unwrap(); let mut found_all = false; diff --git a/src/sinks/statsd.rs b/src/sinks/statsd.rs index a2a647e45ec38..40449bfbce506 100644 --- a/src/sinks/statsd.rs +++ b/src/sinks/statsd.rs @@ -9,7 +9,7 @@ use crate::{ encode_namespace, tcp::TcpSinkConfig, udp::{UdpService, UdpSinkConfig}, - BatchConfig, BatchSettings, BatchSink, Buffer, Compression, + BatchConfig, BatchSettings, BatchSink, Buffer, Compression, EncodedEvent, }, }; use futures::{future, stream, FutureExt, SinkExt, StreamExt, TryFutureExt}; @@ -84,8 +84,9 @@ impl SinkConfig for StatsdSinkConfig { let default_namespace = self.default_namespace.clone(); match &self.mode { Mode::Tcp(config) => { - let encode_event = - move |event| encode_event(event, default_namespace.as_deref()).map(Into::into); + let encode_event = move |event| { + encode_event(event, default_namespace.as_deref()).map(EncodedEvent::from) + }; config.build(cx, encode_event) } Mode::Udp(config) => { @@ -116,8 +117,9 @@ impl SinkConfig for StatsdSinkConfig { } #[cfg(unix)] Mode::Unix(config) => { - let encode_event = - move |event| encode_event(event, default_namespace.as_deref()).map(Into::into); + let encode_event = move |event| { + encode_event(event, default_namespace.as_deref()).map(EncodedEvent::from) + }; config.build(cx, encode_event) } } @@ -167,7 +169,7 @@ fn push_event( }; } -fn encode_event(event: Event, default_namespace: Option<&str>) -> Option> { +fn encode_event(event: Event, default_namespace: Option<&str>) -> Option>> { let mut buf = Vec::new(); let metric = event.as_metric(); @@ -218,7 +220,7 @@ fn encode_event(event: Event, default_namespace: Option<&str>) -> Option let mut body: Vec = message.into_bytes(); body.push(b'\n'); - Some(body) + Some(EncodedEvent::new(body)) } impl Service> for StatsdSvc { @@ -300,7 +302,7 @@ mod test { .with_tags(Some(tags())); let event = Event::Metric(metric1.clone()); let frame = &encode_event(event, None).unwrap(); - let metric2 = parse(from_utf8(&frame).unwrap().trim()).unwrap(); + let metric2 = parse(from_utf8(&frame.item).unwrap().trim()).unwrap(); shared::assert_event_data_eq!(metric1, metric2); } @@ -316,7 +318,7 @@ mod test { let frame = &encode_event(event, None).unwrap(); // The statsd parser will parse the counter as Incremental, // so we can't compare it with the parsed value. - assert_eq!("counter:1.5|c\n", from_utf8(&frame).unwrap()); + assert_eq!("counter:1.5|c\n", from_utf8(&frame.item).unwrap()); } #[cfg(feature = "sources-statsd")] @@ -330,7 +332,7 @@ mod test { .with_tags(Some(tags())); let event = Event::Metric(metric1.clone()); let frame = &encode_event(event, None).unwrap(); - let metric2 = parse(from_utf8(&frame).unwrap().trim()).unwrap(); + let metric2 = parse(from_utf8(&frame.item).unwrap().trim()).unwrap(); shared::assert_event_data_eq!(metric1, metric2); } @@ -345,7 +347,7 @@ mod test { .with_tags(Some(tags())); let event = Event::Metric(metric1.clone()); let frame = &encode_event(event, None).unwrap(); - let metric2 = parse(from_utf8(&frame).unwrap().trim()).unwrap(); + let metric2 = parse(from_utf8(&frame.item).unwrap().trim()).unwrap(); shared::assert_event_data_eq!(metric1, metric2); } @@ -363,7 +365,7 @@ mod test { .with_tags(Some(tags())); let event = Event::Metric(metric1.clone()); let frame = &encode_event(event, None).unwrap(); - let metric2 = parse(from_utf8(&frame).unwrap().trim()).unwrap(); + let metric2 = parse(from_utf8(&frame.item).unwrap().trim()).unwrap(); shared::assert_event_data_eq!(metric1, metric2); } @@ -380,7 +382,7 @@ mod test { .with_tags(Some(tags())); let event = Event::Metric(metric1.clone()); let frame = &encode_event(event, None).unwrap(); - let metric2 = parse(from_utf8(&frame).unwrap().trim()).unwrap(); + let metric2 = parse(from_utf8(&frame.item).unwrap().trim()).unwrap(); shared::assert_event_data_eq!(metric1, metric2); } diff --git a/src/sinks/util/adaptive_concurrency/tests.rs b/src/sinks/util/adaptive_concurrency/tests.rs index ca932df976a01..8eec8562cc710 100644 --- a/src/sinks/util/adaptive_concurrency/tests.rs +++ b/src/sinks/util/adaptive_concurrency/tests.rs @@ -7,8 +7,8 @@ use crate::{ metrics::{self, capture_metrics, get_controller}, sinks::{ util::{ - retries::RetryLogic, BatchSettings, Concurrency, EncodedLength, TowerRequestConfig, - VecBuffer, + retries::RetryLogic, BatchSettings, Concurrency, EncodedEvent, EncodedLength, + TowerRequestConfig, VecBuffer, }, Healthcheck, VectorSink, }, @@ -21,7 +21,7 @@ use crate::{ use core::task::Context; use futures::{ future::{self, BoxFuture}, - FutureExt, SinkExt, + stream, FutureExt, SinkExt, }; use rand::{thread_rng, Rng}; use rand_distr::Exp1; @@ -155,6 +155,7 @@ impl SinkConfig for TestConfig { batch.timeout, cx.acker(), ) + .with_flat_map(|event| stream::iter(Some(Ok(EncodedEvent::new(event))))) .sink_map_err(|error| panic!("Fatal test sink error: {}", error)); let healthcheck = future::ok(()).boxed(); @@ -165,6 +166,7 @@ impl SinkConfig for TestConfig { .get_ref() .get_ref() .get_ref() + .get_ref() .controller .stats, ); diff --git a/src/sinks/util/batch.rs b/src/sinks/util/batch.rs index 253d85dc51877..f037608699b80 100644 --- a/src/sinks/util/batch.rs +++ b/src/sinks/util/batch.rs @@ -1,3 +1,5 @@ +use super::EncodedEvent; +use crate::event::EventMetadata; use derivative::Derivative; use serde::{Deserialize, Serialize}; use snafu::Snafu; @@ -190,13 +192,73 @@ pub trait Batch: Sized { fn num_items(&self) -> usize; } +/// This is a batch construct that stores an vector of metadata alongside the batch itself. +#[derive(Clone, Debug)] +pub struct MetadataBatch { + inner: B, + metadata: Vec, +} + +impl From for MetadataBatch { + fn from(inner: B) -> Self { + Self { + inner, + metadata: Vec::new(), + } + } +} + +impl Batch for MetadataBatch { + type Input = EncodedEvent; + type Output = (B::Output, Vec); + + fn get_settings_defaults( + config: BatchConfig, + defaults: BatchSettings, + ) -> Result, BatchError> { + Ok(B::get_settings_defaults(config, defaults.into())?.into()) + } + + fn push(&mut self, item: Self::Input) -> PushResult { + let EncodedEvent { item, metadata } = item; + match self.inner.push(item) { + PushResult::Ok(full) => { + if let Some(metadata) = metadata { + self.metadata.push(metadata); + } + PushResult::Ok(full) + } + PushResult::Overflow(item) => PushResult::Overflow(EncodedEvent { item, metadata }), + } + } + + fn is_empty(&self) -> bool { + self.inner.is_empty() + } + + fn fresh(&self) -> Self { + Self { + inner: self.inner.fresh(), + metadata: Vec::new(), + } + } + + fn finish(self) -> Self::Output { + (self.inner.finish(), self.metadata) + } + + fn num_items(&self) -> usize { + self.inner.num_items() + } +} + #[derive(Clone, Debug)] pub struct StatefulBatch { inner: B, was_full: bool, } -impl From for StatefulBatch { +impl From for StatefulBatch { fn from(inner: B) -> Self { Self { inner, @@ -215,10 +277,7 @@ impl StatefulBatch { } } -impl Batch for StatefulBatch -where - B: Batch, -{ +impl Batch for StatefulBatch { type Input = B::Input; type Output = B::Output; diff --git a/src/sinks/util/buffer/metrics.rs b/src/sinks/util/buffer/metrics.rs index 40c1eadaef10a..2529cac349e43 100644 --- a/src/sinks/util/buffer/metrics.rs +++ b/src/sinks/util/buffer/metrics.rs @@ -1,6 +1,8 @@ use crate::{ - event::metric::{Metric, MetricKind, MetricValue, Sample}, - event::Event, + event::{ + metric::{Metric, MetricKind, MetricValue, Sample}, + Event, + }, sinks::util::batch::{Batch, BatchConfig, BatchError, BatchSettings, BatchSize, PushResult}, }; use std::{ @@ -127,7 +129,7 @@ impl MetricsBuffer { } impl Batch for MetricsBuffer { - type Input = Event; + type Input = Metric; type Output = Vec; fn get_settings_defaults( @@ -144,7 +146,6 @@ impl Batch for MetricsBuffer { if self.num_items() >= self.max_events { PushResult::Overflow(item) } else { - let item = item.into_metric(); let max_events = self.max_events; let metrics = self .metrics @@ -210,9 +211,9 @@ impl MetricNormalizer { } /// This wraps `MetricNormalize::apply_state`, converting to/from - /// the `Event` type wrapper. See that function for return values. - pub fn apply(&mut self, event: Event) -> Option { - N::apply_state(&mut self.state, event.into_metric()).map(Into::into) + /// the `Metric` type wrapper. See that function for return values. + pub fn apply(&mut self, event: Event) -> Option { + N::apply_state(&mut self.state, event.into_metric()) } } diff --git a/src/sinks/util/buffer/mod.rs b/src/sinks/util/buffer/mod.rs index 500eaf5e6c346..231de51c135fb 100644 --- a/src/sinks/util/buffer/mod.rs +++ b/src/sinks/util/buffer/mod.rs @@ -141,7 +141,7 @@ mod test { use super::{Buffer, Compression}; use crate::{ buffers::Acker, - sinks::util::{BatchSettings, BatchSink}, + sinks::util::{BatchSettings, BatchSink, EncodedEvent}, }; use futures::{future, stream, SinkExt, StreamExt}; use std::{ @@ -179,7 +179,7 @@ mod test { let _ = buffered .sink_map_err(drop) - .send_all(&mut stream::iter(input).map(Ok)) + .send_all(&mut stream::iter(input).map(|item| Ok(EncodedEvent::new(item)))) .await .unwrap(); diff --git a/src/sinks/util/http.rs b/src/sinks/util/http.rs index 14968efca5aaa..14cc1c6b2c93f 100644 --- a/src/sinks/util/http.rs +++ b/src/sinks/util/http.rs @@ -1,6 +1,6 @@ use super::{ retries::{RetryAction, RetryLogic}, - sink, Batch, Partition, TowerBatchedSink, TowerPartitionSink, TowerRequestConfig, + sink, Batch, EncodedEvent, Partition, TowerBatchedSink, TowerPartitionSink, TowerRequestConfig, TowerRequestSettings, }; use crate::{ @@ -31,7 +31,7 @@ pub trait HttpSink: Send + Sync + 'static { type Input; type Output; - fn encode_event(&self, event: Event) -> Option; + fn encode_event(&self, event: Event) -> Option>; async fn build_request(&self, events: Self::Output) -> crate::Result>>; } @@ -65,7 +65,7 @@ where // An empty slot is needed to buffer an item where we encoded it but // the inner sink is applying back pressure. This trick is used in the `WithFlatMap` // sink combinator. https://docs.rs/futures/0.1.29/src/futures/sink/with_flat_map.rs.html#20 - slot: Option, + slot: Option>, } impl BatchedHttpSink @@ -197,7 +197,7 @@ where L, K, >, - slot: Option, + slot: Option>, } impl PartitionHttpSink diff --git a/src/sinks/util/mod.rs b/src/sinks/util/mod.rs index 26c8af74bd28a..9ed9433224381 100644 --- a/src/sinks/util/mod.rs +++ b/src/sinks/util/mod.rs @@ -16,7 +16,7 @@ pub mod udp; pub mod unix; pub mod uri; -use crate::event::Event; +use crate::event::{Event, EventMetadata}; use bytes::Bytes; use encoding::{EncodingConfig, EncodingConfiguration}; use serde::{Deserialize, Serialize}; @@ -44,6 +44,38 @@ enum SinkBuildError { MissingPort, } +#[derive(Debug)] +pub struct EncodedEvent { + pub item: I, + pub metadata: Option, +} + +impl EncodedEvent { + /// Create a trivial input with no metadata. This method will be + /// removed when all sinks are converted. + pub fn new(item: I) -> Self { + Self { + item, + metadata: None, + } + } + + // This should be: + // ```impl> From> for EncodedEvent``` + // however, the compiler rejects that due to conflicting + // implementations of `From` due to the generic + // ```impl From for T``` + pub fn from(that: EncodedEvent) -> Self + where + I: From, + { + Self { + item: I::from(that.item), + metadata: that.metadata, + } + } +} + /** * Enum representing different ways to encode events as they are sent into a Sink. */ @@ -59,7 +91,10 @@ pub enum Encoding { * the given encoding. If there are any errors encoding the event, logs a warning * and returns None. **/ -pub fn encode_event(mut event: Event, encoding: &EncodingConfig) -> Option { +pub fn encode_event( + mut event: Event, + encoding: &EncodingConfig, +) -> Option> { encoding.apply_rules(&mut event); let log = event.into_log(); @@ -76,7 +111,7 @@ pub fn encode_event(mut event: Event, encoding: &EncodingConfig) -> Op b.map(|mut b| { b.push(b'\n'); - Bytes::from(b) + EncodedEvent::new(Bytes::from(b)) }) .map_err(|error| error!(message = "Unable to encode.", %error)) .ok() diff --git a/src/sinks/util/sink.rs b/src/sinks/util/sink.rs index fb99627bb72ea..8b4e72b482154 100644 --- a/src/sinks/util/sink.rs +++ b/src/sinks/util/sink.rs @@ -32,11 +32,15 @@ //! it to notify the consumer that the request has succeeded. use super::{ - batch::{Batch, PushResult, StatefulBatch}, + batch::{Batch, MetadataBatch, PushResult, StatefulBatch}, buffer::{Partition, PartitionBuffer, PartitionInnerBuffer}, service::{Map, ServiceBuilderExt}, + EncodedEvent, +}; +use crate::{ + buffers::Acker, + event::{Event, EventMetadata, EventStatus}, }; -use crate::{buffers::Acker, event::Event}; use async_trait::async_trait; use futures::{ future::BoxFuture, @@ -125,7 +129,7 @@ where } } -impl Sink for BatchSink +impl Sink> for BatchSink where S: Service, S::Future: Send + 'static, @@ -139,9 +143,12 @@ where self.project().inner.poll_ready(cx) } - fn start_send(self: Pin<&mut Self>, item: B::Input) -> Result<(), Self::Error> { - let item = PartitionInnerBuffer::new(item, ()); - self.project().inner.start_send(item) + fn start_send(self: Pin<&mut Self>, item: EncodedEvent) -> Result<(), Self::Error> { + let EncodedEvent { item, metadata } = item; + self.project().inner.start_send(EncodedEvent { + item: PartitionInnerBuffer::new(item, ()), + metadata, + }) } fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { @@ -179,9 +186,9 @@ where B: Batch, { service: ServiceSink, - buffer: Option<(K, B::Input)>, - batch: StatefulBatch, - partitions: HashMap>, + buffer: Option<(K, EncodedEvent)>, + batch: StatefulBatch>, + partitions: HashMap>>, timeout: Duration, lingers: HashMap>>, closing: bool, @@ -203,7 +210,7 @@ where Self { service, buffer: None, - batch: batch.into(), + batch: StatefulBatch::from(MetadataBatch::from(batch)), partitions: HashMap::new(), timeout, lingers: HashMap::new(), @@ -212,7 +219,7 @@ where } } -impl Sink for PartitionBatchSink +impl Sink> for PartitionBatchSink where B: Batch, B::Input: Partition, @@ -240,8 +247,11 @@ where Poll::Ready(Ok(())) } - fn start_send(mut self: Pin<&mut Self>, item: B::Input) -> Result<(), Self::Error> { - let partition = item.partition(); + fn start_send( + mut self: Pin<&mut Self>, + item: EncodedEvent, + ) -> Result<(), Self::Error> { + let partition = item.item.partition(); let batch = loop { if let Some(batch) = self.partitions.get_mut(&partition) { @@ -301,8 +311,8 @@ where self.lingers.remove(&partition); let batch_size = batch.num_items(); - let request = batch.finish(); - tokio::spawn(self.service.call(request, batch_size)); + let (batch, metadata) = batch.finish(); + tokio::spawn(self.service.call(batch, batch_size, metadata)); batch_consumed = true; } else { @@ -392,7 +402,12 @@ where self.service.poll_ready(cx).map_err(Into::into) } - fn call(&mut self, req: Request, batch_size: usize) -> BoxFuture<'static, ()> { + fn call( + &mut self, + req: Request, + batch_size: usize, + metadata: Vec, + ) -> BoxFuture<'static, ()> { let seqno = self.seq_head; self.seq_head += 1; @@ -411,16 +426,22 @@ where .call(req) .err_into() .map(move |result| { - match result { + let status = match result { Ok(response) if response.is_successful() => { trace!(message = "Response successful.", ?response); + EventStatus::Delivered } Ok(response) => { error!(message = "Response wasn't successful.", ?response); + EventStatus::Failed } Err(error) => { error!(message = "Request failed.", %error); + EventStatus::Failed } + }; + for metadata in metadata { + metadata.update_status(status); } // If the rx end is dropped we still completed @@ -521,7 +542,7 @@ mod tests { let _ = buffered .sink_map_err(drop) - .send_all(&mut stream::iter(0..22).map(Ok)) + .send_all(&mut stream::iter(0..22).map(|item| Ok(EncodedEvent::new(item)))) .await .unwrap(); @@ -563,17 +584,26 @@ mod tests { sink.poll_ready_unpin(&mut cx), Poll::Ready(Ok(())) )); - assert!(matches!(sink.start_send_unpin(0), Ok(()))); + assert!(matches!( + sink.start_send_unpin(EncodedEvent::new(0)), + Ok(()) + )); assert!(matches!( sink.poll_ready_unpin(&mut cx), Poll::Ready(Ok(())) )); - assert!(matches!(sink.start_send_unpin(1), Ok(()))); + assert!(matches!( + sink.start_send_unpin(EncodedEvent::new(1)), + Ok(()) + )); assert!(matches!( sink.poll_ready_unpin(&mut cx), Poll::Ready(Ok(())) )); - assert!(matches!(sink.start_send_unpin(2), Ok(()))); + assert!(matches!( + sink.start_send_unpin(EncodedEvent::new(2)), + Ok(()) + )); // Clear internal buffer assert!(matches!(sink.poll_flush_unpin(&mut cx), Poll::Pending)); @@ -602,17 +632,26 @@ mod tests { sink.poll_ready_unpin(&mut cx), Poll::Ready(Ok(())) )); - assert!(matches!(sink.start_send_unpin(3), Ok(()))); + assert!(matches!( + sink.start_send_unpin(EncodedEvent::new(3)), + Ok(()) + )); assert!(matches!( sink.poll_ready_unpin(&mut cx), Poll::Ready(Ok(())) )); - assert!(matches!(sink.start_send_unpin(4), Ok(()))); + assert!(matches!( + sink.start_send_unpin(EncodedEvent::new(4)), + Ok(()) + )); assert!(matches!( sink.poll_ready_unpin(&mut cx), Poll::Ready(Ok(())) )); - assert!(matches!(sink.start_send_unpin(5), Ok(()))); + assert!(matches!( + sink.start_send_unpin(EncodedEvent::new(5)), + Ok(()) + )); // Clear internal buffer assert!(matches!(sink.poll_flush_unpin(&mut cx), Poll::Pending)); @@ -665,7 +704,7 @@ mod tests { let _ = buffered .sink_map_err(drop) - .send_all(&mut stream::iter(0..22).map(Ok)) + .send_all(&mut stream::iter(0..22).map(|item| Ok(EncodedEvent::new(item)))) .await .unwrap(); @@ -699,12 +738,18 @@ mod tests { buffered.poll_ready_unpin(&mut cx), Poll::Ready(Ok(())) )); - assert!(matches!(buffered.start_send_unpin(0), Ok(()))); + assert!(matches!( + buffered.start_send_unpin(EncodedEvent::new(0)), + Ok(()) + )); assert!(matches!( buffered.poll_ready_unpin(&mut cx), Poll::Ready(Ok(())) )); - assert!(matches!(buffered.start_send_unpin(1), Ok(()))); + assert!(matches!( + buffered.start_send_unpin(EncodedEvent::new(1)), + Ok(()) + )); buffered.close().await.unwrap(); @@ -731,12 +776,18 @@ mod tests { buffered.poll_ready_unpin(&mut cx), Poll::Ready(Ok(())) )); - assert!(matches!(buffered.start_send_unpin(0), Ok(()))); + assert!(matches!( + buffered.start_send_unpin(EncodedEvent::new(0)), + Ok(()) + )); assert!(matches!( buffered.poll_ready_unpin(&mut cx), Poll::Ready(Ok(())) )); - assert!(matches!(buffered.start_send_unpin(1), Ok(()))); + assert!(matches!( + buffered.start_send_unpin(EncodedEvent::new(1)), + Ok(()) + )); // Move clock forward by linger timeout + 1 sec advance_time(TIMEOUT + Duration::from_secs(1)).await; @@ -766,7 +817,7 @@ mod tests { let sink = PartitionBatchSink::new(svc, VecBuffer::new(batch.size), TIMEOUT, acker); sink.sink_map_err(drop) - .send_all(&mut stream::iter(0..22).map(Ok)) + .send_all(&mut stream::iter(0..22).map(|item| Ok(EncodedEvent::new(item)))) .await .unwrap(); @@ -797,7 +848,7 @@ mod tests { let input = vec![Partitions::A, Partitions::B]; sink.sink_map_err(drop) - .send_all(&mut stream::iter(input).map(Ok)) + .send_all(&mut stream::iter(input).map(|item| Ok(EncodedEvent::new(item)))) .await .unwrap(); @@ -822,7 +873,7 @@ mod tests { let input = vec![Partitions::A, Partitions::B, Partitions::A, Partitions::B]; sink.sink_map_err(drop) - .send_all(&mut stream::iter(input).map(Ok)) + .send_all(&mut stream::iter(input).map(|item| Ok(EncodedEvent::new(item)))) .await .unwrap(); @@ -856,7 +907,10 @@ mod tests { sink.poll_ready_unpin(&mut cx), Poll::Ready(Ok(())) )); - assert!(matches!(sink.start_send_unpin(1), Ok(()))); + assert!(matches!( + sink.start_send_unpin(EncodedEvent::new(1)), + Ok(()) + )); assert!(matches!(sink.poll_flush_unpin(&mut cx), Poll::Pending)); advance_time(TIMEOUT + Duration::from_secs(1)).await; @@ -888,8 +942,8 @@ mod tests { let mut sink = ServiceSink::new(svc, acker); // send some initial requests - let mut fut1 = sink.call(1, 1); - let mut fut2 = sink.call(2, 2); + let mut fut1 = sink.call(1, 1, vec![]); + let mut fut2 = sink.call(2, 2, vec![]); assert_eq!(ack_counter.load(Relaxed), 0); @@ -901,8 +955,8 @@ mod tests { assert_eq!(ack_counter.load(Relaxed), 3); // send one request that will error and one normal - let mut fut3 = sink.call(3, 3); // i will error - let mut fut4 = sink.call(4, 4); + let mut fut3 = sink.call(3, 3, vec![]); // i will error + let mut fut4 = sink.call(4, 4, vec![]); // make sure they all "worked" assert!(matches!(fut3.poll_unpin(&mut cx), Poll::Ready(()))); diff --git a/src/sinks/util/tcp.rs b/src/sinks/util/tcp.rs index 78d32fa4da18a..ea7f68bda80c8 100644 --- a/src/sinks/util/tcp.rs +++ b/src/sinks/util/tcp.rs @@ -12,7 +12,7 @@ use crate::{ util::{ retries::ExponentialBackoff, socket_bytes_sink::{BytesSink, ShutdownCheck}, - SinkBuildError, StreamSink, + EncodedEvent, SinkBuildError, StreamSink, }, Healthcheck, VectorSink, }, @@ -86,7 +86,7 @@ impl TcpSinkConfig { pub fn build( &self, cx: SinkContext, - encode_event: impl Fn(Event) -> Option + Send + Sync + 'static, + encode_event: impl Fn(Event) -> Option> + Send + Sync + 'static, ) -> crate::Result<(VectorSink, Healthcheck)> { let uri = self.address.parse::()?; let host = uri.host().ok_or(SinkBuildError::MissingHost)?.to_string(); @@ -196,14 +196,14 @@ impl TcpConnector { struct TcpSink { connector: TcpConnector, acker: Acker, - encode_event: Arc Option + Send + Sync>, + encode_event: Arc Option> + Send + Sync>, } impl TcpSink { fn new( connector: TcpConnector, acker: Acker, - encode_event: impl Fn(Event) -> Option + Send + Sync + 'static, + encode_event: impl Fn(Event) -> Option> + Send + Sync + 'static, ) -> Self { Self { connector, @@ -254,14 +254,17 @@ impl StreamSink for TcpSink { // connection only when we have something to send. let encode_event = Arc::clone(&self.encode_event); let mut input = input - .map(|event| encode_event(event).unwrap_or_else(Bytes::new)) + .map(|event| encode_event(event).unwrap_or_else(|| EncodedEvent::new(Bytes::new()))) .peekable(); while Pin::new(&mut input).peek().await.is_some() { let mut sink = self.connect().await; let _open_token = OpenGauge::new().open(|count| emit!(ConnectionOpen { count })); - let result = match sink.send_all_peekable(&mut input).await { + let result = match sink + .send_all_peekable(&mut (&mut input).map(|item| item.item).peekable()) + .await + { Ok(()) => sink.close().await, Err(error) => Err(error), }; diff --git a/src/sinks/util/test.rs b/src/sinks/util/test.rs index bd4e387b68f87..74a836aaa229f 100644 --- a/src/sinks/util/test.rs +++ b/src/sinks/util/test.rs @@ -27,21 +27,38 @@ pub fn build_test_server( mpsc::Receiver<(http::request::Parts, Bytes)>, Trigger, impl std::future::Future>, +) { + build_test_server_generic(addr, || Response::new(Body::empty())) +} + +pub fn build_test_server_generic( + addr: std::net::SocketAddr, + responder: impl Fn() -> Response + Clone + Send + Sync + 'static, +) -> ( + mpsc::Receiver<(http::request::Parts, Bytes)>, + Trigger, + impl std::future::Future>, ) { let (tx, rx) = mpsc::channel(100); let service = make_service_fn(move |_| { + let responder = responder.clone(); let tx = tx.clone(); - async { + async move { + let responder = responder.clone(); Ok::<_, Error>(service_fn(move |req: Request| { + let responder = responder.clone(); let mut tx = tx.clone(); - async { + async move { let (parts, body) = req.into_parts(); - tokio::spawn(async move { - let bytes = hyper::body::to_bytes(body).await.unwrap(); - tx.send((parts, bytes)).await.unwrap(); - }); + let response = responder(); + if response.status().is_success() { + tokio::spawn(async move { + let bytes = hyper::body::to_bytes(body).await.unwrap(); + tx.send((parts, bytes)).await.unwrap(); + }); + } - Ok::<_, Error>(Response::new(Body::empty())) + Ok::<_, Error>(response) } })) } diff --git a/src/sinks/util/udp.rs b/src/sinks/util/udp.rs index 5bbf2c9f01941..c10ca123e6c8e 100644 --- a/src/sinks/util/udp.rs +++ b/src/sinks/util/udp.rs @@ -10,7 +10,7 @@ use crate::{ UdpSocketConnectionFailed, UdpSocketError, }, sinks::{ - util::{retries::ExponentialBackoff, StreamSink}, + util::{retries::ExponentialBackoff, EncodedEvent, StreamSink}, Healthcheck, VectorSink, }, }; @@ -76,7 +76,7 @@ impl UdpSinkConfig { pub fn build( &self, cx: SinkContext, - encode_event: impl Fn(Event) -> Option + Send + Sync + 'static, + encode_event: impl Fn(Event) -> Option> + Send + Sync + 'static, ) -> crate::Result<(VectorSink, Healthcheck)> { let connector = self.build_connector(cx.clone())?; let sink = UdpSink::new(connector.clone(), cx.acker(), encode_event); @@ -228,14 +228,14 @@ impl tower::Service for UdpService { struct UdpSink { connector: UdpConnector, acker: Acker, - encode_event: Box Option + Send + Sync>, + encode_event: Box Option> + Send + Sync>, } impl UdpSink { fn new( connector: UdpConnector, acker: Acker, - encode_event: impl Fn(Event) -> Option + Send + Sync + 'static, + encode_event: impl Fn(Event) -> Option> + Send + Sync + 'static, ) -> Self { Self { connector, @@ -255,16 +255,16 @@ impl StreamSink for UdpSink { while let Some(event) = input.next().await { self.acker.ack(1); - let bytes = match (self.encode_event)(event) { - Some(bytes) => bytes, + let input = match (self.encode_event)(event) { + Some(input) => input, None => continue, }; - match udp_send(&mut socket, &bytes).await { + match udp_send(&mut socket, &input.item).await { Ok(()) => emit!(SocketEventsSent { mode: SocketMode::Udp, count: 1, - byte_size: bytes.len(), + byte_size: input.item.len(), }), Err(error) => { emit!(UdpSocketError { error }); diff --git a/src/sinks/util/unix.rs b/src/sinks/util/unix.rs index aae2bab7870d5..dbc8757b7a8a9 100644 --- a/src/sinks/util/unix.rs +++ b/src/sinks/util/unix.rs @@ -11,7 +11,7 @@ use crate::{ util::{ retries::ExponentialBackoff, socket_bytes_sink::{BytesSink, ShutdownCheck}, - StreamSink, + EncodedEvent, StreamSink, }, Healthcheck, VectorSink, }, @@ -44,7 +44,7 @@ impl UnixSinkConfig { pub fn build( &self, cx: SinkContext, - encode_event: impl Fn(Event) -> Option + Send + Sync + 'static, + encode_event: impl Fn(Event) -> Option> + Send + Sync + 'static, ) -> crate::Result<(VectorSink, Healthcheck)> { let connector = UnixConnector::new(self.path.clone()); let sink = UnixSink::new(connector.clone(), cx.acker(), encode_event); @@ -103,14 +103,14 @@ impl UnixConnector { struct UnixSink { connector: UnixConnector, acker: Acker, - encode_event: Arc Option + Send + Sync>, + encode_event: Arc Option> + Send + Sync>, } impl UnixSink { pub fn new( connector: UnixConnector, acker: Acker, - encode_event: impl Fn(Event) -> Option + Send + Sync + 'static, + encode_event: impl Fn(Event) -> Option> + Send + Sync + 'static, ) -> Self { Self { connector, @@ -136,14 +136,17 @@ impl StreamSink for UnixSink { async fn run(&mut self, input: BoxStream<'_, Event>) -> Result<(), ()> { let encode_event = Arc::clone(&self.encode_event); let mut input = input - .map(|event| encode_event(event).unwrap_or_else(Bytes::new)) + .map(|event| encode_event(event).unwrap_or_else(|| EncodedEvent::new(Bytes::new()))) .peekable(); while Pin::new(&mut input).peek().await.is_some() { let mut sink = self.connect().await; let _open_token = OpenGauge::new().open(|count| emit!(ConnectionOpen { count })); - let result = match sink.send_all_peekable(&mut input).await { + let result = match sink + .send_all_peekable(&mut (&mut input).map(|item| item.item).peekable()) + .await + { Ok(()) => sink.close().await, Err(error) => Err(error), }; @@ -208,7 +211,7 @@ mod tests { .unwrap(); // Send the test data - let (input_lines, events) = random_lines_with_stream(100, num_lines); + let (input_lines, events) = random_lines_with_stream(100, num_lines, None); sink.run(events).await.unwrap(); // Wait for output to connect diff --git a/src/sinks/vector.rs b/src/sinks/vector.rs index 7fd12a7eb825c..33c34d55b29d8 100644 --- a/src/sinks/vector.rs +++ b/src/sinks/vector.rs @@ -1,7 +1,7 @@ use crate::{ config::{DataType, GenerateConfig, SinkConfig, SinkContext, SinkDescription}, event::{proto, Event}, - sinks::util::tcp::TcpSinkConfig, + sinks::util::{tcp::TcpSinkConfig, EncodedEvent}, tcp::TcpKeepaliveConfig, tls::TlsConfig, }; @@ -91,7 +91,7 @@ enum HealthcheckError { ConnectError { source: std::io::Error }, } -fn encode_event(event: Event) -> Bytes { +fn encode_event(event: Event) -> EncodedEvent { let event = proto::EventWrapper::from(event); let event_len = event.encoded_len(); let full_len = event_len + 4; @@ -100,7 +100,7 @@ fn encode_event(event: Event) -> Bytes { out.put_u32(event_len as u32); event.encode(&mut out).unwrap(); - out.into() + EncodedEvent::new(out.into()) } #[cfg(test)] diff --git a/src/sources/socket/mod.rs b/src/sources/socket/mod.rs index 47de5650be3cb..3297bdd9c0202 100644 --- a/src/sources/socket/mod.rs +++ b/src/sources/socket/mod.rs @@ -164,7 +164,7 @@ mod test { config::{log_schema, GlobalOptions, SinkContext, SourceConfig, SourceContext}, event::Event, shutdown::{ShutdownSignal, SourceShutdownCoordinator}, - sinks::util::tcp::TcpSinkConfig, + sinks::util::{tcp::TcpSinkConfig, EncodedEvent}, test_util::{ collect_n, next_addr, random_string, send_lines, send_lines_tls, wait_for_tcp, }, @@ -429,7 +429,7 @@ mod test { let message_bytes = Bytes::from(message.clone() + "\n"); let cx = SinkContext::new_test(); - let encode_event = move |_event| Some(message_bytes.clone()); + let encode_event = move |_event| Some(EncodedEvent::new(message_bytes.clone())); let sink_config = TcpSinkConfig::from_address(format!("localhost:{}", addr.port())); let (sink, _healthcheck) = sink_config.build(cx, encode_event).unwrap(); diff --git a/src/template.rs b/src/template.rs index 268faf7df7c0c..5f8953c2698ea 100644 --- a/src/template.rs +++ b/src/template.rs @@ -1,6 +1,6 @@ use crate::{ config::log_schema, - event::{Event, Metric, Value}, + event::{EventRef, Metric, Value}, }; use bytes::Bytes; use chrono::{ @@ -101,11 +101,18 @@ fn is_dynamic(item: &Item) -> bool { } impl Template { - pub fn render(&self, event: &Event) -> Result { - self.render_string(event).map(Into::into) - } - - pub fn render_string(&self, event: &Event) -> Result { + pub fn render<'a>( + &self, + event: impl Into>, + ) -> Result { + self.render_string(event.into()).map(Into::into) + } + + pub fn render_string<'a>( + &self, + event: impl Into>, + ) -> Result { + let event = event.into(); match (self.has_fields, self.has_ts) { (false, false) => Ok(self.src.clone()), (true, false) => render_fields(&self.src, event), @@ -141,7 +148,7 @@ impl Template { } } -fn render_fields(src: &str, event: &Event) -> Result { +fn render_fields<'a>(src: &str, event: EventRef<'a>) -> Result { let mut missing_keys = Vec::new(); let out = RE .replace_all(src, |caps: &Captures<'_>| { @@ -150,8 +157,8 @@ fn render_fields(src: &str, event: &Event) -> Result log.get(&key).map(|val| val.to_string_lossy()), - Event::Metric(metric) => render_metric_field(key, metric), + EventRef::Log(log) => log.get(&key).map(|val| val.to_string_lossy()), + EventRef::Metric(metric) => render_metric_field(key, metric), } .unwrap_or_else(|| { missing_keys.push(key.to_owned()); @@ -179,12 +186,12 @@ fn render_metric_field(key: &str, metric: &Metric) -> Option { } } -fn render_timestamp(src: &str, event: &Event) -> String { +fn render_timestamp(src: &str, event: EventRef<'_>) -> String { let timestamp = match event { - Event::Log(log) => log + EventRef::Log(log) => log .get(log_schema().timestamp_key()) .and_then(Value::as_timestamp), - Event::Metric(metric) => metric.data.timestamp.as_ref(), + EventRef::Metric(metric) => metric.data.timestamp.as_ref(), }; if let Some(ts) = timestamp { ts.format(src).to_string() @@ -233,7 +240,7 @@ impl Serialize for Template { #[cfg(test)] mod tests { use super::*; - use crate::event::{MetricKind, MetricValue}; + use crate::event::{Event, MetricKind, MetricValue}; use chrono::TimeZone; use shared::btreemap; @@ -435,7 +442,7 @@ mod tests { assert_eq!( Ok(Bytes::from("timestamp 2002-03-04 05:06:07")), - template.render(&sample_metric().into()) + template.render(&sample_metric()) ); } @@ -447,7 +454,7 @@ mod tests { )); assert_eq!( Ok(Bytes::from("name=a-counter component=template")), - template.render(&metric.into()) + template.render(&metric) ); } @@ -458,7 +465,7 @@ mod tests { Err(TemplateRenderingError::MissingKeys { missing_keys: vec!["tags.component".into()] }), - template.render(&sample_metric().into()) + template.render(&sample_metric()) ); } @@ -468,7 +475,7 @@ mod tests { let metric = sample_metric().with_namespace(Some("vector-test")); assert_eq!( Ok(Bytes::from("namespace=vector-test name=a-counter")), - template.render(&metric.into()) + template.render(&metric) ); } @@ -480,7 +487,7 @@ mod tests { Err(TemplateRenderingError::MissingKeys { missing_keys: vec!["namespace".into()] }), - template.render(&metric.into()) + template.render(&metric) ); } diff --git a/src/test_util/mod.rs b/src/test_util/mod.rs index fba99f927599f..a486051f2c6a8 100644 --- a/src/test_util/mod.rs +++ b/src/test_util/mod.rs @@ -1,6 +1,5 @@ use crate::{ config::{Config, ConfigDiff, GenerateConfig}, - event::Event, topology::{self, RunningTopology}, trace, }; @@ -40,6 +39,7 @@ use tokio_stream::wrappers::TcpListenerStream; #[cfg(unix)] use tokio_stream::wrappers::UnixListenerStream; use tokio_util::codec::{Encoder, FramedRead, FramedWrite, LinesCodec}; +use vector_core::event::{BatchNotifier, Event, LogEvent}; const WAIT_FOR_SECS: u64 = 5; // The default time to wait in `wait_for` const WAIT_FOR_MIN_MILLIS: u64 = 5; // The minimum time to pause before retrying @@ -186,9 +186,17 @@ pub fn temp_dir() -> PathBuf { pub fn random_lines_with_stream( len: usize, count: usize, + batch: Option>, ) -> (Vec, impl Stream) { let lines = (0..count).map(|_| random_string(len)).collect::>(); - let stream = stream::iter(lines.clone()).map(Event::from); + let stream = stream::iter(lines.clone()).map(move |line| { + let log = LogEvent::from(line); + match &batch { + None => log, + Some(batch) => log.with_batch_notifier(Arc::clone(batch)), + } + .into() + }); (lines, stream) } diff --git a/src/transforms/log_to_metric.rs b/src/transforms/log_to_metric.rs index 158c00a840628..a18ca56f1d8d2 100644 --- a/src/transforms/log_to_metric.rs +++ b/src/transforms/log_to_metric.rs @@ -144,7 +144,7 @@ enum TransformError { fn render_template(s: &str, event: &Event) -> Result { let template = Template::try_from(s).map_err(TransformError::TemplateParseError)?; template - .render_string(&event) + .render_string(event) .map_err(TransformError::TemplateRenderingError) }