Skip to content

Commit

Permalink
Remove unnecessary fork(setSync)
Browse files Browse the repository at this point in the history
Only exception being in Queue
  • Loading branch information
SystemFw committed Dec 1, 2017
1 parent 7f6a305 commit af5f4c4
Show file tree
Hide file tree
Showing 6 changed files with 9 additions and 9 deletions.
4 changes: 2 additions & 2 deletions core/shared/src/main/scala/fs2/AsyncPull.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ sealed abstract class AsyncPull[F[_],A] { self =>
(b, cancelB) = t1
fa = a.run.map(Left(_): Either[A, B])
fb = b.run.map(Right(_): Either[A, B])
_ <- async.fork(fa.attempt.flatMap(x => async.fork(promise.setSync(x))))
_ <- async.fork(fb.attempt.flatMap(x => async.fork(promise.setSync(x))))
_ <- async.fork(fa.attempt.flatMap(x => promise.setSync(x)))
_ <- async.fork(fb.attempt.flatMap(x => promise.setSync(x)))
} yield {
(FreeC.Eval(promise.get.flatMap {
case Left(e) => F.raiseError[Either[A, B]](e)
Expand Down
4 changes: 2 additions & 2 deletions core/shared/src/main/scala/fs2/Scheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -274,9 +274,9 @@ object Scheduler extends SchedulerPlatform {
async.promise[F,Option[A]].flatMap { gate =>
F.delay {
val cancel = scheduler.scheduleOnce(d) {
ec.execute(() => async.unsafeRunAsync(fa.flatMap(a => async.fork(gate.setSync(Some(a)))))(_ => IO.unit))
ec.execute(() => async.unsafeRunAsync(fa.flatMap(a => gate.setSync(Some(a))))(_ => IO.unit))
}
gate.get -> (F.delay(cancel()) *> async.fork(gate.setSync(None)))
gate.get -> (F.delay(cancel()) *> gate.setSync(None))
}
}

Expand Down
2 changes: 1 addition & 1 deletion core/shared/src/main/scala/fs2/async/async.scala
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ package object async {
*/
def start[F[_], A](f: F[A])(implicit F: Effect[F], ec: ExecutionContext): F[F[A]] =
promise[F, Either[Throwable, A]].flatMap { p =>
fork(f.attempt.flatMap(x => fork(p.setSync(x)))).as(p.get.flatMap(F.fromEither))
fork(f.attempt.flatMap(p.setSync)).as(p.get.flatMap(F.fromEither))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ object Semaphore {
type S = Either[Vector[(Long,async.Promise[F, Unit])], Long]
async.refOf[F,S](Right(n)).map { state => new Semaphore[F] {
private def open(gate: async.Promise[F, Unit]): F[Unit] =
async.fork(gate.setSync(()))
gate.setSync(())

def count = state.get.map(count_)

Expand Down
4 changes: 2 additions & 2 deletions core/shared/src/main/scala/fs2/async/mutable/Topic.scala
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ object Topic {
def unSubscribe: F[Unit] = for {
_ <- state.modify { case (a,subs) => a -> subs.filterNot(_.id == id) }
_ <- subSignal.modify(_ - 1)
_ <- async.fork(done.setSync(true))
_ <- done.setSync(true)
} yield ()
def subscribe: Stream[F, A] = eval(firstA.get) ++ q.dequeue
def publish(a: A): F[Unit] = {
Expand All @@ -127,7 +127,7 @@ object Topic {
}
c <- state.modify { case(a,s) => a -> (s :+ sub) }
_ <- subSignal.modify(_ + 1)
_ <- async.fork(firstA.setSync(c.now._1))
_ <- firstA.setSync(c.now._1)
} yield sub

new Topic[F,A] {
Expand Down
2 changes: 1 addition & 1 deletion io/src/test/scala/fs2/io/tcp/SocketSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class SocketSpec extends Fs2Spec with BeforeAndAfterAll {

val echoServer: Stream[IO, Unit] = {
serverWithLocalAddress[IO](new InetSocketAddress(InetAddress.getByName(null), 0)).flatMap {
case Left(local) => Stream.eval_(async.fork(localBindAddress.setSync(local)))
case Left(local) => Stream.eval_(localBindAddress.setSync(local))
case Right(s) =>
s.map { socket =>
socket.reads(1024).to(socket.writes()).onFinalize(socket.endOfOutput)
Expand Down

0 comments on commit af5f4c4

Please sign in to comment.