Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Monix task utilities #543

Merged
merged 6 commits into from
Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ object ObservableExtensions extends ObservableExtensions {
*/
def headOptL: Task[Opt[T]] = obs.headOptionL.map(_.toOpt)

/**
* Returns a [[monix.eval.Task Task]] which emits the first <b>non-null</b> item for which the predicate holds.
*/
def findOptL(p: T => Boolean): Task[Opt[T]] = obs.findL(e => e != null && p(e)).map(_.toOpt)

/** Suppress the duplicate elements emitted by the source Observable.
*
* WARNING: this requires unbounded buffering.
Expand Down Expand Up @@ -79,5 +84,15 @@ object ObservableExtensions extends ObservableExtensions {
obs
.foldLeftL(factory.newBuilder)(_ += _)
.map(_.result())

/** Returns a [[monix.eval.Task Task]] that upon evaluation
* will collect all items from the source into a [[Map]] instance
* using provided functions to compute keys and values.
*
* WARNING: for infinite streams the process will eventually blow up
* with an out of memory error.
*/
def mkMapL[K, V](keyFun: T => K, valueFun: T => V): Task[Map[K, V]] =
obs.map(v => (keyFun(v), valueFun(v))).toL(Map)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package com.avsystem.commons
package concurrent

import com.avsystem.commons.concurrent.TaskExtensions.{TaskCompanionOps, TaskOps}
import com.avsystem.commons.misc.Timestamp
import monix.eval.Task

import java.util.concurrent.TimeUnit
import scala.concurrent.TimeoutException
import scala.concurrent.duration.FiniteDuration

trait TaskExtensions {
ddworak marked this conversation as resolved.
Show resolved Hide resolved
implicit def taskOps[T](task: Task[T]): TaskOps[T] = new TaskOps(task)

implicit def taskCompanionOps(task: Task.type): TaskCompanionOps.type = TaskCompanionOps
}

object TaskExtensions extends TaskExtensions {
final class TaskOps[T](private val task: Task[T]) extends AnyVal {
/**
* Similar to [[Task.timeoutWith]] but exception instance is created lazily (for performance)
*/
def lazyTimeout(after: FiniteDuration, msg: => String): Task[T] =
ddworak marked this conversation as resolved.
Show resolved Hide resolved
task.timeoutTo(after, Task.defer(Task.raiseError(new TimeoutException(msg))))

/**
* Similar to [[Task.tapEval]], accepts simple consumer function as an argument
*/
def tapL(f: T => Unit): Task[T] =
task.map(_.setup(f))

/**
* Similar to [[Task.tapError]], accepts [[PartialFunction]] as an argument
*/
def tapErrorL[B](f: PartialFunction[Throwable, B]): Task[T] =
task.tapError(t => Task(f.applyOpt(t)))
}

object TaskCompanionOps {
ddworak marked this conversation as resolved.
Show resolved Hide resolved
/** A [[Task]] of [[Opt.Empty]] */
def optEmpty[A]: Task[Opt[A]] = Task.pure(Opt.Empty)

def traverseOpt[A, B](opt: Opt[A])(f: A => Task[B]): Task[Opt[B]] =
opt.fold(Task.optEmpty[B])(a => f(a).map(_.opt))

def fromOpt[A](maybeTask: Opt[Task[A]]): Task[Opt[A]] = maybeTask match {
case Opt(task) => task.map(_.opt)
case Opt.Empty => Task.optEmpty
}

def traverseMap[K, V, A, B](map: Map[K, V])(f: (K, V) => Task[(A, B)]): Task[Map[A, B]] =
Task.traverse(map.toSeq)({ case (key, value) => f(key, value) }).map(_.toMap)
ddworak marked this conversation as resolved.
Show resolved Hide resolved

def traverseMapValues[K, A, B](map: Map[K, A])(f: (K, A) => Task[B]): Task[Map[K, B]] =
traverseMap(map)({ case (key, value) => f(key, value).map(key -> _) })

def currentTimestamp: Task[Timestamp] =
Task.clock.realTime(TimeUnit.MILLISECONDS).map(Timestamp(_))

def usingNow[T](useNow: Timestamp => Task[T]): Task[T] =
ddworak marked this conversation as resolved.
Show resolved Hide resolved
currentTimestamp.flatMap(useNow)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,29 @@ class ObservableExtensionsTest extends AnyFunSuite with Matchers
Observable.fromIterable(ints).headOptL.runToFuture.futureValue shouldBe ints.headOpt
}
}

test("headOptL - null handling") {
Observable.fromIterable(Seq(null, "abc", "xyz")) .headOptL.runToFuture.futureValue shouldBe Opt.Empty
}

test("findOptL") {
forAll { ints: List[Int] =>
Observable.fromIterable(ints).findOptL(_ > 1).runToFuture.futureValue shouldBe ints.findOpt(_ > 1)
ddworak marked this conversation as resolved.
Show resolved Hide resolved
}
}

test("findOptL - null handling") {
Observable.fromIterable(Seq(null, "abc", "xyz")).findOptL(_ => true).runToFuture.futureValue shouldBe Opt.some("abc")
Observable.fromIterable(Seq(null, null)).findOptL(_ => true).runToFuture.futureValue shouldBe Opt.Empty
Observable.fromIterable(Seq(null, "abc", "xyz")).findOptL(_.startsWith("x")).runToFuture.futureValue shouldBe Opt.some("xyz")
}

test("distinct") {
forAll { ints: List[Int] =>
Observable.fromIterable(ints).distinct.toListL.runToFuture.futureValue shouldBe ints.distinct
}
}

test("distinctBy") {
forAll { ints: List[Int] =>
val f: Int => Int = _ % 256
Expand All @@ -33,17 +51,20 @@ class ObservableExtensionsTest extends AnyFunSuite with Matchers
ints.foldLeft(MLinkedHashMap.empty[Int, Int])((map, v) => f(v) |> (key => map.applyIf(!_.contains(key))(_ += key -> v))).valuesIterator.toList
}
}

test("sortedL") {
forAll { ints: List[Int] =>
Observable.fromIterable(ints).sortedL.runToFuture.futureValue shouldBe ints.sorted
}
}

test("sortedByL") {
forAll { ints: List[Int] =>
val f: Int => Int = _ % 256
Observable.fromIterable(ints).sortedByL(f).runToFuture.futureValue shouldBe ints.sortBy(f)
}
}

test("toL") {
forAll { ints: List[(Int, Int)] =>
def testFactory[T](factory: Factory[(Int, Int), T])(implicit position: Position) =
Expand Down Expand Up @@ -78,4 +99,9 @@ class ObservableExtensionsTest extends AnyFunSuite with Matchers
}
}

test("mkMapL") {
forAll { ints: List[Int] =>
Observable.fromIterable(ints).mkMapL(_ % 3, _ + 2).runToFuture.futureValue shouldBe ints.mkMap(_ % 3, _ + 2)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package com.avsystem.commons
package concurrent

import monix.eval.Task
import monix.execution.Scheduler
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should.Matchers
import org.scalatestplus.scalacheck.ScalaCheckDrivenPropertyChecks

import scala.concurrent.TimeoutException
import scala.concurrent.duration._

class TaskExtensionsTest extends AnyFunSuite with Matchers with ScalaCheckDrivenPropertyChecks with ScalaFutures {
import com.avsystem.commons.concurrent.TaskExtensions._

private implicit val scheduler: Scheduler = Scheduler.global

test("lazyTimeout") {
ddworak marked this conversation as resolved.
Show resolved Hide resolved
val result = Task.never.lazyTimeout(50.millis, "Lazy timeout").runToFuture.failed.futureValue
result shouldBe a[TimeoutException]
result.getMessage shouldBe "Lazy timeout"
}

test("traverseOpt") {
Task.traverseOpt(Opt.empty[Int])(i => Task.now(i)).runToFuture.futureValue shouldBe Opt.Empty
Task.traverseOpt(Opt.some(123))(i => Task.now(i)).runToFuture.futureValue shouldBe Opt.some(123)
}

test("fromOpt") {
Task.fromOpt(Opt.empty[Task[Int]]).runToFuture.futureValue shouldBe Opt.Empty
Task.fromOpt(Opt.some(Task.now(123))).runToFuture.futureValue shouldBe Opt.some(123)
}

test("traverseMap") {
forAll { data: List[(String, Int)] =>
val map = data.toMap
val expected = map.view.map({ case (key, value) => (key + key, value + 2) }).toMap
val result = Task.traverseMap(map)({ case (key, value) => Task((key + key, value + 2)) }).runToFuture.futureValue
result shouldBe expected
}
}

test("traverseMapValues") {
forAll { data: List[(String, Int)] =>
val map = data.toMap
val expected = map.view.mapValues(value => value + 2).toMap
val result = Task.traverseMapValues(map)({ case (key, value) => Task(value + 2) }).runToFuture.futureValue
result shouldBe expected
}
}
}
Loading