Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change to support effectful deserializers #120

Merged
merged 1 commit into from
Apr 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
version = "2.0.0-RC4"
style = default
maxColumn = 100
project.git = true
continuationIndent.defnSite = 2
version = "2.0.0-RC6"
style = default
maxColumn = 100
project.git = true
continuationIndent.defnSite = 2
2 changes: 1 addition & 1 deletion docs/src/main/mdoc/quick-example.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ object Main extends IOApp {
IO.pure(record.key -> record.value)

val consumerSettings =
ConsumerSettings[String, String]
ConsumerSettings[IO, String, String]
.withAutoOffsetReset(AutoOffsetReset.Earliest)
.withBootstrapServers("localhost")
.withGroupId("group")
Expand Down
72 changes: 0 additions & 72 deletions src/main/scala/fs2/kafka/ConsumerFactory.scala

This file was deleted.

84 changes: 51 additions & 33 deletions src/main/scala/fs2/kafka/ConsumerRecord.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@

package fs2.kafka

import cats.Show
import cats.syntax.show._
import cats.{Apply, Show}
import cats.implicits._
import fs2.kafka.internal.syntax._
import org.apache.kafka.clients.consumer.ConsumerRecord.{NULL_SIZE, NO_TIMESTAMP}
import org.apache.kafka.common.record.TimestampType.{CREATE_TIME, LOG_APPEND_TIME}
Expand Down Expand Up @@ -167,37 +167,55 @@ object ConsumerRecord {
leaderEpoch = None
)

private[kafka] def fromJava[K, V](
record: KafkaConsumerRecord[K, V]
): ConsumerRecord[K, V] =
ConsumerRecordImpl(
topic = record.topic,
partition = record.partition,
offset = record.offset,
key = record.key,
value = record.value,
headers = record.headers.asScala,
timestamp = record.timestampType match {
case CREATE_TIME if record.timestamp != NO_TIMESTAMP =>
Timestamp.createTime(record.timestamp)
case LOG_APPEND_TIME if record.timestamp != NO_TIMESTAMP =>
Timestamp.logAppendTime(record.timestamp)
case _ =>
Timestamp.none
},
serializedKeySize =
if (record.serializedKeySize != NULL_SIZE)
Some(record.serializedKeySize)
else None,
serializedValueSize =
if (record.serializedValueSize != NULL_SIZE)
Some(record.serializedValueSize)
else None,
leaderEpoch =
if (record.leaderEpoch.isPresent)
Some(record.leaderEpoch.get)
else None
)
private[this] def deserializeFromBytes[F[_], K, V](
record: KafkaConsumerRecord,
headers: Headers,
keyDeserializer: Deserializer[F, K],
valueDeserializer: Deserializer[F, V]
)(implicit F: Apply[F]): F[(K, V)] = {
val key = keyDeserializer.deserialize(record.topic, headers, record.key)
val value = valueDeserializer.deserialize(record.topic, headers, record.value)
key.product(value)
}

private[kafka] def fromJava[F[_], K, V](
record: KafkaConsumerRecord,
keyDeserializer: Deserializer[F, K],
valueDeserializer: Deserializer[F, V]
)(implicit F: Apply[F]): F[ConsumerRecord[K, V]] = {
val headers = record.headers.asScala
deserializeFromBytes(record, headers, keyDeserializer, valueDeserializer).map {
case (key, value) =>
ConsumerRecordImpl(
topic = record.topic,
partition = record.partition,
offset = record.offset,
key = key,
value = value,
headers = headers,
timestamp = record.timestampType match {
case CREATE_TIME if record.timestamp != NO_TIMESTAMP =>
Timestamp.createTime(record.timestamp)
case LOG_APPEND_TIME if record.timestamp != NO_TIMESTAMP =>
Timestamp.logAppendTime(record.timestamp)
case _ =>
Timestamp.none
},
serializedKeySize =
if (record.serializedKeySize != NULL_SIZE)
Some(record.serializedKeySize)
else None,
serializedValueSize =
if (record.serializedValueSize != NULL_SIZE)
Some(record.serializedValueSize)
else None,
leaderEpoch =
if (record.leaderEpoch.isPresent)
Some(record.leaderEpoch.get)
else None
)
}
}

implicit def consumerRecordShow[K, V](
implicit
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/fs2/kafka/ConsumerResource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ final class ConsumerResource[F[_]] private[kafka] (
* This is equivalent to using `consumerResource` directly,
* except we're able to infer the key and value type.
*/
def using[K, V](settings: ConsumerSettings[K, V])(
def using[K, V](settings: ConsumerSettings[F, K, V])(
implicit context: ContextShift[F],
timer: Timer[F]
): Resource[F, KafkaConsumer[F, K, V]] =
Expand Down
Loading