-
Notifications
You must be signed in to change notification settings - Fork 89
Closed
Labels
Description
Hi there, thanks for this library.
I've been playing with the experimental RedisStream and couldn't quite get the StreamingOffset.Latest to work, as it doesn't seem to yield any new element published to the stream.
I've put together a test describing the issue, can you spot any obvious mistake?
I'm running this test against a vanilla redis:5-alpine docker image.
The "custom offset" consistently pass while the "latest offset" test won't.
package hellostreams
import cats.effect.{ContextShift, IO, Timer}
import dev.profunktor.redis4cats.connection.RedisClient
import dev.profunktor.redis4cats.data.RedisCodec
import dev.profunktor.redis4cats.effect.Log.Stdout._
import dev.profunktor.redis4cats.streams.RedisStream
import dev.profunktor.redis4cats.streams.data.{StreamingOffset, XAddMessage}
import org.scalatest.freespec.AnyFreeSpec
import org.scalatest.matchers.should.Matchers
import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
import java.time.Instant
import scala.concurrent.ExecutionContext
class RedisStreamSpec extends AnyFreeSpec with Matchers {
implicit val contextShift: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global)
def subLatest(redis: RedisClient): fs2.Stream[IO, String] = {
for {
stream <- RedisStream.mkStreamingConnection[IO, String, String](redis, RedisCodec.Utf8)
event <- stream.read(Set("hello"), StreamingOffset.Latest[String]).map(_.body("value"))
} yield event
}
def subCustom(redis: RedisClient, offset: String): fs2.Stream[IO, String] = {
for {
stream <- RedisStream.mkStreamingConnection[IO, String, String](redis, RedisCodec.Utf8)
event <- stream.read(Set("hello"), StreamingOffset.Custom[String](_, offset)).map(_.body("value"))
} yield event
}
def pub(redis: RedisClient, messages: fs2.Stream[IO, String]): fs2.Stream[IO, Unit] = {
for {
stream <- RedisStream.mkStreamingConnection[IO, String, String](redis, RedisCodec.Utf8)
_ <- stream.append(messages.map { msg => XAddMessage("hello", Map("value" -> msg)) })
} yield ()
}
"Publisher gets new messages (custom offset)" in {
RedisClient[IO].from("redis://:@localhost:6379").use { redis =>
subCustom(redis, Instant.now.toEpochMilli.toString)
.concurrently(pub(redis, fs2.Stream("world1", "world2", "world3").delayBy[IO](1.second)))
.interruptAfter(2.seconds)
.compile
.toList
}
.unsafeRunSync() shouldBe List("world1", "world2", "world3")
}
"Publisher gets new messages (latest)" in {
RedisClient[IO].from("redis://:@localhost:6379").use { redis =>
subLatest(redis)
.concurrently(pub(redis, fs2.Stream("world1", "world2", "world3").delayBy[IO](1.second)))
.interruptAfter(2.seconds)
.compile
.toList
}
.unsafeRunSync() shouldBe List("world1", "world2", "world3")
}
}