From e9cb98d2ff9a4411544b97429d278bf8945e4c57 Mon Sep 17 00:00:00 2001 From: Anton Ovchinnikov Date: Thu, 23 Jul 2020 17:48:52 +0200 Subject: [PATCH 1/4] feat(kafka): Report producer errors --- relay-server/src/actors/store.rs | 23 +++++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/relay-server/src/actors/store.rs b/relay-server/src/actors/store.rs index dd08a0fb2c..468f8cef3b 100644 --- a/relay-server/src/actors/store.rs +++ b/relay-server/src/actors/store.rs @@ -8,8 +8,8 @@ use actix::prelude::*; use bytes::Bytes; use failure::{Fail, ResultExt}; use rdkafka::error::KafkaError; -use rdkafka::producer::{BaseRecord, DefaultProducerContext}; -use rdkafka::ClientConfig; +use rdkafka::producer::{BaseRecord, DeliveryResult, ProducerContext}; +use rdkafka::{ClientConfig, ClientContext}; use rmp_serde::encode::Error as RmpError; use serde::{ser::Error, Serialize}; @@ -23,7 +23,22 @@ use crate::envelope::{AttachmentType, Envelope, Item, ItemType}; use crate::metrics::RelayCounters; use crate::service::{ServerError, ServerErrorKind}; -type ThreadedProducer = rdkafka::producer::ThreadedProducer; +struct CaptureErrorContext; + +impl ClientContext for CaptureErrorContext {} +impl ProducerContext for CaptureErrorContext { + type DeliveryOpaque = (); + fn delivery(&self, result: &DeliveryResult, _delivery_opaque: Self::DeliveryOpaque) { + if let Err((e, _message)) = result { + println!("BLABLA Error: {}", e); + + // TODO send a metric + // metric!(counter(RelayCounters::EventProtocol) += 1); + } + } +} + +type ThreadedProducer = rdkafka::producer::ThreadedProducer; lazy_static::lazy_static! { static ref NAMESPACE_DID: Uuid = @@ -61,7 +76,7 @@ impl StoreForwarder { } let producer = client_config - .create() + .create_with_context(CaptureErrorContext) .context(ServerErrorKind::KafkaError)?; Ok(Self { From d53a42a9087a1a077544004f22ba18453a605753 Mon Sep 17 00:00:00 2001 From: Anton Ovchinnikov Date: Thu, 23 Jul 2020 18:49:45 +0200 Subject: [PATCH 2/4] New module, do the same for outcomes --- relay-server/src/actors/outcome.rs | 7 +++---- relay-server/src/actors/store.rs | 22 +++------------------- relay-server/src/utils/kafka.rs | 20 ++++++++++++++++++++ relay-server/src/utils/mod.rs | 6 ++++++ 4 files changed, 32 insertions(+), 23 deletions(-) create mode 100644 relay-server/src/utils/kafka.rs diff --git a/relay-server/src/actors/outcome.rs b/relay-server/src/actors/outcome.rs index b446e2b51d..51130e66ae 100644 --- a/relay-server/src/actors/outcome.rs +++ b/relay-server/src/actors/outcome.rs @@ -338,7 +338,7 @@ mod processing { use failure::{Fail, ResultExt}; use rdkafka::error::KafkaError; - use rdkafka::producer::{BaseRecord, DefaultProducerContext}; + use rdkafka::producer::BaseRecord; use rdkafka::ClientConfig; use serde_json::Error as SerdeSerializationError; @@ -347,8 +347,7 @@ mod processing { use crate::metrics::RelayCounters; use crate::service::ServerErrorKind; - - type ThreadedProducer = rdkafka::producer::ThreadedProducer; + use crate::utils::{CaptureErrorContext, ThreadedProducer}; #[derive(Fail, Debug)] pub enum OutcomeError { @@ -388,7 +387,7 @@ mod processing { client_config.set(config_p.name.as_str(), config_p.value.as_str()); } let future_producer = client_config - .create() + .create_with_context(CaptureErrorContext) .context(ServerErrorKind::KafkaError)?; (Some(future_producer), None) } else { diff --git a/relay-server/src/actors/store.rs b/relay-server/src/actors/store.rs index 468f8cef3b..c8b57d0f5a 100644 --- a/relay-server/src/actors/store.rs +++ b/relay-server/src/actors/store.rs @@ -8,8 +8,8 @@ use actix::prelude::*; use bytes::Bytes; use failure::{Fail, ResultExt}; use rdkafka::error::KafkaError; -use rdkafka::producer::{BaseRecord, DeliveryResult, ProducerContext}; -use rdkafka::{ClientConfig, ClientContext}; +use rdkafka::producer::BaseRecord; +use rdkafka::ClientConfig; use rmp_serde::encode::Error as RmpError; use serde::{ser::Error, Serialize}; @@ -22,23 +22,7 @@ use relay_quotas::Scoping; use crate::envelope::{AttachmentType, Envelope, Item, ItemType}; use crate::metrics::RelayCounters; use crate::service::{ServerError, ServerErrorKind}; - -struct CaptureErrorContext; - -impl ClientContext for CaptureErrorContext {} -impl ProducerContext for CaptureErrorContext { - type DeliveryOpaque = (); - fn delivery(&self, result: &DeliveryResult, _delivery_opaque: Self::DeliveryOpaque) { - if let Err((e, _message)) = result { - println!("BLABLA Error: {}", e); - - // TODO send a metric - // metric!(counter(RelayCounters::EventProtocol) += 1); - } - } -} - -type ThreadedProducer = rdkafka::producer::ThreadedProducer; +use crate::utils::{CaptureErrorContext, ThreadedProducer}; lazy_static::lazy_static! { static ref NAMESPACE_DID: Uuid = diff --git a/relay-server/src/utils/kafka.rs b/relay-server/src/utils/kafka.rs new file mode 100644 index 0000000000..fc3d725e4b --- /dev/null +++ b/relay-server/src/utils/kafka.rs @@ -0,0 +1,20 @@ +use rdkafka::producer::{DeliveryResult, ProducerContext}; +use rdkafka::ClientContext; + +pub struct CaptureErrorContext; + +impl ClientContext for CaptureErrorContext {} + +impl ProducerContext for CaptureErrorContext { + type DeliveryOpaque = (); + fn delivery(&self, result: &DeliveryResult, _delivery_opaque: Self::DeliveryOpaque) { + if let Err((e, _message)) = result { + log::error!("producer error: {}", e); + + // TODO send a metric + // metric!(counter(RelayCounters::EventProtocol) += 1); + } + } +} + +pub type ThreadedProducer = rdkafka::producer::ThreadedProducer; diff --git a/relay-server/src/utils/mod.rs b/relay-server/src/utils/mod.rs index 3e38215a8a..108b2c0ef5 100644 --- a/relay-server/src/utils/mod.rs +++ b/relay-server/src/utils/mod.rs @@ -8,6 +8,9 @@ mod request; mod shutdown; mod timer; +#[cfg(feature = "processing")] +mod kafka; + #[cfg(feature = "processing")] mod unreal; @@ -21,5 +24,8 @@ pub use self::request::*; pub use self::shutdown::*; pub use self::timer::*; +#[cfg(feature = "processing")] +pub use self::kafka::*; + #[cfg(feature = "processing")] pub use self::unreal::*; From 3f9441a5231f0c7de5a9ccc4c4a57c07a58e5b36 Mon Sep 17 00:00:00 2001 From: Anton Ovchinnikov Date: Fri, 24 Jul 2020 10:25:35 +0200 Subject: [PATCH 3/4] LogError and metrics --- relay-server/src/metrics.rs | 8 ++++++++ relay-server/src/utils/kafka.rs | 11 +++++++---- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/relay-server/src/metrics.rs b/relay-server/src/metrics.rs index 7861ff911c..24aabae3ba 100644 --- a/relay-server/src/metrics.rs +++ b/relay-server/src/metrics.rs @@ -215,6 +215,12 @@ pub enum RelayCounters { /// either `event` or `attachment` representing the type of message produced on the Kafka queue. #[cfg(feature = "processing")] ProcessingMessageProduced, + /// Counts the number of producer errors occurred after an event was already enqueued for + /// sending to Kafka. These errors might include e.g. MessageTooLarge errors when the broker + /// does not accept the requests over a certain size, which is usually due to invalic or + /// inconsistent broker/producer configurations. + #[cfg(feature = "processing")] + ProcessingProduceError, /// Counts the number of events that hit any of the Store like endpoints (Store, Security, /// MiniDump, Unreal). The events are counted before they are rate limited , filtered or /// processed in any way. The counter has a `version` tag that tracks the message event @@ -265,6 +271,8 @@ impl CounterMetric for RelayCounters { RelayCounters::ServerStarting => "server.starting", #[cfg(feature = "processing")] RelayCounters::ProcessingMessageProduced => "processing.event.produced", + #[cfg(feature = "processing")] + RelayCounters::ProcessingProduceError => "processing.produce.error", RelayCounters::EventProtocol => "event.protocol", RelayCounters::Requests => "requests", RelayCounters::ResponsesStatusCodes => "responses.status_codes", diff --git a/relay-server/src/utils/kafka.rs b/relay-server/src/utils/kafka.rs index fc3d725e4b..a9bc3cc1c5 100644 --- a/relay-server/src/utils/kafka.rs +++ b/relay-server/src/utils/kafka.rs @@ -1,6 +1,10 @@ use rdkafka::producer::{DeliveryResult, ProducerContext}; use rdkafka::ClientContext; +use relay_common::{metric, LogError}; + +use crate::metrics::RelayCounters; + pub struct CaptureErrorContext; impl ClientContext for CaptureErrorContext {} @@ -8,11 +12,10 @@ impl ClientContext for CaptureErrorContext {} impl ProducerContext for CaptureErrorContext { type DeliveryOpaque = (); fn delivery(&self, result: &DeliveryResult, _delivery_opaque: Self::DeliveryOpaque) { - if let Err((e, _message)) = result { - log::error!("producer error: {}", e); + if let Err((error, _message)) = result { + log::error!("callback producer error: {}", LogError(error)); - // TODO send a metric - // metric!(counter(RelayCounters::EventProtocol) += 1); + metric!(counter(RelayCounters::ProcessingProduceError) += 1); } } } From a666ded391f9b4c0ea804b8aabdfa16cd31d2c58 Mon Sep 17 00:00:00 2001 From: Anton Ovchinnikov Date: Fri, 24 Jul 2020 11:58:48 +0200 Subject: [PATCH 4/4] More comments --- relay-server/src/utils/kafka.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/relay-server/src/utils/kafka.rs b/relay-server/src/utils/kafka.rs index a9bc3cc1c5..60ce0388df 100644 --- a/relay-server/src/utils/kafka.rs +++ b/relay-server/src/utils/kafka.rs @@ -5,15 +5,22 @@ use relay_common::{metric, LogError}; use crate::metrics::RelayCounters; +/// Kafka producer context that logs producer errors pub struct CaptureErrorContext; impl ClientContext for CaptureErrorContext {} impl ProducerContext for CaptureErrorContext { type DeliveryOpaque = (); + + /// This method is called after attempting to send a message to Kafka. + /// It's called asynchronously for every message, so we want to handle errors explicitly here. fn delivery(&self, result: &DeliveryResult, _delivery_opaque: Self::DeliveryOpaque) { if let Err((error, _message)) = result { - log::error!("callback producer error: {}", LogError(error)); + log::error!( + "failed to produce message to Kafka (delivery callback): {}", + LogError(error) + ); metric!(counter(RelayCounters::ProcessingProduceError) += 1); }