From d28a176bb3116ef58ad6cbe55f0f6239fb8adf9c Mon Sep 17 00:00:00 2001 From: Kruno Tomola Fabro Date: Thu, 30 Apr 2020 13:28:53 +0200 Subject: [PATCH] enhancement(aws_cloudwatch_logs sink): Enforce age requirements (#2437) * Time filtering Signed-off-by: ktf * Add 24h split Signed-off-by: Kruno Tomola Fabro * Add test Signed-off-by: Kruno Tomola Fabro * Fix remainder range Signed-off-by: Kruno Tomola Fabro * Add optimization Signed-off-by: Kruno Tomola Fabro * Remove extra buffer time Signed-off-by: Kruno Tomola Fabro * Move filtering to outer layer Signed-off-by: Kruno Tomola Fabro * Fix Signed-off-by: Kruno Tomola Fabro * Add split_off Signed-off-by: Kruno Tomola Fabro * Add 24h split test Signed-off-by: Kruno Tomola Fabro * Bump Signed-off-by: Kruno Tomola Fabro --- src/sinks/aws_cloudwatch_logs/mod.rs | 188 +++++++++++++++++++++-- src/sinks/aws_cloudwatch_logs/request.rs | 41 ++--- 2 files changed, 195 insertions(+), 34 deletions(-) diff --git a/src/sinks/aws_cloudwatch_logs/mod.rs b/src/sinks/aws_cloudwatch_logs/mod.rs index 577aeb49fa059..5449c3f0abc61 100644 --- a/src/sinks/aws_cloudwatch_logs/mod.rs +++ b/src/sinks/aws_cloudwatch_logs/mod.rs @@ -15,6 +15,7 @@ use crate::{ topology::config::{DataType, SinkConfig, SinkContext}, }; use bytes::Bytes; +use chrono::{Duration, Utc}; use futures01::{future, stream::iter_ok, sync::oneshot, Async, Future, Poll, Sink}; use lazy_static::lazy_static; use rusoto_core::{request::BufferedHttpResponse, Region, RusotoError}; @@ -266,7 +267,7 @@ impl CloudwatchLogsSvc { if let Some(Value::Timestamp(ts)) = log.remove(&event::log_schema().timestamp_key()) { ts.timestamp_millis() } else { - chrono::Utc::now().timestamp_millis() + Utc::now().timestamp_millis() }; match self.encoding.codec() { @@ -283,6 +284,66 @@ impl CloudwatchLogsSvc { } } } + + pub fn process_events(&self, events: Vec) -> Vec> { + let now = Utc::now(); + // Acceptable range of Event timestamps. + let age_range = (now - Duration::days(14)).timestamp_millis() + ..(now + Duration::hours(2)).timestamp_millis(); + + let mut events = events + .into_iter() + .map(|mut e| { + self.encoding.apply_rules(&mut e); + e + }) + .map(|e| e.into_log()) + .map(|e| self.encode_log(e)) + .filter(|e| age_range.contains(&e.timestamp)) + .collect::>(); + + // Sort by timestamp + events.sort_by_key(|e| e.timestamp); + + info!(message = "Sending events.", events = %events.len()); + + let mut event_batches = Vec::new(); + if events.is_empty() { + // This should happen rarely. + event_batches.push(Vec::new()); + } else { + // We will split events into 24h batches. + // Relies on log_events being sorted by timestamp in ascending order. + while let Some(oldest) = events.first() { + let limit = oldest.timestamp + Duration::days(1).num_milliseconds(); + + if events.last().expect("Events can't be empty").timestamp <= limit { + // Fast path. + // In most cases the difference between oldest and newest event + // is less than 24h. + event_batches.push(events); + break; + } + + // At this point we know that an event older than the limit exists. + // + // We will find none or one of the events with timestamp==limit. + // In the case of more events with limit, we can just split them + // at found event, and send those before at with this batch, and + // those after at with the next batch. + let at = events + .binary_search_by_key(&limit, |e| e.timestamp) + .unwrap_or_else(|at| at); + + // Can't be empty + let remainder = events.split_off(at); + event_batches.push(events); + events = remainder; + } + } + + event_batches + } } impl Service> for CloudwatchLogsSvc { @@ -314,27 +375,18 @@ impl Service> for CloudwatchLogsSvc { fn call(&mut self, req: Vec) -> Self::Future { if self.token_rx.is_none() { - let events = req - .into_iter() - .map(|mut e| { - self.encoding.apply_rules(&mut e); - e - }) - .map(|e| e.into_log()) - .map(|e| self.encode_log(e)) - .collect::>(); + let event_batches = self.process_events(req); let (tx, rx) = oneshot::channel(); self.token_rx = Some(rx); - info!(message = "Sending events.", events = %events.len()); request::CloudwatchFuture::new( self.client.clone(), self.stream_name.clone(), self.group_name.clone(), self.create_missing_group, self.create_missing_stream, - events, + event_batches, self.token.take(), tx, ) @@ -735,6 +787,31 @@ mod tests { let encoded = svc(config).encode_log(event.clone()); assert_eq!(encoded.message, "hello world"); } + + #[test] + fn cloudwatch_24h_split() { + let now = Utc::now(); + let events = (0..100) + .into_iter() + .map(|i| now - Duration::hours(i)) + .map(|timestamp| { + let mut event = Event::new_empty_log(); + event + .as_mut_log() + .insert(&event::log_schema().timestamp_key(), timestamp); + event + }) + .collect(); + + let batches = svc(default_config(Encoding::Text)).process_events(events); + + let day = Duration::days(1).num_milliseconds(); + for batch in batches.iter() { + assert!((batch.last().unwrap().timestamp - batch.first().unwrap().timestamp) <= day); + } + + assert_eq!(batches.len(), 5); + } } #[cfg(feature = "aws-cloudwatch-logs-integration-tests")] @@ -744,10 +821,13 @@ mod integration_tests { use crate::{ region::RegionOrEndpoint, runtime::Runtime, - test_util::{random_lines_with_stream, random_string}, + test_util::{random_lines, random_lines_with_stream, random_string}, topology::config::{SinkConfig, SinkContext}, }; - use futures01::{stream::Stream, Sink}; + use futures01::{ + stream::{self, Stream}, + Sink, + }; use pretty_assertions::assert_eq; use rusoto_core::Region; use rusoto_logs::{CloudWatchLogs, CreateLogGroupRequest, GetLogEventsRequest}; @@ -882,6 +962,86 @@ mod integration_tests { assert_eq!(output_lines, input_lines); } + #[test] + fn cloudwatch_insert_out_of_range_timestamp() { + let mut rt = Runtime::single_threaded().unwrap(); + let resolver = Resolver::new(Vec::new(), rt.executor()).unwrap(); + + let stream_name = gen_name(); + + let region = Region::Custom { + name: "localstack".into(), + endpoint: "http://localhost:6000".into(), + }; + ensure_group(region.clone()); + + let config = CloudwatchLogsSinkConfig { + stream_name: stream_name.clone().into(), + group_name: GROUP_NAME.into(), + region: RegionOrEndpoint::with_endpoint("http://localhost:6000".into()), + encoding: Encoding::Text.into(), + create_missing_group: None, + create_missing_stream: None, + batch: Default::default(), + request: Default::default(), + assume_role: None, + }; + + let (sink, _) = config.build(SinkContext::new_test(rt.executor())).unwrap(); + + let now = chrono::Utc::now(); + + let mut input_lines = random_lines(100); + let mut events = Vec::new(); + let mut lines = Vec::new(); + + let mut add_event = |offset: chrono::Duration| { + let line = input_lines.next().unwrap(); + let mut event = Event::from(line.clone()); + event + .as_mut_log() + .insert(event::log_schema().timestamp_key(), now + offset); + events.push(event); + line + }; + + // ------------------ past + add_event(Duration::days(-15)); + // ------------------ 14 day limit + lines.push(add_event(Duration::days(-13))); + lines.push(add_event(Duration::days(-1))); + // ------------------ now + lines.push(add_event(Duration::zero())); + // ------------------ future + lines.push(add_event(Duration::hours(1))); + lines.push(add_event(Duration::minutes(110))); + // ------------------ 2h limit + add_event(Duration::minutes(125)); + + let pump = sink.send_all(stream::iter_ok(events)); + let (sink, _) = rt.block_on(pump).unwrap(); + // drop the sink so it closes all its connections + drop(sink); + + let mut request = GetLogEventsRequest::default(); + request.log_stream_name = stream_name.clone().into(); + request.log_group_name = GROUP_NAME.into(); + request.start_time = Some((now - Duration::days(30)).timestamp_millis()); + + let client = create_client(region, None, resolver).unwrap(); + + let response = rt.block_on(client.get_log_events(request)).unwrap(); + + let events = response.events.unwrap(); + + let output_lines = events + .into_iter() + .map(|e| e.message.unwrap()) + .collect::>(); + + assert_eq!(output_lines, lines); + } + #[test] fn cloudwatch_dynamic_group_and_stream_creation() { let mut rt = Runtime::single_threaded().unwrap(); diff --git a/src/sinks/aws_cloudwatch_logs/request.rs b/src/sinks/aws_cloudwatch_logs/request.rs index 9f5fb18f709ef..da4c01c8a89a8 100644 --- a/src/sinks/aws_cloudwatch_logs/request.rs +++ b/src/sinks/aws_cloudwatch_logs/request.rs @@ -13,7 +13,7 @@ pub struct CloudwatchFuture { state: State, create_missing_group: bool, create_missing_stream: bool, - events: Option>, + events: Vec>, token_tx: Option>>, } @@ -31,13 +31,14 @@ enum State { } impl CloudwatchFuture { + /// Panics if events.is_empty() pub fn new( client: CloudWatchLogsClient, stream_name: String, group_name: String, create_missing_group: bool, create_missing_stream: bool, - events: Vec, + mut events: Vec>, token: Option, token_tx: oneshot::Sender>, ) -> Self { @@ -47,12 +48,10 @@ impl CloudwatchFuture { group_name, }; - let (state, events) = if let Some(token) = token { - let state = State::Put(client.put_logs(Some(token), events)); - (state, None) + let state = if let Some(token) = token { + State::Put(client.put_logs(Some(token), events.pop().expect("No Events to send"))) } else { - let state = State::DescribeStream(client.describe_stream()); - (state, Some(events)) + State::DescribeStream(client.describe_stream()) }; Self { @@ -106,8 +105,8 @@ impl Future for CloudwatchFuture { let events = self .events - .take() - .expect("Token got called twice, this is a bug!"); + .pop() + .expect("Token got called multiple times, this is a bug!"); let token = stream.upload_sequence_token; @@ -169,15 +168,20 @@ impl Future for CloudwatchFuture { let next_token = res.next_sequence_token; - info!(message = "putting logs was successful.", ?next_token); + if let Some(events) = self.events.pop() { + debug!(message = "putting logs.", ?next_token); + self.state = State::Put(self.client.put_logs(next_token, events)); + } else { + info!(message = "putting logs was successful.", ?next_token); - self.token_tx - .take() - .expect("Put was polled twice.") - .send(next_token) - .expect("CloudwatchLogsSvc was dropped unexpectedly"); + self.token_tx + .take() + .expect("Put was polled after finishing.") + .send(next_token) + .expect("CloudwatchLogsSvc was dropped unexpectedly"); - return Ok(().into()); + return Ok(().into()); + } } } } @@ -188,11 +192,8 @@ impl Client { pub fn put_logs( &self, sequence_token: Option, - mut log_events: Vec, + log_events: Vec, ) -> RusotoFuture { - // Sort by timestamp - log_events.sort_by_key(|e| e.timestamp); - let request = PutLogEventsRequest { log_events, sequence_token,