Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add BrokerPool #4

Merged
merged 3 commits into from
Jan 4, 2022
Merged

feat: add BrokerPool #4

merged 3 commits into from
Jan 4, 2022

Conversation

tustvold
Copy link
Contributor

@tustvold tustvold commented Jan 4, 2022

This adds a BrokerPool that tries to connect to one of a list of bootstrap brokers, and then caches this connection.

A future PR will add functionality to fetch the full list of brokers and also cache this.

This is following the docs here

src/client.rs Outdated
for upper_bound in (ApiVersionsRequest::API_VERSION_RANGE.0 .0 .0
..=ApiVersionsRequest::API_VERSION_RANGE.1 .0 .0)
.rev()
I: IntoIterator<Item = A>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it really necessary to take an iterator here? Nobody is going to pass in thousand of connections here, so I think a simple vector would also do the job and would result in a somewhat more ergonomic interface.

tests/client.rs Outdated
@@ -49,5 +49,5 @@ macro_rules! maybe_skip_kafka_integration {
#[tokio::test]
async fn test_connect() {
let connection = maybe_skip_kafka_integration!();
Client::new(connection).await;
Client::new(std::iter::once(connection)).await;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even when we keep the IntoIter type, I think vec![connection] is easier to read.

src/messenger.rs Outdated
/// A connection to a single broker
///
/// Note: Requests to the same [`Messenger`] will be pipelined by Kafka
/// -
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// -

Or was there more?

);
Duration::from_secs_f64(std::mem::replace(&mut self.next_backoff_secs, next_backoff))
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
}
}
// TODO: tests (either statistical or by using a fixed RNG)

Comment on lines +60 to +64
let brokers = if self.discovered_brokers.is_empty() {
&self.bootstrap_brokers
} else {
&self.discovered_brokers
};
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about chaining this these two lists (discovered first, then falling back to bootstrap) in case the cluster got into some weird state (e.g. you have two brokers, the bootstrap broker doesn't report itself and after a while comes back and the discovered broker dies).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The discovered brokers should include the bootstrap brokers - will add docs to clarify

@tustvold tustvold added automerge Instruct kodiak to merge the PR kodiak: merge.method = 'squash' Instruct kodiak to perform a squash merge labels Jan 4, 2022
@kodiakhq kodiakhq bot merged commit f2a6237 into main Jan 4, 2022
@carols10cents carols10cents deleted the broker-pool branch January 21, 2022 18:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
automerge Instruct kodiak to merge the PR kodiak: merge.method = 'squash' Instruct kodiak to perform a squash merge
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants