Skip to content

Commit

Permalink
enhancement(aws_cloudwatch_logs sink): Enforce age requirements (#2437)
Browse files Browse the repository at this point in the history
* Time filtering

Signed-off-by: ktf <krunotf@gmail.com>

* Add 24h split

Signed-off-by: Kruno Tomola Fabro <krunotf@gmail.com>

* Add test

Signed-off-by: Kruno Tomola Fabro <krunotf@gmail.com>

* Fix remainder range

Signed-off-by: Kruno Tomola Fabro <krunotf@gmail.com>

* Add optimization

Signed-off-by: Kruno Tomola Fabro <krunotf@gmail.com>

* Remove extra buffer time

Signed-off-by: Kruno Tomola Fabro <krunotf@gmail.com>

* Move filtering to outer layer

Signed-off-by: Kruno Tomola Fabro <krunotf@gmail.com>

* Fix

Signed-off-by: Kruno Tomola Fabro <krunotf@gmail.com>

* Add split_off

Signed-off-by: Kruno Tomola Fabro <krunotf@gmail.com>

* Add 24h split test

Signed-off-by: Kruno Tomola Fabro <krunotf@gmail.com>

* Bump

Signed-off-by: Kruno Tomola Fabro <krunotf@gmail.com>
  • Loading branch information
ktff authored Apr 30, 2020
1 parent d7c0004 commit d28a176
Show file tree
Hide file tree
Showing 2 changed files with 195 additions and 34 deletions.
188 changes: 174 additions & 14 deletions src/sinks/aws_cloudwatch_logs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::{
topology::config::{DataType, SinkConfig, SinkContext},
};
use bytes::Bytes;
use chrono::{Duration, Utc};
use futures01::{future, stream::iter_ok, sync::oneshot, Async, Future, Poll, Sink};
use lazy_static::lazy_static;
use rusoto_core::{request::BufferedHttpResponse, Region, RusotoError};
Expand Down Expand Up @@ -266,7 +267,7 @@ impl CloudwatchLogsSvc {
if let Some(Value::Timestamp(ts)) = log.remove(&event::log_schema().timestamp_key()) {
ts.timestamp_millis()
} else {
chrono::Utc::now().timestamp_millis()
Utc::now().timestamp_millis()
};

match self.encoding.codec() {
Expand All @@ -283,6 +284,66 @@ impl CloudwatchLogsSvc {
}
}
}

pub fn process_events(&self, events: Vec<Event>) -> Vec<Vec<InputLogEvent>> {
let now = Utc::now();
// Acceptable range of Event timestamps.
let age_range = (now - Duration::days(14)).timestamp_millis()
..(now + Duration::hours(2)).timestamp_millis();

let mut events = events
.into_iter()
.map(|mut e| {
self.encoding.apply_rules(&mut e);
e
})
.map(|e| e.into_log())
.map(|e| self.encode_log(e))
.filter(|e| age_range.contains(&e.timestamp))
.collect::<Vec<_>>();

// Sort by timestamp
events.sort_by_key(|e| e.timestamp);

info!(message = "Sending events.", events = %events.len());

let mut event_batches = Vec::new();
if events.is_empty() {
// This should happen rarely.
event_batches.push(Vec::new());
} else {
// We will split events into 24h batches.
// Relies on log_events being sorted by timestamp in ascending order.
while let Some(oldest) = events.first() {
let limit = oldest.timestamp + Duration::days(1).num_milliseconds();

if events.last().expect("Events can't be empty").timestamp <= limit {
// Fast path.
// In most cases the difference between oldest and newest event
// is less than 24h.
event_batches.push(events);
break;
}

// At this point we know that an event older than the limit exists.
//
// We will find none or one of the events with timestamp==limit.
// In the case of more events with limit, we can just split them
// at found event, and send those before at with this batch, and
// those after at with the next batch.
let at = events
.binary_search_by_key(&limit, |e| e.timestamp)
.unwrap_or_else(|at| at);

// Can't be empty
let remainder = events.split_off(at);
event_batches.push(events);
events = remainder;
}
}

event_batches
}
}

impl Service<Vec<Event>> for CloudwatchLogsSvc {
Expand Down Expand Up @@ -314,27 +375,18 @@ impl Service<Vec<Event>> for CloudwatchLogsSvc {

fn call(&mut self, req: Vec<Event>) -> Self::Future {
if self.token_rx.is_none() {
let events = req
.into_iter()
.map(|mut e| {
self.encoding.apply_rules(&mut e);
e
})
.map(|e| e.into_log())
.map(|e| self.encode_log(e))
.collect::<Vec<_>>();
let event_batches = self.process_events(req);

let (tx, rx) = oneshot::channel();
self.token_rx = Some(rx);

info!(message = "Sending events.", events = %events.len());
request::CloudwatchFuture::new(
self.client.clone(),
self.stream_name.clone(),
self.group_name.clone(),
self.create_missing_group,
self.create_missing_stream,
events,
event_batches,
self.token.take(),
tx,
)
Expand Down Expand Up @@ -735,6 +787,31 @@ mod tests {
let encoded = svc(config).encode_log(event.clone());
assert_eq!(encoded.message, "hello world");
}

#[test]
fn cloudwatch_24h_split() {
let now = Utc::now();
let events = (0..100)
.into_iter()
.map(|i| now - Duration::hours(i))
.map(|timestamp| {
let mut event = Event::new_empty_log();
event
.as_mut_log()
.insert(&event::log_schema().timestamp_key(), timestamp);
event
})
.collect();

let batches = svc(default_config(Encoding::Text)).process_events(events);

let day = Duration::days(1).num_milliseconds();
for batch in batches.iter() {
assert!((batch.last().unwrap().timestamp - batch.first().unwrap().timestamp) <= day);
}

assert_eq!(batches.len(), 5);
}
}

#[cfg(feature = "aws-cloudwatch-logs-integration-tests")]
Expand All @@ -744,10 +821,13 @@ mod integration_tests {
use crate::{
region::RegionOrEndpoint,
runtime::Runtime,
test_util::{random_lines_with_stream, random_string},
test_util::{random_lines, random_lines_with_stream, random_string},
topology::config::{SinkConfig, SinkContext},
};
use futures01::{stream::Stream, Sink};
use futures01::{
stream::{self, Stream},
Sink,
};
use pretty_assertions::assert_eq;
use rusoto_core::Region;
use rusoto_logs::{CloudWatchLogs, CreateLogGroupRequest, GetLogEventsRequest};
Expand Down Expand Up @@ -882,6 +962,86 @@ mod integration_tests {
assert_eq!(output_lines, input_lines);
}

#[test]
fn cloudwatch_insert_out_of_range_timestamp() {
let mut rt = Runtime::single_threaded().unwrap();
let resolver = Resolver::new(Vec::new(), rt.executor()).unwrap();

let stream_name = gen_name();

let region = Region::Custom {
name: "localstack".into(),
endpoint: "http://localhost:6000".into(),
};
ensure_group(region.clone());

let config = CloudwatchLogsSinkConfig {
stream_name: stream_name.clone().into(),
group_name: GROUP_NAME.into(),
region: RegionOrEndpoint::with_endpoint("http://localhost:6000".into()),
encoding: Encoding::Text.into(),
create_missing_group: None,
create_missing_stream: None,
batch: Default::default(),
request: Default::default(),
assume_role: None,
};

let (sink, _) = config.build(SinkContext::new_test(rt.executor())).unwrap();

let now = chrono::Utc::now();

let mut input_lines = random_lines(100);
let mut events = Vec::new();
let mut lines = Vec::new();

let mut add_event = |offset: chrono::Duration| {
let line = input_lines.next().unwrap();
let mut event = Event::from(line.clone());
event
.as_mut_log()
.insert(event::log_schema().timestamp_key(), now + offset);
events.push(event);
line
};

// ------------------ past
add_event(Duration::days(-15));
// ------------------ 14 day limit
lines.push(add_event(Duration::days(-13)));
lines.push(add_event(Duration::days(-1)));
// ------------------ now
lines.push(add_event(Duration::zero()));
// ------------------ future
lines.push(add_event(Duration::hours(1)));
lines.push(add_event(Duration::minutes(110)));
// ------------------ 2h limit
add_event(Duration::minutes(125));

let pump = sink.send_all(stream::iter_ok(events));
let (sink, _) = rt.block_on(pump).unwrap();
// drop the sink so it closes all its connections
drop(sink);

let mut request = GetLogEventsRequest::default();
request.log_stream_name = stream_name.clone().into();
request.log_group_name = GROUP_NAME.into();
request.start_time = Some((now - Duration::days(30)).timestamp_millis());

let client = create_client(region, None, resolver).unwrap();

let response = rt.block_on(client.get_log_events(request)).unwrap();

let events = response.events.unwrap();

let output_lines = events
.into_iter()
.map(|e| e.message.unwrap())
.collect::<Vec<_>>();

assert_eq!(output_lines, lines);
}

#[test]
fn cloudwatch_dynamic_group_and_stream_creation() {
let mut rt = Runtime::single_threaded().unwrap();
Expand Down
41 changes: 21 additions & 20 deletions src/sinks/aws_cloudwatch_logs/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub struct CloudwatchFuture {
state: State,
create_missing_group: bool,
create_missing_stream: bool,
events: Option<Vec<InputLogEvent>>,
events: Vec<Vec<InputLogEvent>>,
token_tx: Option<oneshot::Sender<Option<String>>>,
}

Expand All @@ -31,13 +31,14 @@ enum State {
}

impl CloudwatchFuture {
/// Panics if events.is_empty()
pub fn new(
client: CloudWatchLogsClient,
stream_name: String,
group_name: String,
create_missing_group: bool,
create_missing_stream: bool,
events: Vec<InputLogEvent>,
mut events: Vec<Vec<InputLogEvent>>,
token: Option<String>,
token_tx: oneshot::Sender<Option<String>>,
) -> Self {
Expand All @@ -47,12 +48,10 @@ impl CloudwatchFuture {
group_name,
};

let (state, events) = if let Some(token) = token {
let state = State::Put(client.put_logs(Some(token), events));
(state, None)
let state = if let Some(token) = token {
State::Put(client.put_logs(Some(token), events.pop().expect("No Events to send")))
} else {
let state = State::DescribeStream(client.describe_stream());
(state, Some(events))
State::DescribeStream(client.describe_stream())
};

Self {
Expand Down Expand Up @@ -106,8 +105,8 @@ impl Future for CloudwatchFuture {

let events = self
.events
.take()
.expect("Token got called twice, this is a bug!");
.pop()
.expect("Token got called multiple times, this is a bug!");

let token = stream.upload_sequence_token;

Expand Down Expand Up @@ -169,15 +168,20 @@ impl Future for CloudwatchFuture {

let next_token = res.next_sequence_token;

info!(message = "putting logs was successful.", ?next_token);
if let Some(events) = self.events.pop() {
debug!(message = "putting logs.", ?next_token);
self.state = State::Put(self.client.put_logs(next_token, events));
} else {
info!(message = "putting logs was successful.", ?next_token);

self.token_tx
.take()
.expect("Put was polled twice.")
.send(next_token)
.expect("CloudwatchLogsSvc was dropped unexpectedly");
self.token_tx
.take()
.expect("Put was polled after finishing.")
.send(next_token)
.expect("CloudwatchLogsSvc was dropped unexpectedly");

return Ok(().into());
return Ok(().into());
}
}
}
}
Expand All @@ -188,11 +192,8 @@ impl Client {
pub fn put_logs(
&self,
sequence_token: Option<String>,
mut log_events: Vec<InputLogEvent>,
log_events: Vec<InputLogEvent>,
) -> RusotoFuture<PutLogEventsResponse, PutLogEventsError> {
// Sort by timestamp
log_events.sort_by_key(|e| e.timestamp);

let request = PutLogEventsRequest {
log_events,
sequence_token,
Expand Down

0 comments on commit d28a176

Please sign in to comment.