From a392928c6cd8a1d58c828b4f5a10f60c02535251 Mon Sep 17 00:00:00 2001 From: Kyri Petrou Date: Fri, 12 Jan 2024 08:49:33 +1100 Subject: [PATCH] Add `partitionsFor` method to producer --- .../src/test/scala/zio/kafka/ProducerSpec.scala | 6 ++++++ .../main/scala/zio/kafka/producer/Producer.scala | 16 +++++++++++++++- 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/ProducerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/ProducerSpec.scala index a894e7fe4..202dc44b3 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/ProducerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/ProducerSpec.scala @@ -266,6 +266,12 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { metrics <- Producer.metrics } yield assertTrue(metrics.nonEmpty) }, + test("partitionsFor") { + for { + topic <- randomTopic + info <- Producer.partitionsFor(topic).debug + } yield assertTrue(info.headOption.map(_.topic()) == Some(topic)) + }, suite("transactions")( test("a simple transaction") { import Subscription._ diff --git a/zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala b/zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala index 633545382..f5548dd06 100644 --- a/zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala @@ -2,7 +2,7 @@ package zio.kafka.producer import org.apache.kafka.clients.producer.{ KafkaProducer, Producer => JProducer, ProducerRecord, RecordMetadata } import org.apache.kafka.common.serialization.ByteArraySerializer -import org.apache.kafka.common.{ Metric, MetricName } +import org.apache.kafka.common.{ Metric, MetricName, PartitionInfo } import zio._ import zio.kafka.serde.Serializer import zio.kafka.utils.SslHelper @@ -169,6 +169,11 @@ trait Producer { records: Chunk[ProducerRecord[Array[Byte], Array[Byte]]] ): UIO[UIO[Chunk[Either[Throwable, RecordMetadata]]]] + /** + * Get the partition metadata for the given topic + */ + def partitionsFor(topic: String): Task[Chunk[PartitionInfo]] + /** * Flushes the producer's internal buffer. This will guarantee that all records currently buffered will be transmitted * to the broker. @@ -337,6 +342,12 @@ object Producer { ): RIO[R & Producer, Chunk[RecordMetadata]] = ZIO.serviceWithZIO[Producer](_.produceChunk(records, keySerializer, valueSerializer)) + /** + * Accessor method + */ + def partitionsFor(topic: String): RIO[Producer, Chunk[PartitionInfo]] = + ZIO.serviceWithZIO(_.partitionsFor(topic)) + /** * Accessor method */ @@ -440,6 +451,9 @@ private[producer] final class ProducerLive( } yield done.await } + override def partitionsFor(topic: String): Task[Chunk[PartitionInfo]] = + ZIO.attemptBlocking(Chunk.fromJavaIterable(p.partitionsFor(topic))) + override def flush: Task[Unit] = ZIO.attemptBlocking(p.flush()) override def metrics: Task[Map[MetricName, Metric]] = ZIO.attemptBlocking(p.metrics().asScala.toMap)