Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref(kafka): Move code into separate crate [INGEST-1627] #1563

Merged
merged 16 commits into from
Nov 15, 2022

Conversation

olksdr
Copy link
Contributor

@olksdr olksdr commented Nov 3, 2022

Create relay-kafka crate with its own configuration and producer functionality.

Having dedicated crate will help to keep all the related code together and it also simplifies the dependency tree.

Create `relay-kafka` crate with its own configuration and producer
functionality.

Having dedicated crate will help to keep all the related code together
and it also simplifies the dependency tree.
@olksdr olksdr self-assigned this Nov 3, 2022
@olksdr olksdr requested a review from a team November 3, 2022 06:51
attachments: Producer::create(
&config
.kafka_config(KafkaTopic::Attachments)
.map_err(|_| ServerErrorKind::KafkaError)?,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how make_producers create_with_context worked, but are we now erasing information from the original error that used to be preserved? Or was it always erased and mapped to a ServerErrorKind::KafkaError?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The create_with_context call did not go anywhere, it was moved into the kafka crate and still used but in Producer::create function - which captures and reports the errors.

But we do erase the error here and replace it with ServerErrorKind::KafkaError which means that something wrong with kafka configuration and the producer could not be initialized.

Do you think we wanna keep the error around here as well? I can add required changes

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a nit by the way, but yes, I would give KafkaError a #[cause] ProducerError to propagate the information. Would that work?

.map_err(|_| ServerErrorKind::KafkaError)?,
&mut reused_producers,
)
.map_err(|_| ServerErrorKind::KafkaError)?,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another nit: These expressions all follow the same pattern, should we replace them with a helper function that just takes a KafkaTopic as argument?

Copy link
Member

@jan-auer jan-auer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also list the relay-kafka crate in the doc comment of the main relay crate.


