-
Notifications
You must be signed in to change notification settings - Fork 104
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add 'describeCluster' and 'createTopic' operations to KafkaAdminClient. #88
Conversation
* NOTE: Kafka's base [[AdminClient]] supports creating multiple topics in a | ||
* single request, but the result isn't transactional. We limit to a single | ||
* topic-per-request to avoid complications in error reporting for cases when | ||
* some topic creations fail and others succeed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do you all feel about this design choice? I couldn't think of an obvious way to convert the Map[String, Future[Void]]
of topic-creation results provided by Kafka into a single F[_]
capturing all failures and all successes, and figured it's easy enough for users to build that behavior however they'd like using Cats and fs2.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can do something similar to what we do for e.g. describeTopics
:
def createTopics[G[_]](topics: G[NewTopic])(
implicit G: Foldable[G]
): F[Unit]
and createTopicsWith
then simply becomes the following.
private[this] def createTopicsWith[F[_], G[_]](
client: Client[F],
topics: G[NewTopic]
)(implicit G: Foldable[G]): F[Unit] =
client(_.createTopics(topics.asJava).all.void)
You might need to add void
to KafkaFutureSyntax
in fs2.kafka.internal.syntax
.
def void: KafkaFuture[Unit] =
map(_ => ())
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 to using Foldable
to handle the input collection, but my worry is about the .all
call. The underlying KafkaFuture.allOf
arbitrarily chooses a single error to bubble up if multiple Futures have failed. For our use-case, we'd need to be able to collect & report every failure.
If there's interest on your end, I can spend some time setting up more complex error-handling (i.e. using a custom Exception
type that can collect all the errors). Otherwise I'll make the change as you suggest, and we'll just only call the method with one topic at a time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's difficult to cater for all use cases here, but it's pretty easy to write yourself using only createTopics
(or createTopic
for a single topic), parTraverse
, and attempt
. You can also easily include the topic name, or other details you care about.
topics.parTraverse(topic => client.createTopic(topic).attempt)
// e.g. IO[List[Either[Throwable,Unit]]]
I've pushed some changes, including createTopic
. Would you mind having a look?
sealed abstract class CreateTopic[F[_]] { | ||
|
||
/** Name of the topic being created. */ | ||
def topicName: String |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added this to make error reporting easier if you've got a collection of CreateTopic[F]
s
Codecov Report
@@ Coverage Diff @@
## master #88 +/- ##
==========================================
+ Coverage 98.71% 98.72% +0.01%
==========================================
Files 38 38
Lines 932 940 +8
Branches 52 53 +1
==========================================
+ Hits 920 928 +8
Misses 12 12
Continue to review full report at Codecov.
|
client: Client[F], | ||
topic: NewTopic | ||
): F[Unit] = | ||
client(_.createTopics(java.util.Collections.singleton(topic)).all.void) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it make sense for this to instead call createTopicsWith(client, List(topic))
, to avoid the duplicate logic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could, but we avoid creating the extra intermediate List
by doing it this way.
Thanks for the help! LGTM, should I push a few more tests to make codecov happy? |
Looks like it was me who missed the additional tests -- pushed now. Thanks a lot for this contribution! 👍 I'll merge this and get a new release out today. |
I'm hoping to use
fs2-kafka
in a new project at work, but I need these two operations. I tried to follow the existing patterns in theKafkaAdminClient
, but please let me know if I should change anything!