Skip to content

Commit

Permalink
fix(amqp sink): remove unnecessary unwrap & emit event dropped errors (
Browse files Browse the repository at this point in the history
…#18923)

* fix(amqp sink): remove unnecessary unwrap & emit event dropped errors

* return error too

* fix checks

* feedback
  • Loading branch information
dsmith3197 authored Oct 24, 2023
1 parent 249330a commit 26f430c
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 21 deletions.
36 changes: 30 additions & 6 deletions src/internal_events/amqp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,9 @@ pub mod sink {

impl InternalEvent for AmqpDeliveryError<'_> {
fn emit(self) {
let deliver_reason = "Unable to deliver.";
const DELIVER_REASON: &str = "Unable to deliver.";

error!(message = deliver_reason,
error!(message = DELIVER_REASON,
error = ?self.error,
error_type = error_type::REQUEST_FAILED,
stage = error_stage::SENDING,
Expand All @@ -120,7 +120,7 @@ pub mod sink {
);
emit!(ComponentEventsDropped::<UNINTENTIONAL> {
count: 1,
reason: deliver_reason
reason: DELIVER_REASON
});
}
}
Expand All @@ -132,9 +132,9 @@ pub mod sink {

impl InternalEvent for AmqpAcknowledgementError<'_> {
fn emit(self) {
let ack_reason = "Acknowledgement failed.";
const ACK_REASON: &str = "Acknowledgement failed.";

error!(message = ack_reason,
error!(message = ACK_REASON,
error = ?self.error,
error_type = error_type::REQUEST_FAILED,
stage = error_stage::SENDING,
Expand All @@ -147,7 +147,31 @@ pub mod sink {
);
emit!(ComponentEventsDropped::<UNINTENTIONAL> {
count: 1,
reason: ack_reason
reason: ACK_REASON
});
}
}

#[derive(Debug)]
pub struct AmqpNackError;

impl InternalEvent for AmqpNackError {
fn emit(self) {
const DELIVER_REASON: &str = "Received Negative Acknowledgement from AMQP broker.";
error!(
message = DELIVER_REASON,
error_type = error_type::ACKNOWLEDGMENT_FAILED,
stage = error_stage::SENDING,
internal_log_rate_limit = true,
);
counter!(
"component_errors_total", 1,
"error_type" => error_type::ACKNOWLEDGMENT_FAILED,
"stage" => error_stage::SENDING,
);
emit!(ComponentEventsDropped::<UNINTENTIONAL> {
count: 1,
reason: DELIVER_REASON
});
}
}
Expand Down
25 changes: 10 additions & 15 deletions src/sinks/amqp/service.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! The main tower service that takes the request created by the request builder
//! and sends it to `AMQP`.
use crate::{
internal_events::sink::{AmqpAcknowledgementError, AmqpDeliveryError},
internal_events::sink::{AmqpAcknowledgementError, AmqpDeliveryError, AmqpNackError},
sinks::prelude::*,
};
use bytes::Bytes;
Expand Down Expand Up @@ -88,10 +88,13 @@ pub(super) struct AmqpService {
#[derive(Debug, Snafu)]
pub(super) enum AmqpError {
#[snafu(display("Failed retrieving Acknowledgement: {}", error))]
AmqpAcknowledgementFailed { error: lapin::Error },
AcknowledgementFailed { error: lapin::Error },

#[snafu(display("Failed AMQP request: {}", error))]
AmqpDeliveryFailed { error: lapin::Error },
DeliveryFailed { error: lapin::Error },

#[snafu(display("Received Negative Acknowledgement from AMQP broker."))]
Nack,
}

impl Service<AmqpRequest> for AmqpService {
Expand All @@ -109,11 +112,6 @@ impl Service<AmqpRequest> for AmqpService {
let channel = Arc::clone(&self.channel);

Box::pin(async move {
channel
.confirm_select(lapin::options::ConfirmSelectOptions::default())
.await
.unwrap();

let byte_size = req.body.len();
let fut = channel
.basic_publish(
Expand All @@ -128,16 +126,13 @@ impl Service<AmqpRequest> for AmqpService {
match fut {
Ok(result) => match result.await {
Ok(lapin::publisher_confirm::Confirmation::Nack(_)) => {
warn!("Received Negative Acknowledgement from AMQP server.");
Ok(AmqpResponse {
json_size: req.metadata.into_events_estimated_json_encoded_byte_size(),
byte_size,
})
emit!(AmqpNackError);
Err(AmqpError::Nack)
}
Err(error) => {
// TODO: In due course the caller could emit these on error.
emit!(AmqpAcknowledgementError { error: &error });
Err(AmqpError::AmqpAcknowledgementFailed { error })
Err(AmqpError::AcknowledgementFailed { error })
}
Ok(_) => Ok(AmqpResponse {
json_size: req.metadata.into_events_estimated_json_encoded_byte_size(),
Expand All @@ -147,7 +142,7 @@ impl Service<AmqpRequest> for AmqpService {
Err(error) => {
// TODO: In due course the caller could emit these on error.
emit!(AmqpDeliveryError { error: &error });
Err(AmqpError::AmqpDeliveryFailed { error })
Err(AmqpError::DeliveryFailed { error })
}
}
})
Expand Down
1 change: 1 addition & 0 deletions src/sinks/amqp/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ impl AmqpSink {
.await
.map_err(|e| BuildError::AmqpCreateFailed { source: e })?;

// Enable confirmations on the channel.
channel
.confirm_select(ConfirmSelectOptions::default())
.await
Expand Down

0 comments on commit 26f430c

Please sign in to comment.