[features]
default = []
processing = [
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feature effectively gives us producer implementations, should we name it as such?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, sounds good.


/// Kafka configuration errors.
#[derive(Fail, Debug)]
pub enum ConfigError {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason this is not in config? This causes some strange imports now where most types get imported from Config except for the related error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I will reorganize the modules and make it simpler and uniform.

@@ -0,0 +1,54 @@
pub mod config;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be great if we could use this as a chance to improve documentation. Particularly, the root doc comment of the crate is valuable because it can explain the purpose of the crate and point to the various entry-points.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 I will create some initial docs here.

relay-kafka/src/lib.rs Show resolved Hide resolved
@@ -0,0 +1,170 @@
use std::{collections::BTreeMap, sync::Arc};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please un-merge those imports. Same with the crate imports below.

/// the producer could not be created.
pub fn create<'a>(
config: &'a KafkaConfig,
reused_producers: &mut ReusedProducersMap<'a>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was fine while this was still in a private module, but this mixes several concerns for a public signature:

  • It creates a new producer instance
  • It memoizes them and determines the data structure for memoization

I realize that this increases the effort for this refactor, but it would be good to clean this up and separate those concerns. Ideally the memo-map is either generic, or becomes a higher-level utility that uses create internally.

In the end, I think the abstraction here is entirely wrong. The real producer is the Arc'ed threaded producer inside. The Producer type here then gets one or more of them and wraps them together with a topic name.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will revisit this and make some changes into this refactor.

type ReusedProducersMap<'a> = BTreeMap<Option<&'a str>, Arc<ThreadedProducer>>;

/// This object containes the Kafka producer variants for single and sharded configurations.
pub enum Producer {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: The internals of the enum are now part of the public signature. Do we want this explicitly?

  • If yes, then let's also introduce a type for the Single variant. This allows us to move specific functionality to a method on that type rather than having large functions covering all variants here.
  • If not, let's make this a struct with a hidden inner field that is then a private enum. I'd still propose the refactor from the previous point, though.

#[cfg(feature = "processing")]
mod producer;
#[cfg(feature = "processing")]
mod utils;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this module just contains producer utilities, why don't we merge them into the producer module? That would also be quite nice if we rename the feature to producer.

/// [`CaptureErrorContext`] context.
pub type ThreadedProducer = rdkafka::producer::ThreadedProducer<CaptureErrorContext>;

enum KafkaCounters {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In order to document these metrics, we need to list them in the documentation CI job here:

args: -p document-metrics -- -o relay_metrics.json relay-server/src/statsd.rs relay-metrics/src/statsd.rs

nit: I would propose to move these into a statsd module so the path stays stable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good 👍

@@ -0,0 +1,54 @@
pub mod config;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add this to the preamble:

#![warn(missing_docs)]
#![warn(missing_debug_implementations)]
#![doc(
    html_logo_url = "https://raw.githubusercontent.com/getsentry/relay/master/artwork/relay-icon.png",
    html_favicon_url = "https://raw.githubusercontent.com/getsentry/relay/master/artwork/relay-icon.png"
)]

Particularly, note that this will require Debug to be implemented on most types. Let's ensure this is the case.

@olksdr olksdr requested review from jjbayer and jan-auer November 14, 2022 08:44
@@ -131,7 +131,7 @@ jobs:
uses: actions-rs/cargo@v1
with:
command: run
args: -p document-metrics -- -o relay_metrics.json relay-server/src/statsd.rs relay-metrics/src/statsd.rs
args: -p document-metrics -- -o relay_metrics.json relay-server/src/statsd.rs relay-metrics/src/statsd.rs relay-kafka/src/statsd.rs
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is easy to miss when adding a crate, would a glob pattern work here? That is, relay-*/src/statsd.rs

relay-kafka/src/producer/mod.rs Show resolved Hide resolved
.map_err(|_| ServerErrorKind::KafkaError)?;
client_builder = client_builder
.add_kafka_topic_config(kafka_config)
.map_err(|_| ServerErrorKind::KafkaError)?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This nit still stands, but it's not a blocker:

I would give KafkaError a #[cause] ProducerError to propagate the information.

https://github.com/getsentry/relay/pull/1563/files/3050fca26247f6d2e61d77c2b74842a93a721465#r1013800430

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jjbayer so, the reason that the error neither here nor in any other case when the ServerErrorKind is used is that, the enum contains all the "empty" variants and also implement Copy and Clone and also Hash where most of the error do not do that, see:

/// Indicates the type of failure of the server.
#[derive(Debug, Fail, Copy, Clone, PartialEq, Eq, Hash)]
pub enum ServerErrorKind {
/// Binding failed.
#[fail(display = "bind to interface failed")]
BindFailed,
/// Listening on the HTTP socket failed.
#[fail(display = "listening failed")]
ListenFailed,
/// A TLS error ocurred.
#[fail(display = "could not initialize the TLS server")]
TlsInitFailed,
/// TLS support was not compiled in.
#[fail(display = "compile with the `ssl` feature to enable SSL support")]
TlsNotSupported,
/// GeoIp construction failed.
#[fail(display = "could not load the Geoip Db")]
GeoIpError,
/// Configuration failed.
#[fail(display = "configuration error")]
ConfigError,
/// Initializing the Kafka producer failed.
#[fail(display = "could not initialize kafka producer")]
KafkaError,
/// Initializing the Redis cluster client failed.
#[fail(display = "could not initialize redis cluster client")]
RedisError,
}

In case we want to propagate the errors - we would need either to implement Hash trait on kafka ClientError and remove Clone and Copy, or if we add only additional formatted message inside the variant - we would have to at least remove Copy and Clone and return reference in

*self.inner.get_context()

What do you think would be better?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Ignore my comment then, given that this error only happens on initialization I think we can live without the extra information being bubbled up.

Copy link
Member

@jan-auer jan-auer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent documentation, thank you for taking the time to write this!

This is good to merge from my end, see minor and optional comments below.

@@ -103,7 +103,7 @@ jobs:
args: --workspace --all-targets --no-deps -- -D warnings

test:
timeout-minutes: 15
timeout-minutes: 20
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that timeout change still required?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it can be removed now. Added before rust cache fix.

description = "Kafka related functionality for Relay"
homepage = "https://getsentry.github.io/relay/"
repository = "https://github.com/getsentry/relay"
version = "22.9.0"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The version is outdated.

Suggested change
version = "22.9.0"
version = "22.10.0"

Please also note that there's a self-hosted release today, so ideally we merge before that.

impl KafkaTopic {
/// Returns iterator over the variants of [`KafkaTopic`].
/// It will have to be adjusted if the new variants are added.
pub fn iter() -> std::slice::Iter<'static, KafkaTopic> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pub fn iter() -> std::slice::Iter<'static, KafkaTopic> {
pub fn iter() -> std::slice::Iter<'static, Self> {

nit: This iterates references of KafkaTopic. It would be slightly more ergonomic to iterate owned references, but it's likely not worth the added complexity. There would be std::array::IntoIter, but that will internally own the array.

/// Configuration for topics.
#[derive(Serialize, Deserialize, Debug)]
#[serde(default)]
pub struct TopicAssignments {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To separate concerns, it would be beneficial if the relay-kafka crate doesn't know about the topic assignments. For instance, we could leave the TopicAssignment type here, but move TopicAssignments and KafkaTopic back into relay-config or wherever it's best defined. That way, we don't have to rebuild it every time we add a new topic.

I'm happy to merge the PR as-is though, and follow up with this later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed! I will followup on this

},
}

/// Sharded Kafka config
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Sharded Kafka config
/// Sharded Kafka config.

//!
//! # Usage
//!
//! ```compile_fail
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's unfortunate that we cannot even compile this because of missing features. Maybe we move the documentation of the producer part into the producer module so it can compile? (that won't work since the module is not public, just its contents)

Regardless, we should at least insert the correct imports here.

@olksdr olksdr merged commit d4b7e54 into master Nov 15, 2022
@olksdr olksdr deleted the feat/relay-kafka-crate branch November 15, 2022 14:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants