diff --git a/core/jvm/src/test/scala/com/avsystem/commons/concurrent/JvmTaskExtensionsTest.scala b/core/jvm/src/test/scala/com/avsystem/commons/concurrent/JvmTaskExtensionsTest.scala new file mode 100644 index 000000000..f68f28a39 --- /dev/null +++ b/core/jvm/src/test/scala/com/avsystem/commons/concurrent/JvmTaskExtensionsTest.scala @@ -0,0 +1,26 @@ +package com.avsystem.commons +package concurrent + +import monix.eval.Task +import monix.execution.Scheduler +import org.scalatest.concurrent.ScalaFutures +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.matchers.should.Matchers +import org.scalatestplus.scalacheck.ScalaCheckDrivenPropertyChecks + +import scala.concurrent.TimeoutException +import scala.concurrent.duration._ + +class JvmTaskExtensionsTest extends AnyFunSuite with Matchers with ScalaCheckDrivenPropertyChecks with ScalaFutures { + + import com.avsystem.commons.concurrent.TaskExtensions._ + + private implicit val scheduler: Scheduler = Scheduler.global + + // This test does not work in SJS runtime (but the method itself does) + test("lazyTimeout") { + val result = Task.never.lazyTimeout(50.millis, "Lazy timeout").runToFuture.failed.futureValue + result shouldBe a[TimeoutException] + result.getMessage shouldBe "Lazy timeout" + } +} diff --git a/core/src/main/scala/com/avsystem/commons/concurrent/ObservableExtensions.scala b/core/src/main/scala/com/avsystem/commons/concurrent/ObservableExtensions.scala index 7756335cd..b6ca9b681 100644 --- a/core/src/main/scala/com/avsystem/commons/concurrent/ObservableExtensions.scala +++ b/core/src/main/scala/com/avsystem/commons/concurrent/ObservableExtensions.scala @@ -22,6 +22,11 @@ object ObservableExtensions extends ObservableExtensions { */ def headOptL: Task[Opt[T]] = obs.headOptionL.map(_.toOpt) + /** + * Returns a [[monix.eval.Task Task]] which emits the first non-null item for which the predicate holds. + */ + def findOptL(p: T => Boolean): Task[Opt[T]] = obs.findL(e => e != null && p(e)).map(_.toOpt) + /** Suppress the duplicate elements emitted by the source Observable. * * WARNING: this requires unbounded buffering. @@ -79,5 +84,15 @@ object ObservableExtensions extends ObservableExtensions { obs .foldLeftL(factory.newBuilder)(_ += _) .map(_.result()) + + /** Returns a [[monix.eval.Task Task]] that upon evaluation + * will collect all items from the source into a [[Map]] instance + * using provided functions to compute keys and values. + * + * WARNING: for infinite streams the process will eventually blow up + * with an out of memory error. + */ + def mkMapL[K, V](keyFun: T => K, valueFun: T => V): Task[Map[K, V]] = + obs.map(v => (keyFun(v), valueFun(v))).toL(Map) } } diff --git a/core/src/main/scala/com/avsystem/commons/concurrent/TaskExtensions.scala b/core/src/main/scala/com/avsystem/commons/concurrent/TaskExtensions.scala new file mode 100644 index 000000000..16dca4ecd --- /dev/null +++ b/core/src/main/scala/com/avsystem/commons/concurrent/TaskExtensions.scala @@ -0,0 +1,66 @@ +package com.avsystem.commons +package concurrent + +import com.avsystem.commons.concurrent.TaskExtensions.{TaskCompanionOps, TaskOps} +import com.avsystem.commons.misc.Timestamp +import monix.eval.Task +import monix.reactive.Observable + +import java.util.concurrent.TimeUnit +import scala.concurrent.TimeoutException +import scala.concurrent.duration.FiniteDuration + +trait TaskExtensions { + implicit def taskOps[T](task: Task[T]): TaskOps[T] = new TaskOps(task) + + implicit def taskCompanionOps(task: Task.type): TaskCompanionOps.type = TaskCompanionOps +} + +object TaskExtensions extends TaskExtensions { + final class TaskOps[T](private val task: Task[T]) extends AnyVal { + /** + * Similar to [[Task.timeoutWith]] but exception instance is created lazily (for performance) + */ + def lazyTimeout(after: FiniteDuration, msg: => String): Task[T] = + task.timeoutTo(after, Task.defer(Task.raiseError(new TimeoutException(msg)))) + + /** + * Similar to [[Task.tapEval]], accepts simple consumer function as an argument + */ + def tapL(f: T => Unit): Task[T] = + task.map(_.setup(f)) + + /** + * Similar to [[Task.tapError]], accepts [[PartialFunction]] as an argument + */ + def tapErrorL[B](f: PartialFunction[Throwable, B]): Task[T] = + task.tapError(t => Task(f.applyOpt(t))) + } + + object TaskCompanionOps { + import com.avsystem.commons.concurrent.ObservableExtensions.observableOps + + /** A [[Task]] of [[Opt.Empty]] */ + def optEmpty[A]: Task[Opt[A]] = Task.pure(Opt.Empty) + + def traverseOpt[A, B](opt: Opt[A])(f: A => Task[B]): Task[Opt[B]] = + opt.fold(Task.optEmpty[B])(a => f(a).map(_.opt)) + + def fromOpt[A](maybeTask: Opt[Task[A]]): Task[Opt[A]] = maybeTask match { + case Opt(task) => task.map(_.opt) + case Opt.Empty => Task.optEmpty + } + + def traverseMap[K, V, A, B](map: Map[K, V])(f: (K, V) => Task[(A, B)]): Task[Map[A, B]] = + Observable.fromIterable(map).mapEval({ case (key, value) => f(key, value) }).toL(Map) + + def traverseMapValues[K, A, B](map: Map[K, A])(f: (K, A) => Task[B]): Task[Map[K, B]] = + traverseMap(map)({ case (key, value) => f(key, value).map(key -> _) }) + + def currentTimestamp: Task[Timestamp] = + Task.clock.realTime(TimeUnit.MILLISECONDS).map(Timestamp(_)) + + def usingNow[T](useNow: Timestamp => Task[T]): Task[T] = + currentTimestamp.flatMap(useNow) + } +} diff --git a/core/src/test/scala/com/avsystem/commons/concurrent/ObservableExtensionsTest.scala b/core/src/test/scala/com/avsystem/commons/concurrent/ObservableExtensionsTest.scala index 35e33d3d4..008b62cf1 100644 --- a/core/src/test/scala/com/avsystem/commons/concurrent/ObservableExtensionsTest.scala +++ b/core/src/test/scala/com/avsystem/commons/concurrent/ObservableExtensionsTest.scala @@ -20,11 +20,29 @@ class ObservableExtensionsTest extends AnyFunSuite with Matchers Observable.fromIterable(ints).headOptL.runToFuture.futureValue shouldBe ints.headOpt } } + + test("headOptL - null handling") { + Observable.fromIterable(Seq(null, "abc", "xyz")) .headOptL.runToFuture.futureValue shouldBe Opt.Empty + } + + test("findOptL") { + forAll { ints: List[Int] => + Observable.fromIterable(ints).findOptL(_ > 1).runToFuture.futureValue shouldBe ints.findOpt(_ > 1) + } + } + + test("findOptL - null handling") { + Observable.fromIterable(Seq(null, "abc", "xyz")).findOptL(_ => true).runToFuture.futureValue shouldBe Opt.some("abc") + Observable.fromIterable(Seq(null, null)).findOptL(_ => true).runToFuture.futureValue shouldBe Opt.Empty + Observable.fromIterable(Seq(null, "abc", "xyz")).findOptL(_.startsWith("x")).runToFuture.futureValue shouldBe Opt.some("xyz") + } + test("distinct") { forAll { ints: List[Int] => Observable.fromIterable(ints).distinct.toListL.runToFuture.futureValue shouldBe ints.distinct } } + test("distinctBy") { forAll { ints: List[Int] => val f: Int => Int = _ % 256 @@ -33,17 +51,20 @@ class ObservableExtensionsTest extends AnyFunSuite with Matchers ints.foldLeft(MLinkedHashMap.empty[Int, Int])((map, v) => f(v) |> (key => map.applyIf(!_.contains(key))(_ += key -> v))).valuesIterator.toList } } + test("sortedL") { forAll { ints: List[Int] => Observable.fromIterable(ints).sortedL.runToFuture.futureValue shouldBe ints.sorted } } + test("sortedByL") { forAll { ints: List[Int] => val f: Int => Int = _ % 256 Observable.fromIterable(ints).sortedByL(f).runToFuture.futureValue shouldBe ints.sortBy(f) } } + test("toL") { forAll { ints: List[(Int, Int)] => def testFactory[T](factory: Factory[(Int, Int), T])(implicit position: Position) = @@ -78,4 +99,9 @@ class ObservableExtensionsTest extends AnyFunSuite with Matchers } } + test("mkMapL") { + forAll { ints: List[Int] => + Observable.fromIterable(ints).mkMapL(_ % 3, _ + 2).runToFuture.futureValue shouldBe ints.mkMap(_ % 3, _ + 2) + } + } } diff --git a/core/src/test/scala/com/avsystem/commons/concurrent/TaskExtensionsTest.scala b/core/src/test/scala/com/avsystem/commons/concurrent/TaskExtensionsTest.scala new file mode 100644 index 000000000..ca188894d --- /dev/null +++ b/core/src/test/scala/com/avsystem/commons/concurrent/TaskExtensionsTest.scala @@ -0,0 +1,43 @@ +package com.avsystem.commons +package concurrent + +import monix.eval.Task +import monix.execution.Scheduler +import org.scalatest.concurrent.ScalaFutures +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.matchers.should.Matchers +import org.scalatestplus.scalacheck.ScalaCheckDrivenPropertyChecks + +class TaskExtensionsTest extends AnyFunSuite with Matchers with ScalaCheckDrivenPropertyChecks with ScalaFutures { + import com.avsystem.commons.concurrent.TaskExtensions._ + + private implicit val scheduler: Scheduler = Scheduler.global + + test("traverseOpt") { + Task.traverseOpt(Opt.empty[Int])(i => Task.now(i)).runToFuture.futureValue shouldBe Opt.Empty + Task.traverseOpt(Opt.some(123))(i => Task.now(i)).runToFuture.futureValue shouldBe Opt.some(123) + } + + test("fromOpt") { + Task.fromOpt(Opt.empty[Task[Int]]).runToFuture.futureValue shouldBe Opt.Empty + Task.fromOpt(Opt.some(Task.now(123))).runToFuture.futureValue shouldBe Opt.some(123) + } + + test("traverseMap") { + forAll { data: List[(String, Int)] => + val map = data.toMap + val expected = map.view.map({ case (key, value) => (key + key, value + 2) }).toMap + val result = Task.traverseMap(map)({ case (key, value) => Task((key + key, value + 2)) }).runToFuture.futureValue + result shouldBe expected + } + } + + test("traverseMapValues") { + forAll { data: List[(String, Int)] => + val map = data.toMap + val expected = map.view.mapValues(value => value + 2).toMap + val result = Task.traverseMapValues(map)({ case (key, value) => Task(value + 2) }).runToFuture.futureValue + result shouldBe expected + } + } +}