diff --git a/benchmarks/src/main/scala/monix/benchmarks/IOSequenceBenchmark.scala b/benchmarks/src/main/scala/monix/benchmarks/IOSequenceBenchmark.scala index 96724aac..76855c0d 100644 --- a/benchmarks/src/main/scala/monix/benchmarks/IOSequenceBenchmark.scala +++ b/benchmarks/src/main/scala/monix/benchmarks/IOSequenceBenchmark.scala @@ -116,24 +116,24 @@ class IOSequenceBenchmark { @Benchmark - def monixBioGather(): Long = { + def monixBioParSequence(): Long = { val tasks = (0 until count).map(_ => BIO.eval(1)).toList - val result = BIO.gather(tasks).map(_.sum.toLong) + val result = BIO.parSequence(tasks).map(_.sum.toLong) result.runSyncUnsafe() } @Benchmark - def monixBioGatherUnordered(): Long = { + def monixBioParSequenceUnordered(): Long = { val tasks = (0 until count).map(_ => BIO.eval(1)).toList - val result = BIO.gatherUnordered(tasks).map(_.sum.toLong) + val result = BIO.parSequenceUnordered(tasks).map(_.sum.toLong) result.runSyncUnsafe() } @Benchmark - def monixBioGatherN(): Long = { + def monixBioParSequenceN(): Long = { val tasks = (0 until count).map(_ => BIO.eval(1)).toList - val result = BIO.gatherN(parallelism)(tasks).map(_.sum.toLong) + val result = BIO.parSequenceN(parallelism)(tasks).map(_.sum.toLong) result.runSyncUnsafe() } diff --git a/core/shared/src/main/scala/monix/bio/BIO.scala b/core/shared/src/main/scala/monix/bio/BIO.scala index 642e6fa7..078ace52 100644 --- a/core/shared/src/main/scala/monix/bio/BIO.scala +++ b/core/shared/src/main/scala/monix/bio/BIO.scala @@ -182,7 +182,7 @@ import scala.util.{Failure, Success, Try} * it does for `Future.sequence`, the given `Task` values being * evaluated one after another, in ''sequence'', not in ''parallel''. * If you want parallelism, then you need to use - * [[monix.bio.BIO.gather BIO.gather]] and thus be explicit about it. + * [[monix.bio.BIO.parSequence BIO.parSequence]] and thus be explicit about it. * * This is great because it gives you the possibility of fine tuning the * execution. For example, say you want to execute things in parallel, @@ -198,7 +198,7 @@ import scala.util.{Failure, Success, Try} * val chunks = list.sliding(30, 30).toSeq * * // Specify that each batch should process stuff in parallel - * val batchedTasks = chunks.map(chunk => Task.gather(chunk)) + * val batchedTasks = chunks.map(chunk => Task.parSequence(chunk)) * // Sequence the batches * val allBatches = Task.sequence(batchedTasks) * @@ -1327,7 +1327,7 @@ sealed abstract class BIO[+E, +A] extends Serializable { * If an exception is raised, then `bracket` will re-raise the exception * ''after'' performing the `release`. If the resulting task gets cancelled, * then `bracket` will still perform the `release`, but the yielded task - * will be non-terminating (equivalent with [[Task.never]]). + * will be non-terminating (equivalent with [[BIO.never]]). * * Example: * @@ -1622,11 +1622,11 @@ sealed abstract class BIO[+E, +A] extends Serializable { * readFile("path/to/file").executeOn(io, forceAsync = true) * }}} * - * In this example we are using [[Task.eval]], which executes the + * In this example we are using [[BIO.eval]], which executes the * given `thunk` immediately (on the current thread and call stack). * * By calling `executeOn(io)`, we are ensuring that the used - * `Scheduler` (injected in [[Task.cancelable0[A](register* async tasks]]) + * `Scheduler` (injected in [[BIO.cancelable0]]) * will be `io`, a `Scheduler` that we intend to use for blocking * I/O actions. And we are also forcing an asynchronous boundary * right before execution, by passing the `forceAsync` parameter as @@ -2659,7 +2659,7 @@ sealed abstract class BIO[+E, +A] extends Serializable { * parallelism being guaranteed when multi-threading is available! * * All specified tasks get evaluated in parallel, regardless of their - * execution model ([[Task.eval]] vs [[Task.evalAsync]] doesn't matter). + * execution model ([[BIO.eval]] vs [[BIO.evalAsync]] doesn't matter). * Also the implementation tries to be smart about detecting forked * tasks so it can eliminate extraneous forks for the very obvious * cases. @@ -3397,7 +3397,7 @@ object BIO extends TaskInstancesLevel0 { * [[https://typelevel.org/cats/guidelines.html#partially-applied-type-params Partially-Applied Type technique]]. * * Calling `create` with a callback that returns `Unit` is - * equivalent with [[Task.async0]]: + * equivalent with [[BIO.async0]]: * * `Task.async0(f) <-> Task.create(f)` * @@ -3444,7 +3444,7 @@ object BIO extends TaskInstancesLevel0 { * * Passed function can also return `Task[Unit]` as a task that * describes a cancelation action, thus for an `f` that can be - * passed to [[Task.cancelable0]], and this equivalence holds: + * passed to [[BIO.cancelable0]], and this equivalence holds: * * `Task.cancelable(f) <-> Task.create(f)` * @@ -3646,7 +3646,7 @@ object BIO extends TaskInstancesLevel0 { * results in the same collection. * * This operation will execute the tasks one by one, in order, which means that - * both effects and results will be ordered. See [[gather]] and [[gatherUnordered]] + * both effects and results will be ordered. See [[parSequence]] and [[parSequenceUnordered]] * for unordered results or effects, and thus potential of running in parallel. * * It's a simple version of [[traverse]]. @@ -3674,11 +3674,11 @@ object BIO extends TaskInstancesLevel0 { * This function is the nondeterministic analogue of `sequence` and should * behave identically to `sequence` so long as there is no interaction between * the effects being gathered. However, unlike `sequence`, which decides on - * a total order of effects, the effects in a `gather` are unordered with + * a total order of effects, the effects in a `parSequence` are unordered with * respect to each other, the tasks being execute in parallel, not in sequence. * * Although the effects are unordered, we ensure the order of results - * matches the order of the input sequence. Also see [[gatherUnordered]] + * matches the order of the input sequence. Also see [[parSequenceUnordered]] * for the more efficient alternative. * * Example: @@ -3686,17 +3686,47 @@ object BIO extends TaskInstancesLevel0 { * val tasks = List(Task(1 + 1), Task(2 + 2), Task(3 + 3)) * * // Yields 2, 4, 6 - * Task.gather(tasks) + * Task.parSequence(tasks) * }}} * * $parallelismAdvice * * $parallelismNote * - * @see [[gatherN]] for a version that limits parallelism. + * @see [[parSequenceN]] for a version that limits parallelism. */ - def gather[E, A, M[X] <: Iterable[X]](in: M[BIO[E, A]])(implicit bf: BuildFrom[M[BIO[E, A]], A, M[A]]): BIO[E, M[A]] = - TaskGather[E, A, M](in, () => newBuilder(bf, in)) + def parSequence[E, A, M[X] <: Iterable[X]]( + in: M[BIO[E, A]] + )(implicit bf: BuildFrom[M[BIO[E, A]], A, M[A]]): BIO[E, M[A]] = + TaskParSequence[E, A, M](in, () => newBuilder(bf, in)) + + /** Given a `Iterable[A]` and a function `A => BIO[E, B]`, + * nondeterministically apply the function to each element of the collection + * and return a task that will signal a collection of the results once all + * tasks are finished. + * + * This function is the nondeterministic analogue of `traverse` and should + * behave identically to `traverse` so long as there is no interaction between + * the effects being gathered. However, unlike `traverse`, which decides on + * a total order of effects, the effects in a `parTraverse` are unordered with + * respect to each other. + * + * Although the effects are unordered, we ensure the order of results + * matches the order of the input sequence. Also see doctodo parTraverseUnordered + * for the more efficient alternative. + * + * It's a generalized version of [[parSequence]]. + * + * $parallelismAdvice + * + * $parallelismNote + * + * @see doctodo parTraverseN for a version that limits parallelism. + */ + def parTraverse[E, A, B, M[X] <: Iterable[X]](in: M[A])(f: A => BIO[E, B])( + implicit bf: BuildFrom[M[A], B, M[B]] + ): BIO[E, M[B]] = + UIO.eval(in.map(f)).flatMap(col => TaskParSequence[E, B, M](col, () => newBuilder(bf, in))) /** Executes the given sequence of tasks in parallel, non-deterministically * gathering their results, returning a task that will signal the sequence @@ -3717,45 +3747,17 @@ object BIO extends TaskInstancesLevel0 { * ) * * // Yields 2, 4, 6, 8 after around 6 seconds - * Task.gatherN(2)(tasks) + * Task.parSequenceN(2)(tasks) * }}} * * $parallelismAdvice * * $parallelismNote * - * @see [[gather]] for a version that does not limit parallelism. + * @see [[parSequence]] for a version that does not limit parallelism. */ - def gatherN[E, A](parallelism: Int)(in: Iterable[BIO[E, A]]): BIO[E, List[A]] = - TaskGatherN[E, A](parallelism, in) - - /** Given a `Iterable[A]` and a function `A => BIO[E, B]`, - * nondeterministically apply the function to each element of the collection - * and return a task that will signal a collection of the results once all - * tasks are finished. - * - * This function is the nondeterministic analogue of `traverse` and should - * behave identically to `traverse` so long as there is no interaction between - * the effects being gathered. However, unlike `traverse`, which decides on - * a total order of effects, the effects in a `wander` are unordered with - * respect to each other. - * - * Although the effects are unordered, we ensure the order of results - * matches the order of the input sequence. Also see doctodo wanderUnordered - * for the more efficient alternative. - * - * It's a generalized version of [[gather]]. - * - * $parallelismAdvice - * - * $parallelismNote - * - * @see doctodo wanderN for a version that limits parallelism. - */ - def wander[E, A, B, M[X] <: Iterable[X]](in: M[A])(f: A => BIO[E, B])( - implicit bf: BuildFrom[M[A], B, M[B]] - ): BIO[E, M[B]] = - UIO.eval(in.map(f)).flatMap(col => TaskGather[E, B, M](col, () => newBuilder(bf, in))) + def parSequenceN[E, A](parallelism: Int)(in: Iterable[BIO[E, A]]): BIO[E, List[A]] = + TaskParSequenceN[E, A](parallelism, in) /** Applies the provided function in a non-deterministic way to each element * of the input collection. The result will be signalled once all tasks @@ -3782,27 +3784,27 @@ object BIO extends TaskInstancesLevel0 { * val numbers = List(1, 2, 3, 4) * * // Yields 2, 4, 6, 8 after around 6 seconds - * BIO.wanderN(2)(numbers)(n => BIO(n + n).delayExecution(n.second)) + * BIO.parTraverseN(2)(numbers)(n => BIO(n + n).delayExecution(n.second)) * }}} * * $parallelismAdvice * * $parallelismNote * - * @see [[wander]] for a version that does not limit parallelism. + * @see [[parTraverse]] for a version that does not limit parallelism. */ - def wanderN[E, A, B](parallelism: Int)(in: Iterable[A])(f: A => BIO[E, B]): BIO[E, List[B]] = - deferTotal(TaskGatherN(parallelism, in.map(f))) + def parTraverseN[E, A, B](parallelism: Int)(in: Iterable[A])(f: A => BIO[E, B]): BIO[E, List[B]] = + deferTotal(TaskParSequenceN(parallelism, in.map(f))) /** Processes the given collection of tasks in parallel and * nondeterministically gather the results without keeping the original * ordering of the given tasks. * - * This function is similar to [[gather]], but neither the effects nor the + * This function is similar to [[parSequence]], but neither the effects nor the * results will be ordered. Useful when you don't need ordering because: * * - it has non-blocking behavior (but not wait-free) - * - it can be more efficient (compared with [[gather]]), but not + * - it can be more efficient (compared with [[parSequence]]), but not * necessarily (if you care about performance, then test) * * Example: @@ -3810,7 +3812,7 @@ object BIO extends TaskInstancesLevel0 { * val tasks = List(Task(1 + 1), Task(2 + 2), Task(3 + 3)) * * // Yields 2, 4, 6 (but order is NOT guaranteed) - * Task.gatherUnordered(tasks) + * Task.parSequenceUnordered(tasks) * }}} * * $parallelismAdvice @@ -3819,28 +3821,28 @@ object BIO extends TaskInstancesLevel0 { * * @param in is a list of tasks to execute */ - def gatherUnordered[E, A](in: Iterable[BIO[E, A]]): BIO[E, List[A]] = - TaskGatherUnordered(in) + def parSequenceUnordered[E, A](in: Iterable[BIO[E, A]]): BIO[E, List[A]] = + TaskParSequenceUnordered(in) /** Given a `Iterable[A]` and a function `A => BIO[E, B]`, * nondeterministically apply the function to each element of the collection * without keeping the original ordering of the results. * - * This function is similar to [[wander]], but neither the effects nor the + * This function is similar to [[parTraverse]], but neither the effects nor the * results will be ordered. Useful when you don't need ordering because: * * - it has non-blocking behavior (but not wait-free) - * - it can be more efficient (compared with [[wander]]), but not + * - it can be more efficient (compared with [[parTraverse]]), but not * necessarily (if you care about performance, then test) * - * It's a generalized version of [[gatherUnordered]]. + * It's a generalized version of [[parSequenceUnordered]]. * * $parallelismAdvice * * $parallelismNote */ - def wanderUnordered[E, A, B](in: Iterable[A])(f: A => BIO[E, B]): BIO[E, List[B]] = - BIO.evalTotal(in.map(f)).flatMap(gatherUnordered) + def parTraverseUnordered[E, A, B](in: Iterable[A])(f: A => BIO[E, B]): BIO[E, List[B]] = + BIO.evalTotal(in.map(f)).flatMap(parSequenceUnordered) /** Yields a task that on evaluation will process the given tasks * in parallel, then apply the given mapping function on their results. @@ -4949,7 +4951,7 @@ private[bio] abstract class TaskTimers extends TaskClocks { } } -private[bio] abstract class TaskClocks { +private[bio] abstract class TaskClocks extends BIODeprecated.Companion { /** * Default, pure, globally visible `cats.effect.Clock` diff --git a/core/shared/src/main/scala/monix/bio/Task.scala b/core/shared/src/main/scala/monix/bio/Task.scala index 74114f2f..6ca43887 100644 --- a/core/shared/src/main/scala/monix/bio/Task.scala +++ b/core/shared/src/main/scala/monix/bio/Task.scala @@ -20,7 +20,7 @@ package monix.bio import cats.effect.{CancelToken, ConcurrentEffect, Effect} import cats.~> import monix.bio.BIO.AsyncBuilder -import monix.bio.internal.{TaskCreate, TaskFromFuture} +import monix.bio.internal.{TaskCreate, TaskDeprecated, TaskFromFuture} import monix.catnap.FutureLift import monix.execution.compat.BuildFrom import monix.execution.{CancelablePromise, Scheduler} @@ -30,7 +30,7 @@ import scala.concurrent.duration.FiniteDuration import scala.concurrent.{ExecutionContext, Future} import scala.util.Try -object Task { +object Task extends TaskDeprecated.Companion { /** * @see See [[monix.bio.BIO.apply]] @@ -287,40 +287,42 @@ object Task { BIO.traverse(in)(f) /** - * @see See [[monix.bio.BIO.gather]] + * @see See [[monix.bio.BIO.parSequence]] */ - def gather[A, M[X] <: Iterable[X]](in: M[Task[A]])(implicit bf: BuildFrom[M[Task[A]], A, M[A]]): Task[M[A]] = - BIO.gather(in) + def parSequence[A, M[X] <: Iterable[X]](in: M[Task[A]])(implicit bf: BuildFrom[M[Task[A]], A, M[A]]): Task[M[A]] = + BIO.parSequence(in) /** - * @see See [[monix.bio.BIO.gatherN]] + * @see [[monix.bio.BIO.parTraverse]] */ - def gatherN[A](parallelism: Int)(in: Iterable[Task[A]]): Task[List[A]] = - BIO.gatherN(parallelism)(in) + def parTraverse[A, B, M[X] <: Iterable[X]]( + in: M[A] + )(f: A => Task[B])(implicit bf: BuildFrom[M[A], B, M[B]]): Task[M[B]] = + BIO.parTraverse(in)(f) /** - * @see [[monix.bio.BIO.wander]] + * @see See [[monix.bio.BIO.parSequenceN]] */ - def wander[A, B, M[X] <: Iterable[X]](in: M[A])(f: A => Task[B])(implicit bf: BuildFrom[M[A], B, M[B]]): Task[M[B]] = - BIO.wander(in)(f) + def parSequenceN[A](parallelism: Int)(in: Iterable[Task[A]]): Task[List[A]] = + BIO.parSequenceN(parallelism)(in) /** - * @see See [[monix.bio.BIO.wanderN]] + * @see See [[monix.bio.BIO.parTraverseN]] */ - def wanderN[A, B](parallelism: Int)(in: Iterable[A])(f: A => Task[B]): Task[List[B]] = - BIO.wanderN(parallelism)(in)(f) + def parTraverseN[A, B](parallelism: Int)(in: Iterable[A])(f: A => Task[B]): Task[List[B]] = + BIO.parTraverseN(parallelism)(in)(f) /** - * @see See [[monix.bio.BIO.gatherUnordered]] + * @see See [[monix.bio.BIO.parSequenceUnordered]] */ - def gatherUnordered[A](in: Iterable[Task[A]]): Task[List[A]] = - BIO.gatherUnordered(in) + def parSequenceUnordered[A](in: Iterable[Task[A]]): Task[List[A]] = + BIO.parSequenceUnordered(in) /** - * @see [[monix.bio.BIO.wanderUnordered]] + * @see [[monix.bio.BIO.parTraverseUnordered]] */ - def wanderUnordered[A, B](in: Iterable[A])(f: A => Task[B]): Task[List[B]] = - BIO.wanderUnordered(in)(f) + def parTraverseUnordered[A, B](in: Iterable[A])(f: A => Task[B]): Task[List[B]] = + BIO.parTraverseUnordered(in)(f) /** * @see See [[monix.bio.BIO.mapBoth]] diff --git a/core/shared/src/main/scala/monix/bio/UIO.scala b/core/shared/src/main/scala/monix/bio/UIO.scala index d8bd4d05..9fb677ef 100644 --- a/core/shared/src/main/scala/monix/bio/UIO.scala +++ b/core/shared/src/main/scala/monix/bio/UIO.scala @@ -25,7 +25,7 @@ import monix.execution.{CancelablePromise, Scheduler} import scala.concurrent.ExecutionContext import scala.concurrent.duration.FiniteDuration -object UIO { +object UIO extends UIODeprecated.Companion { /** * @see See [[monix.bio.BIO.apply]] @@ -193,40 +193,42 @@ object UIO { TaskSequence.traverse(in, f)(bf) /** - * @see See [[monix.bio.BIO.gather]] + * @see See [[monix.bio.BIO.parSequence]] */ - def gather[A, M[X] <: Iterable[X]](in: M[UIO[A]])(implicit bf: BuildFrom[M[UIO[A]], A, M[A]]): UIO[M[A]] = - TaskGather[Nothing, A, M](in, () => newBuilder(bf, in)) + def parSequence[A, M[X] <: Iterable[X]](in: M[UIO[A]])(implicit bf: BuildFrom[M[UIO[A]], A, M[A]]): UIO[M[A]] = + TaskParSequence[Nothing, A, M](in, () => newBuilder(bf, in)) /** - * @see See [[monix.bio.BIO.gatherN]] + * @see [[monix.bio.BIO.parTraverse]] */ - def gatherN[A](parallelism: Int)(in: Iterable[UIO[A]]): UIO[List[A]] = - TaskGatherN[Nothing, A](parallelism, in) + def parTraverse[A, B, M[X] <: Iterable[X]]( + in: M[A] + )(f: A => UIO[B])(implicit bf: BuildFrom[M[A], B, M[B]]): UIO[M[B]] = + BIO.parTraverse(in)(f) /** - * @see [[monix.bio.BIO.wander]] + * @see See [[monix.bio.BIO.parSequenceN]] */ - def wander[A, B, M[X] <: Iterable[X]](in: M[A])(f: A => UIO[B])(implicit bf: BuildFrom[M[A], B, M[B]]): UIO[M[B]] = - BIO.wander(in)(f) + def parSequenceN[A](parallelism: Int)(in: Iterable[UIO[A]]): UIO[List[A]] = + TaskParSequenceN[Nothing, A](parallelism, in) /** - * @see See [[monix.bio.BIO.wanderN]] + * @see See [[monix.bio.BIO.parTraverseN]] */ - def wanderN[A, B](parallelism: Int)(in: Iterable[A])(f: A => UIO[B]): UIO[List[B]] = - BIO.wanderN(parallelism)(in)(f) + def parTraverseN[A, B](parallelism: Int)(in: Iterable[A])(f: A => UIO[B]): UIO[List[B]] = + BIO.parTraverseN(parallelism)(in)(f) /** - * @see See [[monix.bio.BIO.gatherUnordered]] + * @see See [[monix.bio.BIO.parSequenceUnordered]] */ - def gatherUnordered[A](in: Iterable[UIO[A]]): UIO[List[A]] = - TaskGatherUnordered[Nothing, A](in) + def parSequenceUnordered[A](in: Iterable[UIO[A]]): UIO[List[A]] = + TaskParSequenceUnordered[Nothing, A](in) /** - * @see [[monix.bio.BIO.wanderUnordered]] + * @see [[monix.bio.BIO.parTraverseUnordered]] */ - def wanderUnordered[A, B](in: Iterable[A])(f: A => UIO[B]): UIO[List[B]] = - BIO.wanderUnordered(in)(f) + def parTraverseUnordered[A, B](in: Iterable[A])(f: A => UIO[B]): UIO[List[B]] = + BIO.parTraverseUnordered(in)(f) /** * @see See [[monix.bio.BIO.mapBoth]] diff --git a/core/shared/src/main/scala/monix/bio/internal/BIODeprecated.scala b/core/shared/src/main/scala/monix/bio/internal/BIODeprecated.scala new file mode 100644 index 00000000..43a1e897 --- /dev/null +++ b/core/shared/src/main/scala/monix/bio/internal/BIODeprecated.scala @@ -0,0 +1,82 @@ +/* + * Copyright (c) 2019-2020 by The Monix Project Developers. + * See the project homepage at: https://monix.io + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package monix.bio.internal + +import monix.bio.BIO +import monix.execution.compat.BuildFrom + +private[bio] object BIODeprecated { + + /** + * Extension methods describing deprecated `BIO` operations. + */ + private[bio] abstract class Companion { + + /** DEPRECATED — renamed to [[BIO.parSequence]]. */ + @deprecated("Use parSequence", "0.1.0") + def gather[E, A, M[X] <: Iterable[X]]( + in: M[BIO[E, A]] + )(implicit bf: BuildFrom[M[BIO[E, A]], A, M[A]]): BIO[E, M[A]] = { + // $COVERAGE-OFF$ + BIO.parSequence(in) + // $COVERAGE-ON$ + } + + /** DEPRECATED — renamed to [[BIO.parSequenceN]] */ + @deprecated("Use parSequenceN", "0.1.0") + def gatherN[E, A](parallelism: Int)(in: Iterable[BIO[E, A]]): BIO[E, List[A]] = { + // $COVERAGE-OFF$ + BIO.parSequenceN(parallelism)(in) + // $COVERAGE-ON$ + } + + /** DEPRECATED — renamed to [[BIO.parSequenceUnordered]] */ + @deprecated("Use parSequenceUnordered", "0.1.0") + def gatherUnordered[E, A](in: Iterable[BIO[E, A]]): BIO[E, List[A]] = { + // $COVERAGE-OFF$ + BIO.parSequenceUnordered(in) + // $COVERAGE-ON$ + } + + /** DEPRECATED — renamed to [[BIO.parTraverse]] */ + @deprecated("Use parTraverse", "0.1.0") + def wander[E, A, B, M[X] <: Iterable[X]]( + in: M[A] + )(f: A => BIO[E, B])(implicit bf: BuildFrom[M[A], B, M[B]]): BIO[E, M[B]] = { + // $COVERAGE-OFF$ + BIO.parTraverse(in)(f) + // $COVERAGE-ON$ + } + + /** DEPRECATED — renamed to [[BIO.parTraverseN]] */ + @deprecated("Use parTraverseN", "0.1.0") + def wanderN[E, A, B](parallelism: Int)(in: Iterable[A])(f: A => BIO[E, B]): BIO[E, List[B]] = { + // $COVERAGE-OFF$ + BIO.parTraverseN(parallelism)(in)(f) + // $COVERAGE-ON$ + } + + /** DEPRECATED — renamed to [[BIO.parTraverseUnordered]] */ + @deprecated("Use parTraverseUnordered", "3.2.0") + def wanderUnordered[E, A, B, M[X] <: Iterable[X]](in: M[A])(f: A => BIO[E, B]): BIO[E, List[B]] = { + // $COVERAGE-OFF$ + BIO.parTraverseUnordered(in)(f) + // $COVERAGE-ON$ + } + } +} diff --git a/core/shared/src/main/scala/monix/bio/internal/TaskDeprecated.scala b/core/shared/src/main/scala/monix/bio/internal/TaskDeprecated.scala new file mode 100644 index 00000000..887b116c --- /dev/null +++ b/core/shared/src/main/scala/monix/bio/internal/TaskDeprecated.scala @@ -0,0 +1,81 @@ +/* + * Copyright (c) 2019-2020 by The Monix Project Developers. + * See the project homepage at: https://monix.io + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package monix.bio.internal + +import monix.bio.Task +import monix.execution.compat.BuildFrom + +private[bio] object TaskDeprecated { + + /** + * Extension methods describing deprecated `Task` operations. + */ + private[bio] abstract class Companion { + + /** DEPRECATED — renamed to [[Task.parSequence]]. */ + @deprecated("Use parSequence", "0.1.0") + def gather[A, M[X] <: Iterable[X]](in: M[Task[A]])(implicit bf: BuildFrom[M[Task[A]], A, M[A]]): Task[M[A]] = { + // $COVERAGE-OFF$ + Task.parSequence(in) + // $COVERAGE-ON$ + } + + /** DEPRECATED — renamed to [[Task.parSequenceN]] */ + @deprecated("Use parSequenceN", "0.1.0") + def gatherN[A](parallelism: Int)(in: Iterable[Task[A]]): Task[List[A]] = { + // $COVERAGE-OFF$ + Task.parSequenceN(parallelism)(in) + // $COVERAGE-ON$ + } + + /** DEPRECATED — renamed to [[Task.parSequenceUnordered]] */ + @deprecated("Use parSequenceUnordered", "0.1.0") + def gatherUnordered[A](in: Iterable[Task[A]]): Task[List[A]] = { + // $COVERAGE-OFF$ + Task.parSequenceUnordered(in) + // $COVERAGE-ON$ + } + + /** DEPRECATED — renamed to [[Task.parTraverse]] */ + @deprecated("Use parTraverse", "0.1.0") + def wander[A, B, M[X] <: Iterable[X]]( + in: M[A] + )(f: A => Task[B])(implicit bf: BuildFrom[M[A], B, M[B]]): Task[M[B]] = { + // $COVERAGE-OFF$ + Task.parTraverse(in)(f) + // $COVERAGE-ON$ + } + + /** DEPRECATED — renamed to [[Task.parTraverseN]] */ + @deprecated("Use parTraverseN", "0.1.0") + def wanderN[A, B](parallelism: Int)(in: Iterable[A])(f: A => Task[B]): Task[List[B]] = { + // $COVERAGE-OFF$ + Task.parTraverseN(parallelism)(in)(f) + // $COVERAGE-ON$ + } + + /** DEPRECATED — renamed to [[BIO.parTraverseUnordered]] */ + @deprecated("Use parTraverseUnordered", "3.2.0") + def wanderUnordered[A, B, M[X] <: Iterable[X]](in: M[A])(f: A => Task[B]): Task[List[B]] = { + // $COVERAGE-OFF$ + Task.parTraverseUnordered(in)(f) + // $COVERAGE-ON$ + } + } + +} diff --git a/core/shared/src/main/scala/monix/bio/internal/TaskGather.scala b/core/shared/src/main/scala/monix/bio/internal/TaskParSequence.scala similarity index 98% rename from core/shared/src/main/scala/monix/bio/internal/TaskGather.scala rename to core/shared/src/main/scala/monix/bio/internal/TaskParSequence.scala index ebd44ccc..3162334c 100644 --- a/core/shared/src/main/scala/monix/bio/internal/TaskGather.scala +++ b/core/shared/src/main/scala/monix/bio/internal/TaskParSequence.scala @@ -27,10 +27,10 @@ import scala.collection.mutable import scala.collection.mutable.ListBuffer import scala.util.control.NonFatal -private[bio] object TaskGather { +private[bio] object TaskParSequence { /** - * Implementation for `Task.gather` + * Implementation for `Task.parSequence` */ def apply[E, A, M[X] <: Iterable[X]]( in: Iterable[BIO[E, A]], diff --git a/core/shared/src/main/scala/monix/bio/internal/TaskGatherN.scala b/core/shared/src/main/scala/monix/bio/internal/TaskParSequenceN.scala similarity index 96% rename from core/shared/src/main/scala/monix/bio/internal/TaskGatherN.scala rename to core/shared/src/main/scala/monix/bio/internal/TaskParSequenceN.scala index 3cb059f0..61f9983a 100644 --- a/core/shared/src/main/scala/monix/bio/internal/TaskGatherN.scala +++ b/core/shared/src/main/scala/monix/bio/internal/TaskParSequenceN.scala @@ -24,7 +24,7 @@ import monix.catnap.ConcurrentQueue import monix.execution.exceptions.UncaughtErrorException import monix.execution.{BufferCapacity, ChannelType} -private[bio] object TaskGatherN { +private[bio] object TaskParSequenceN { def apply[E, A]( parallelism: Int, @@ -44,7 +44,7 @@ private[bio] object TaskGatherN { .hideErrors pairs <- BIO.traverse(in.toList)(task => Deferred[Task, A].map(p => (p, task)).hideErrors) _ <- queue.offerMany(pairs).hideErrors - workers = UIO.gather(List.fill(parallelism.min(itemSize)) { + workers = UIO.parSequence(List.fill(parallelism.min(itemSize)) { queue.poll.hideErrors.flatMap { case (p, task) => task.redeemCauseWith( diff --git a/core/shared/src/main/scala/monix/bio/internal/TaskGatherUnordered.scala b/core/shared/src/main/scala/monix/bio/internal/TaskParSequenceUnordered.scala similarity index 99% rename from core/shared/src/main/scala/monix/bio/internal/TaskGatherUnordered.scala rename to core/shared/src/main/scala/monix/bio/internal/TaskParSequenceUnordered.scala index 382b1c18..c1d90ecc 100644 --- a/core/shared/src/main/scala/monix/bio/internal/TaskGatherUnordered.scala +++ b/core/shared/src/main/scala/monix/bio/internal/TaskParSequenceUnordered.scala @@ -30,7 +30,7 @@ import scala.annotation.tailrec import scala.collection.mutable.ListBuffer import scala.util.control.NonFatal -private[bio] object TaskGatherUnordered { +private[bio] object TaskParSequenceUnordered { /** * Implementation for `Task.gatherUnordered` diff --git a/core/shared/src/main/scala/monix/bio/internal/UIODeprecated.scala b/core/shared/src/main/scala/monix/bio/internal/UIODeprecated.scala new file mode 100644 index 00000000..b3de69ca --- /dev/null +++ b/core/shared/src/main/scala/monix/bio/internal/UIODeprecated.scala @@ -0,0 +1,80 @@ +/* + * Copyright (c) 2019-2020 by The Monix Project Developers. + * See the project homepage at: https://monix.io + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package monix.bio.internal + +import monix.bio.{BIO, UIO} +import monix.execution.compat.BuildFrom + +private[bio] object UIODeprecated { + + /** + * Extension methods describing deprecated `UIO` operations. + */ + private[bio] abstract class Companion { + + /** DEPRECATED — renamed to [[UIO.parSequence]]. */ + @deprecated("Use parSequence", "0.1.0") + def gather[A, M[X] <: Iterable[X]](in: M[UIO[A]])(implicit bf: BuildFrom[M[UIO[A]], A, M[A]]): UIO[M[A]] = { + // $COVERAGE-OFF$ + UIO.parSequence(in) + // $COVERAGE-ON$ + } + + /** DEPRECATED — renamed to [[UIO.parSequenceN]] */ + @deprecated("Use parSequenceN", "0.1.0") + def gatherN[A](parallelism: Int)(in: Iterable[UIO[A]]): UIO[List[A]] = { + // $COVERAGE-OFF$ + UIO.parSequenceN(parallelism)(in) + // $COVERAGE-ON$ + } + + /** DEPRECATED — renamed to [[UIO.parSequenceUnordered]] */ + @deprecated("Use parSequenceUnordered", "0.1.0") + def gatherUnordered[A](in: Iterable[UIO[A]]): UIO[List[A]] = { + // $COVERAGE-OFF$ + UIO.parSequenceUnordered(in) + // $COVERAGE-ON$ + } + + /** DEPRECATED — renamed to [[UIO.parTraverse]] */ + @deprecated("Use parTraverse", "0.1.0") + def wander[A, B, M[X] <: Iterable[X]]( + in: M[A] + )(f: A => UIO[B])(implicit bf: BuildFrom[M[A], B, M[B]]): UIO[M[B]] = { + // $COVERAGE-OFF$ + UIO.parTraverse(in)(f) + // $COVERAGE-ON$ + } + + /** DEPRECATED — renamed to [[UIO.parTraverseN]] */ + @deprecated("Use parTraverseN", "0.1.0") + def wanderN[A, B](parallelism: Int)(in: Iterable[A])(f: A => UIO[B]): UIO[List[B]] = { + // $COVERAGE-OFF$ + BIO.parTraverseN(parallelism)(in)(f) + // $COVERAGE-ON$ + } + + /** DEPRECATED — renamed to [[BIO.parTraverseUnordered]] */ + @deprecated("Use parTraverseUnordered", "3.2.0") + def wanderUnordered[A, B, M[X] <: Iterable[X]](in: M[A])(f: A => UIO[B]): UIO[List[B]] = { + // $COVERAGE-OFF$ + UIO.parTraverseUnordered(in)(f) + // $COVERAGE-ON$ + } + } +} diff --git a/core/shared/src/test/scala/monix/bio/TaskGatherNSuite.scala b/core/shared/src/test/scala/monix/bio/TaskParSequenceNSuite.scala similarity index 74% rename from core/shared/src/test/scala/monix/bio/TaskGatherNSuite.scala rename to core/shared/src/test/scala/monix/bio/TaskParSequenceNSuite.scala index ca203051..afed15fc 100644 --- a/core/shared/src/test/scala/monix/bio/TaskGatherNSuite.scala +++ b/core/shared/src/test/scala/monix/bio/TaskParSequenceNSuite.scala @@ -25,14 +25,14 @@ import monix.execution.internal.Platform import scala.concurrent.duration._ import scala.util.{Failure, Success} -object TaskGatherNSuite extends BaseTestSuite { +object TaskParSequenceNSuite extends BaseTestSuite { - test("BIO.gatherN should execute in parallel bounded by parallelism") { implicit s => + test("BIO.parSequenceN should execute in parallel bounded by parallelism") { implicit s => val num = AtomicInt(0) val task = UIO.evalAsync(num.increment()) >> BIO.sleep(2.seconds) val seq = List.fill(100)(task) - BIO.gatherN(5)(seq).runToFuture + BIO.parSequenceN(5)(seq).runToFuture s.tick() assertEquals(num.get(), 5) @@ -44,38 +44,38 @@ object TaskGatherNSuite extends BaseTestSuite { assertEquals(num.get(), 100) } - test("BIO.gatherN should return result in order") { implicit s => + test("BIO.parSequenceN should return result in order") { implicit s => val task = 1.until(10).toList.map(UIO.eval(_)) - val res = BIO.gatherN(2)(task).runToFuture + val res = BIO.parSequenceN(2)(task).runToFuture s.tick() assertEquals(res.value, Some(Success(List(1, 2, 3, 4, 5, 6, 7, 8, 9)))) } - test("BIO.gatherN should return empty list") { implicit s => - val res = BIO.gatherN(2)(List.empty).runToFuture + test("BIO.parSequenceN should return empty list") { implicit s => + val res = BIO.parSequenceN(2)(List.empty).runToFuture s.tick() assertEquals(res.value, Some(Success(List.empty))) } - test("BIO.gatherN should handle single item") { implicit s => + test("BIO.parSequenceN should handle single item") { implicit s => val task = List(Task.eval(1)) - val res = BIO.gatherN(2)(task).runToFuture + val res = BIO.parSequenceN(2)(task).runToFuture s.tick() assertEquals(res.value, Some(Success(List(1)))) } - test("BIO.gatherN should handle parallelism bigger than list") { implicit s => + test("BIO.parSequenceN should handle parallelism bigger than list") { implicit s => val task = 1.until(5).toList.map(UIO.eval(_)) - val res = BIO.gatherN(10)(task).runToFuture + val res = BIO.parSequenceN(10)(task).runToFuture s.tick() assertEquals(res.value, Some(Success(List(1, 2, 3, 4)))) } - test("BIO.gatherN should onError if one of the tasks terminates in error") { implicit s => + test("BIO.parSequenceN should onError if one of the tasks terminates in error") { implicit s => val ex = "dummy" val seq = Seq( BIO.evalAsync(3).delayExecution(2.seconds), @@ -84,7 +84,7 @@ object TaskGatherNSuite extends BaseTestSuite { BIO.evalAsync(3).delayExecution(1.seconds) ) - val f = BIO.gatherN(2)(seq).attempt.runToFuture + val f = BIO.parSequenceN(2)(seq).attempt.runToFuture s.tick() assertEquals(f.value, None) @@ -94,7 +94,7 @@ object TaskGatherNSuite extends BaseTestSuite { assertEquals(f.value, Some(Success(Left(ex)))) } - test("BIO.gatherN should onTerminate if one of the tasks terminates in a fatal error") { implicit s => + test("BIO.parSequenceN should onTerminate if one of the tasks terminates in a fatal error") { implicit s => val ex = DummyException("dummy") val seq = Seq( UIO.evalAsync(3).delayExecution(2.seconds), @@ -103,7 +103,7 @@ object TaskGatherNSuite extends BaseTestSuite { UIO.evalAsync(3).delayExecution(1.seconds) ) - val f = BIO.gatherN(2)(seq).runToFuture + val f = BIO.parSequenceN(2)(seq).runToFuture s.tick() assertEquals(f.value, None) @@ -113,13 +113,13 @@ object TaskGatherNSuite extends BaseTestSuite { assertEquals(f.value, Some(Failure(ex))) } - test("BIO.gatherN should be canceled") { implicit s => + test("BIO.parSequenceN should be canceled") { implicit s => val num = AtomicInt(0) val seq = Seq( BIO.unit.delayExecution(3.seconds).doOnCancel(UIO.eval(num.increment())), UIO.evalAsync(num.increment(10)) ) - val f = BIO.gatherN(1)(seq).runToFuture + val f = BIO.parSequenceN(1)(seq).runToFuture s.tick(2.seconds) f.cancel() @@ -129,22 +129,22 @@ object TaskGatherNSuite extends BaseTestSuite { assertEquals(num.get(), 1) } - test("BIO.gatherN should be stack safe for synchronous tasks") { implicit s => + test("BIO.parSequenceN should be stack safe for synchronous tasks") { implicit s => val count = if (Platform.isJVM) 200000 else 5000 val tasks = for (_ <- 0 until count) yield Task.now(1) - val composite = BIO.gatherN(count)(tasks).map(_.sum) + val composite = BIO.parSequenceN(count)(tasks).map(_.sum) val result = composite.runToFuture s.tick() assertEquals(result.value, Some(Success(count))) } - test("BIO.gatherN runAsync multiple times") { implicit s => + test("BIO.parSequenceN runAsync multiple times") { implicit s => var effect = 0 val task1 = UIO.evalAsync { effect += 1; 3 }.memoize val task2 = task1 map { x => effect += 1; x + 1 } - val task3 = BIO.gatherN(2)(List(task2, task2, task2)) + val task3 = BIO.parSequenceN(2)(List(task2, task2, task2)) val result1 = task3.runToFuture; s.tick() assertEquals(result1.value, Some(Success(List(4, 4, 4)))) @@ -155,7 +155,7 @@ object TaskGatherNSuite extends BaseTestSuite { assertEquals(effect, 1 + 3 + 3) } - test("BIO.gatherN should log errors if multiple errors happen") { implicit s => + test("BIO.parSequenceN should log errors if multiple errors happen") { implicit s => implicit val opts = BIO.defaultOptions.disableAutoCancelableRunLoops val ex = "dummy1" @@ -181,8 +181,8 @@ object TaskGatherNSuite extends BaseTestSuite { } .uncancelable - val gather = BIO.gatherN(4)(Seq(task1, task2)) - val result = gather.attempt.runToFutureOpt + val sequence = BIO.parSequenceN(4)(Seq(task1, task2)) + val result = sequence.attempt.runToFutureOpt s.tick() assertEquals(result.value, Some(Success(Left(ex)))) @@ -190,7 +190,7 @@ object TaskGatherNSuite extends BaseTestSuite { assertEquals(errorsThrow, 2) } - test("BIO.gatherN should log terminal errors if multiple errors happen") { implicit s => + test("BIO.parSequenceN should log terminal errors if multiple errors happen") { implicit s => implicit val opts = BIO.defaultOptions.disableAutoCancelableRunLoops val ex = DummyException("dummy1") @@ -216,8 +216,8 @@ object TaskGatherNSuite extends BaseTestSuite { } .uncancelable - val gather = BIO.gatherN(4)(Seq(task1, task2)) - val result = gather.attempt.runToFutureOpt + val sequence = BIO.parSequenceN(4)(Seq(task1, task2)) + val result = sequence.attempt.runToFutureOpt s.tick() assertEquals(result.value, Some(Failure(ex))) diff --git a/core/shared/src/test/scala/monix/bio/TaskGatherSuite.scala b/core/shared/src/test/scala/monix/bio/TaskParSequenceSuite.scala similarity index 78% rename from core/shared/src/test/scala/monix/bio/TaskGatherSuite.scala rename to core/shared/src/test/scala/monix/bio/TaskParSequenceSuite.scala index aa1e9e80..ea9fe113 100644 --- a/core/shared/src/test/scala/monix/bio/TaskGatherSuite.scala +++ b/core/shared/src/test/scala/monix/bio/TaskParSequenceSuite.scala @@ -24,14 +24,14 @@ import monix.execution.internal.Platform import scala.concurrent.duration._ import scala.util.{Failure, Success} -object TaskGatherSuite extends BaseTestSuite { - test("BIO.gather should execute in parallel for async tasks") { implicit s => +object TaskParSequenceSuite extends BaseTestSuite { + test("BIO.parSequence should execute in parallel for async tasks") { implicit s => val seq = Seq( BIO.evalAsync(1).delayExecution(2.seconds), BIO.evalAsync(2).delayExecution(1.second), BIO.evalAsync(3).delayExecution(3.seconds) ) - val f = BIO.gather(seq).runToFuture + val f = BIO.parSequence(seq).runToFuture s.tick() assertEquals(f.value, None) @@ -41,7 +41,7 @@ object TaskGatherSuite extends BaseTestSuite { assertEquals(f.value, Some(Success(Seq(1, 2, 3)))) } - test("BIO.gather should onError if one of the tasks terminates in error") { implicit s => + test("BIO.parSequence should onError if one of the tasks terminates in error") { implicit s => val ex = "dummy" val seq = Seq( UIO.evalAsync(3).delayExecution(3.seconds), @@ -50,7 +50,7 @@ object TaskGatherSuite extends BaseTestSuite { UIO.evalAsync(3).delayExecution(1.seconds) ) - val f = BIO.gather(seq).attempt.runToFuture + val f = BIO.parSequence(seq).attempt.runToFuture s.tick() assertEquals(f.value, None) @@ -58,7 +58,7 @@ object TaskGatherSuite extends BaseTestSuite { assertEquals(f.value, Some(Success(Left(ex)))) } - test("BIO.gather should onTerminate if one of the tasks terminates in a fatal error") { implicit s => + test("BIO.parSequence should onTerminate if one of the tasks terminates in a fatal error") { implicit s => val ex = DummyException("dummy") val seq: Seq[BIO[String, Int]] = Seq( UIO.evalAsync(3).delayExecution(3.seconds), @@ -67,7 +67,7 @@ object TaskGatherSuite extends BaseTestSuite { UIO.evalAsync(3).delayExecution(1.seconds) ) - val f = BIO.gather(seq).attempt.runToFuture + val f = BIO.parSequence(seq).attempt.runToFuture s.tick() assertEquals(f.value, None) @@ -75,13 +75,13 @@ object TaskGatherSuite extends BaseTestSuite { assertEquals(f.value, Some(Failure(ex))) } - test("BIO.gather should be canceled") { implicit s => + test("BIO.parSequence should be canceled") { implicit s => val seq: Seq[BIO[Int, Int]] = Seq( UIO.evalAsync(1).delayExecution(2.seconds), UIO.evalAsync(2).delayExecution(1.second), UIO.evalAsync(3).delayExecution(3.seconds) ) - val f = BIO.gather(seq).attempt.runToFuture + val f = BIO.parSequence(seq).attempt.runToFuture s.tick() assertEquals(f.value, None) @@ -93,22 +93,22 @@ object TaskGatherSuite extends BaseTestSuite { assertEquals(f.value, None) } - test("BIO.gather should be stack safe for synchronous tasks") { implicit s => + test("BIO.parSequence should be stack safe for synchronous tasks") { implicit s => val count = if (Platform.isJVM) 200000 else 5000 val tasks = for (_ <- 0 until count) yield Task.now(1) - val composite = BIO.gather(tasks).map(_.sum) + val composite = BIO.parSequence(tasks).map(_.sum) val result = composite.runToFuture s.tick() assertEquals(result.value, Some(Success(count))) } - test("BIO.gather runAsync multiple times") { implicit s => + test("BIO.parSequence runAsync multiple times") { implicit s => var effect = 0 val task1 = UIO.evalAsync { effect += 1; 3 }.memoize val task2 = task1 map { x => effect += 1; x + 1 } - val task3 = UIO.gather(List(task2, task2, task2)) + val task3 = UIO.parSequence(List(task2, task2, task2)) val result1 = task3.runToFuture; s.tick() assertEquals(result1.value, Some(Success(List(4, 4, 4)))) @@ -119,7 +119,7 @@ object TaskGatherSuite extends BaseTestSuite { assertEquals(effect, 1 + 3 + 3) } - test("BIO.gather should log errors if multiple errors happen") { implicit s => + test("BIO.parSequence should log errors if multiple errors happen") { implicit s => implicit val opts = BIO.defaultOptions.disableAutoCancelableRunLoops val ex = "dummy1" @@ -145,8 +145,8 @@ object TaskGatherSuite extends BaseTestSuite { } .uncancelable - val gather = BIO.gather(Seq(task1, task2)) - val result = gather.attempt.runToFutureOpt + val parSequence = BIO.parSequence(Seq(task1, task2)) + val result = parSequence.attempt.runToFutureOpt s.tick() assertEquals(result.value, Some(Success(Left(ex)))) @@ -154,7 +154,7 @@ object TaskGatherSuite extends BaseTestSuite { assertEquals(errorsThrow, 2) } - test("BIO.gather should log terminal errors if multiple errors happen") { implicit s => + test("BIO.parSequence should log terminal errors if multiple errors happen") { implicit s => implicit val opts = BIO.defaultOptions.disableAutoCancelableRunLoops val ex = DummyException("dummy1") @@ -180,8 +180,8 @@ object TaskGatherSuite extends BaseTestSuite { } .uncancelable - val gather = BIO.gather(Seq(task1, task2)) - val result = gather.attempt.runToFutureOpt + val parSequence = BIO.parSequence(Seq(task1, task2)) + val result = parSequence.attempt.runToFutureOpt s.tick() assertEquals(result.value, Some(Failure(ex))) diff --git a/core/shared/src/test/scala/monix/bio/TaskGatherUnorderedSuite.scala b/core/shared/src/test/scala/monix/bio/TaskParSequenceUnorderedSuite.scala similarity index 79% rename from core/shared/src/test/scala/monix/bio/TaskGatherUnorderedSuite.scala rename to core/shared/src/test/scala/monix/bio/TaskParSequenceUnorderedSuite.scala index 7a7b9416..fdeda81a 100644 --- a/core/shared/src/test/scala/monix/bio/TaskGatherUnorderedSuite.scala +++ b/core/shared/src/test/scala/monix/bio/TaskParSequenceUnorderedSuite.scala @@ -26,14 +26,14 @@ import scala.collection.mutable.ListBuffer import scala.concurrent.duration._ import scala.util.{Failure, Success, Try} -object TaskGatherUnorderedSuite extends BaseTestSuite { - test("BIO.gatherUnordered should execute in parallel") { implicit s => +object TaskParSequenceUnorderedSuite extends BaseTestSuite { + test("BIO.parSequenceUnordered should execute in parallel") { implicit s => val seq = Seq( BIO.evalAsync(1).delayExecution(2.seconds), BIO.evalAsync(2).delayExecution(1.second), BIO.evalAsync(3).delayExecution(3.seconds) ) - val f = BIO.gatherUnordered(seq).runToFuture + val f = BIO.parSequenceUnordered(seq).runToFuture s.tick() assertEquals(f.value, None) @@ -43,7 +43,7 @@ object TaskGatherUnorderedSuite extends BaseTestSuite { assertEquals(f.value, Some(Success(List(3, 1, 2)))) } - test("BIO.gatherUnordered should onError if one of the tasks terminates in error") { implicit s => + test("BIO.parSequenceUnordered should onError if one of the tasks terminates in error") { implicit s => val ex = DummyException("dummy") val seq = Seq( BIO.evalAsync(3).delayExecution(3.seconds), @@ -52,7 +52,7 @@ object TaskGatherUnorderedSuite extends BaseTestSuite { BIO.evalAsync(3).delayExecution(1.seconds) ) - val f = BIO.gatherUnordered(seq).runToFuture + val f = BIO.parSequenceUnordered(seq).runToFuture s.tick() assertEquals(f.value, None) @@ -60,7 +60,7 @@ object TaskGatherUnorderedSuite extends BaseTestSuite { assertEquals(f.value, Some(Failure(ex))) } - test("BIO.gatherUnordered should onTerminate if one of the tasks terminates in a fatal error") { implicit s => + test("BIO.parSequenceUnordered should onTerminate if one of the tasks terminates in a fatal error") { implicit s => val ex = DummyException("dummy") val seq = Seq( UIO.evalAsync(3).delayExecution(3.seconds), @@ -69,7 +69,7 @@ object TaskGatherUnorderedSuite extends BaseTestSuite { UIO.evalAsync(3).delayExecution(1.seconds) ) - val f = UIO.gatherUnordered(seq).runToFuture + val f = UIO.parSequenceUnordered(seq).runToFuture s.tick() assertEquals(f.value, None) @@ -77,13 +77,13 @@ object TaskGatherUnorderedSuite extends BaseTestSuite { assertEquals(f.value, Some(Failure(ex))) } - test("BIO.gatherUnordered should be canceled") { implicit s => + test("BIO.parSequenceUnordered should be canceled") { implicit s => val seq = Seq( UIO.evalAsync(1).delayExecution(2.seconds), UIO.evalAsync(2).delayExecution(1.second), UIO.evalAsync(3).delayExecution(3.seconds) ) - val f = BIO.gatherUnordered(seq).runToFuture + val f = BIO.parSequenceUnordered(seq).runToFuture s.tick() assertEquals(f.value, None) @@ -95,28 +95,28 @@ object TaskGatherUnorderedSuite extends BaseTestSuite { assertEquals(f.value, None) } - test("BIO.gatherUnordered should run over an iterable") { implicit s => + test("BIO.parSequenceUnordered should run over an iterable") { implicit s => val count = 10 val seq = 0 until count val it = seq.map(x => BIO.eval(x + 1)) - val sum = BIO.gatherUnordered(it).map(_.sum) + val sum = BIO.parSequenceUnordered(it).map(_.sum) val result = sum.runToFuture; s.tick() assertEquals(result.value.get, Success((count + 1) * count / 2)) } - test("BIO.gatherUnordered should be stack-safe on handling many tasks") { implicit s => + test("BIO.parSequenceUnordered should be stack-safe on handling many tasks") { implicit s => val count = 10000 val tasks = (0 until count).map(x => BIO.eval(x)) - val sum = BIO.gatherUnordered(tasks).map(_.sum) + val sum = BIO.parSequenceUnordered(tasks).map(_.sum) val result = sum.runToFuture; s.tick() assertEquals(result.value.get, Success(count * (count - 1) / 2)) } - test("BIO.gatherUnordered should be stack safe on success") { implicit s => + test("BIO.parSequenceUnordered should be stack safe on success") { implicit s => def fold[A, B](ta: Task[ListBuffer[A]], tb: Task[A]): Task[ListBuffer[A]] = - Task.gatherUnordered(List(ta, tb)).map { + Task.parSequenceUnordered(List(ta, tb)).map { case a :: b :: Nil => val (accR, valueR) = if (a.isInstanceOf[ListBuffer[_]]) (a, b) else (b, a) val acc = accR.asInstanceOf[ListBuffer[A]] @@ -150,7 +150,7 @@ object TaskGatherUnorderedSuite extends BaseTestSuite { assertEquals(result, Some(Success(count * (count - 1) / 2))) } - test("BIO.gatherUnordered should log errors if multiple errors happen") { implicit s => + test("BIO.parSequenceUnordered should log errors if multiple errors happen") { implicit s => implicit val opts = BIO.defaultOptions.disableAutoCancelableRunLoops val ex = "dummy1" @@ -176,8 +176,8 @@ object TaskGatherUnorderedSuite extends BaseTestSuite { } .uncancelable - val gather = BIO.gatherUnordered(Seq(task1, task2)) - val result = gather.attempt.runToFutureOpt + val sequence = BIO.parSequenceUnordered(Seq(task1, task2)) + val result = sequence.attempt.runToFutureOpt s.tick() assertEquals(result.value, Some(Success(Left(ex)))) @@ -185,7 +185,7 @@ object TaskGatherUnorderedSuite extends BaseTestSuite { assertEquals(errorsThrow, 2) } - test("BIO.gatherUnordered should log terminal errors if multiple errors happen") { implicit s => + test("BIO.parSequenceUnordered should log terminal errors if multiple errors happen") { implicit s => implicit val opts = BIO.defaultOptions.disableAutoCancelableRunLoops val ex = DummyException("dummy1") @@ -211,8 +211,8 @@ object TaskGatherUnorderedSuite extends BaseTestSuite { } .uncancelable - val gather = BIO.gatherUnordered(Seq(task1, task2)) - val result = gather.attempt.runToFutureOpt + val sequence = BIO.parSequenceUnordered(Seq(task1, task2)) + val result = sequence.attempt.runToFutureOpt s.tick() assertEquals(result.value, Some(Failure(ex))) @@ -220,13 +220,13 @@ object TaskGatherUnorderedSuite extends BaseTestSuite { assertEquals(errorsThrow, 2) } - test("BIO.gatherUnordered runAsync multiple times") { implicit s => + test("BIO.parSequenceUnordered runAsync multiple times") { implicit s => var effect = 0 val task1 = UIO.evalAsync { effect += 1; 3 }.memoize val task2 = task1 map { x => effect += 1; x + 1 } - val task3 = BIO.gatherUnordered(List(task2, task2, task2)) + val task3 = BIO.parSequenceUnordered(List(task2, task2, task2)) val result1 = task3.runToFuture; s.tick() assertEquals(result1.value, Some(Success(List(4, 4, 4)))) diff --git a/core/shared/src/test/scala/monix/bio/TaskWanderNSuite.scala b/core/shared/src/test/scala/monix/bio/TaskParTraverseNSuite.scala similarity index 66% rename from core/shared/src/test/scala/monix/bio/TaskWanderNSuite.scala rename to core/shared/src/test/scala/monix/bio/TaskParTraverseNSuite.scala index b7a844ea..b7078695 100644 --- a/core/shared/src/test/scala/monix/bio/TaskWanderNSuite.scala +++ b/core/shared/src/test/scala/monix/bio/TaskParTraverseNSuite.scala @@ -25,14 +25,14 @@ import monix.execution.internal.Platform import scala.concurrent.duration._ import scala.util.{Failure, Success} -object TaskWanderNSuite extends BaseTestSuite { +object TaskParTraverseNSuite extends BaseTestSuite { - test("BIO.wanderN allows fully sequential execution") { implicit s => + test("BIO.parTraverseN allows fully sequential execution") { implicit s => val numbers = List(2, 5, 10, 20, 40) - val wander = BIO.wanderN(parallelism = 1)(numbers) { num => + val traverse = BIO.parTraverseN(parallelism = 1)(numbers) { num => BIO.fromEither((num * num).asRight[String]).delayExecution(num.seconds) } - val f = wander.attempt.runToFuture + val f = traverse.attempt.runToFuture s.tick() assertEquals(f.value, None) @@ -45,12 +45,12 @@ object TaskWanderNSuite extends BaseTestSuite { assert(s.state.tasks.isEmpty, "no tasks should be left") } - test("BIO.wanderN allows fully concurrent execution") { implicit s => + test("BIO.parTraverseN allows fully concurrent execution") { implicit s => val numbers = List(2, 5, 10, 20, 40) - val wander = BIO.wanderN(parallelism = 5)(numbers) { num => + val traverse = BIO.parTraverseN(parallelism = 5)(numbers) { num => BIO.fromEither((num * num).asRight[String]).delayExecution(num.seconds) } - val f = wander.attempt.runToFuture + val f = traverse.attempt.runToFuture s.tick() assertEquals(f.value, None) @@ -63,12 +63,12 @@ object TaskWanderNSuite extends BaseTestSuite { assert(s.state.tasks.isEmpty, "no tasks should be left") } - test("BIO.wanderN allows partially concurrent execution") { implicit s => + test("BIO.parTraverseN allows partially concurrent execution") { implicit s => val numbers = List(2, 5, 10, 20, 40) - val wander = BIO.wanderN(parallelism = 2)(numbers) { num => + val traverse = BIO.parTraverseN(parallelism = 2)(numbers) { num => BIO.fromEither((num * num).asRight[String]).delayExecution(num.seconds) } - val f = wander.attempt.runToFuture + val f = traverse.attempt.runToFuture s.tick() assertEquals(f.value, None) @@ -85,13 +85,13 @@ object TaskWanderNSuite extends BaseTestSuite { assert(s.state.tasks.isEmpty, "no tasks should be left") } - test("BIO.wanderN returns an error when fully sequential execution fails in a typed way") { implicit s => + test("BIO.parTraverseN returns an error when fully sequential execution fails in a typed way") { implicit s => val numbers = List(2, 5, 10, 20, 40) - val wander = BIO.wanderN(parallelism = 1)(numbers) { num => + val traverse = BIO.parTraverseN(parallelism = 1)(numbers) { num => if (num == 10) BIO.fromEither("dummy error".asLeft[Int]).delayExecution(num.seconds) else BIO.fromEither((num * num).asRight[String]).delayExecution(num.seconds) } - val f = wander.attempt.runToFuture + val f = traverse.attempt.runToFuture s.tick() assertEquals(f.value, None) @@ -104,13 +104,13 @@ object TaskWanderNSuite extends BaseTestSuite { assert(s.state.tasks.isEmpty, "no tasks should be left") } - test("BIO.wanderN returns an error when fully sequential execution fails in a terminal way") { implicit s => + test("BIO.parTraverseN returns an error when fully sequential execution fails in a terminal way") { implicit s => val numbers = List(2, 5, 10, 20, 40) - val wander = BIO.wanderN(parallelism = 1)(numbers) { num => + val traverse = BIO.parTraverseN(parallelism = 1)(numbers) { num => if (num == 10) BIO.terminate(DummyException("dummy exception")).delayExecution(num.seconds) else BIO.fromEither((num * num).asRight[String]).delayExecution(num.seconds) } - val f = wander.attempt.runToFuture + val f = traverse.attempt.runToFuture s.tick() assertEquals(f.value, None) @@ -123,13 +123,13 @@ object TaskWanderNSuite extends BaseTestSuite { assert(s.state.tasks.isEmpty, "no tasks should be left") } - test("BIO.wanderN returns an error when fully concurrent execution fails in a typed way") { implicit s => + test("BIO.parTraverseN returns an error when fully concurrent execution fails in a typed way") { implicit s => val numbers = List(2, 5, 10, 20, 40) - val wander = BIO.wanderN(parallelism = 5)(numbers) { num => + val traverse = BIO.parTraverseN(parallelism = 5)(numbers) { num => if (num == 10) BIO.fromEither("dummy error".asLeft[Int]).delayExecution(num.seconds) else BIO.fromEither((num * num).asRight[String]).delayExecution(num.seconds) } - val f = wander.attempt.runToFuture + val f = traverse.attempt.runToFuture s.tick() assertEquals(f.value, None) @@ -142,13 +142,13 @@ object TaskWanderNSuite extends BaseTestSuite { assert(s.state.tasks.isEmpty, "no tasks should be left") } - test("BIO.wanderN returns an error when fully concurrent execution fails in a terminal way") { implicit s => + test("BIO.parTraverseN returns an error when fully concurrent execution fails in a terminal way") { implicit s => val numbers = List(2, 5, 10, 20, 40) - val wander = BIO.wanderN(parallelism = 5)(numbers) { num => + val traverse = BIO.parTraverseN(parallelism = 5)(numbers) { num => if (num == 10) BIO.terminate(DummyException("dummy exception")).delayExecution(num.seconds) else BIO.fromEither((num * num).asRight[String]).delayExecution(num.seconds) } - val f = wander.attempt.runToFuture + val f = traverse.attempt.runToFuture s.tick() assertEquals(f.value, None) @@ -161,13 +161,13 @@ object TaskWanderNSuite extends BaseTestSuite { assert(s.state.tasks.isEmpty, "no tasks should be left") } - test("BIO.wanderN returns an error when partially concurrent execution fails in a typed way") { implicit s => + test("BIO.parTraverseN returns an error when partially concurrent execution fails in a typed way") { implicit s => val numbers = List(2, 5, 10, 20, 40) - val wander = BIO.wanderN(parallelism = 2)(numbers) { num => + val traverse = BIO.parTraverseN(parallelism = 2)(numbers) { num => if (num == 10) BIO.fromEither("dummy error".asLeft[Int]).delayExecution(num.seconds) else BIO.fromEither((num * num).asRight[String]).delayExecution(num.seconds) } - val f = wander.attempt.runToFuture + val f = traverse.attempt.runToFuture s.tick() assertEquals(f.value, None) @@ -182,13 +182,13 @@ object TaskWanderNSuite extends BaseTestSuite { assert(s.state.tasks.isEmpty, "no tasks should be left") } - test("BIO.wanderN returns an error when partially concurrent execution fails in a terminal way") { implicit s => + test("BIO.parTraverseN returns an error when partially concurrent execution fails in a terminal way") { implicit s => val numbers = List(2, 5, 10, 20, 40) - val wander = BIO.wanderN(parallelism = 2)(numbers) { num => + val traverse = BIO.parTraverseN(parallelism = 2)(numbers) { num => if (num == 10) BIO.terminate(DummyException("dummy exception")).delayExecution(num.seconds) else BIO.fromEither((num * num).asRight[String]).delayExecution(num.seconds) } - val f = wander.attempt.runToFuture + val f = traverse.attempt.runToFuture s.tick() assertEquals(f.value, None) @@ -203,24 +203,24 @@ object TaskWanderNSuite extends BaseTestSuite { assert(s.state.tasks.isEmpty, "no tasks should be left") } - test("BIO.wanderN returns a terminal error when an exception is thrown") { implicit s => + test("BIO.parTraverseN returns a terminal error when an exception is thrown") { implicit s => val numbers = List(2, 5, 10, 20, 40) - val wander = BIO.wanderN(parallelism = 5)(numbers) { num => + val traverse = BIO.parTraverseN(parallelism = 5)(numbers) { num => if (num == 10) throw DummyException("dummy exception") else BIO.fromEither(num.asRight[String]) } - val f = wander.attempt.runToFuture + val f = traverse.attempt.runToFuture s.tick() assertEquals(f.value, Some(Failure(DummyException("dummy exception")))) } - test("BIO.wanderN should be cancelable") { implicit s => + test("BIO.parTraverseN should be cancelable") { implicit s => val numbers = List(2, 5, 10, 20, 40) - val wander = BIO.wanderN(parallelism = 2)(numbers) { num => + val traverse = BIO.parTraverseN(parallelism = 2)(numbers) { num => BIO.fromEither((num * num).asRight[String]).delayExecution(num.seconds) } - val f = wander.attempt.runToFuture + val f = traverse.attempt.runToFuture s.tick() assertEquals(f.value, None) @@ -232,76 +232,76 @@ object TaskWanderNSuite extends BaseTestSuite { assert(s.state.tasks.isEmpty, "every task should be cancelled") } - test("BIO.wanderN should be stack safe for synchronous tasks with low parallelism") { implicit s => + test("BIO.parTraverseN should be stack safe for synchronous tasks with low parallelism") { implicit s => val count = if (Platform.isJVM) 100000 else 10000 val numbers = 1.to(count).toList - val wander = BIO - .wanderN(parallelism = 10)(numbers)(num => BIO.fromEither(num.asRight[String])) + val traverse = BIO + .parTraverseN(parallelism = 10)(numbers)(num => BIO.fromEither(num.asRight[String])) .map(_.sum) - val f = wander.attempt.runToFuture + val f = traverse.attempt.runToFuture s.tick() assertEquals(f.value, Some(Success(Right(numbers.sum)))) } - test("BIO.wanderN should be stack safe for asynchronous tasks with low parallelism") { implicit s => + test("BIO.parTraverseN should be stack safe for asynchronous tasks with low parallelism") { implicit s => val count = if (Platform.isJVM) 100000 else 10000 val numbers = 1.to(count).toList - val wander = BIO - .wanderN(parallelism = 10)(numbers)(num => BIO.fromEither(num.asRight[String]).executeAsync) + val traverse = BIO + .parTraverseN(parallelism = 10)(numbers)(num => BIO.fromEither(num.asRight[String]).executeAsync) .map(_.sum) - val f = wander.attempt.runToFuture + val f = traverse.attempt.runToFuture s.tick() assertEquals(f.value, Some(Success(Right(numbers.sum)))) } - test("BIO.wanderN should be stack safe for synchronous tasks with high parallelism") { implicit s => + test("BIO.parTraverseN should be stack safe for synchronous tasks with high parallelism") { implicit s => val count = if (Platform.isJVM) 100000 else 10000 val numbers = 1.to(count).toList - val wander = BIO - .wanderN(parallelism = count)(numbers)(num => BIO.fromEither(num.asRight[String])) + val traverse = BIO + .parTraverseN(parallelism = count)(numbers)(num => BIO.fromEither(num.asRight[String])) .map(_.sum) - val f = wander.attempt.runToFuture + val f = traverse.attempt.runToFuture s.tick() assertEquals(f.value, Some(Success(Right(numbers.sum)))) } - test("BIO.wanderN should be stack safe for asynchronous tasks with high parallelism") { implicit s => + test("BIO.parTraverseN should be stack safe for asynchronous tasks with high parallelism") { implicit s => val count = if (Platform.isJVM) 100000 else 10000 val numbers = 1.to(count).toList - val wander = BIO - .wanderN(parallelism = count)(numbers)(num => BIO.fromEither(num.asRight[String]).executeAsync) + val traverse = BIO + .parTraverseN(parallelism = count)(numbers)(num => BIO.fromEither(num.asRight[String]).executeAsync) .map(_.sum) - val f = wander.attempt.runToFuture + val f = traverse.attempt.runToFuture s.tick() assertEquals(f.value, Some(Success(Right(numbers.sum)))) } - test("BIO.wanderN allows running the same effect multiple times") { implicit s => + test("BIO.parTraverseN allows running the same effect multiple times") { implicit s => val counter = AtomicInt(0) val numbers = List(2, 5, 10, 20, 40) val effect = BIO.evalAsync(counter.increment()) - val wander = BIO.wanderN(parallelism = 2)(numbers) { num => + val traverse = BIO.parTraverseN(parallelism = 2)(numbers) { num => effect.map(_ => num * num) } - val f = wander.attempt.runToFuture + val f = traverse.attempt.runToFuture s.tick() assertEquals(counter.get, 5) assertEquals(f.value, Some(Success(Right(List(4, 25, 100, 400, 1600))))) } - test("BIO.wanderN allows reusing a memoized effect multiple times") { implicit s => + test("BIO.parTraverseN allows reusing a memoized effect multiple times") { implicit s => val counter = AtomicInt(0) val numbers = List(2, 5, 10, 20, 40) val effect = BIO.evalAsync(counter.increment()).memoize - val wander = BIO.wanderN(parallelism = 2)(numbers) { num => + val traverse = BIO.parTraverseN(parallelism = 2)(numbers) { num => effect.map(_ => num * num) } - val f = wander.attempt.runToFuture + val f = traverse.attempt.runToFuture s.tick() assertEquals(counter.get, 1) diff --git a/core/shared/src/test/scala/monix/bio/TaskWanderSuite.scala b/core/shared/src/test/scala/monix/bio/TaskParTraverseSuite.scala similarity index 77% rename from core/shared/src/test/scala/monix/bio/TaskWanderSuite.scala rename to core/shared/src/test/scala/monix/bio/TaskParTraverseSuite.scala index a888966f..ebd72bdb 100644 --- a/core/shared/src/test/scala/monix/bio/TaskWanderSuite.scala +++ b/core/shared/src/test/scala/monix/bio/TaskParTraverseSuite.scala @@ -23,11 +23,11 @@ import monix.execution.internal.Platform import concurrent.duration._ import scala.util.{Failure, Success} -object TaskWanderSuite extends BaseTestSuite { - test("BIO.wander should execute in parallel for async tasks") { implicit s => +object TaskParTraverseSuite extends BaseTestSuite { + test("BIO.parTraverse should execute in parallel for async tasks") { implicit s => val seq = Seq((1, 2), (2, 1), (3, 3)) val f = BIO - .wander(seq) { + .parTraverse(seq) { case (i, d) => BIO.evalAsync(i + 1).delayExecution(d.seconds) } @@ -41,11 +41,11 @@ object TaskWanderSuite extends BaseTestSuite { assertEquals(f.value, Some(Success(Seq(2, 3, 4)))) } - test("BIO.wander should onError if one of the tasks terminates in error") { implicit s => + test("BIO.parTraverse should onError if one of the tasks terminates in error") { implicit s => val ex = 1000L val seq = Seq((1, 3), (-1, 1), (3, 2), (3, 1)) val f = BIO - .wander(seq) { + .parTraverse(seq) { case (i, d) => BIO .suspendTotal(if (i < 0) BIO.raiseError(ex) else BIO.now(i + 1)) @@ -60,11 +60,11 @@ object TaskWanderSuite extends BaseTestSuite { assertEquals(f.value, Some(Success(Left(ex)))) } - test("BIO.wander should onTerminate if one of the tasks terminates in unexpected error") { implicit s => + test("BIO.parTraverse should onTerminate if one of the tasks terminates in unexpected error") { implicit s => val ex = DummyException("dummy") val seq = Seq((1, 3), (-1, 1), (3, 2), (3, 1)) val f = BIO - .wander(seq) { + .parTraverse(seq) { case (i, d) => UIO .evalAsync(if (i < 0) throw ex else i + 1) @@ -78,10 +78,10 @@ object TaskWanderSuite extends BaseTestSuite { assertEquals(f.value, Some(Failure(ex))) } - test("BIO.wander should be canceled") { implicit s => + test("BIO.parTraverse should be canceled") { implicit s => val seq = Seq((1, 2), (2, 1), (3, 3)) val f = BIO - .wander(seq) { + .parTraverse(seq) { case (i, d) => BIO.evalAsync(i + 1).delayExecution(d.seconds) } .runToFuture @@ -96,21 +96,21 @@ object TaskWanderSuite extends BaseTestSuite { assertEquals(f.value, None) } - test("BIO.wander should be stack safe for synchronous tasks") { implicit s => + test("BIO.parTraverse should be stack safe for synchronous tasks") { implicit s => val count = if (Platform.isJVM) 200000 else 5000 val seq = for (i <- 0 until count) yield 1 - val composite = BIO.wander(seq)(BIO.now).map(_.sum) + val composite = BIO.parTraverse(seq)(BIO.now).map(_.sum) val result = composite.runToFuture s.tick() assertEquals(result.value, Some(Success(count))) } - test("BIO.wander runAsync multiple times") { implicit s => + test("BIO.parTraverse runAsync multiple times") { implicit s => var effect = 0 val task1 = UIO.evalAsync { effect += 1; 3 }.memoize - val task2 = BIO.wander(Seq(0, 0, 0)) { _ => + val task2 = BIO.parTraverse(Seq(0, 0, 0)) { _ => task1 map { x => effect += 1; x + 1 } @@ -125,9 +125,9 @@ object TaskWanderSuite extends BaseTestSuite { assertEquals(effect, 1 + 3 + 3) } - test("BIO.wander should wrap exceptions in the function") { implicit s => + test("BIO.parTraverse should wrap exceptions in the function") { implicit s => val ex = DummyException("dummy") - val task1 = BIO.wander(Seq(0)) { i => + val task1 = BIO.parTraverse(Seq(0)) { i => throw ex BIO.now(i) } diff --git a/core/shared/src/test/scala/monix/bio/TaskWanderUnorderedSuite.scala b/core/shared/src/test/scala/monix/bio/TaskParTraverseUnorderedSuite.scala similarity index 77% rename from core/shared/src/test/scala/monix/bio/TaskWanderUnorderedSuite.scala rename to core/shared/src/test/scala/monix/bio/TaskParTraverseUnorderedSuite.scala index 262f1973..fa1c2fa1 100644 --- a/core/shared/src/test/scala/monix/bio/TaskWanderUnorderedSuite.scala +++ b/core/shared/src/test/scala/monix/bio/TaskParTraverseUnorderedSuite.scala @@ -24,11 +24,11 @@ import scala.collection.mutable.ListBuffer import scala.concurrent.duration._ import scala.util.{Failure, Success, Try} -object TaskWanderUnorderedSuite extends BaseTestSuite { - test("BIO.wanderUnordered should execute in parallel") { implicit s => +object TaskParTraverseUnorderedSuite extends BaseTestSuite { + test("BIO.parTraverseUnordered should execute in parallel") { implicit s => val seq = Seq((1, 2), (2, 1), (3, 3)) val f = Task - .wanderUnordered(seq) { + .parTraverseUnordered(seq) { case (i, d) => Task.evalAsync(i + 1).delayExecution(d.seconds) } @@ -42,11 +42,11 @@ object TaskWanderUnorderedSuite extends BaseTestSuite { assertEquals(f.value, Some(Success(Seq(4, 2, 3)))) } - test("Task.wanderUnordered should onError if one of the tasks terminates in error") { implicit s => + test("BIO.parTraverseUnordered should onError if one of the tasks terminates in error") { implicit s => val ex = DummyException("dummy") val seq = Seq((1, 3), (-1, 1), (3, 2), (3, 1)) val f = Task - .wanderUnordered(seq) { + .parTraverseUnordered(seq) { case (i, d) => Task .evalAsync(if (i < 0) throw ex else i + 1) @@ -60,10 +60,10 @@ object TaskWanderUnorderedSuite extends BaseTestSuite { assertEquals(f.value, Some(Failure(ex))) } - test("Task.wanderUnordered should be canceled") { implicit s => + test("BIO.parTraverseUnordered should be canceled") { implicit s => val seq = Seq((1, 2), (2, 1), (3, 3)) val f = Task - .wanderUnordered(seq) { + .parTraverseUnordered(seq) { case (i, d) => Task.evalAsync(i + 1).delayExecution(d.seconds) } .runToFuture @@ -78,28 +78,28 @@ object TaskWanderUnorderedSuite extends BaseTestSuite { assertEquals(f.value, None) } - test("Task.wanderUnordered should run over an iterable") { implicit s => + test("BIO.parTraverseUnordered should run over an iterable") { implicit s => val count = 10 val seq = 0 until count - val sum = Task.wanderUnordered(seq)(x => Task.eval(x + 1)).map(_.sum) + val sum = Task.parTraverseUnordered(seq)(x => Task.eval(x + 1)).map(_.sum) val result = sum.runToFuture; s.tick() assertEquals(result.value.get, Success((count + 1) * count / 2)) } - test("Task.wanderUnordered should be stack-safe on handling many tasks") { implicit s => + test("BIO.parTraverseUnordered should be stack-safe on handling many tasks") { implicit s => val count = 10000 val seq = for (i <- 0 until count) yield i - val sum = Task.wanderUnordered(seq)(x => Task.eval(x)).map(_.sum) + val sum = Task.parTraverseUnordered(seq)(x => Task.eval(x)).map(_.sum) val result = sum.runToFuture; s.tick() assertEquals(result.value.get, Success(count * (count - 1) / 2)) } - test("Task.wanderUnordered should be stack safe on success") { implicit s => + test("BIO.parTraverseUnordered should be stack safe on success") { implicit s => def fold[A](ta: Task[ListBuffer[A]], next: A): Task[ListBuffer[A]] = ta flatMap { acc => - Task.wanderUnordered(Seq(acc, next)) { v => + Task.parTraverseUnordered(Seq(acc, next)) { v => Task.eval(v) } } map { @@ -139,12 +139,12 @@ object TaskWanderUnorderedSuite extends BaseTestSuite { assertEquals(result, Some(Success(count * (count - 1) / 2))) } - test("Task.wanderUnordered should log errors if multiple errors happen") { implicit s => + test("BIO.parTraverseUnordered should log errors if multiple errors happen") { implicit s => implicit val opts = BIO.defaultOptions.disableAutoCancelableRunLoops val ex = DummyException("dummy1") var errorsThrow = 0 - val gather = Task.wanderUnordered(Seq(0, 0)) { _ => + val sequence = Task.parTraverseUnordered(Seq(0, 0)) { _ => Task .raiseError[Int](ex) .executeAsync @@ -155,7 +155,7 @@ object TaskWanderUnorderedSuite extends BaseTestSuite { .uncancelable } - val result = gather.runToFutureOpt + val result = sequence.runToFutureOpt s.tick() assertEquals(result.value, Some(Failure(ex))) @@ -163,7 +163,7 @@ object TaskWanderUnorderedSuite extends BaseTestSuite { assertEquals(errorsThrow, 2) } - test("Task.wanderUnordered runAsync multiple times") { implicit s => + test("BIO.parTraverseUnordered runAsync multiple times") { implicit s => var effect = 0 val task1 = Task.evalAsync { effect += 1; 3 @@ -173,7 +173,7 @@ object TaskWanderUnorderedSuite extends BaseTestSuite { effect += 1; x + 1 } - val task3 = Task.wanderUnordered(List(0, 0, 0)) { _ => + val task3 = Task.parTraverseUnordered(List(0, 0, 0)) { _ => task2 } @@ -186,9 +186,9 @@ object TaskWanderUnorderedSuite extends BaseTestSuite { assertEquals(effect, 1 + 3 + 3) } - test("Task.wanderUnordered should wrap exceptions in the function") { implicit s => + test("BIO.parTraverseUnordered should wrap exceptions in the function") { implicit s => val ex = DummyException("dummy") - val task1 = Task.wanderUnordered(Seq(0)) { _ => + val task1 = Task.parTraverseUnordered(Seq(0)) { _ => throw ex }