Skip to content

Commit

Permalink
enhancement(http sink): Add support for end-to-end acknowledgements (#…
Browse files Browse the repository at this point in the history
…7265)

* Create `EventRef` wrapper for references to event inner types

* Rework MetricsBuffer to use the Metric type internally

* Merge some common bits in http sink tests

* Add tests for retries and failures to http sink

* Add Batch wrapper to store metadata

* Use metadata batch for batch sinks

* Update event metadata in `ServiceSink`

* Add metadata output to `trait HttpSink::encode_event`

* Handle metadata in `BatchSink` wrapper

* Modify http sink to pass along metadata

* Add batch status asserts to all HTTP tests

Signed-off-by: Bruce Guenter <bruce.guenter@datadoghq.com>
  • Loading branch information
Bruce Guenter authored May 8, 2021
1 parent 24f1f8f commit 6db09e1
Show file tree
Hide file tree
Showing 52 changed files with 757 additions and 450 deletions.
10 changes: 7 additions & 3 deletions benches/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use vector::{
buffers::Acker,
sinks::util::{
batch::{Batch, BatchConfig, BatchError, BatchSettings, BatchSize, PushResult},
BatchSink, Buffer, Compression, Partition, PartitionBatchSink,
BatchSink, Buffer, Compression, EncodedEvent, Partition, PartitionBatchSink,
},
test_util::{random_lines, runtime},
};
Expand Down Expand Up @@ -55,7 +55,7 @@ fn benchmark_batch(c: &mut Criterion) {
inner: b,
key: Bytes::from("key"),
}))
.map(Ok),
.map(|item| Ok(EncodedEvent::new(item))),
batch_sink,
)
},
Expand Down Expand Up @@ -83,7 +83,11 @@ fn benchmark_batch(c: &mut Criterion) {
)
.sink_map_err(|error| panic!("{}", error));

(rt, stream::iter(input.clone()).map(Ok), batch_sink)
(
rt,
stream::iter(input.clone()).map(|item| Ok(EncodedEvent::new(item))),
batch_sink,
)
},
|(rt, input, batch_sink)| rt.block_on(input.forward(batch_sink)).unwrap(),
criterion::BatchSize::LargeInput,
Expand Down
1 change: 0 additions & 1 deletion benches/isolated_buffer.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use async_trait::async_trait;
use criterion::{criterion_group, BatchSize, Criterion, SamplingMode, Throughput};
use futures::{
compat::{Future01CompatExt, Sink01CompatExt},
stream::{self, BoxStream},
Sink, SinkExt, Stream, StreamExt,
};
Expand Down
29 changes: 29 additions & 0 deletions lib/vector-core/src/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,35 @@ impl From<Metric> for Event {
}
}

/// A wrapper for references to inner event types, where reconstituting
/// a full `Event` from a `LogEvent` or `Metric` might be inconvenient.
#[derive(Clone, Copy, Debug)]
pub enum EventRef<'a> {
Log(&'a LogEvent),
Metric(&'a Metric),
}

impl<'a> From<&'a Event> for EventRef<'a> {
fn from(event: &'a Event) -> Self {
match event {
Event::Log(log) => log.into(),
Event::Metric(metric) => metric.into(),
}
}
}

impl<'a> From<&'a LogEvent> for EventRef<'a> {
fn from(log: &'a LogEvent) -> Self {
Self::Log(log)
}
}

impl<'a> From<&'a Metric> for EventRef<'a> {
fn from(metric: &'a Metric) -> Self {
Self::Metric(metric)
}
}

