Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(kinesis sinks): implement full retry of partial failures in firehose/streams #16771

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion src/sinks/aws_kinesis/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,13 @@ pub struct KinesisSinkBaseConfig {
#[serde(default)]
pub auth: AwsAuthentication,

/// Whether or not to retry successful requests containing partial failures.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably want per-sink config. This is in the "base" only

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 especially if we're only supporting it for streams.

///
/// Note: this will cause duplicates in firehose.
#[serde(default)]
#[configurable(metadata(docs::advanced))]
pub request_retry_partial: bool,

#[configurable(derived)]
#[serde(
default,
Expand All @@ -83,6 +90,7 @@ pub async fn build_sink<C, R, RR, E, RT>(
partition_key_field: Option<String>,
batch_settings: BatcherSettings,
client: C,
retry_logic: RT,
) -> crate::Result<VectorSink>
where
C: SendRecord + Clone + Send + Sync + 'static,
Expand All @@ -98,7 +106,7 @@ where

let region = config.region.region();
let service = ServiceBuilder::new()
.settings::<RT, BatchKinesisRequest<RR>>(request_limits, RT::default())
.settings::<RT, BatchKinesisRequest<RR>>(request_limits, retry_logic)
.service(KinesisService::<C, R, E> {
client,
stream_name: config.stream_name.clone(),
Expand Down
17 changes: 16 additions & 1 deletion src/sinks/aws_kinesis/firehose/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use futures::FutureExt;
use snafu::Snafu;
use vector_config::configurable_component;

use crate::sinks::util::retries::RetryAction;
use crate::{
aws::{create_client, is_retriable_error, ClientBuilder},
config::{AcknowledgementsConfig, GenerateConfig, Input, ProxyConfig, SinkConfig, SinkContext},
Expand Down Expand Up @@ -141,6 +142,9 @@ impl SinkConfig for KinesisFirehoseSinkConfig {
None,
batch_settings,
KinesisFirehoseClient { client },
KinesisRetryLogic {
retry_partial: self.base.request_retry_partial,
},
)
.await?;

Expand All @@ -167,7 +171,9 @@ impl GenerateConfig for KinesisFirehoseSinkConfig {
}

#[derive(Clone, Default)]
struct KinesisRetryLogic;
struct KinesisRetryLogic {
retry_partial: bool,
}

impl RetryLogic for KinesisRetryLogic {
type Error = SdkError<KinesisError>;
Expand All @@ -181,4 +187,13 @@ impl RetryLogic for KinesisRetryLogic {
}
is_retriable_error(error)
}

fn should_retry_response(&self, response: &Self::Response) -> RetryAction {
if self.retry_partial && response.failure_count > 0 {
let msg = format!("partial error count {}", response.failure_count);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to include the error type and reason if we can pull that out of the response reasonably.

return RetryAction::Retry(msg.into());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixme: doesn't need a return

} else {
RetryAction::Successful
}
}
}
16 changes: 14 additions & 2 deletions src/sinks/aws_kinesis/firehose/record.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use crate::sinks::aws_kinesis::KinesisResponse;
use aws_sdk_firehose::output::PutRecordBatchOutput;
use aws_sdk_firehose::types::{Blob, SdkError};
use bytes::Bytes;
use tracing::Instrument;
Expand Down Expand Up @@ -46,14 +48,24 @@ impl SendRecord for KinesisFirehoseClient {
type T = KinesisRecord;
type E = KinesisError;

async fn send(&self, records: Vec<Self::T>, stream_name: String) -> Option<SdkError<Self::E>> {
async fn send(
&self,
records: Vec<Self::T>,
stream_name: String,
) -> Result<KinesisResponse, SdkError<Self::E>> {
let rec_count = records.len().clone();

self.client
.put_record_batch()
.set_records(Some(records))
.delivery_stream_name(stream_name)
.send()
.instrument(info_span!("request").or_current())
.await
.err()
.map(|output: PutRecordBatchOutput| KinesisResponse {
count: rec_count,
failure_count: output.failed_put_count().unwrap_or(0) as usize,
events_byte_size: 0,
Copy link
Contributor Author

@jasongoodwin jasongoodwin Mar 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the events size isn't available here. Wasn't sure the best way to modify this - will think about it. I may just return the failure count for now, and build the KinesisResponse in the Service

})
}
}
2 changes: 2 additions & 0 deletions src/sinks/aws_kinesis/firehose/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ async fn check_batch_size() {
request: Default::default(),
tls: None,
auth: Default::default(),
request_retry_partial: false,
acknowledgements: Default::default(),
};

Expand Down Expand Up @@ -62,6 +63,7 @@ async fn check_batch_events() {
request: Default::default(),
tls: None,
auth: Default::default(),
request_retry_partial: false,
acknowledgements: Default::default(),
};

Expand Down
7 changes: 6 additions & 1 deletion src/sinks/aws_kinesis/record.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::sinks::aws_kinesis::KinesisResponse;
use async_trait::async_trait;
use aws_smithy_client::SdkError;
use bytes::Bytes;
Expand All @@ -24,5 +25,9 @@ pub trait SendRecord {
type E;

/// Sends the records.
async fn send(&self, records: Vec<Self::T>, stream_name: String) -> Option<SdkError<Self::E>>;
async fn send(
&self,
records: Vec<Self::T>,
stream_name: String,
) -> Result<KinesisResponse, SdkError<Self::E>>;
}
20 changes: 7 additions & 13 deletions src/sinks/aws_kinesis/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ where
}

pub struct KinesisResponse {
count: usize,
events_byte_size: usize,
pub(crate) count: usize,
pub(crate) failure_count: usize,
pub(crate) events_byte_size: usize,
}

impl DriverResponse for KinesisResponse {
Expand Down Expand Up @@ -73,7 +74,6 @@ where
// Emission of internal events for errors and dropped events is handled upstream by the caller.
fn call(&mut self, requests: BatchKinesisRequest<R>) -> Self::Future {
let events_byte_size = requests.get_metadata().events_byte_size();
let count = requests.get_metadata().event_count();

let records = requests
.events
Expand All @@ -85,16 +85,10 @@ where
let stream_name = self.stream_name.clone();

Box::pin(async move {
// Returning a Result (a trait that implements Try) is not a stable feature,
// so instead we have to explicitly check for error and return.
// https://github.com/rust-lang/rust/issues/84277
if let Some(e) = client.send(records, stream_name).await {
return Err(e);
}

Ok(KinesisResponse {
count,
events_byte_size,
client.send(records, stream_name).await.map(|mut r| {
// augment the response
r.events_byte_size = events_byte_size;
r
})
})
}
Expand Down
17 changes: 16 additions & 1 deletion src/sinks/aws_kinesis/streams/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use futures::FutureExt;
use snafu::Snafu;
use vector_config::{component::GenerateConfig, configurable_component};

use crate::sinks::util::retries::RetryAction;
use crate::{
aws::{create_client, is_retriable_error, ClientBuilder},
config::{AcknowledgementsConfig, Input, ProxyConfig, SinkConfig, SinkContext},
Expand Down Expand Up @@ -148,6 +149,9 @@ impl SinkConfig for KinesisStreamsSinkConfig {
self.partition_key_field.clone(),
batch_settings,
KinesisStreamClient { client },
KinesisRetryLogic {
retry_partial: self.base.request_retry_partial,
},
)
.await?;

Expand All @@ -174,7 +178,9 @@ impl GenerateConfig for KinesisStreamsSinkConfig {
}
}
#[derive(Default, Clone)]
struct KinesisRetryLogic;
struct KinesisRetryLogic {
retry_partial: bool,
}

impl RetryLogic for KinesisRetryLogic {
type Error = SdkError<KinesisError>;
Expand All @@ -188,6 +194,15 @@ impl RetryLogic for KinesisRetryLogic {
}
is_retriable_error(error)
}

fn should_retry_response(&self, response: &Self::Response) -> RetryAction {
if self.retry_partial && response.failure_count > 0 {
let msg = format!("partial error count {}", response.failure_count);
return RetryAction::Retry(msg.into());
} else {
RetryAction::Successful
}
}
}

#[cfg(test)]
Expand Down
15 changes: 13 additions & 2 deletions src/sinks/aws_kinesis/streams/record.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use crate::sinks::aws_kinesis::KinesisResponse;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be moved into the use super:: line below.

use aws_sdk_kinesis::output::PutRecordsOutput;
use aws_sdk_kinesis::types::{Blob, SdkError};
use bytes::Bytes;
use tracing::Instrument;
Expand Down Expand Up @@ -62,14 +64,23 @@ impl SendRecord for KinesisStreamClient {
type T = KinesisRecord;
type E = KinesisError;

async fn send(&self, records: Vec<Self::T>, stream_name: String) -> Option<SdkError<Self::E>> {
async fn send(
&self,
records: Vec<Self::T>,
stream_name: String,
) -> Result<KinesisResponse, SdkError<Self::E>> {
let rec_count = records.len().clone();
self.client
.put_records()
.set_records(Some(records))
.stream_name(stream_name)
.send()
.instrument(info_span!("request").or_current())
.await
.err()
.map(|output: PutRecordsOutput| KinesisResponse {
count: rec_count,
failure_count: output.failed_record_count().unwrap_or(0) as usize,
events_byte_size: 0,
})
Comment on lines +80 to +84
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It definitely feels better to me to do this in the service.rs

}
}