Skip to content

Commit

Permalink
feat(kafka): Report producer errors (#677)
Browse files Browse the repository at this point in the history
  • Loading branch information
tonyo authored Jul 27, 2020
1 parent b8781d6 commit d12cbe1
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 8 deletions.
7 changes: 3 additions & 4 deletions relay-server/src/actors/outcome.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ mod processing {

use failure::{Fail, ResultExt};
use rdkafka::error::KafkaError;
use rdkafka::producer::{BaseRecord, DefaultProducerContext};
use rdkafka::producer::BaseRecord;
use rdkafka::ClientConfig;
use serde_json::Error as SerdeSerializationError;

Expand All @@ -347,8 +347,7 @@ mod processing {

use crate::metrics::RelayCounters;
use crate::service::ServerErrorKind;

type ThreadedProducer = rdkafka::producer::ThreadedProducer<DefaultProducerContext>;
use crate::utils::{CaptureErrorContext, ThreadedProducer};

#[derive(Fail, Debug)]
pub enum OutcomeError {
Expand Down Expand Up @@ -388,7 +387,7 @@ mod processing {
client_config.set(config_p.name.as_str(), config_p.value.as_str());
}
let future_producer = client_config
.create()
.create_with_context(CaptureErrorContext)
.context(ServerErrorKind::KafkaError)?;
(Some(future_producer), None)
} else {
Expand Down
7 changes: 3 additions & 4 deletions relay-server/src/actors/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use actix::prelude::*;
use bytes::Bytes;
use failure::{Fail, ResultExt};
use rdkafka::error::KafkaError;
use rdkafka::producer::{BaseRecord, DefaultProducerContext};
use rdkafka::producer::BaseRecord;
use rdkafka::ClientConfig;
use rmp_serde::encode::Error as RmpError;
use serde::{ser::Error, Serialize};
Expand All @@ -22,8 +22,7 @@ use relay_quotas::Scoping;
use crate::envelope::{AttachmentType, Envelope, Item, ItemType};
use crate::metrics::RelayCounters;
use crate::service::{ServerError, ServerErrorKind};

type ThreadedProducer = rdkafka::producer::ThreadedProducer<DefaultProducerContext>;
use crate::utils::{CaptureErrorContext, ThreadedProducer};

lazy_static::lazy_static! {
static ref NAMESPACE_DID: Uuid =
Expand Down Expand Up @@ -61,7 +60,7 @@ impl StoreForwarder {
}

let producer = client_config
.create()
.create_with_context(CaptureErrorContext)
.context(ServerErrorKind::KafkaError)?;

Ok(Self {
Expand Down
8 changes: 8 additions & 0 deletions relay-server/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,12 @@ pub enum RelayCounters {
/// either `event` or `attachment` representing the type of message produced on the Kafka queue.
#[cfg(feature = "processing")]
ProcessingMessageProduced,
/// Counts the number of producer errors occurred after an event was already enqueued for
/// sending to Kafka. These errors might include e.g. MessageTooLarge errors when the broker
/// does not accept the requests over a certain size, which is usually due to invalic or
/// inconsistent broker/producer configurations.
#[cfg(feature = "processing")]
ProcessingProduceError,
/// Counts the number of events that hit any of the Store like endpoints (Store, Security,
/// MiniDump, Unreal). The events are counted before they are rate limited , filtered or
/// processed in any way. The counter has a `version` tag that tracks the message event
Expand Down Expand Up @@ -265,6 +271,8 @@ impl CounterMetric for RelayCounters {
RelayCounters::ServerStarting => "server.starting",
#[cfg(feature = "processing")]
RelayCounters::ProcessingMessageProduced => "processing.event.produced",
#[cfg(feature = "processing")]
RelayCounters::ProcessingProduceError => "processing.produce.error",
RelayCounters::EventProtocol => "event.protocol",
RelayCounters::Requests => "requests",
RelayCounters::ResponsesStatusCodes => "responses.status_codes",
Expand Down
30 changes: 30 additions & 0 deletions relay-server/src/utils/kafka.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
use rdkafka::producer::{DeliveryResult, ProducerContext};
use rdkafka::ClientContext;

use relay_common::{metric, LogError};

use crate::metrics::RelayCounters;

/// Kafka producer context that logs producer errors
pub struct CaptureErrorContext;

impl ClientContext for CaptureErrorContext {}

impl ProducerContext for CaptureErrorContext {
type DeliveryOpaque = ();

/// This method is called after attempting to send a message to Kafka.
/// It's called asynchronously for every message, so we want to handle errors explicitly here.
fn delivery(&self, result: &DeliveryResult, _delivery_opaque: Self::DeliveryOpaque) {
if let Err((error, _message)) = result {
log::error!(
"failed to produce message to Kafka (delivery callback): {}",
LogError(error)
);

metric!(counter(RelayCounters::ProcessingProduceError) += 1);
}
}
}

pub type ThreadedProducer = rdkafka::producer::ThreadedProducer<CaptureErrorContext>;
6 changes: 6 additions & 0 deletions relay-server/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ mod request;
mod shutdown;
mod timer;

#[cfg(feature = "processing")]
mod kafka;

#[cfg(feature = "processing")]
mod unreal;

Expand All @@ -21,5 +24,8 @@ pub use self::request::*;
pub use self::shutdown::*;
pub use self::timer::*;

#[cfg(feature = "processing")]
pub use self::kafka::*;

#[cfg(feature = "processing")]
pub use self::unreal::*;

0 comments on commit d12cbe1

Please sign in to comment.