-
Notifications
You must be signed in to change notification settings - Fork 34
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add mu kafka consumer producer #793
Conversation
.eval( | ||
Logger[F].info( | ||
result.records.head | ||
.fold("Error: ProducerResult contained empty records.")(a => s"Published $a") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Codecov Report
@@ Coverage Diff @@
## master #793 +/- ##
==========================================
+ Coverage 88.71% 88.87% +0.16%
==========================================
Files 58 61 +3
Lines 833 863 +30
Branches 2 1 -1
==========================================
+ Hits 739 767 +28
- Misses 94 96 +2
Continue to review full report at Codecov.
|
.unsafeRunAsyncAndForget() | ||
|
||
kafka | ||
.consumer(topic, consumerGroup, putConsumeMessageIntoFuture) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@raulraja putConsumeMessageIntoFuture is the function for processing the message. Devs experienced in fs2 can use higherkindness.mu.kafka.ConsumerStream directly.
} | ||
(consumed, processor) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Below are the two functions for creating a Kafka producer and consumer wrapped in IO monads.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
😊 looks great
): F[Unit] = | ||
ConsumerStream(topic, consumerSettings.atLeastOnceFromEarliest(groupId, brokers)) | ||
.through(messageProcessingPipe) | ||
.compile |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we keep the result of consumer
as Stream[F, A]
. In this way, the user could consume a message, process it and then produce this new message on a new topic 🤔
def consumer[F[_], A]( | ||
topic: String, | ||
groupId: String, | ||
messageProcessingPipe: Pipe[F, A, A] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it make sense the signature Pipe[F, A, A]
? Let's say that I consume a message (UserAdded), with its id I go to the db and I get another thing (UserInfo) and I want to return a mix between those 2 data models (UserAddedInfo). Could it be possible?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tzimisce012 yes I was considering allowing the return type to be different.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we're calling .drain
, the return type of the pipe doesn't matter, right? We may as well make it Pipe[F, A, _]
.
val a = decoder.decode(message.record.value) | ||
for { | ||
_ <- fs2.Stream.eval(logger.info(a.toString)) | ||
} yield a |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where do you commit the read messages? 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tzimisce012 With h.m.k.consumerSetting.atLeastOnceFromEarliest setttings, I am leaving it to be auto.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I just saw this .withEnableAutoCommit(true)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we'll need to iterate on this later. Right now I only see atLeastOnceFromEarliest
, which turns on auto-commit. We'll probably want to provide more flexibility in commit strategies, possibly by exposing fs2-kafka's CommitableConsumerRecord
or something equivalent to it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, at the moment it is quite rigid. I wanted to avoid getting this PR longer than it is now 😄. So I added only a simple case.
I want to develop the example project further, mu-kafka-sandbox
by adding more complicated scenarios to feed into the mu-kafka
development.
logger: Logger[F] | ||
): Stream[F, A] = | ||
kafkaConsumerStream | ||
.evalTap(_.subscribeTo(topic)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you considering the option of partitionStream
? In this way, you will have a stream of streams, but on each stream you will have ordered messages. Also, on this way, each stream can be processed on parallel (if you wish that)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tzimisce012 Yes, I want to tackle more complicated scenarios later. With this PR, I wanted to make that everyone is happy with the general direction :)
object AvroWithSchema { | ||
implicit def encoder[T: SchemaFor: ToRecord]: Encoder[T] = new Encoder[T] { | ||
override def encode(t: T): Array[Byte] = { | ||
val bOut = new ByteArrayOutputStream() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this looks pretty similar to this
mu-scala/modules/internal/src/main/scala/higherkindness/mu/rpc/internal/encoders/avro.scala
Lines 58 to 63 in 136f81e
val baos: ByteArrayOutputStream = new ByteArrayOutputStream() | |
val output: AvroBinaryOutputStream[A] = AvroOutputStream.binary[A](baos) | |
output.write(value) | |
output.close() | |
new ByteArrayInputStream(baos.toByteArray) |
Would it be possible to put the code in one place and reuse it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I thought about this as well. I will add a ticket to refactor to share the common code.
override def decode(bytes: Array[Byte]): T = { | ||
val in = AvroInputStream.data[T](bytes) | ||
in.close() | ||
in.iterator.toSet.head |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same that above. In this case, it seems we're not closing the stream in the internal
mu-scala/modules/internal/src/main/scala/higherkindness/mu/rpc/internal/encoders/avro.scala
Lines 53 to 54 in 136f81e
val input: AvroBinaryInputStream[A] = AvroInputStream.binary[A](stream) | |
input.iterator.toList.head |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this file, I don't think the in.close()
is doing anything, because the AvroInputStream is wrapping a SeekableByteArrayInput
, whose close()
method is a no-op.
In the file linked by @fedefernandez, I think we're doing the right thing? The gRPC Javadoc doesn't say anything about whether we should close the stream, but the standard Java idiom is that you don't close a resource you were passed as an argument. Closing the stream is the responsibility of its creator.
|
||
object ConsumerStream { | ||
|
||
def apply[F[_]: Sync, A](topic: String, settings: ConsumerSettings[F, String, Array[Byte]])( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we don't need Sync
since we have the ConcurrentEffect
instance
concurrentEffect: ConcurrentEffect[F], | ||
timer: Timer[F], | ||
decoder: Decoder[A], | ||
sync: Sync[F] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not needed at this point
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The : Sync
context bound is also not needed
val a = decoder.decode(message.record.value) | ||
for { | ||
_ <- fs2.Stream.eval(logger.info(a.toString)) | ||
} yield a |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can use evalTap
in the logger print
kafkaConsumerStream
.evalTap(_.subscribeTo(topic))
.flatMap(_.stream.flatMap(msg => decoder.decode(msg.record.value)))
.evalTap(a => logger.info(a.toString))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On the other hand, I don't know if this (print the message) is something we want to do at library level or at library client level
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I know what you meant. In general it would be better if the library does less but it would be such a useful thing to have for debugging purpose to ensure that everything is working at the library level. We could put it at DEBUG or even at TRACE level.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Setting the log level to DEBUG
looks good to me. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would not log the message contents at the library level, even as DEBUG. The message contents might be huge, and may contain sensitive data that shouldn't be logged. Best to let the user decide what they want to log.
implicit contextShift: ContextShift[F], | ||
concurrentEffect: ConcurrentEffect[F], | ||
timer: Timer[F], | ||
sync: Sync[F], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sync: Sync[F], |
implicit contextShift: ContextShift[F], | ||
concurrentEffect: ConcurrentEffect[F], | ||
timer: Timer[F], | ||
sync: Sync[F], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sync: Sync[F], |
implicit contextShift: ContextShift[F], | ||
concurrentEffect: ConcurrentEffect[F], | ||
timer: Timer[F], | ||
sync: Sync[F], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sync: Sync[F], |
.fold("Error: ProducerResult contained empty records.")(a => s"Published $a") | ||
) | ||
) | ||
.flatMap(_ => Stream.eval(sync.delay(result))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.flatMap(_ => Stream.eval(sync.delay(result))) | |
.evalMap(_ => sync.delay(result)) |
) | ||
.covary[F] | ||
.through(publishToKafka) | ||
.flatMap(result => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
evalMap
?
project/ProjectPlugin.scala
Outdated
@@ -3,7 +3,7 @@ import microsites.MicrositesPlugin.autoImport._ | |||
import microsites._ | |||
import sbt.Keys._ | |||
import sbt.ScriptedPlugin.autoImport._ | |||
import sbt._ | |||
import sbt.{compilerPlugin, _} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
import sbt.{compilerPlugin, _} | |
import sbt._ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@naree the overall implementation looks good to me
* limitations under the License. | ||
*/ | ||
|
||
package higherkindness.mu.format |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe Encoder
and Decoder
should be moved somewhere under the higherkindness.mu.kafka
package? Even though they are already namespaced by being in the kafka
module, just looking at the FQN higherkindness.mu.format.Decoder
doesn't give you any hint that it's Kafka-related.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, I don't think they need to be related. I put h.m.format
package inside kafka
project for now but this package only contains serialisation related code. In fact as pointed by @fedefernandez, the serialisation code is duplicated else where in mu-scala
. We just keep one copy and put somewhere more appropriate. mu-format? mu-marshaller?
I think h.m.format.Decoder
and h.m.format.Encoder
should be treated as general interface rather than specific to mu-kafka
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use the internal
module? If not, I'm ok with mu-format
but I'd avoid extra modules as possible (we have a lot of already 😅 ).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@fedefernandez true. I am happy to move it to internal
and remove the duplicate code as pointed out. We can always break it out to its own module as & when necessary.
package higherkindness.mu.format | ||
|
||
trait Decoder[A] { | ||
def decode(a: Array[Byte]): A |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would expect this to return something like Either[DecodeError, A]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I agree. This PR is missing an error handling code. We will need to add appropriate actions when decoding fails. My suggestion is that we log it and throw away the message, ie commit the offset. All other approaches I have seen such as dead message topic, brings up its own complications. Or we can leave it to the end user.
sync: Sync[F] | ||
): Stream[F, A] = | ||
for { | ||
implicit0(logger: Logger[F]) <- fs2.Stream.eval(Slf4jLogger.create[F]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: there are a few places where you can replace fs2.Stream
with just Stream
, as you have it imported
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Always welcome free nit picking 😄. What the review is for!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The general design looks good to me :D
Can this be merged? Thanks! |
Yes, I will rebase first and merge it. |
I'll merge this now. We'll need some follow-up PRs to iterate on the design, resolve some of the TODOs in the code and things pointed out in commments above, and add documentation for the feature. I think the next thing to try would be making use of fs2-kafka's |
Thanks @naree! |
What this does?
https://github.com/47deg/marlow/issues/515
https://github.com/47deg/marlow/issues/542
This PR is about creating APIs for building Kafka consumer and producer with minimum effort with fs2-kafka and models defined and generated using sbt-mu-srcgen.
The kafka poc has been updated to use the APIs
https://github.com/naree/mu-kafka-sandbox
An example in the IT test and good entry point to the API use.
https://github.com/higherkindness/mu-scala/blob/add-mu-kafka-consumer-producer/modules/kafka/src/test/scala/higherkindness/mu/kafka/it/example/MuKafkaServiceSpec.scala#L88-L95
Two types of developers were considered as the target users
The idea is not to shield developers from accessing fs2-stream but provide simple APIs to start from.
Additionally, I made the assumption that the developers would be familiar with
concurrent programming and understand how to select the correct ContextShift.
This PR is one of features for mu-kafka.
Still to come in later PRs.
Checklist