diff --git a/build.sbt b/build.sbt index 8ed4b8cb9..41c67dcf3 100644 --- a/build.sbt +++ b/build.sbt @@ -147,7 +147,10 @@ lazy val mimaSettings = Seq( ProblemFilters.exclude[DirectMissingMethodProblem]("fs2.kafka.ConsumerSettings#ConsumerSettingsImpl.this"), ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.kafka.Headers.asJava"), ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.kafka.Headers.withKey"), - ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.kafka.Headers.concat") + ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.kafka.Headers.concat"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.kafka.KafkaAdminClient.createTopic"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.kafka.KafkaAdminClient.createTopics"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.kafka.KafkaAdminClient.describeCluster") ) // format: on } diff --git a/src/main/scala/fs2/kafka/KafkaAdminClient.scala b/src/main/scala/fs2/kafka/KafkaAdminClient.scala index b101d8196..db8a4a7ab 100644 --- a/src/main/scala/fs2/kafka/KafkaAdminClient.scala +++ b/src/main/scala/fs2/kafka/KafkaAdminClient.scala @@ -24,7 +24,7 @@ import fs2.kafka.KafkaAdminClient._ import fs2.kafka.internal.syntax._ import org.apache.kafka.clients.admin._ import org.apache.kafka.clients.consumer.OffsetAndMetadata -import org.apache.kafka.common.{KafkaFuture, TopicPartition} +import org.apache.kafka.common.{KafkaFuture, Node, TopicPartition} /** * [[KafkaAdminClient]] represents an admin client for Kafka, which is able to @@ -35,6 +35,39 @@ import org.apache.kafka.common.{KafkaFuture, TopicPartition} */ sealed abstract class KafkaAdminClient[F[_]] { + /** + * Creates the specified topic. + */ + def createTopic(topic: NewTopic): F[Unit] + + /** + * Creates the specified topics. + */ + def createTopics[G[_]](topics: G[NewTopic])( + implicit G: Foldable[G] + ): F[Unit] + + /** + * Describes the cluster. Returns nodes using: + * + * {{{ + * describeCluster.nodes + * }}} + * + * or the controller node using: + * + * {{{ + * describeCluster.controller + * }}} + * + * or the cluster ID using the following. + * + * {{{ + * describeCluster.clusterId + * }}} + */ + def describeCluster: DescribeCluster[F] + /** * Describes the consumer groups with the specified group ids, returning a * `Map` with group ids as keys, and `ConsumerGroupDescription`s as values. @@ -113,6 +146,18 @@ sealed abstract class KafkaAdminClient[F[_]] { } object KafkaAdminClient { + private[this] def createTopicWith[F[_]]( + client: Client[F], + topic: NewTopic + ): F[Unit] = + client(_.createTopics(java.util.Collections.singleton(topic)).all.void) + + private[this] def createTopicsWith[F[_], G[_]]( + client: Client[F], + topics: G[NewTopic] + )(implicit G: Foldable[G]): F[Unit] = + client(_.createTopics(topics.asJava).all.void) + private[this] def describeConsumerGroupsWith[F[_], G[_]]( client: Client[F], groupIds: G[String] @@ -125,6 +170,35 @@ object KafkaAdminClient { )(implicit G: Foldable[G]): F[Map[String, TopicDescription]] = client(_.describeTopics(topics.asJava).all.map(_.toMap)) + sealed abstract class DescribeCluster[F[_]] { + + /** Lists available nodes in the cluster. */ + def nodes: F[Set[Node]] + + /** The node in the cluster acting as the current controller. */ + def controller: F[Node] + + /** Current cluster ID. */ + def clusterId: F[String] + } + + private[this] def describeClusterWith[F[_]]( + client: Client[F] + ): DescribeCluster[F] = + new DescribeCluster[F] { + override def nodes: F[Set[Node]] = + client(_.describeCluster.nodes.map(_.toSet)) + + override def controller: F[Node] = + client(_.describeCluster.controller) + + override def clusterId: F[String] = + client(_.describeCluster.clusterId) + + override def toString: String = + "DescribeCluster$" + System.identityHashCode(this) + } + sealed abstract class ListTopics[F[_]] { /** Lists topic names. */ @@ -306,12 +380,23 @@ object KafkaAdminClient { )(implicit F: Concurrent[F]): Resource[F, KafkaAdminClient[F]] = createAdminClient(settings).map { client => new KafkaAdminClient[F] { - def describeConsumerGroups[G[_]](groupIds: G[String])( + override def createTopic(topic: NewTopic): F[Unit] = + createTopicWith(client, topic) + + override def createTopics[G[_]](topics: G[NewTopic])( + implicit G: Foldable[G] + ): F[Unit] = + createTopicsWith(client, topics) + + override def describeCluster: DescribeCluster[F] = + describeClusterWith(client) + + override def describeConsumerGroups[G[_]](groupIds: G[String])( implicit G: Foldable[G] ): F[Map[String, ConsumerGroupDescription]] = describeConsumerGroupsWith(client, groupIds) - def describeTopics[G[_]](topics: G[String])( + override def describeTopics[G[_]](topics: G[String])( implicit G: Foldable[G] ): F[Map[String, TopicDescription]] = describeTopicsWith(client, topics) diff --git a/src/main/scala/fs2/kafka/internal/syntax.scala b/src/main/scala/fs2/kafka/internal/syntax.scala index 95411ee63..6c665a091 100644 --- a/src/main/scala/fs2/kafka/internal/syntax.scala +++ b/src/main/scala/fs2/kafka/internal/syntax.scala @@ -172,6 +172,9 @@ private[kafka] object syntax { def map[B](f: A => B): KafkaFuture[B] = future.thenApply(baseFunction(f)) + def void: KafkaFuture[Unit] = + map(_ => ()) + def cancelToken[F[_]](implicit F: Sync[F]): CancelToken[F] = F.delay { future.cancel(true); () } diff --git a/src/test/scala/fs2/kafka/KafkaAdminClientSpec.scala b/src/test/scala/fs2/kafka/KafkaAdminClientSpec.scala index fffd4c1ed..32c71bb65 100644 --- a/src/test/scala/fs2/kafka/KafkaAdminClientSpec.scala +++ b/src/test/scala/fs2/kafka/KafkaAdminClientSpec.scala @@ -2,6 +2,7 @@ package fs2.kafka import cats.effect.IO import cats.implicits._ +import org.apache.kafka.clients.admin.NewTopic import org.apache.kafka.common.TopicPartition final class KafkaAdminClientSpec extends BaseKafkaSpec { @@ -25,6 +26,12 @@ final class KafkaAdminClientSpec extends BaseKafkaSpec { adminClientResource[IO](adminClientSettings(config)).use { adminClient => for { + clusterNodes <- adminClient.describeCluster.nodes + _ <- IO(assert(clusterNodes.size == 1)) + clusterController <- adminClient.describeCluster.controller + _ <- IO(assert(!clusterController.isEmpty)) + clusterId <- adminClient.describeCluster.clusterId + _ <- IO(assert(clusterId.nonEmpty)) consumerGroupIds <- adminClient.listConsumerGroups.groupIds _ <- IO(assert(consumerGroupIds.size == 1)) consumerGroupListings <- adminClient.listConsumerGroups.listings @@ -60,6 +67,9 @@ final class KafkaAdminClientSpec extends BaseKafkaSpec { _ <- IO(assert(topicNamesToListingsInternal.size == 2)) describedTopics <- adminClient.describeTopics(topicNames.toList) _ <- IO(assert(describedTopics.size == 1)) + _ <- IO { + adminClient.describeCluster.toString should startWith("DescribeCluster$") + } _ <- IO { adminClient.toString should startWith("KafkaAdminClient$") } @@ -85,6 +95,14 @@ final class KafkaAdminClientSpec extends BaseKafkaSpec { .forPartitions(List(new TopicPartition("topic", 0))) .toString shouldBe "ListConsumerGroupOffsetsForPartitions(groupId = group, partitions = List(topic-0))" } + newTopic = new NewTopic("new-test-topic", 1, 1) + preCreateNames <- adminClient.listTopics.names + _ <- IO(assert(!preCreateNames.contains(newTopic.name))) + _ <- adminClient.createTopic(newTopic) + postCreateNames <- adminClient.listTopics.names + createAgain <- adminClient.createTopics(List(newTopic)).attempt + _ <- IO(assert(createAgain.isLeft)) + _ <- IO(assert(postCreateNames.contains(newTopic.name))) } yield () }.unsafeRunSync }