Skip to content

Commit

Permalink
Merge pull request #588 from fd4s/mk-type-param
Browse files Browse the repository at this point in the history
  • Loading branch information
bplommer authored Apr 10, 2021
2 parents f269875 + edd2692 commit ffb0338
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,19 @@ import org.apache.kafka.common.serialization.ByteArrayDeserializer
* lexical scope.
*/
trait MkConsumer[F[_]] {
def apply(settings: ConsumerSettings[F, _, _]): F[KafkaByteConsumer]
def apply[G[_]](settings: ConsumerSettings[G, _, _]): F[KafkaByteConsumer]
}

object MkConsumer {
implicit def mkConsumerForSync[F[_]](implicit F: Sync[F]): MkConsumer[F] =
settings =>
F.delay {
new MkConsumer[F] {
def apply[G[_]](settings: ConsumerSettings[G, _, _]): F[KafkaByteConsumer] = F.delay {
val byteArrayDeserializer = new ByteArrayDeserializer
new org.apache.kafka.clients.consumer.KafkaConsumer(
(settings.properties: Map[String, AnyRef]).asJava,
byteArrayDeserializer,
byteArrayDeserializer
)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,19 @@ import fs2.kafka.internal.converters.collection._
* lexical scope.
*/
trait MkProducer[F[_]] {
def apply(settings: ProducerSettings[F, _, _]): F[KafkaByteProducer]
def apply[G[_]](settings: ProducerSettings[G, _, _]): F[KafkaByteProducer]
}

object MkProducer {
implicit def mkProducerForSync[F[_]](implicit F: Sync[F]): MkProducer[F] =
settings =>
F.delay {
new MkProducer[F] {
def apply[G[_]](settings: ProducerSettings[G, _, _]): F[KafkaByteProducer] = F.delay {
val byteArraySerializer = new ByteArraySerializer
new org.apache.kafka.clients.producer.KafkaProducer(
(settings.properties: Map[String, AnyRef]).asJava,
byteArraySerializer,
byteArraySerializer
)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -159,24 +159,26 @@ class TransactionalKafkaProducerSpec extends BaseKafkaSpec with EitherValues {

val error = new RuntimeException("BOOM")

implicit val mk: MkProducer[IO] = settings =>
IO.delay {
new org.apache.kafka.clients.producer.KafkaProducer[Array[Byte], Array[Byte]](
(settings.properties: Map[String, AnyRef]).asJava,
new ByteArraySerializer,
new ByteArraySerializer
) {
override def sendOffsetsToTransaction(
offsets: util.Map[TopicPartition, OffsetAndMetadata],
consumerGroupId: String
): Unit =
if (offsets.containsKey(new TopicPartition(topic, 2))) {
throw error
} else {
super.sendOffsetsToTransaction(offsets, consumerGroupId)
}
implicit val mk: MkProducer[IO] = new MkProducer[IO] {
def apply[G[_]](settings: ProducerSettings[G, _, _]): IO[KafkaByteProducer] =
IO.delay {
new org.apache.kafka.clients.producer.KafkaProducer[Array[Byte], Array[Byte]](
(settings.properties: Map[String, AnyRef]).asJava,
new ByteArraySerializer,
new ByteArraySerializer
) {
override def sendOffsetsToTransaction(
offsets: util.Map[TopicPartition, OffsetAndMetadata],
consumerGroupId: String
): Unit =
if (offsets.containsKey(new TopicPartition(topic, 2))) {
throw error
} else {
super.sendOffsetsToTransaction(offsets, consumerGroupId)
}
}
}
}
}

val produced =
(for {
Expand Down Expand Up @@ -232,8 +234,8 @@ class TransactionalKafkaProducerSpec extends BaseKafkaSpec with EitherValues {
createCustomTopic(topic, partitions = 3)
val toProduce = (0 to 100).toList.map(n => s"key-$n" -> s"value-$n")

implicit val mkProducer: MkProducer[IO] = settings =>
IO.delay {
implicit val mkProducer: MkProducer[IO] = new MkProducer[IO] {
def apply[G[_]](settings: ProducerSettings[G, _, _]): IO[KafkaByteProducer] = IO.delay {
new org.apache.kafka.clients.producer.KafkaProducer[Array[Byte], Array[Byte]](
(settings.properties: Map[String, AnyRef]).asJava,
new ByteArraySerializer,
Expand All @@ -245,6 +247,7 @@ class TransactionalKafkaProducerSpec extends BaseKafkaSpec with EitherValues {
}
}
}
}

val produced =
(for {
Expand Down

0 comments on commit ffb0338

Please sign in to comment.