#[cfg(test)]
mod test {
use super::*;
Expand Down
39 changes: 18 additions & 21 deletions src/sinks/aws_cloudwatch_logs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ use crate::{
internal_events::TemplateRenderingFailed,
rusoto::{self, AwsAuthentication, RegionOrEndpoint},
sinks::util::{
batch::{BatchConfig, BatchSettings},
encoding::{EncodingConfig, EncodingConfiguration},
retries::{FixedRetryPolicy, RetryLogic},
BatchConfig, BatchSettings, Compression, EncodedLength, PartitionBatchSink,
PartitionBuffer, PartitionInnerBuffer, TowerRequestConfig, TowerRequestSettings, VecBuffer,
Compression, EncodedEvent, EncodedLength, PartitionBatchSink, PartitionBuffer,
PartitionInnerBuffer, TowerRequestConfig, TowerRequestSettings, VecBuffer,
},
template::Template,
};
Expand Down Expand Up @@ -440,7 +441,7 @@ fn partition_encode(
encoding: &EncodingConfig<Encoding>,
group: &Template,
stream: &Template,
) -> Option<PartitionInnerBuffer<InputLogEvent, CloudwatchKey>> {
) -> Option<EncodedEvent<PartitionInnerBuffer<InputLogEvent, CloudwatchKey>>> {
let group = match group.render_string(&event) {
Ok(b) => b,
Err(error) => {
Expand Down Expand Up @@ -474,7 +475,7 @@ fn partition_encode(
)
.ok()?;

Some(PartitionInnerBuffer::new(event, key))
Some(EncodedEvent::new(PartitionInnerBuffer::new(event, key)))
}

#[derive(Debug, Snafu)]
Expand Down Expand Up @@ -693,9 +694,8 @@ mod tests {
let group = "group".try_into().unwrap();
let encoding = Encoding::Text.into();

let (_event, key) = partition_encode(event, &encoding, &group, &stream)
.unwrap()
.into_parts();
let encoded = partition_encode(event, &encoding, &group, &stream).unwrap();
let (_event, key) = encoded.item.into_parts();

let expected = CloudwatchKey {
stream: "stream".into(),
Expand All @@ -715,9 +715,8 @@ mod tests {
let group = "group".try_into().unwrap();
let encoding = Encoding::Text.into();

let (_event, key) = partition_encode(event, &encoding, &group, &stream)
.unwrap()
.into_parts();
let encoded = partition_encode(event, &encoding, &group, &stream).unwrap();
let (_event, key) = encoded.item.into_parts();

let expected = CloudwatchKey {
stream: "stream".into(),
Expand All @@ -737,9 +736,8 @@ mod tests {
let group = "group".try_into().unwrap();
let encoding = Encoding::Text.into();

let (_event, key) = partition_encode(event, &encoding, &group, &stream)
.unwrap()
.into_parts();
let encoded = partition_encode(event, &encoding, &group, &stream).unwrap();
let (_event, key) = encoded.item.into_parts();

let expected = CloudwatchKey {
stream: "abcd-stream".into(),
Expand All @@ -759,9 +757,8 @@ mod tests {
let group = "group".try_into().unwrap();
let encoding = Encoding::Text.into();

let (_event, key) = partition_encode(event, &encoding, &group, &stream)
.unwrap()
.into_parts();
let encoded = partition_encode(event, &encoding, &group, &stream).unwrap();
let (_event, key) = encoded.item.into_parts();

let expected = CloudwatchKey {
stream: "stream-abcd".into(),
Expand Down Expand Up @@ -896,7 +893,7 @@ mod integration_tests {

let timestamp = chrono::Utc::now();

let (input_lines, events) = random_lines_with_stream(100, 11);
let (input_lines, events) = random_lines_with_stream(100, 11, None);
sink.run(events).await.unwrap();

let request = GetLogEventsRequest {
Expand Down Expand Up @@ -943,7 +940,7 @@ mod integration_tests {

let timestamp = chrono::Utc::now() - chrono::Duration::days(1);

let (mut input_lines, events) = random_lines_with_stream(100, 11);
let (mut input_lines, events) = random_lines_with_stream(100, 11, None);

// add a historical timestamp to all but the first event, to simulate
// out-of-order timestamps.
Expand Down Expand Up @@ -1081,7 +1078,7 @@ mod integration_tests {

let timestamp = chrono::Utc::now();

let (input_lines, events) = random_lines_with_stream(100, 11);
let (input_lines, events) = random_lines_with_stream(100, 11, None);
sink.run(events).await.unwrap();

let request = GetLogEventsRequest {
Expand Down Expand Up @@ -1133,7 +1130,7 @@ mod integration_tests {

let timestamp = chrono::Utc::now();

let (input_lines, events) = random_lines_with_stream(100, 11);
let (input_lines, events) = random_lines_with_stream(100, 11, None);
let mut events = events.map(Ok);
let _ = sink.into_sink().send_all(&mut events).await.unwrap();

Expand Down Expand Up @@ -1181,7 +1178,7 @@ mod integration_tests {

let timestamp = chrono::Utc::now();

let (input_lines, _events) = random_lines_with_stream(100, 10);
let (input_lines, _events) = random_lines_with_stream(100, 10, None);

let events = input_lines
.clone()
Expand Down
14 changes: 8 additions & 6 deletions src/sinks/aws_cloudwatch_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ use crate::{
},
rusoto::{self, AwsAuthentication, RegionOrEndpoint},
sinks::util::{
batch::{BatchConfig, BatchSettings},
buffer::metrics::{MetricNormalize, MetricNormalizer, MetricSet, MetricsBuffer},
retries::RetryLogic,
BatchConfig, BatchSettings, Compression, PartitionBatchSink, PartitionBuffer,
PartitionInnerBuffer, TowerRequestConfig,
Compression, EncodedEvent, PartitionBatchSink, PartitionBuffer, PartitionInnerBuffer,
TowerRequestConfig,
},
};
use chrono::{DateTime, SecondsFormat, Utc};
Expand Down Expand Up @@ -149,15 +150,16 @@ impl CloudWatchMetricsSvc {
let sink = PartitionBatchSink::new(svc, buffer, batch.timeout, cx.acker())
.sink_map_err(|error| error!(message = "Fatal CloudwatchMetrics sink error.", %error))
.with_flat_map(move |event: Event| {
stream::iter(normalizer.apply(event).map(|mut event| {
let namespace = event
.as_mut_metric()
stream::iter(normalizer.apply(event).map(|mut metric| {
let namespace = metric
.series
.name
.namespace
.take()
.unwrap_or_else(|| default_namespace.clone());
Ok(PartitionInnerBuffer::new(event, namespace))
Ok(EncodedEvent::new(PartitionInnerBuffer::new(
metric, namespace,
)))
}))
});

Expand Down
11 changes: 6 additions & 5 deletions src/sinks/aws_kinesis_firehose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use crate::{
encoding::{EncodingConfig, EncodingConfiguration},
retries::RetryLogic,
sink::Response,
BatchConfig, BatchSettings, Compression, EncodedLength, TowerRequestConfig, VecBuffer,
BatchConfig, BatchSettings, Compression, EncodedEvent, EncodedLength, TowerRequestConfig,
VecBuffer,
},
};
use bytes::Bytes;
Expand Down Expand Up @@ -240,7 +241,7 @@ enum HealthcheckError {
StreamNamesMismatch { name: String, stream_name: String },
}

fn encode_event(mut event: Event, encoding: &EncodingConfig<Encoding>) -> Record {
fn encode_event(mut event: Event, encoding: &EncodingConfig<Encoding>) -> EncodedEvent<Record> {
encoding.apply_rules(&mut event);
let log = event.into_log();
let data = match encoding.codec() {
Expand All @@ -254,7 +255,7 @@ fn encode_event(mut event: Event, encoding: &EncodingConfig<Encoding>) -> Record

let data = Bytes::from(data);

Record { data }
EncodedEvent::new(Record { data })
}

#[cfg(test)]
Expand All @@ -272,7 +273,7 @@ mod tests {
let message = "hello world".to_string();
let event = encode_event(message.clone().into(), &Encoding::Text.into());

assert_eq!(&event.data[..], message.as_bytes());
assert_eq!(&event.item.data[..], message.as_bytes());
}

#[test]
Expand All @@ -282,7 +283,7 @@ mod tests {
event.as_mut_log().insert("key", "value");
let event = encode_event(event, &Encoding::Json.into());

let map: BTreeMap<String, String> = serde_json::from_slice(&event.data[..]).unwrap();
let map: BTreeMap<String, String> = serde_json::from_slice(&event.item.data[..]).unwrap();

assert_eq!(
map[&crate::config::log_schema().message_key().to_string()],
Expand Down
27 changes: 14 additions & 13 deletions src/sinks/aws_kinesis_streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ use crate::{
encoding::{EncodingConfig, EncodingConfiguration},
retries::RetryLogic,
sink::Response,
BatchConfig, BatchSettings, Compression, EncodedLength, TowerRequestConfig, VecBuffer,
BatchConfig, BatchSettings, Compression, EncodedEvent, EncodedLength, TowerRequestConfig,
VecBuffer,
},
};
use bytes::Bytes;
Expand Down Expand Up @@ -269,7 +270,7 @@ fn encode_event(
mut event: Event,
partition_key_field: &Option<String>,
encoding: &EncodingConfig<Encoding>,
) -> Option<PutRecordsRequestEntry> {
) -> Option<EncodedEvent<PutRecordsRequestEntry>> {
let partition_key = if let Some(partition_key_field) = partition_key_field {
if let Some(v) = event.as_log().get(&partition_key_field) {
v.to_string_lossy()
Expand Down Expand Up @@ -302,11 +303,11 @@ fn encode_event(
.unwrap_or_default(),
};

Some(PutRecordsRequestEntry {
Some(EncodedEvent::new(PutRecordsRequestEntry {
data: Bytes::from(data),
partition_key,
..Default::default()
})
}))
}

fn gen_partition_key() -> String {
Expand Down Expand Up @@ -334,7 +335,7 @@ mod tests {
let message = "hello world".to_string();
let event = encode_event(message.clone().into(), &None, &Encoding::Text.into()).unwrap();

assert_eq!(&event.data[..], message.as_bytes());
assert_eq!(&event.item.data[..], message.as_bytes());
}

#[test]
Expand All @@ -344,7 +345,7 @@ mod tests {
event.as_mut_log().insert("key", "value");
let event = encode_event(event, &None, &Encoding::Json.into()).unwrap();

let map: BTreeMap<String, String> = serde_json::from_slice(&event.data[..]).unwrap();
let map: BTreeMap<String, String> = serde_json::from_slice(&event.item.data[..]).unwrap();

assert_eq!(map[&log_schema().message_key().to_string()], message);
assert_eq!(map["key"], "value".to_string());
Expand All @@ -356,8 +357,8 @@ mod tests {
event.as_mut_log().insert("key", "some_key");
let event = encode_event(event, &Some("key".into()), &Encoding::Text.into()).unwrap();

assert_eq!(&event.data[..], b"hello world");
assert_eq!(&event.partition_key, &"some_key".to_string());
assert_eq!(&event.item.data[..], b"hello world");
assert_eq!(&event.item.partition_key, &"some_key".to_string());
}

#[test]
Expand All @@ -366,8 +367,8 @@ mod tests {
event.as_mut_log().insert("key", random_string(300));
let event = encode_event(event, &Some("key".into()), &Encoding::Text.into()).unwrap();

assert_eq!(&event.data[..], b"hello world");
assert_eq!(event.partition_key.len(), 256);
assert_eq!(&event.item.data[..], b"hello world");
assert_eq!(event.item.partition_key.len(), 256);
}

#[test]
Expand All @@ -379,9 +380,9 @@ mod tests {
encoding.except_fields = Some(vec!["key".into()]);

let event = encode_event(event, &Some("key".into()), &encoding).unwrap();
let map: BTreeMap<String, String> = serde_json::from_slice(&event.data[..]).unwrap();
let map: BTreeMap<String, String> = serde_json::from_slice(&event.item.data[..]).unwrap();

assert_eq!(&event.partition_key, &"some_key".to_string());
assert_eq!(&event.item.partition_key, &"some_key".to_string());
assert!(!map.contains_key("key"));
}
}
Expand Down Expand Up @@ -433,7 +434,7 @@ mod integration_tests {

let timestamp = chrono::Utc::now().timestamp_millis();

let (mut input_lines, events) = random_lines_with_stream(100, 11);
let (mut input_lines, events) = random_lines_with_stream(100, 11, None);
let mut events = events.map(Ok);

let _ = sink.send_all(&mut events).await.unwrap();
Expand Down
Loading

0 comments on commit 6db09e1

Please sign in to comment.