Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add 'describeCluster' and 'createTopic' operations to KafkaAdminClient. #88

Merged
merged 4 commits into from
Feb 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,10 @@ lazy val mimaSettings = Seq(
ProblemFilters.exclude[DirectMissingMethodProblem]("fs2.kafka.ConsumerSettings#ConsumerSettingsImpl.this"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.kafka.Headers.asJava"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.kafka.Headers.withKey"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.kafka.Headers.concat")
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.kafka.Headers.concat"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.kafka.KafkaAdminClient.createTopic"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.kafka.KafkaAdminClient.createTopics"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.kafka.KafkaAdminClient.describeCluster")
)
// format: on
}
Expand Down
91 changes: 88 additions & 3 deletions src/main/scala/fs2/kafka/KafkaAdminClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import fs2.kafka.KafkaAdminClient._
import fs2.kafka.internal.syntax._
import org.apache.kafka.clients.admin._
import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.common.{KafkaFuture, TopicPartition}
import org.apache.kafka.common.{KafkaFuture, Node, TopicPartition}

/**
* [[KafkaAdminClient]] represents an admin client for Kafka, which is able to
Expand All @@ -35,6 +35,39 @@ import org.apache.kafka.common.{KafkaFuture, TopicPartition}
*/
sealed abstract class KafkaAdminClient[F[_]] {

/**
* Creates the specified topic.
*/
def createTopic(topic: NewTopic): F[Unit]

/**
* Creates the specified topics.
*/
def createTopics[G[_]](topics: G[NewTopic])(
implicit G: Foldable[G]
): F[Unit]

/**
* Describes the cluster. Returns nodes using:
*
* {{{
* describeCluster.nodes
* }}}
*
* or the controller node using:
*
* {{{
* describeCluster.controller
* }}}
*
* or the cluster ID using the following.
*
* {{{
* describeCluster.clusterId
* }}}
*/
def describeCluster: DescribeCluster[F]

/**
* Describes the consumer groups with the specified group ids, returning a
* `Map` with group ids as keys, and `ConsumerGroupDescription`s as values.
Expand Down Expand Up @@ -113,6 +146,18 @@ sealed abstract class KafkaAdminClient[F[_]] {
}

object KafkaAdminClient {
private[this] def createTopicWith[F[_]](
client: Client[F],
topic: NewTopic
): F[Unit] =
client(_.createTopics(java.util.Collections.singleton(topic)).all.void)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense for this to instead call createTopicsWith(client, List(topic)), to avoid the duplicate logic?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could, but we avoid creating the extra intermediate List by doing it this way.


private[this] def createTopicsWith[F[_], G[_]](
client: Client[F],
topics: G[NewTopic]
)(implicit G: Foldable[G]): F[Unit] =
client(_.createTopics(topics.asJava).all.void)

private[this] def describeConsumerGroupsWith[F[_], G[_]](
client: Client[F],
groupIds: G[String]
Expand All @@ -125,6 +170,35 @@ object KafkaAdminClient {
)(implicit G: Foldable[G]): F[Map[String, TopicDescription]] =
client(_.describeTopics(topics.asJava).all.map(_.toMap))

sealed abstract class DescribeCluster[F[_]] {

/** Lists available nodes in the cluster. */
def nodes: F[Set[Node]]

/** The node in the cluster acting as the current controller. */
def controller: F[Node]

/** Current cluster ID. */
def clusterId: F[String]
}

private[this] def describeClusterWith[F[_]](
client: Client[F]
): DescribeCluster[F] =
new DescribeCluster[F] {
override def nodes: F[Set[Node]] =
client(_.describeCluster.nodes.map(_.toSet))

override def controller: F[Node] =
client(_.describeCluster.controller)

override def clusterId: F[String] =
client(_.describeCluster.clusterId)

override def toString: String =
"DescribeCluster$" + System.identityHashCode(this)
}

