Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change to support effectful deserializers #120

Merged
merged 1 commit into from
Apr 18, 2019
Merged

Conversation

vlovgr
Copy link
Contributor

@vlovgr vlovgr commented Apr 17, 2019

Similarly to #118, this pull request adds support for effectul deserializers.

Previously, Deserializer[A] extended the Java Kafka Deserializer, providing the following function.

def deserialize(topic: String, headers: Headers, bytes: Array[Byte]): A

While A can be anything, including e.g. IO[B] for some type B, we have to return ConsumerRecord[K, V], which in case of using IO would be ConsumerRecord[IO[K], IO[V]]. This means users will have to deal with deserialization somewhere else, which is far from ideal. It would be better if ConsumerRecord[K, V] could be returned even though deserializers are effectful.

Like for serializers, only the most basic deserializers do not make use of effects. For example, Confluent provides a popular Avro deserializer for use with their schema registry. The deserializer can automatically fetch schemas and perform certain compatibility checks as part of deserialize, but these side effects are generally not captured properly.

To support effectful deserializers, Deserializer[A] becomes Deserializer[F[_], A] with deserialize:

def deserialize(topic: String, headers: Headers, bytes: Array[Byte]): F[A]

essentially only changing the return type to F[A]. Deserializers are not restricted to effect types only for F[_], but can work with e.g. Id. However, when used with a consumer, we require F[_] to be an effect type (requiring Sync[F]).

The implication of this change is that ConsumerSettings is now also parameterized on F[_], and consumers now deserialize to F[A], only retrieving the Array[Byte] from the Java Kafka consumer, whereas before the Java Kafka consumer would perform the deserialization to A.

Since deserialization used to happen on the Java Kafka consumer thread, and since many deserializers are in fact blocking (including the Confluent Avro deserializer), we default to keep deserialization on the dedicated ExecutionContext and provide withShiftDeserialization on ConsumerSettings for changing behaviour.

Deserializer no longer extends the Java Kafka Deserializer and ConsumerSettings cannot be created using Kafka Deserializers directly. Instead, use Deserializer.delegate and Deserializer#suspend to create a Deserializer which delegates to a Java instance, wrapping in pure and suspend.


Detailed changes as follows.

  • Add Deserializer#suspend to capture side effects from an impure Deserializer.
  • Change Deserializer to be parameterized on F[_].
    • This means that Deserializers can now include effects (e.g. IO).
    • Deserialization functions have also been parameterized on F[_].
  • Change ConsumerSettings to be parameterized on F[_].
    • The context is limited to effect types (requiring Sync[F]).
    • ConsumerSettings#apply with implicit deserializers now requires F[_] to be specified explicitly, unless it can be inferred from the context.
  • Remove ConsumerFactory, and ConsumerSettings#consumerFactory and withConsumerFactory.
    • Add ConsumerSettings#createConsumer and withCreateConsumer as replacements.
  • Add ConsumerSettings#shiftDeserialization and withShiftDeserialization.
    • This controls whether deserialization should run on the dedicated ExecutionContext or not.
    • Defaults to true to retain current behaviour and to keep supporting blocking deserializers.
  • Remove Deserializer#delay and add Deserializer#defer as a generic replacement.
  • Remove Deserializer.Attempt and Deserializer.attempt.
    • Usages have been replaced with F[_] requiring ApplicativeError[F, Throwable].
  • Remove ConsumerSettings#apply accepting Java Kafka Deserializers.
    • For interoperability, wrap the Java Kafka Deserializer with Deserializer.delegate and use Deserializer#suspend to capture any non-pure behaviours.
  • Remove apply functions accepting ExecutionContext when creating ConsumerSettings and ProducerSettings.
    • If you want to provide a custom ExecutionContext, use withExecutionContext.

@codecov
Copy link

codecov bot commented Apr 17, 2019

Codecov Report

Merging #120 into 0.20.x will decrease coverage by 0.19%.
The diff coverage is 97.22%.

Impacted file tree graph

@@            Coverage Diff            @@
##           0.20.x     #120     +/-   ##
=========================================
- Coverage   93.75%   93.55%   -0.2%     
=========================================
  Files          41       40      -1     
  Lines        1217     1211      -6     
  Branches       77       83      +6     
=========================================
- Hits         1141     1133      -8     
- Misses         76       78      +2
Impacted Files Coverage Δ
src/main/scala/fs2/kafka/ConsumerStream.scala 100% <ø> (ø) ⬆️
src/main/scala/fs2/kafka/package.scala 100% <ø> (ø) ⬆️
src/main/scala/fs2/kafka/internal/instances.scala 100% <ø> (ø) ⬆️
src/main/scala/fs2/kafka/KafkaConsumer.scala 100% <ø> (ø) ⬆️
src/main/scala/fs2/kafka/ConsumerResource.scala 100% <ø> (ø) ⬆️
src/main/scala/fs2/kafka/KafkaProducer.scala 100% <ø> (ø) ⬆️
src/main/scala/fs2/kafka/internal/LogEntry.scala 26.47% <ø> (ø) ⬆️
src/main/scala/fs2/kafka/Deserializer.scala 100% <100%> (ø) ⬆️
src/main/scala/fs2/kafka/internal/syntax.scala 100% <100%> (ø) ⬆️
src/main/scala/fs2/kafka/ProducerSettings.scala 100% <100%> (ø) ⬆️
... and 3 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 24bc461...aed9904. Read the comment docs.

@vlovgr vlovgr force-pushed the deserializer-effect branch from c29050c to aed9904 Compare April 17, 2019 12:42
@vlovgr vlovgr merged commit 8be507e into 0.20.x Apr 18, 2019
@vlovgr vlovgr deleted the deserializer-effect branch April 18, 2019 10:35
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging this pull request may close these issues.

1 participant