Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add partitionsFor method to producer #1147

Merged
merged 1 commit into from
Jan 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions zio-kafka-test/src/test/scala/zio/kafka/ProducerSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,12 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
metrics <- Producer.metrics
} yield assertTrue(metrics.nonEmpty)
},
test("partitionsFor") {
for {
topic <- randomTopic
info <- Producer.partitionsFor(topic).debug
} yield assertTrue(info.headOption.map(_.topic()) == Some(topic))
},
suite("transactions")(
test("a simple transaction") {
import Subscription._
Expand Down
16 changes: 15 additions & 1 deletion zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package zio.kafka.producer

import org.apache.kafka.clients.producer.{ KafkaProducer, Producer => JProducer, ProducerRecord, RecordMetadata }
import org.apache.kafka.common.serialization.ByteArraySerializer
import org.apache.kafka.common.{ Metric, MetricName }
import org.apache.kafka.common.{ Metric, MetricName, PartitionInfo }
import zio._
import zio.kafka.serde.Serializer
import zio.kafka.utils.SslHelper
Expand Down Expand Up @@ -169,6 +169,11 @@ trait Producer {
records: Chunk[ProducerRecord[Array[Byte], Array[Byte]]]
): UIO[UIO[Chunk[Either[Throwable, RecordMetadata]]]]

/**
* Get the partition metadata for the given topic
*/
def partitionsFor(topic: String): Task[Chunk[PartitionInfo]]

/**
* Flushes the producer's internal buffer. This will guarantee that all records currently buffered will be transmitted
* to the broker.
Expand Down Expand Up @@ -337,6 +342,12 @@ object Producer {
): RIO[R & Producer, Chunk[RecordMetadata]] =
ZIO.serviceWithZIO[Producer](_.produceChunk(records, keySerializer, valueSerializer))

/**
* Accessor method
*/
def partitionsFor(topic: String): RIO[Producer, Chunk[PartitionInfo]] =
ZIO.serviceWithZIO(_.partitionsFor(topic))

/**
* Accessor method
*/
Expand Down Expand Up @@ -440,6 +451,9 @@ private[producer] final class ProducerLive(
} yield done.await
}

override def partitionsFor(topic: String): Task[Chunk[PartitionInfo]] =
ZIO.attemptBlocking(Chunk.fromJavaIterable(p.partitionsFor(topic)))

override def flush: Task[Unit] = ZIO.attemptBlocking(p.flush())

override def metrics: Task[Map[MetricName, Metric]] = ZIO.attemptBlocking(p.metrics().asScala.toMap)
Expand Down