Skip to content

Commit

Permalink
Merge pull request #120 from ovotech/deserializer-effect
Browse files Browse the repository at this point in the history
Change to support effectful deserializers
  • Loading branch information
vlovgr authored Apr 18, 2019
2 parents 24bc461 + aed9904 commit 8be507e
Show file tree
Hide file tree
Showing 26 changed files with 640 additions and 612 deletions.
10 changes: 5 additions & 5 deletions .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
version = "2.0.0-RC4"
style = default
maxColumn = 100
project.git = true
continuationIndent.defnSite = 2
version = "2.0.0-RC6"
style = default
maxColumn = 100
project.git = true
continuationIndent.defnSite = 2
2 changes: 1 addition & 1 deletion docs/src/main/mdoc/quick-example.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ object Main extends IOApp {
IO.pure(record.key -> record.value)

val consumerSettings =
ConsumerSettings[String, String]
ConsumerSettings[IO, String, String]
.withAutoOffsetReset(AutoOffsetReset.Earliest)
.withBootstrapServers("localhost")
.withGroupId("group")
Expand Down
72 changes: 0 additions & 72 deletions src/main/scala/fs2/kafka/ConsumerFactory.scala

This file was deleted.

84 changes: 51 additions & 33 deletions src/main/scala/fs2/kafka/ConsumerRecord.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@

package fs2.kafka

import cats.Show
import cats.syntax.show._
import cats.{Apply, Show}
import cats.implicits._
import fs2.kafka.internal.syntax._
import org.apache.kafka.clients.consumer.ConsumerRecord.{NULL_SIZE, NO_TIMESTAMP}
import org.apache.kafka.common.record.TimestampType.{CREATE_TIME, LOG_APPEND_TIME}
Expand Down Expand Up @@ -167,37 +167,55 @@ object ConsumerRecord {
leaderEpoch = None
)

private[kafka] def fromJava[K, V](
record: KafkaConsumerRecord[K, V]
): ConsumerRecord[K, V] =
ConsumerRecordImpl(
topic = record.topic,
partition = record.partition,
offset = record.offset,
key = record.key,
value = record.value,
headers = record.headers.asScala,
timestamp = record.timestampType match {
case CREATE_TIME if record.timestamp != NO_TIMESTAMP =>
Timestamp.createTime(record.timestamp)
case LOG_APPEND_TIME if record.timestamp != NO_TIMESTAMP =>
Timestamp.logAppendTime(record.timestamp)
case _ =>
Timestamp.none
},
serializedKeySize =
if (record.serializedKeySize != NULL_SIZE)
Some(record.serializedKeySize)
else None,
serializedValueSize =
if (record.serializedValueSize != NULL_SIZE)
Some(record.serializedValueSize)
else None,
leaderEpoch =
if (record.leaderEpoch.isPresent)
Some(record.leaderEpoch.get)
else None
)
private[this] def deserializeFromBytes[F[_], K, V](
record: KafkaConsumerRecord,
headers: Headers,
keyDeserializer: Deserializer[F, K],
valueDeserializer: Deserializer[F, V]
)(implicit F: Apply[F]): F[(K, V)] = {
val key = keyDeserializer.deserialize(record.topic, headers, record.key)
val value = valueDeserializer.deserialize(record.topic, headers, record.value)
key.product(value)
}

private[kafka] def fromJava[F[_], K, V](
record: KafkaConsumerRecord,
keyDeserializer: Deserializer[F, K],
valueDeserializer: Deserializer[F, V]
)(implicit F: Apply[F]): F[ConsumerRecord[K, V]] = {
val headers = record.headers.asScala
deserializeFromBytes(record, headers, keyDeserializer, valueDeserializer).map {
case (key, value) =>
ConsumerRecordImpl(
topic = record.topic,
partition = record.partition,
offset = record.offset,
key = key,
value = value,
headers = headers,
timestamp = record.timestampType match {
case CREATE_TIME if record.timestamp != NO_TIMESTAMP =>
Timestamp.createTime(record.timestamp)
case LOG_APPEND_TIME if record.timestamp != NO_TIMESTAMP =>
Timestamp.logAppendTime(record.timestamp)
case _ =>
Timestamp.none
},
serializedKeySize =
if (record.serializedKeySize != NULL_SIZE)
Some(record.serializedKeySize)
else None,
serializedValueSize =
if (record.serializedValueSize != NULL_SIZE)
Some(record.serializedValueSize)
else None,
leaderEpoch =
if (record.leaderEpoch.isPresent)
Some(record.leaderEpoch.get)
else None
)
}
}

implicit def consumerRecordShow[K, V](
implicit
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/fs2/kafka/ConsumerResource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ final class ConsumerResource[F[_]] private[kafka] (
* This is equivalent to using `consumerResource` directly,
* except we're able to infer the key and value type.
*/
def using[K, V](settings: ConsumerSettings[K, V])(
def using[K, V](settings: ConsumerSettings[F, K, V])(
implicit context: ContextShift[F],
timer: Timer[F]
): Resource[F, KafkaConsumer[F, K, V]] =
Expand Down
Loading

0 comments on commit 8be507e

Please sign in to comment.