sealed abstract class ListTopics[F[_]] {

/** Lists topic names. */
Expand Down Expand Up @@ -306,12 +380,23 @@ object KafkaAdminClient {
)(implicit F: Concurrent[F]): Resource[F, KafkaAdminClient[F]] =
createAdminClient(settings).map { client =>
new KafkaAdminClient[F] {
def describeConsumerGroups[G[_]](groupIds: G[String])(
override def createTopic(topic: NewTopic): F[Unit] =
createTopicWith(client, topic)

override def createTopics[G[_]](topics: G[NewTopic])(
implicit G: Foldable[G]
): F[Unit] =
createTopicsWith(client, topics)

override def describeCluster: DescribeCluster[F] =
describeClusterWith(client)

override def describeConsumerGroups[G[_]](groupIds: G[String])(
implicit G: Foldable[G]
): F[Map[String, ConsumerGroupDescription]] =
describeConsumerGroupsWith(client, groupIds)

def describeTopics[G[_]](topics: G[String])(
override def describeTopics[G[_]](topics: G[String])(
implicit G: Foldable[G]
): F[Map[String, TopicDescription]] =
describeTopicsWith(client, topics)
Expand Down
3 changes: 3 additions & 0 deletions src/main/scala/fs2/kafka/internal/syntax.scala
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,9 @@ private[kafka] object syntax {
def map[B](f: A => B): KafkaFuture[B] =
future.thenApply(baseFunction(f))

def void: KafkaFuture[Unit] =
map(_ => ())

def cancelToken[F[_]](implicit F: Sync[F]): CancelToken[F] =
F.delay { future.cancel(true); () }

Expand Down
18 changes: 18 additions & 0 deletions src/test/scala/fs2/kafka/KafkaAdminClientSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package fs2.kafka

import cats.effect.IO
import cats.implicits._
import org.apache.kafka.clients.admin.NewTopic
import org.apache.kafka.common.TopicPartition

final class KafkaAdminClientSpec extends BaseKafkaSpec {
Expand All @@ -25,6 +26,12 @@ final class KafkaAdminClientSpec extends BaseKafkaSpec {

adminClientResource[IO](adminClientSettings(config)).use { adminClient =>
for {
clusterNodes <- adminClient.describeCluster.nodes
_ <- IO(assert(clusterNodes.size == 1))
clusterController <- adminClient.describeCluster.controller
_ <- IO(assert(!clusterController.isEmpty))
clusterId <- adminClient.describeCluster.clusterId
_ <- IO(assert(clusterId.nonEmpty))
consumerGroupIds <- adminClient.listConsumerGroups.groupIds
_ <- IO(assert(consumerGroupIds.size == 1))
consumerGroupListings <- adminClient.listConsumerGroups.listings
Expand Down Expand Up @@ -60,6 +67,9 @@ final class KafkaAdminClientSpec extends BaseKafkaSpec {
_ <- IO(assert(topicNamesToListingsInternal.size == 2))
describedTopics <- adminClient.describeTopics(topicNames.toList)
_ <- IO(assert(describedTopics.size == 1))
_ <- IO {
adminClient.describeCluster.toString should startWith("DescribeCluster$")
}
_ <- IO {
adminClient.toString should startWith("KafkaAdminClient$")
}
Expand All @@ -85,6 +95,14 @@ final class KafkaAdminClientSpec extends BaseKafkaSpec {
.forPartitions(List(new TopicPartition("topic", 0)))
.toString shouldBe "ListConsumerGroupOffsetsForPartitions(groupId = group, partitions = List(topic-0))"
}
newTopic = new NewTopic("new-test-topic", 1, 1)
preCreateNames <- adminClient.listTopics.names
_ <- IO(assert(!preCreateNames.contains(newTopic.name)))
_ <- adminClient.createTopic(newTopic)
postCreateNames <- adminClient.listTopics.names
createAgain <- adminClient.createTopics(List(newTopic)).attempt
_ <- IO(assert(createAgain.isLeft))
_ <- IO(assert(postCreateNames.contains(newTopic.name)))
} yield ()
}.unsafeRunSync
}
Expand Down