Skip to content

Commit

Permalink
Merge pull request #664 from fd4s/stream-aliases
Browse files Browse the repository at this point in the history
Alias `stream` as `records`
  • Loading branch information
bplommer authored Aug 23, 2021
2 parents 19ca674 + 5761287 commit 6c9c28e
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 38 deletions.
8 changes: 4 additions & 4 deletions docs/src/main/mdoc/consumers.md
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ object ConsumerStreamExample extends IOApp {
val stream =
KafkaConsumer.stream(consumerSettings)
.subscribeTo("topic")
.stream
.records

stream.compile.drain.as(ExitCode.Success)
}
Expand All @@ -221,7 +221,7 @@ object ConsumerPartitionedStreamExample extends IOApp {
val stream =
KafkaConsumer.stream(consumerSettings)
.subscribeTo("topic")
.partitionedStream
.partitionedRecords
.map { partitionStream =>
partitionStream
.evalMap { committable =>
Expand Down Expand Up @@ -262,7 +262,7 @@ object ConsumerMapAsyncExample extends IOApp {
val stream =
KafkaConsumer.stream(consumerSettings)
.subscribeTo("topic")
.stream
.records
.mapAsync(25) { committable =>
processRecord(committable.record)
}
Expand All @@ -289,7 +289,7 @@ object ConsumerCommitBatchExample extends IOApp {
val stream =
KafkaConsumer.stream(consumerSettings)
.subscribeTo("topic")
.stream
.records
.mapAsync(25) { committable =>
processRecord(committable.record)
.as(committable.offset)
Expand Down
8 changes: 4 additions & 4 deletions docs/src/main/mdoc/producers.md
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ object ProduceExample extends IOApp {
val stream =
KafkaConsumer.stream(consumerSettings)
.subscribeTo("topic")
.stream
.records
.map { committable =>
val key = committable.record.key
val value = committable.record.value
Expand Down Expand Up @@ -199,7 +199,7 @@ object PartitionedProduceExample extends IOApp {
.flatMap { producer =>
KafkaConsumer.stream(consumerSettings)
.subscribeTo("topic")
.partitionedStream
.partitionedRecords
.map { partition =>
partition
.map { committable =>
Expand Down Expand Up @@ -228,7 +228,7 @@ object KafkaProducerProduceExample extends IOApp {
.flatMap { producer =>
KafkaConsumer.stream(consumerSettings)
.subscribeTo("topic")
.stream
.records
.map { committable =>
val key = committable.record.key
val value = committable.record.value
Expand Down Expand Up @@ -258,7 +258,7 @@ object KafkaProducerProduceFlattenExample extends IOApp {
.flatMap { producer =>
KafkaConsumer.stream(consumerSettings)
.subscribeTo("topic")
.stream
.records
.map { committable =>
val key = committable.record.key
val value = committable.record.value
Expand Down
4 changes: 2 additions & 2 deletions docs/src/main/mdoc/quick-example.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ title: Quick Example

Following is an example showing how to:

- use `KafkaConsumer.stream` in order to stream records from Kafka,
- use `KafkaConsumer#records` in order to stream records from Kafka,
- use `produce` to produce newly created records to Kafka,
- use `commitBatchWithin` to commit consumed offsets in batches.

Expand All @@ -32,7 +32,7 @@ object Main extends IOApp {
val stream =
KafkaConsumer.stream(consumerSettings)
.subscribeTo("topic")
.stream
.records
.mapAsync(25) { committable =>
processRecord(committable.record)
.map { case (key, value) =>
Expand Down
16 changes: 14 additions & 2 deletions modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -734,16 +734,28 @@ object KafkaConsumer {
): Stream[F, KafkaConsumer[F, K, V]] =
self.evalTap(_.subscribeTo(firstTopic, remainingTopics: _*))

/**
* A [[Stream]] of records from the allocated [[KafkaConsumer]]. Alias for [[stream]].
* See [[KafkaConsume#stream]]
*/
def records: Stream[F, CommittableConsumerRecord[F, K, V]] = stream

/**
* A [[Stream]] of records from the allocated [[KafkaConsumer]].
* See [[KafkaConsume#stream]]
*/
def stream: Stream[F, CommittableConsumerRecord[F, K, V]] = self.flatMap(_.stream)
def stream: Stream[F, CommittableConsumerRecord[F, K, V]] = self.flatMap(_.records)

/**
* Alias for [[partitionedStream]]. See [[KafkaConsume#partitionedStream]]
*/
def partitionedRecords: Stream[F, Stream[F, CommittableConsumerRecord[F, K, V]]] =
partitionedStream

/**
* See [[KafkaConsume#partitionedStream]]
*/
def partitionedStream: Stream[F, Stream[F, CommittableConsumerRecord[F, K, V]]] =
self.flatMap(_.partitionedStream)
self.flatMap(_.partitionedRecords)
}
}
22 changes: 17 additions & 5 deletions modules/core/src/main/scala/fs2/kafka/consumer/KafkaConsume.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,28 @@ import org.apache.kafka.common.TopicPartition

trait KafkaConsume[F[_], K, V] {

/**
* Consume from all assigned partitions, producing a stream
* of [[CommittableConsumerRecord]]s. Alias for [[stream]].
*/
final def records: Stream[F, CommittableConsumerRecord[F, K, V]] = stream

/**
* Alias for `partitionedStream.parJoinUnbounded`.
* See [[partitionedStream]] for more information.
* See [[partitionedRecords]] for more information.
*
* @note you have to first use `subscribe` to subscribe the consumer
* before using this `Stream`. If you forgot to subscribe, there
* will be a [[NotSubscribedException]] raised in the `Stream`.
*/
def stream: Stream[F, CommittableConsumerRecord[F, K, V]]

/**
* Alias for [[partitionedStream]]
*/
final def partitionedRecords: Stream[F, Stream[F, CommittableConsumerRecord[F, K, V]]] =
partitionedStream

/**
* `Stream` where the elements themselves are `Stream`s which continually
* request records for a single partition. These `Stream`s will have to be
Expand All @@ -32,7 +44,7 @@ trait KafkaConsume[F[_], K, V] {
* limit will be the number of assigned partitions.<br>
* <br>
* If you do not want to process all partitions in parallel, then you
* can use [[stream]] instead, where records for all partitions are in
* can use [[records]] instead, where records for all partitions are in
* a single `Stream`.
*
* @note you have to first use `subscribe` to subscribe the consumer
Expand Down Expand Up @@ -61,8 +73,8 @@ trait KafkaConsume[F[_], K, V] {
* @note you have to first use `subscribe` to subscribe the consumer
* before using this `Stream`. If you forgot to subscribe, there
* will be a [[NotSubscribedException]] raised in the `Stream`.
* @see [[stream]]
* @see [[partitionedStream]]
* @see [[records]]
* @see [[partitionedRecords]]
*/
def partitionsMapStream
: Stream[F, Map[TopicPartition, Stream[F, CommittableConsumerRecord[F, K, V]]]]
Expand All @@ -76,7 +88,7 @@ trait KafkaConsume[F[_], K, V] {
* 2. All currently running streams will continue to run until all in-flight messages will be processed.
* It means that streams will be completed when all fetched messages will be processed.<br>
* <br>
* If some of the [[stream]] methods will be called after [[stopConsuming]] call,
* If some of the [[records]] methods will be called after [[stopConsuming]] call,
* these methods will return empty streams.<br>
* <br>
* More than one call of [[stopConsuming]] will have no effect.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ final class KafkaAdminClientSpec extends BaseKafkaSpec {
KafkaConsumer
.stream(consumerSettings[IO])
.evalTap(_.subscribe(topic.r))
.stream
.records
.take(produced.size.toLong)
.map(_.offset)
.chunks
Expand Down
40 changes: 20 additions & 20 deletions modules/core/src/test/scala/fs2/kafka/KafkaConsumerSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ final class KafkaConsumerSpec extends BaseKafkaSpec {
.subscribeTo(topic)
.evalTap(consumer => IO(consumer.toString should startWith("KafkaConsumer$")).void)
.evalMap(IO.sleep(3.seconds).as(_)) // sleep a bit to trigger potential race condition with _.stream
.stream
.records
.map(committable => committable.record.key -> committable.record.value)
.interruptAfter(10.seconds) // wait some time to catch potentially duplicated records
.compile
Expand All @@ -71,7 +71,7 @@ final class KafkaConsumerSpec extends BaseKafkaSpec {
.stream(consumerSettings[IO].withGroupId("test"))
.subscribeTo(topic)
.evalMap(IO.sleep(3.seconds).as(_)) // sleep a bit to trigger potential race condition with _.stream
.stream
.records
.map(committable => committable.record.key -> committable.record.value)
.interruptAfter(10.seconds) // wait some time to catch potentially duplicated records

Expand Down Expand Up @@ -104,7 +104,7 @@ final class KafkaConsumerSpec extends BaseKafkaSpec {
.evalTap(_.assign(topic, partitions))
.evalTap(consumer => IO(consumer.toString should startWith("KafkaConsumer$")).void)
.evalMap(IO.sleep(3.seconds).as(_)) // sleep a bit to trigger potential race condition with _.stream
.stream
.records
.map(committable => committable.record.key -> committable.record.value)
.interruptAfter(10.seconds)

Expand All @@ -127,7 +127,7 @@ final class KafkaConsumerSpec extends BaseKafkaSpec {
.stream(consumerSettings[IO].withGroupId("test"))
.evalTap(_.assign(topic))
.evalMap(IO.sleep(3.seconds).as(_)) // sleep a bit to trigger potential race condition with _.stream
.stream
.records
.map(committable => committable.record.key -> committable.record.value)
.interruptAfter(10.seconds)

Expand All @@ -150,7 +150,7 @@ final class KafkaConsumerSpec extends BaseKafkaSpec {
.stream(consumerSettings[IO].withGroupId("test2"))
.evalTap(_.assign(topic))
.evalMap(IO.sleep(3.seconds).as(_)) // sleep a bit to trigger potential race condition with _.stream
.stream
.records
.map(committable => committable.record.key -> committable.record.value)
.interruptAfter(10.seconds)

Expand Down Expand Up @@ -207,7 +207,7 @@ final class KafkaConsumerSpec extends BaseKafkaSpec {
.stream(consumerSettings[IO])
.subscribeTo(topic)
.evalTap(_.terminate)
.flatTap(_.stream)
.flatTap(_.records)
.evalTap(_.awaitTermination)
.compile
.toVector
Expand All @@ -224,7 +224,7 @@ final class KafkaConsumerSpec extends BaseKafkaSpec {
val consumed =
KafkaConsumer
.stream(consumerSettings[IO])
.stream
.records
.compile
.lastOrError
.attempt
Expand Down Expand Up @@ -334,7 +334,7 @@ final class KafkaConsumerSpec extends BaseKafkaSpec {
.withAutoOffsetReset(AutoOffsetReset.None)
}
.subscribeTo(topic)
.stream
.records
.compile
.lastOrError
.attempt
Expand Down Expand Up @@ -362,7 +362,7 @@ final class KafkaConsumerSpec extends BaseKafkaSpec {
.stream(consumerSettings[IO])
.subscribeTo(topic)
.flatTap { consumer =>
consumer.stream
consumer.records
.take(produced.size.toLong)
.map(_.offset)
.chunks
Expand Down Expand Up @@ -421,7 +421,7 @@ final class KafkaConsumerSpec extends BaseKafkaSpec {
.stream(consumerSettings[IO])
.flatMap { consumer =>
val validSeekParams =
consumer.stream
consumer.records
.take(Math.max(readOffset, 1))
.map(_.offset)
.compile
Expand All @@ -441,7 +441,7 @@ final class KafkaConsumerSpec extends BaseKafkaSpec {
val setOffset =
seekParams.flatMap { case (tp, o) => consumer.seek(tp, o) }

val consume = consumer.stream.take(numRecords - readOffset)
val consume = consumer.records.take(numRecords - readOffset)

Stream.eval(consumer.subscribeTo(topic)).drain ++
(Stream.eval_(setOffset) ++ consume)
Expand Down Expand Up @@ -576,7 +576,7 @@ final class KafkaConsumerSpec extends BaseKafkaSpec {
.parJoinUnbounded
.concurrently(
// run second stream to start a rebalance after initial rebalance, default timeout is 3 secs
Stream.sleep(5.seconds) >> stream.stream
Stream.sleep(5.seconds) >> stream.records
)
.interruptWhen(stopSignal)
.compile
Expand Down Expand Up @@ -662,7 +662,7 @@ final class KafkaConsumerSpec extends BaseKafkaSpec {
.subscribeTo(topic)
.evalMap { consumer =>
consumer.assignmentStream
.concurrently(consumer.stream)
.concurrently(consumer.records)
.evalMap(as => queue.enqueue1(Some(as)))
.compile
.drain
Expand Down Expand Up @@ -708,7 +708,7 @@ final class KafkaConsumerSpec extends BaseKafkaSpec {
consumer <- KafkaConsumer
.stream(consumerSettings[IO])
.subscribeTo(topic)
_ <- Stream.eval(IO.sleep(5.seconds)).concurrently(consumer.stream)
_ <- Stream.eval(IO.sleep(5.seconds)).concurrently(consumer.records)
queue <- Stream.eval(Queue.noneTerminated[IO, SortedSet[TopicPartition]])
_ <- Stream.eval(
consumer.assignmentStream.evalTap(as => queue.enqueue1(Some(as))).compile.drain.start
Expand Down Expand Up @@ -744,8 +744,8 @@ final class KafkaConsumerSpec extends BaseKafkaSpec {
consumer2 <- cons

_ <- Stream(
consumer1.stream.evalTap(_ => cntRef.update(_ + 1)),
consumer2.stream.concurrently(
consumer1.records.evalTap(_ => cntRef.update(_ + 1)),
consumer2.records.concurrently(
consumer2.assignmentStream.evalTap(
assignedTopicPartitions => partitions.set(assignedTopicPartitions)
)
Expand Down Expand Up @@ -789,7 +789,7 @@ final class KafkaConsumerSpec extends BaseKafkaSpec {
_ <- KafkaConsumer.resource(settings).use { consumer =>
for {
_ <- consumer.subscribeTo(topic)
_ <- consumer.stream
_ <- consumer.records
.evalMap { msg =>
consumedRef.getAndUpdate(_ :+ (msg.record.key -> msg.record.value)).flatMap {
prevConsumed =>
Expand Down Expand Up @@ -821,7 +821,7 @@ final class KafkaConsumerSpec extends BaseKafkaSpec {
val run = KafkaConsumer.resource(settings).use { consumer =>
for {
_ <- consumer.subscribeTo(topic)
runStream = consumer.stream.compile.drain
runStream = consumer.records.compile.drain
stopStream = consumer.stopConsuming
_ <- (runStream, IO.sleep(1.second) >> stopStream).parTupled
} yield succeed
Expand All @@ -842,7 +842,7 @@ final class KafkaConsumerSpec extends BaseKafkaSpec {
.subscribeTo(topic)
.evalTap(_.stopConsuming)
.evalTap(_ => IO(publishToKafka(topic, produced)))
.stream
.records
.evalTap { _ =>
IO.raiseError(new RuntimeException("Stream should be empty"))
}
Expand Down Expand Up @@ -957,7 +957,7 @@ final class KafkaConsumerSpec extends BaseKafkaSpec {

val committed = (for {
consumer <- createConsumer
consumed <- consumer.stream
consumed <- consumer.records
.take(produced.size.toLong)
.map(_.offset)
.fold(CommittableOffsetBatch.empty[IO])(_ updated _)
Expand Down

0 comments on commit 6c9c28e

Please sign in to comment.