From 839c89d82c28d49173714eb7083b8c158abcaccc Mon Sep 17 00:00:00 2001 From: Sebastian Haracz Date: Thu, 22 Feb 2024 18:23:52 +0100 Subject: [PATCH 1/6] Add Monix task utilities --- .../concurrent/ObservableExtensions.scala | 15 +++++ .../commons/concurrent/TaskExtensions.scala | 57 +++++++++++++++++++ .../concurrent/ObservableExtensionsTest.scala | 16 ++++++ 3 files changed, 88 insertions(+) create mode 100644 core/src/main/scala/com/avsystem/commons/concurrent/TaskExtensions.scala diff --git a/core/src/main/scala/com/avsystem/commons/concurrent/ObservableExtensions.scala b/core/src/main/scala/com/avsystem/commons/concurrent/ObservableExtensions.scala index 7756335cd..f8b205352 100644 --- a/core/src/main/scala/com/avsystem/commons/concurrent/ObservableExtensions.scala +++ b/core/src/main/scala/com/avsystem/commons/concurrent/ObservableExtensions.scala @@ -22,6 +22,11 @@ object ObservableExtensions extends ObservableExtensions { */ def headOptL: Task[Opt[T]] = obs.headOptionL.map(_.toOpt) + /** + * Returns a [[monix.eval.Task Task]] which emits the first item for which the predicate holds. + */ + def findOptL(p: T => Boolean): Task[Opt[T]] = obs.findL(p).map(_.toOpt) + /** Suppress the duplicate elements emitted by the source Observable. * * WARNING: this requires unbounded buffering. @@ -79,5 +84,15 @@ object ObservableExtensions extends ObservableExtensions { obs .foldLeftL(factory.newBuilder)(_ += _) .map(_.result()) + + /** Returns a [[monix.eval.Task Task]] that upon evaluation + * will collect all items from the source into a [[Map]] instance + * using provided functions to compute keys and values. + * + * WARNING: for infinite streams the process will eventually blow up + * with an out of memory error. + */ + def mkMapL[K, V](keyFun: T => K, valueFun: T => V): Task[Map[K, V]] = + obs.foldLeftL(Map.newBuilder[K, V])({ case (res, a) => res += ((keyFun(a), valueFun(a))) }).map(_.result()) } } diff --git a/core/src/main/scala/com/avsystem/commons/concurrent/TaskExtensions.scala b/core/src/main/scala/com/avsystem/commons/concurrent/TaskExtensions.scala new file mode 100644 index 000000000..dab66b54e --- /dev/null +++ b/core/src/main/scala/com/avsystem/commons/concurrent/TaskExtensions.scala @@ -0,0 +1,57 @@ +package com.avsystem.commons +package concurrent + +import com.avsystem.commons.concurrent.TaskExtensions.{TaskCompanionOps, TaskOps} +import com.avsystem.commons.misc.Timestamp +import monix.eval.Task + +import java.util.concurrent.TimeUnit +import scala.concurrent.TimeoutException +import scala.concurrent.duration.FiniteDuration + +trait TaskExtensions { + implicit def taskOps[T](task: Task[T]): TaskOps[T] = new TaskOps(task) + + implicit def taskCompanionOps(task: Task.type): TaskCompanionOps.type = TaskCompanionOps +} + +object TaskExtensions extends TaskExtensions { + final class TaskOps[T](private val task: Task[T]) extends AnyVal { + /** + * Like regular `timeout` but [[TimeoutException]] is created lazily (for performance). + */ + def lazyTimeout(after: FiniteDuration, msg: => String): Task[T] = + task.timeoutTo(after, Task.raiseError(new TimeoutException(msg))) + + /** + * Similar to [[Task.tapEval]], accepts simple consumer function as an argument + */ + def tapL(f: T => Unit): Task[T] = + task.map(_.setup(f)) + + /** + * Similar to [[Task.tapError]], accepts [[PartialFunction]] as an argument + */ + def tapErrorL[B](f: PartialFunction[Throwable, B]): Task[T] = + task.tapError(t => Task(f.applyOpt(t))) + } + + object TaskCompanionOps { + /** A [[Task]] of [[Opt.Empty]] */ + def optEmpty[A]: Task[Opt[A]] = Task.pure(Opt.Empty) + + def traverseOpt[A, B](opt: Opt[A])(f: A => Task[B]): Task[Opt[B]] = + opt.fold(Task.optEmpty[B])(a => f(a).map(_.opt)) + + def fromOpt[A](maybeTask: Opt[Task[A]]): Task[Opt[A]] = maybeTask match { + case Opt(task) => task.map(_.opt) + case Opt.Empty => optEmpty + } + + def currentTimestamp: Task[Timestamp] = + Task.clock.realTime(TimeUnit.MILLISECONDS).map(Timestamp(_)) + + def usingNow[T](useNow: Timestamp => Task[T]): Task[T] = + currentTimestamp.flatMap(useNow) + } +} diff --git a/core/src/test/scala/com/avsystem/commons/concurrent/ObservableExtensionsTest.scala b/core/src/test/scala/com/avsystem/commons/concurrent/ObservableExtensionsTest.scala index 35e33d3d4..294a175ea 100644 --- a/core/src/test/scala/com/avsystem/commons/concurrent/ObservableExtensionsTest.scala +++ b/core/src/test/scala/com/avsystem/commons/concurrent/ObservableExtensionsTest.scala @@ -20,11 +20,19 @@ class ObservableExtensionsTest extends AnyFunSuite with Matchers Observable.fromIterable(ints).headOptL.runToFuture.futureValue shouldBe ints.headOpt } } + + test("findOptL") { + forAll { ints: List[Int] => + Observable.fromIterable(ints).findOptL(_ > 1).runToFuture.futureValue shouldBe ints.findOpt(_ > 1) + } + } + test("distinct") { forAll { ints: List[Int] => Observable.fromIterable(ints).distinct.toListL.runToFuture.futureValue shouldBe ints.distinct } } + test("distinctBy") { forAll { ints: List[Int] => val f: Int => Int = _ % 256 @@ -33,17 +41,20 @@ class ObservableExtensionsTest extends AnyFunSuite with Matchers ints.foldLeft(MLinkedHashMap.empty[Int, Int])((map, v) => f(v) |> (key => map.applyIf(!_.contains(key))(_ += key -> v))).valuesIterator.toList } } + test("sortedL") { forAll { ints: List[Int] => Observable.fromIterable(ints).sortedL.runToFuture.futureValue shouldBe ints.sorted } } + test("sortedByL") { forAll { ints: List[Int] => val f: Int => Int = _ % 256 Observable.fromIterable(ints).sortedByL(f).runToFuture.futureValue shouldBe ints.sortBy(f) } } + test("toL") { forAll { ints: List[(Int, Int)] => def testFactory[T](factory: Factory[(Int, Int), T])(implicit position: Position) = @@ -78,4 +89,9 @@ class ObservableExtensionsTest extends AnyFunSuite with Matchers } } + test("mkMapL") { + forAll { ints: List[Int] => + Observable.fromIterable(ints).mkMapL(_ % 3, _ + 2).runToFuture.futureValue shouldBe ints.mkMap(_ % 3, _ + 2) + } + } } From f126d73a025e095d8e29c8dd374ce8b3f0aa3cb0 Mon Sep 17 00:00:00 2001 From: Sebastian Haracz Date: Mon, 26 Feb 2024 13:10:07 +0100 Subject: [PATCH 2/6] null edge case tests for Monix extensions --- .../commons/concurrent/ObservableExtensions.scala | 4 ++-- .../avsystem/commons/concurrent/TaskExtensions.scala | 2 +- .../commons/concurrent/ObservableExtensionsTest.scala | 10 ++++++++++ 3 files changed, 13 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/com/avsystem/commons/concurrent/ObservableExtensions.scala b/core/src/main/scala/com/avsystem/commons/concurrent/ObservableExtensions.scala index f8b205352..de3942dc1 100644 --- a/core/src/main/scala/com/avsystem/commons/concurrent/ObservableExtensions.scala +++ b/core/src/main/scala/com/avsystem/commons/concurrent/ObservableExtensions.scala @@ -23,9 +23,9 @@ object ObservableExtensions extends ObservableExtensions { def headOptL: Task[Opt[T]] = obs.headOptionL.map(_.toOpt) /** - * Returns a [[monix.eval.Task Task]] which emits the first item for which the predicate holds. + * Returns a [[monix.eval.Task Task]] which emits the first non-null item for which the predicate holds. */ - def findOptL(p: T => Boolean): Task[Opt[T]] = obs.findL(p).map(_.toOpt) + def findOptL(p: T => Boolean): Task[Opt[T]] = obs.findL(e => e != null && p(e)).map(_.toOpt) /** Suppress the duplicate elements emitted by the source Observable. * diff --git a/core/src/main/scala/com/avsystem/commons/concurrent/TaskExtensions.scala b/core/src/main/scala/com/avsystem/commons/concurrent/TaskExtensions.scala index dab66b54e..dda9221f6 100644 --- a/core/src/main/scala/com/avsystem/commons/concurrent/TaskExtensions.scala +++ b/core/src/main/scala/com/avsystem/commons/concurrent/TaskExtensions.scala @@ -18,7 +18,7 @@ trait TaskExtensions { object TaskExtensions extends TaskExtensions { final class TaskOps[T](private val task: Task[T]) extends AnyVal { /** - * Like regular `timeout` but [[TimeoutException]] is created lazily (for performance). + * Similar to [[Task.timeoutWith]] but exception instance is created lazily (for performance) */ def lazyTimeout(after: FiniteDuration, msg: => String): Task[T] = task.timeoutTo(after, Task.raiseError(new TimeoutException(msg))) diff --git a/core/src/test/scala/com/avsystem/commons/concurrent/ObservableExtensionsTest.scala b/core/src/test/scala/com/avsystem/commons/concurrent/ObservableExtensionsTest.scala index 294a175ea..008b62cf1 100644 --- a/core/src/test/scala/com/avsystem/commons/concurrent/ObservableExtensionsTest.scala +++ b/core/src/test/scala/com/avsystem/commons/concurrent/ObservableExtensionsTest.scala @@ -21,12 +21,22 @@ class ObservableExtensionsTest extends AnyFunSuite with Matchers } } + test("headOptL - null handling") { + Observable.fromIterable(Seq(null, "abc", "xyz")) .headOptL.runToFuture.futureValue shouldBe Opt.Empty + } + test("findOptL") { forAll { ints: List[Int] => Observable.fromIterable(ints).findOptL(_ > 1).runToFuture.futureValue shouldBe ints.findOpt(_ > 1) } } + test("findOptL - null handling") { + Observable.fromIterable(Seq(null, "abc", "xyz")).findOptL(_ => true).runToFuture.futureValue shouldBe Opt.some("abc") + Observable.fromIterable(Seq(null, null)).findOptL(_ => true).runToFuture.futureValue shouldBe Opt.Empty + Observable.fromIterable(Seq(null, "abc", "xyz")).findOptL(_.startsWith("x")).runToFuture.futureValue shouldBe Opt.some("xyz") + } + test("distinct") { forAll { ints: List[Int] => Observable.fromIterable(ints).distinct.toListL.runToFuture.futureValue shouldBe ints.distinct From 7eb89869eebd91f6909c14fdd18022f91b0446fe Mon Sep 17 00:00:00 2001 From: Sebastian Haracz Date: Mon, 26 Feb 2024 14:33:13 +0100 Subject: [PATCH 3/6] lazyTimeout - add Task.defer to make it lazy --- .../scala/com/avsystem/commons/concurrent/TaskExtensions.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/com/avsystem/commons/concurrent/TaskExtensions.scala b/core/src/main/scala/com/avsystem/commons/concurrent/TaskExtensions.scala index dda9221f6..dc74bf1d4 100644 --- a/core/src/main/scala/com/avsystem/commons/concurrent/TaskExtensions.scala +++ b/core/src/main/scala/com/avsystem/commons/concurrent/TaskExtensions.scala @@ -21,7 +21,7 @@ object TaskExtensions extends TaskExtensions { * Similar to [[Task.timeoutWith]] but exception instance is created lazily (for performance) */ def lazyTimeout(after: FiniteDuration, msg: => String): Task[T] = - task.timeoutTo(after, Task.raiseError(new TimeoutException(msg))) + task.timeoutTo(after, Task.defer(Task.raiseError(new TimeoutException(msg)))) /** * Similar to [[Task.tapEval]], accepts simple consumer function as an argument From 54a8942c5b53fc9eb7d40713d62b4ab2ceeb2054 Mon Sep 17 00:00:00 2001 From: Sebastian Haracz Date: Fri, 1 Mar 2024 11:31:57 +0100 Subject: [PATCH 4/6] Implement Task.traverseMap and unit tests for the new extensions --- .../concurrent/ObservableExtensions.scala | 2 +- .../commons/concurrent/TaskExtensions.scala | 8 ++- .../concurrent/TaskExtensionsTest.scala | 52 +++++++++++++++++++ 3 files changed, 60 insertions(+), 2 deletions(-) create mode 100644 core/src/test/scala/com/avsystem/commons/concurrent/TaskExtensionsTest.scala diff --git a/core/src/main/scala/com/avsystem/commons/concurrent/ObservableExtensions.scala b/core/src/main/scala/com/avsystem/commons/concurrent/ObservableExtensions.scala index de3942dc1..b6ca9b681 100644 --- a/core/src/main/scala/com/avsystem/commons/concurrent/ObservableExtensions.scala +++ b/core/src/main/scala/com/avsystem/commons/concurrent/ObservableExtensions.scala @@ -93,6 +93,6 @@ object ObservableExtensions extends ObservableExtensions { * with an out of memory error. */ def mkMapL[K, V](keyFun: T => K, valueFun: T => V): Task[Map[K, V]] = - obs.foldLeftL(Map.newBuilder[K, V])({ case (res, a) => res += ((keyFun(a), valueFun(a))) }).map(_.result()) + obs.map(v => (keyFun(v), valueFun(v))).toL(Map) } } diff --git a/core/src/main/scala/com/avsystem/commons/concurrent/TaskExtensions.scala b/core/src/main/scala/com/avsystem/commons/concurrent/TaskExtensions.scala index dc74bf1d4..00479d4eb 100644 --- a/core/src/main/scala/com/avsystem/commons/concurrent/TaskExtensions.scala +++ b/core/src/main/scala/com/avsystem/commons/concurrent/TaskExtensions.scala @@ -45,9 +45,15 @@ object TaskExtensions extends TaskExtensions { def fromOpt[A](maybeTask: Opt[Task[A]]): Task[Opt[A]] = maybeTask match { case Opt(task) => task.map(_.opt) - case Opt.Empty => optEmpty + case Opt.Empty => Task.optEmpty } + def traverseMap[K, V, A, B](map: Map[K, V])(f: (K, V) => Task[(A, B)]): Task[Map[A, B]] = + Task.traverse(map.toSeq)({ case (key, value) => f(key, value) }).map(_.toMap) + + def traverseMapValues[K, A, B](map: Map[K, A])(f: (K, A) => Task[B]): Task[Map[K, B]] = + traverseMap(map)({ case (key, value) => f(key, value).map(key -> _) }) + def currentTimestamp: Task[Timestamp] = Task.clock.realTime(TimeUnit.MILLISECONDS).map(Timestamp(_)) diff --git a/core/src/test/scala/com/avsystem/commons/concurrent/TaskExtensionsTest.scala b/core/src/test/scala/com/avsystem/commons/concurrent/TaskExtensionsTest.scala new file mode 100644 index 000000000..6669ade3e --- /dev/null +++ b/core/src/test/scala/com/avsystem/commons/concurrent/TaskExtensionsTest.scala @@ -0,0 +1,52 @@ +package com.avsystem.commons +package concurrent + +import monix.eval.Task +import monix.execution.Scheduler +import org.scalatest.concurrent.ScalaFutures +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.matchers.should.Matchers +import org.scalatestplus.scalacheck.ScalaCheckDrivenPropertyChecks + +import scala.concurrent.TimeoutException +import scala.concurrent.duration._ + +class TaskExtensionsTest extends AnyFunSuite with Matchers with ScalaCheckDrivenPropertyChecks with ScalaFutures { + import com.avsystem.commons.concurrent.TaskExtensions._ + + private implicit val scheduler: Scheduler = Scheduler(RunNowEC) + + test("lazyTimeout") { + val result = Task.never.lazyTimeout(100.millis, "Lazy timeout").runToFuture.failed.futureValue + result shouldBe a[TimeoutException] + result.getMessage shouldBe "Lazy timeout" + } + + test("traverseOpt") { + Task.traverseOpt(Opt.empty[Int])(i => Task.now(i)).runToFuture.futureValue shouldBe Opt.Empty + Task.traverseOpt(Opt.some(123))(i => Task.now(i)).runToFuture.futureValue shouldBe Opt.some(123) + } + + test("fromOpt") { + Task.fromOpt(Opt.empty[Task[Int]]).runToFuture.futureValue shouldBe Opt.Empty + Task.fromOpt(Opt.some(Task.now(123))).runToFuture.futureValue shouldBe Opt.some(123) + } + + test("traverseMap") { + forAll { data: List[(String, Int)] => + val map = data.toMap + val expected = map.view.map({ case (key, value) => (key + key, value + 2) }).toMap + val result = Task.traverseMap(map)({ case (key, value) => Task((key + key, value + 2)) }).runToFuture.futureValue + result shouldBe expected + } + } + + test("traverseMapValues") { + forAll { data: List[(String, Int)] => + val map = data.toMap + val expected = map.view.mapValues(value => value + 2).toMap + val result = Task.traverseMapValues(map)({ case (key, value) => Task(value + 2) }).runToFuture.futureValue + result shouldBe expected + } + } +} From e8f8e7445581ded18f3091aca24f41db68855b7b Mon Sep 17 00:00:00 2001 From: Sebastian Haracz Date: Fri, 1 Mar 2024 11:53:10 +0100 Subject: [PATCH 5/6] Monix extensions - fix test --- .../com/avsystem/commons/concurrent/TaskExtensionsTest.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/com/avsystem/commons/concurrent/TaskExtensionsTest.scala b/core/src/test/scala/com/avsystem/commons/concurrent/TaskExtensionsTest.scala index 6669ade3e..3c919516c 100644 --- a/core/src/test/scala/com/avsystem/commons/concurrent/TaskExtensionsTest.scala +++ b/core/src/test/scala/com/avsystem/commons/concurrent/TaskExtensionsTest.scala @@ -14,10 +14,10 @@ import scala.concurrent.duration._ class TaskExtensionsTest extends AnyFunSuite with Matchers with ScalaCheckDrivenPropertyChecks with ScalaFutures { import com.avsystem.commons.concurrent.TaskExtensions._ - private implicit val scheduler: Scheduler = Scheduler(RunNowEC) + private implicit val scheduler: Scheduler = Scheduler.global test("lazyTimeout") { - val result = Task.never.lazyTimeout(100.millis, "Lazy timeout").runToFuture.failed.futureValue + val result = Task.never.lazyTimeout(50.millis, "Lazy timeout").runToFuture.failed.futureValue result shouldBe a[TimeoutException] result.getMessage shouldBe "Lazy timeout" } From 0d69f6680b8895503573fca218e57a10660355ec Mon Sep 17 00:00:00 2001 From: Sebastian Haracz Date: Mon, 4 Mar 2024 16:10:49 +0100 Subject: [PATCH 6/6] Monix extensions - move broken test to jvm only --- .../concurrent/JvmTaskExtensionsTest.scala | 26 +++++++++++++++++++ .../commons/concurrent/TaskExtensions.scala | 5 +++- .../concurrent/TaskExtensionsTest.scala | 9 ------- 3 files changed, 30 insertions(+), 10 deletions(-) create mode 100644 core/jvm/src/test/scala/com/avsystem/commons/concurrent/JvmTaskExtensionsTest.scala diff --git a/core/jvm/src/test/scala/com/avsystem/commons/concurrent/JvmTaskExtensionsTest.scala b/core/jvm/src/test/scala/com/avsystem/commons/concurrent/JvmTaskExtensionsTest.scala new file mode 100644 index 000000000..f68f28a39 --- /dev/null +++ b/core/jvm/src/test/scala/com/avsystem/commons/concurrent/JvmTaskExtensionsTest.scala @@ -0,0 +1,26 @@ +package com.avsystem.commons +package concurrent + +import monix.eval.Task +import monix.execution.Scheduler +import org.scalatest.concurrent.ScalaFutures +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.matchers.should.Matchers +import org.scalatestplus.scalacheck.ScalaCheckDrivenPropertyChecks + +import scala.concurrent.TimeoutException +import scala.concurrent.duration._ + +class JvmTaskExtensionsTest extends AnyFunSuite with Matchers with ScalaCheckDrivenPropertyChecks with ScalaFutures { + + import com.avsystem.commons.concurrent.TaskExtensions._ + + private implicit val scheduler: Scheduler = Scheduler.global + + // This test does not work in SJS runtime (but the method itself does) + test("lazyTimeout") { + val result = Task.never.lazyTimeout(50.millis, "Lazy timeout").runToFuture.failed.futureValue + result shouldBe a[TimeoutException] + result.getMessage shouldBe "Lazy timeout" + } +} diff --git a/core/src/main/scala/com/avsystem/commons/concurrent/TaskExtensions.scala b/core/src/main/scala/com/avsystem/commons/concurrent/TaskExtensions.scala index 00479d4eb..16dca4ecd 100644 --- a/core/src/main/scala/com/avsystem/commons/concurrent/TaskExtensions.scala +++ b/core/src/main/scala/com/avsystem/commons/concurrent/TaskExtensions.scala @@ -4,6 +4,7 @@ package concurrent import com.avsystem.commons.concurrent.TaskExtensions.{TaskCompanionOps, TaskOps} import com.avsystem.commons.misc.Timestamp import monix.eval.Task +import monix.reactive.Observable import java.util.concurrent.TimeUnit import scala.concurrent.TimeoutException @@ -37,6 +38,8 @@ object TaskExtensions extends TaskExtensions { } object TaskCompanionOps { + import com.avsystem.commons.concurrent.ObservableExtensions.observableOps + /** A [[Task]] of [[Opt.Empty]] */ def optEmpty[A]: Task[Opt[A]] = Task.pure(Opt.Empty) @@ -49,7 +52,7 @@ object TaskExtensions extends TaskExtensions { } def traverseMap[K, V, A, B](map: Map[K, V])(f: (K, V) => Task[(A, B)]): Task[Map[A, B]] = - Task.traverse(map.toSeq)({ case (key, value) => f(key, value) }).map(_.toMap) + Observable.fromIterable(map).mapEval({ case (key, value) => f(key, value) }).toL(Map) def traverseMapValues[K, A, B](map: Map[K, A])(f: (K, A) => Task[B]): Task[Map[K, B]] = traverseMap(map)({ case (key, value) => f(key, value).map(key -> _) }) diff --git a/core/src/test/scala/com/avsystem/commons/concurrent/TaskExtensionsTest.scala b/core/src/test/scala/com/avsystem/commons/concurrent/TaskExtensionsTest.scala index 3c919516c..ca188894d 100644 --- a/core/src/test/scala/com/avsystem/commons/concurrent/TaskExtensionsTest.scala +++ b/core/src/test/scala/com/avsystem/commons/concurrent/TaskExtensionsTest.scala @@ -8,20 +8,11 @@ import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers import org.scalatestplus.scalacheck.ScalaCheckDrivenPropertyChecks -import scala.concurrent.TimeoutException -import scala.concurrent.duration._ - class TaskExtensionsTest extends AnyFunSuite with Matchers with ScalaCheckDrivenPropertyChecks with ScalaFutures { import com.avsystem.commons.concurrent.TaskExtensions._ private implicit val scheduler: Scheduler = Scheduler.global - test("lazyTimeout") { - val result = Task.never.lazyTimeout(50.millis, "Lazy timeout").runToFuture.failed.futureValue - result shouldBe a[TimeoutException] - result.getMessage shouldBe "Lazy timeout" - } - test("traverseOpt") { Task.traverseOpt(Opt.empty[Int])(i => Task.now(i)).runToFuture.futureValue shouldBe Opt.Empty Task.traverseOpt(Opt.some(123))(i => Task.now(i)).runToFuture.futureValue shouldBe Opt.some(123)