Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unify FreeC and Algebra types. #1610

Merged
merged 1 commit into from
Sep 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions benchmark/src/main/scala/fs2/benchmark/FreeCBenchmark.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class FreeCBenchmark {
@Benchmark
def nestedMaps = {
val nestedMapsFreeC =
(0 to N).foldLeft(FreeC.pure[IO, Int](0): FreeC[IO, Int]) { (acc, i) =>
(0 to N).foldLeft(FreeC.pure[IO, Int](0): FreeC[IO, INothing, Int]) { (acc, i) =>
acc.map(_ + i)
}
run(nestedMapsFreeC)
Expand All @@ -25,21 +25,20 @@ class FreeCBenchmark {
@Benchmark
def nestedFlatMaps = {
val nestedFlatMapsFreeC =
(0 to N).foldLeft(FreeC.pure[IO, Int](0): FreeC[IO, Int]) { (acc, i) =>
(0 to N).foldLeft(FreeC.pure[IO, Int](0): FreeC[IO, INothing, Int]) { (acc, i) =>
acc.flatMap(j => FreeC.pure(i + j))
}
run(nestedFlatMapsFreeC)
}

private def run[F[_], R](self: FreeC[F, R])(implicit F: MonadError[F, Throwable]): F[Option[R]] =
private def run[F[_], O, R](
self: FreeC[F, O, R]
)(implicit F: MonadError[F, Throwable]): F[Option[R]] =
self.viewL match {
case Result.Pure(r) => F.pure(Some(r))
case Result.Fail(e) => F.raiseError(e)
case Result.Interrupted(_, err) => err.fold[F[Option[R]]](F.pure(None)) { F.raiseError }
case v @ ViewL.View(step) =>
F.flatMap(F.attempt(step)) { r =>
run(v.next(Result.fromEither(r)))
}
case v @ ViewL.View(_) => F.raiseError(new RuntimeException("Never get here)"))
}

}
31 changes: 16 additions & 15 deletions core/shared/src/main/scala/fs2/Pull.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,10 @@ import fs2.internal._
* `raiseError` is caught by `handleErrorWith`:
* - `handleErrorWith(raiseError(e))(f) == f(e)`
*/
final class Pull[+F[_], +O, +R] private (private val free: FreeC[Algebra[Nothing, Nothing, ?], R])
extends AnyVal {
final class Pull[+F[_], +O, +R] private (private val free: FreeC[Nothing, O, R]) extends AnyVal {

private[fs2] def get[F2[x] >: F[x], O2 >: O, R2 >: R]: FreeC[Algebra[F2, O2, ?], R2] =
free.asInstanceOf[FreeC[Algebra[F2, O2, ?], R2]]
private[fs2] def get[F2[x] >: F[x], O2 >: O, R2 >: R]: FreeC[F2, O2, R2] =
free.asInstanceOf[FreeC[F2, O2, R2]]

/** Alias for `_.map(_ => o2)`. */
def as[R2](r2: R2): Pull[F, O, R2] = map(_ => r2)
Expand Down Expand Up @@ -74,7 +73,7 @@ final class Pull[+F[_], +O, +R] private (private val free: FreeC[Algebra[Nothing

/** Run `p2` after `this`, regardless of errors during `this`, then reraise any errors encountered during `this`. */
def onComplete[F2[x] >: F[x], O2 >: O, R2 >: R](p2: => Pull[F2, O2, R2]): Pull[F2, O2, R2] =
handleErrorWith(e => p2 >> new Pull(Algebra.raiseError[Nothing, Nothing](e))) >> p2
handleErrorWith(e => p2 >> new Pull(Algebra.raiseError[Nothing](e))) >> p2

/** If `this` terminates with `Pull.raiseError(e)`, invoke `h(e)`. */
def handleErrorWith[F2[x] >: F[x], O2 >: O, R2 >: R](
Expand All @@ -88,8 +87,8 @@ final class Pull[+F[_], +O, +R] private (private val free: FreeC[Algebra[Nothing

object Pull extends PullLowPriority {

@inline private[fs2] def fromFreeC[F[_], O, R](free: FreeC[Algebra[F, O, ?], R]): Pull[F, O, R] =
new Pull(free.asInstanceOf[FreeC[Algebra[Nothing, Nothing, ?], R]])
@inline private[fs2] def fromFreeC[F[_], O, R](free: FreeC[F, O, R]): Pull[F, O, R] =
new Pull(free.asInstanceOf[FreeC[Nothing, O, R]])

/**
* Like [[eval]] but if the effectful value fails, the exception is returned in a `Left`
Expand All @@ -98,18 +97,18 @@ object Pull extends PullLowPriority {
def attemptEval[F[_], R](fr: F[R]): Pull[F, INothing, Either[Throwable, R]] =
fromFreeC(
Algebra
.eval[F, INothing, R](fr)
.eval[F, R](fr)
.map(r => Right(r): Either[Throwable, R])
.handleErrorWith(t => Algebra.pure[F, INothing, Either[Throwable, R]](Left(t)))
.handleErrorWith(t => Algebra.pure[F, Either[Throwable, R]](Left(t)))
)

/** The completed `Pull`. Reads and outputs nothing. */
val done: Pull[Pure, INothing, Unit] =
fromFreeC[Pure, INothing, Unit](Algebra.pure[Pure, INothing, Unit](()))
fromFreeC[Pure, INothing, Unit](Algebra.pure[Pure, Unit](()))

/** Evaluates the supplied effectful value and returns the result as the resource of the returned pull. */
def eval[F[_], R](fr: F[R]): Pull[F, INothing, R] =
fromFreeC(Algebra.eval[F, INothing, R](fr))
fromFreeC(Algebra.eval[F, R](fr))

/**
* Repeatedly uses the output of the pull as input for the next step of the pull.
Expand All @@ -119,7 +118,7 @@ object Pull extends PullLowPriority {
r => using(r).flatMap { _.map(loop(using)).getOrElse(Pull.pure(None)) }

private def mapOutput[F[_], O, O2, R](p: Pull[F, O, R])(f: O => O2): Pull[F, O2, R] =
Pull.fromFreeC(p.get[F, O, R].translate(Algebra.mapOutput(f)))
Pull.fromFreeC(p.get[F, O, R].mapOutput(f))

/** Outputs a single value. */
def output1[F[x] >: Pure[x], O](o: O): Pull[F, O, Unit] =
Expand All @@ -131,15 +130,15 @@ object Pull extends PullLowPriority {

/** Pull that outputs nothing and has result of `r`. */
def pure[F[x] >: Pure[x], R](r: R): Pull[F, INothing, R] =
fromFreeC(Algebra.pure(r))
fromFreeC[F, INothing, R](Algebra.pure(r))

/**
* Reads and outputs nothing, and fails with the given error.
*
* The `F` type must be explicitly provided (e.g., via `raiseError[IO]` or `raiseError[Fallible]`).
*/
def raiseError[F[_]: RaiseThrowable](err: Throwable): Pull[F, INothing, INothing] =
new Pull(Algebra.raiseError[Nothing, Nothing](err))
new Pull(Algebra.raiseError[Nothing](err))

final class PartiallyAppliedFromEither[F[_]] {
def apply[A](either: Either[Throwable, A])(implicit ev: RaiseThrowable[F]): Pull[F, A, Unit] =
Expand Down Expand Up @@ -185,7 +184,9 @@ object Pull extends PullLowPriority {
def bracketCase[A, B](acquire: Pull[F, O, A])(
use: A => Pull[F, O, B]
)(release: (A, ExitCase[Throwable]) => Pull[F, O, Unit]): Pull[F, O, B] =
Pull.fromFreeC(FreeC.bracketCase(acquire.get)(a => use(a).get)((a, c) => release(a, c).get))
Pull.fromFreeC(
FreeC.bracketCase(acquire.get, (a: A) => use(a).get, (a: A, c) => release(a, c).get)
)
}

/**
Expand Down
46 changes: 21 additions & 25 deletions core/shared/src/main/scala/fs2/Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,10 @@ import scala.concurrent.duration._
* @hideImplicitConversion PureOps
* @hideImplicitConversion IdOps
**/
final class Stream[+F[_], +O] private (private val free: FreeC[Algebra[Nothing, Nothing, ?], Unit])
extends AnyVal {
final class Stream[+F[_], +O] private (private val free: FreeC[Nothing, O, Unit]) extends AnyVal {

private[fs2] def get[F2[x] >: F[x], O2 >: O]: FreeC[Algebra[F2, O2, ?], Unit] =
free.asInstanceOf[FreeC[Algebra[F2, O2, ?], Unit]]
private[fs2] def get[F2[x] >: F[x], O2 >: O]: FreeC[F2, O2, Unit] =
free.asInstanceOf[FreeC[F2, O2, Unit]]

/**
* Appends `s2` to the end of this stream.
Expand Down Expand Up @@ -1091,7 +1090,7 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Algebra[Nothing,
f(hd(0)).get

case _ =>
def go(idx: Int): FreeC[Algebra[F2, O2, ?], Unit] =
def go(idx: Int): FreeC[F2, O2, Unit] =
if (idx == hd.size) Stream.fromFreeC(tl).flatMap(f).get
else {
f(hd(idx)).get.transformWith {
Expand Down Expand Up @@ -2952,8 +2951,8 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Algebra[Nothing,
}

object Stream extends StreamLowPriority {
@inline private[fs2] def fromFreeC[F[_], O](free: FreeC[Algebra[F, O, ?], Unit]): Stream[F, O] =
new Stream(free.asInstanceOf[FreeC[Algebra[Nothing, Nothing, ?], Unit]])
@inline private[fs2] def fromFreeC[F[_], O](free: FreeC[F, O, Unit]): Stream[F, O] =
new Stream(free.asInstanceOf[FreeC[Nothing, O, Unit]])

/** Creates a pure stream that emits the supplied values. To convert to an effectful stream, use `covary`. */
def apply[F[x] >: Pure[x], O](os: O*): Stream[F, O] = emits(os)
Expand Down Expand Up @@ -3109,7 +3108,7 @@ object Stream extends StreamLowPriority {

/** Empty pure stream. */
val empty: Stream[Pure, INothing] =
fromFreeC[Pure, INothing](Algebra.pure[Pure, INothing, Unit](())): Stream[Pure, INothing]
fromFreeC[Pure, INothing](Algebra.pure[Pure, Unit](())): Stream[Pure, INothing]

/**
* Creates a single element stream that gets its value by evaluating the supplied effect. If the effect fails,
Expand Down Expand Up @@ -3307,7 +3306,7 @@ object Stream extends StreamLowPriority {
* This is a low-level method and generally should not be used by user code.
*/
def getScope[F[x] >: Pure[x]]: Stream[F, Scope[F]] =
Stream.fromFreeC(Algebra.getScope[F, Scope[F]].flatMap(Algebra.output1(_)))
Stream.fromFreeC(Algebra.getScope[F].flatMap(Algebra.output1(_)))

/**
* A stream that never emits and never terminates.
Expand Down Expand Up @@ -3566,7 +3565,7 @@ object Stream extends StreamLowPriority {

/** Provides syntax for streams that are invariant in `F` and `O`. */
final class InvariantOps[F[_], O] private[Stream] (
private val free: FreeC[Algebra[F, O, ?], Unit]
private val free: FreeC[F, O, Unit]
) extends AnyVal {
private def self: Stream[F, O] = Stream.fromFreeC(free)

Expand Down Expand Up @@ -3662,7 +3661,7 @@ object Stream extends StreamLowPriority {

/** Gets a projection of this stream that allows converting it to a `Pull` in a number of ways. */
def pull: ToPull[F, O] =
new ToPull[F, O](free.asInstanceOf[FreeC[Algebra[Nothing, Nothing, ?], Unit]])
new ToPull[F, O](free.asInstanceOf[FreeC[Nothing, O, Unit]])

/**
* Repeatedly invokes `using`, running the resultant `Pull` each time, halting when a pull
Expand All @@ -3680,8 +3679,7 @@ object Stream extends StreamLowPriority {
new PureOps(s.get[Pure, O])

/** Provides syntax for pure streams. */
final class PureOps[O] private[Stream] (private val free: FreeC[Algebra[Pure, O, ?], Unit])
extends AnyVal {
final class PureOps[O] private[Stream] (private val free: FreeC[Pure, O, Unit]) extends AnyVal {
private def self: Stream[Pure, O] = Stream.fromFreeC[Pure, O](free)

/** Alias for covary, to be able to write `Stream.empty[X]`. */
Expand Down Expand Up @@ -3709,8 +3707,7 @@ object Stream extends StreamLowPriority {
new IdOps(s.get[Id, O])

/** Provides syntax for pure pipes based on `cats.Id`. */
final class IdOps[O] private[Stream] (private val free: FreeC[Algebra[Id, O, ?], Unit])
extends AnyVal {
final class IdOps[O] private[Stream] (private val free: FreeC[Id, O, Unit]) extends AnyVal {
private def self: Stream[Id, O] = Stream.fromFreeC[Id, O](free)

private def idToApplicative[F[_]: Applicative]: Id ~> F =
Expand All @@ -3724,9 +3721,8 @@ object Stream extends StreamLowPriority {
new FallibleOps(s.get[Fallible, O])

/** Provides syntax for fallible streams. */
final class FallibleOps[O] private[Stream] (
private val free: FreeC[Algebra[Fallible, O, ?], Unit]
) extends AnyVal {
final class FallibleOps[O] private[Stream] (private val free: FreeC[Fallible, O, Unit])
extends AnyVal {
private def self: Stream[Fallible, O] = Stream.fromFreeC[Fallible, O](free)

/** Lifts this stream to the specified effect type. */
Expand All @@ -3752,11 +3748,11 @@ object Stream extends StreamLowPriority {

/** Projection of a `Stream` providing various ways to get a `Pull` from the `Stream`. */
final class ToPull[F[_], O] private[Stream] (
private val free: FreeC[Algebra[Nothing, Nothing, ?], Unit]
private val free: FreeC[Nothing, O, Unit]
) extends AnyVal {

private def self: Stream[F, O] =
Stream.fromFreeC(free.asInstanceOf[FreeC[Algebra[F, O, ?], Unit]])
Stream.fromFreeC(free.asInstanceOf[FreeC[F, O, Unit]])

/**
* Waits for a chunk of elements to be available in the source stream.
Expand Down Expand Up @@ -4024,7 +4020,7 @@ object Stream extends StreamLowPriority {
*/
def stepLeg: Pull[F, INothing, Option[StepLeg[F, O]]] =
Pull
.fromFreeC(Algebra.getScope[F, INothing])
.fromFreeC(Algebra.getScope[F])
.flatMap { scope =>
new StepLeg[F, O](Chunk.empty, scope.id, self.get).stepLeg
}
Expand Down Expand Up @@ -4118,7 +4114,7 @@ object Stream extends StreamLowPriority {
}

object Compiler extends LowPrioCompiler {
private def compile[F[_], O, B](stream: FreeC[Algebra[F, O, ?], Unit], init: B)(
private def compile[F[_], O, B](stream: FreeC[F, O, Unit], init: B)(
f: (B, Chunk[O]) => B
)(implicit F: Sync[F]): F[B] =
F.bracketCase(CompileScope.newRoot[F])(
Expand Down Expand Up @@ -4161,11 +4157,11 @@ object Stream extends StreamLowPriority {

/** Projection of a `Stream` providing various ways to compile a `Stream[F,O]` to an `F[...]`. */
final class CompileOps[F[_], G[_], O] private[Stream] (
private val free: FreeC[Algebra[Nothing, Nothing, ?], Unit]
private val free: FreeC[Nothing, O, Unit]
)(implicit compiler: Compiler[F, G]) {

private def self: Stream[F, O] =
Stream.fromFreeC(free.asInstanceOf[FreeC[Algebra[F, O, ?], Unit]])
Stream.fromFreeC(free.asInstanceOf[FreeC[F, O, Unit]])

/**
* Compiles this stream in to a value of the target effect type `F` and
Expand Down Expand Up @@ -4475,7 +4471,7 @@ object Stream extends StreamLowPriority {
final class StepLeg[F[_], O](
val head: Chunk[O],
private[fs2] val scopeId: Token,
private[fs2] val next: FreeC[Algebra[F, O, ?], Unit]
private[fs2] val next: FreeC[F, O, Unit]
) { self =>

/**
Expand Down
Loading