Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updated all gatherX and wanderX methods to be parSequenceX and parTra… #102

Merged
merged 10 commits into from
Mar 30, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -116,24 +116,24 @@ class IOSequenceBenchmark {


@Benchmark
def monixBioGather(): Long = {
def monixBioParSequence(): Long = {
val tasks = (0 until count).map(_ => BIO.eval(1)).toList
val result = BIO.gather(tasks).map(_.sum.toLong)
val result = BIO.parSequence(tasks).map(_.sum.toLong)
result.runSyncUnsafe()
}


@Benchmark
def monixBioGatherUnordered(): Long = {
def monixBioParSequenceUnordered(): Long = {
val tasks = (0 until count).map(_ => BIO.eval(1)).toList
val result = BIO.gatherUnordered(tasks).map(_.sum.toLong)
val result = BIO.parSequenceUnordered(tasks).map(_.sum.toLong)
result.runSyncUnsafe()
}

@Benchmark
def monixBioGatherN(): Long = {
def monixBioParSequenceN(): Long = {
val tasks = (0 until count).map(_ => BIO.eval(1)).toList
val result = BIO.gatherN(parallelism)(tasks).map(_.sum.toLong)
val result = BIO.parSequenceN(parallelism)(tasks).map(_.sum.toLong)
result.runSyncUnsafe()
}

Expand Down
126 changes: 64 additions & 62 deletions core/shared/src/main/scala/monix/bio/BIO.scala
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ import scala.util.{Failure, Success, Try}
* it does for `Future.sequence`, the given `Task` values being
* evaluated one after another, in ''sequence'', not in ''parallel''.
* If you want parallelism, then you need to use
* [[monix.bio.BIO.gather BIO.gather]] and thus be explicit about it.
* [[monix.bio.BIO.parSequence BIO.parSequence]] and thus be explicit about it.
*
* This is great because it gives you the possibility of fine tuning the
* execution. For example, say you want to execute things in parallel,
Expand All @@ -198,7 +198,7 @@ import scala.util.{Failure, Success, Try}
* val chunks = list.sliding(30, 30).toSeq
*
* // Specify that each batch should process stuff in parallel
* val batchedTasks = chunks.map(chunk => Task.gather(chunk))
* val batchedTasks = chunks.map(chunk => Task.parSequence(chunk))
* // Sequence the batches
* val allBatches = Task.sequence(batchedTasks)
*
Expand Down Expand Up @@ -1327,7 +1327,7 @@ sealed abstract class BIO[+E, +A] extends Serializable {
* If an exception is raised, then `bracket` will re-raise the exception
* ''after'' performing the `release`. If the resulting task gets cancelled,
* then `bracket` will still perform the `release`, but the yielded task
* will be non-terminating (equivalent with [[Task.never]]).
* will be non-terminating (equivalent with [[BIO.never]]).
*
* Example:
*
Expand Down Expand Up @@ -1622,11 +1622,11 @@ sealed abstract class BIO[+E, +A] extends Serializable {
* readFile("path/to/file").executeOn(io, forceAsync = true)
* }}}
*
* In this example we are using [[Task.eval]], which executes the
* In this example we are using [[BIO.eval]], which executes the
* given `thunk` immediately (on the current thread and call stack).
*
* By calling `executeOn(io)`, we are ensuring that the used
* `Scheduler` (injected in [[Task.cancelable0[A](register* async tasks]])
* `Scheduler` (injected in [[BIO.cancelable0]])
* will be `io`, a `Scheduler` that we intend to use for blocking
* I/O actions. And we are also forcing an asynchronous boundary
* right before execution, by passing the `forceAsync` parameter as
Expand Down Expand Up @@ -2659,7 +2659,7 @@ sealed abstract class BIO[+E, +A] extends Serializable {
* parallelism being guaranteed when multi-threading is available!
*
* All specified tasks get evaluated in parallel, regardless of their
* execution model ([[Task.eval]] vs [[Task.evalAsync]] doesn't matter).
* execution model ([[BIO.eval]] vs [[BIO.evalAsync]] doesn't matter).
* Also the implementation tries to be smart about detecting forked
* tasks so it can eliminate extraneous forks for the very obvious
* cases.
Expand Down Expand Up @@ -3397,7 +3397,7 @@ object BIO extends TaskInstancesLevel0 {
* [[https://typelevel.org/cats/guidelines.html#partially-applied-type-params Partially-Applied Type technique]].
*
* Calling `create` with a callback that returns `Unit` is
* equivalent with [[Task.async0]]:
* equivalent with [[BIO.async0]]:
*
* `Task.async0(f) <-> Task.create(f)`
*
Expand Down Expand Up @@ -3444,7 +3444,7 @@ object BIO extends TaskInstancesLevel0 {
*
* Passed function can also return `Task[Unit]` as a task that
* describes a cancelation action, thus for an `f` that can be
* passed to [[Task.cancelable0]], and this equivalence holds:
* passed to [[BIO.cancelable0]], and this equivalence holds:
*
* `Task.cancelable(f) <-> Task.create(f)`
*
Expand Down Expand Up @@ -3646,7 +3646,7 @@ object BIO extends TaskInstancesLevel0 {
* results in the same collection.
*
* This operation will execute the tasks one by one, in order, which means that
* both effects and results will be ordered. See [[gather]] and [[gatherUnordered]]
* both effects and results will be ordered. See [[parSequence]] and [[parSequenceUnordered]]
* for unordered results or effects, and thus potential of running in parallel.
*
* It's a simple version of [[traverse]].
Expand Down Expand Up @@ -3674,29 +3674,59 @@ object BIO extends TaskInstancesLevel0 {
* This function is the nondeterministic analogue of `sequence` and should
* behave identically to `sequence` so long as there is no interaction between
* the effects being gathered. However, unlike `sequence`, which decides on
* a total order of effects, the effects in a `gather` are unordered with
* a total order of effects, the effects in a `parSequence` are unordered with
* respect to each other, the tasks being execute in parallel, not in sequence.
*
* Although the effects are unordered, we ensure the order of results
* matches the order of the input sequence. Also see [[gatherUnordered]]
* matches the order of the input sequence. Also see [[parSequenceUnordered]]
* for the more efficient alternative.
*
* Example:
* {{{
* val tasks = List(Task(1 + 1), Task(2 + 2), Task(3 + 3))
*
* // Yields 2, 4, 6
* Task.gather(tasks)
* Task.parSequence(tasks)
* }}}
*
* $parallelismAdvice
*
* $parallelismNote
*
* @see [[gatherN]] for a version that limits parallelism.
* @see [[parSequenceN]] for a version that limits parallelism.
*/
def gather[E, A, M[X] <: Iterable[X]](in: M[BIO[E, A]])(implicit bf: BuildFrom[M[BIO[E, A]], A, M[A]]): BIO[E, M[A]] =
TaskGather[E, A, M](in, () => newBuilder(bf, in))
def parSequence[E, A, M[X] <: Iterable[X]](
in: M[BIO[E, A]]
)(implicit bf: BuildFrom[M[BIO[E, A]], A, M[A]]): BIO[E, M[A]] =
TaskParSequence[E, A, M](in, () => newBuilder(bf, in))

/** Given a `Iterable[A]` and a function `A => BIO[E, B]`,
* nondeterministically apply the function to each element of the collection
* and return a task that will signal a collection of the results once all
* tasks are finished.
*
* This function is the nondeterministic analogue of `traverse` and should
* behave identically to `traverse` so long as there is no interaction between
* the effects being gathered. However, unlike `traverse`, which decides on
* a total order of effects, the effects in a `parTraverse` are unordered with
* respect to each other.
*
* Although the effects are unordered, we ensure the order of results
* matches the order of the input sequence. Also see doctodo parTraverseUnordered
* for the more efficient alternative.
*
* It's a generalized version of [[parSequence]].
*
* $parallelismAdvice
*
* $parallelismNote
*
* @see doctodo parTraverseN for a version that limits parallelism.
*/
def parTraverse[E, A, B, M[X] <: Iterable[X]](in: M[A])(f: A => BIO[E, B])(
implicit bf: BuildFrom[M[A], B, M[B]]
): BIO[E, M[B]] =
UIO.eval(in.map(f)).flatMap(col => TaskParSequence[E, B, M](col, () => newBuilder(bf, in)))

/** Executes the given sequence of tasks in parallel, non-deterministically
* gathering their results, returning a task that will signal the sequence
Expand All @@ -3717,45 +3747,17 @@ object BIO extends TaskInstancesLevel0 {
* )
*
* // Yields 2, 4, 6, 8 after around 6 seconds
* Task.gatherN(2)(tasks)
* Task.parSequenceN(2)(tasks)
* }}}
*
* $parallelismAdvice
*
* $parallelismNote
*
* @see [[gather]] for a version that does not limit parallelism.
* @see [[parSequence]] for a version that does not limit parallelism.
*/
def gatherN[E, A](parallelism: Int)(in: Iterable[BIO[E, A]]): BIO[E, List[A]] =
TaskGatherN[E, A](parallelism, in)

/** Given a `Iterable[A]` and a function `A => BIO[E, B]`,
* nondeterministically apply the function to each element of the collection
* and return a task that will signal a collection of the results once all
* tasks are finished.
*
* This function is the nondeterministic analogue of `traverse` and should
* behave identically to `traverse` so long as there is no interaction between
* the effects being gathered. However, unlike `traverse`, which decides on
* a total order of effects, the effects in a `wander` are unordered with
* respect to each other.
*
* Although the effects are unordered, we ensure the order of results
* matches the order of the input sequence. Also see doctodo wanderUnordered
* for the more efficient alternative.
*
* It's a generalized version of [[gather]].
*
* $parallelismAdvice
*
* $parallelismNote
*
* @see doctodo wanderN for a version that limits parallelism.
*/
def wander[E, A, B, M[X] <: Iterable[X]](in: M[A])(f: A => BIO[E, B])(
implicit bf: BuildFrom[M[A], B, M[B]]
): BIO[E, M[B]] =
UIO.eval(in.map(f)).flatMap(col => TaskGather[E, B, M](col, () => newBuilder(bf, in)))
def parSequenceN[E, A](parallelism: Int)(in: Iterable[BIO[E, A]]): BIO[E, List[A]] =
TaskParSequenceN[E, A](parallelism, in)

/** Applies the provided function in a non-deterministic way to each element
* of the input collection. The result will be signalled once all tasks
Expand All @@ -3782,35 +3784,35 @@ object BIO extends TaskInstancesLevel0 {
* val numbers = List(1, 2, 3, 4)
*
* // Yields 2, 4, 6, 8 after around 6 seconds
* BIO.wanderN(2)(numbers)(n => BIO(n + n).delayExecution(n.second))
* BIO.parTraverseN(2)(numbers)(n => BIO(n + n).delayExecution(n.second))
* }}}
*
* $parallelismAdvice
*
* $parallelismNote
*
* @see [[wander]] for a version that does not limit parallelism.
* @see [[parTraverse]] for a version that does not limit parallelism.
*/
def wanderN[E, A, B](parallelism: Int)(in: Iterable[A])(f: A => BIO[E, B]): BIO[E, List[B]] =
deferTotal(TaskGatherN(parallelism, in.map(f)))
def parTraverseN[E, A, B](parallelism: Int)(in: Iterable[A])(f: A => BIO[E, B]): BIO[E, List[B]] =
deferTotal(TaskParSequenceN(parallelism, in.map(f)))

/** Processes the given collection of tasks in parallel and
* nondeterministically gather the results without keeping the original
* ordering of the given tasks.
*
* This function is similar to [[gather]], but neither the effects nor the
* This function is similar to [[parSequence]], but neither the effects nor the
* results will be ordered. Useful when you don't need ordering because:
*
* - it has non-blocking behavior (but not wait-free)
* - it can be more efficient (compared with [[gather]]), but not
* - it can be more efficient (compared with [[parSequence]]), but not
* necessarily (if you care about performance, then test)
*
* Example:
* {{{
* val tasks = List(Task(1 + 1), Task(2 + 2), Task(3 + 3))
*
* // Yields 2, 4, 6 (but order is NOT guaranteed)
* Task.gatherUnordered(tasks)
* Task.parSequenceUnordered(tasks)
* }}}
*
* $parallelismAdvice
Expand All @@ -3819,28 +3821,28 @@ object BIO extends TaskInstancesLevel0 {
*
* @param in is a list of tasks to execute
*/
def gatherUnordered[E, A](in: Iterable[BIO[E, A]]): BIO[E, List[A]] =
TaskGatherUnordered(in)
def parSequenceUnordered[E, A](in: Iterable[BIO[E, A]]): BIO[E, List[A]] =
TaskParSequenceUnordered(in)

/** Given a `Iterable[A]` and a function `A => BIO[E, B]`,
* nondeterministically apply the function to each element of the collection
* without keeping the original ordering of the results.
*
* This function is similar to [[wander]], but neither the effects nor the
* This function is similar to [[parTraverse]], but neither the effects nor the
* results will be ordered. Useful when you don't need ordering because:
*
* - it has non-blocking behavior (but not wait-free)
* - it can be more efficient (compared with [[wander]]), but not
* - it can be more efficient (compared with [[parTraverse]]), but not
* necessarily (if you care about performance, then test)
*
* It's a generalized version of [[gatherUnordered]].
* It's a generalized version of [[parSequenceUnordered]].
*
* $parallelismAdvice
*
* $parallelismNote
*/
def wanderUnordered[E, A, B](in: Iterable[A])(f: A => BIO[E, B]): BIO[E, List[B]] =
BIO.evalTotal(in.map(f)).flatMap(gatherUnordered)
def parTraverseUnordered[E, A, B](in: Iterable[A])(f: A => BIO[E, B]): BIO[E, List[B]] =
BIO.evalTotal(in.map(f)).flatMap(parSequenceUnordered)

/** Yields a task that on evaluation will process the given tasks
* in parallel, then apply the given mapping function on their results.
Expand Down Expand Up @@ -4949,7 +4951,7 @@ private[bio] abstract class TaskTimers extends TaskClocks {
}
}

private[bio] abstract class TaskClocks {
private[bio] abstract class TaskClocks extends BIODeprecated.Companion {

/**
* Default, pure, globally visible `cats.effect.Clock`
Expand Down
42 changes: 22 additions & 20 deletions core/shared/src/main/scala/monix/bio/Task.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package monix.bio
import cats.effect.{CancelToken, ConcurrentEffect, Effect}
import cats.~>
import monix.bio.BIO.AsyncBuilder
import monix.bio.internal.{TaskCreate, TaskFromFuture}
import monix.bio.internal.{TaskCreate, TaskDeprecated, TaskFromFuture}
import monix.catnap.FutureLift
import monix.execution.compat.BuildFrom
import monix.execution.{CancelablePromise, Scheduler}
Expand All @@ -30,7 +30,7 @@ import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ExecutionContext, Future}
import scala.util.Try

object Task {
object Task extends TaskDeprecated.Companion {

/**
* @see See [[monix.bio.BIO.apply]]
Expand Down Expand Up @@ -287,40 +287,42 @@ object Task {
BIO.traverse(in)(f)

/**
* @see See [[monix.bio.BIO.gather]]
* @see See [[monix.bio.BIO.parSequence]]
*/
def gather[A, M[X] <: Iterable[X]](in: M[Task[A]])(implicit bf: BuildFrom[M[Task[A]], A, M[A]]): Task[M[A]] =
BIO.gather(in)
def parSequence[A, M[X] <: Iterable[X]](in: M[Task[A]])(implicit bf: BuildFrom[M[Task[A]], A, M[A]]): Task[M[A]] =
BIO.parSequence(in)

/**
* @see See [[monix.bio.BIO.gatherN]]
* @see [[monix.bio.BIO.parTraverse]]
*/
def gatherN[A](parallelism: Int)(in: Iterable[Task[A]]): Task[List[A]] =
BIO.gatherN(parallelism)(in)
def parTraverse[A, B, M[X] <: Iterable[X]](
in: M[A]
)(f: A => Task[B])(implicit bf: BuildFrom[M[A], B, M[B]]): Task[M[B]] =
BIO.parTraverse(in)(f)

/**
* @see [[monix.bio.BIO.wander]]
* @see See [[monix.bio.BIO.parSequenceN]]
*/
def wander[A, B, M[X] <: Iterable[X]](in: M[A])(f: A => Task[B])(implicit bf: BuildFrom[M[A], B, M[B]]): Task[M[B]] =
BIO.wander(in)(f)
def parSequenceN[A](parallelism: Int)(in: Iterable[Task[A]]): Task[List[A]] =
BIO.parSequenceN(parallelism)(in)

/**
* @see See [[monix.bio.BIO.wanderN]]
* @see See [[monix.bio.BIO.parTraverseN]]
*/
def wanderN[A, B](parallelism: Int)(in: Iterable[A])(f: A => Task[B]): Task[List[B]] =
BIO.wanderN(parallelism)(in)(f)
def parTraverseN[A, B](parallelism: Int)(in: Iterable[A])(f: A => Task[B]): Task[List[B]] =
BIO.parTraverseN(parallelism)(in)(f)

/**
* @see See [[monix.bio.BIO.gatherUnordered]]
* @see See [[monix.bio.BIO.parSequenceUnordered]]
*/
def gatherUnordered[A](in: Iterable[Task[A]]): Task[List[A]] =
BIO.gatherUnordered(in)
def parSequenceUnordered[A](in: Iterable[Task[A]]): Task[List[A]] =
BIO.parSequenceUnordered(in)

/**
* @see [[monix.bio.BIO.wanderUnordered]]
* @see [[monix.bio.BIO.parTraverseUnordered]]
*/
def wanderUnordered[A, B](in: Iterable[A])(f: A => Task[B]): Task[List[B]] =
BIO.wanderUnordered(in)(f)
def parTraverseUnordered[A, B](in: Iterable[A])(f: A => Task[B]): Task[List[B]] =
BIO.parTraverseUnordered(in)(f)

/**
* @see See [[monix.bio.BIO.mapBoth]]
Expand Down
Loading