Skip to content

Commit

Permalink
Replace usage of NonFatal with UnsafeNonFatal
Browse files Browse the repository at this point in the history
  • Loading branch information
kapunga committed Feb 11, 2025
1 parent d4ba299 commit 83fecb9
Show file tree
Hide file tree
Showing 10 changed files with 48 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import org.scalajs.macrotaskexecutor.MacrotaskExecutor
import scala.collection.mutable
import scala.concurrent.ExecutionContextExecutor
import scala.scalajs.{js, LinkingInfo}
import scala.util.control.NonFatal

/**
* An `ExecutionContext` that improves throughput by providing a method to `schedule` fibers to
Expand Down Expand Up @@ -65,7 +64,7 @@ private[effect] final class BatchingMacrotaskExecutor(
val fiber = fibers.take()
try fiber.run()
catch {
case t if NonFatal(t) => reportFailure(t)
case t if UnsafeNonFatal(t) => reportFailure(t)
case t: Throwable => IOFiber.onFatalFailure(t)
}
i += 1
Expand Down
8 changes: 4 additions & 4 deletions core/jvm/src/main/scala/cats/effect/IOApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ package cats.effect
import cats.effect.metrics.{CpuStarvationWarningMetrics, JvmCpuStarvationMetrics}
import cats.effect.std.Console
import cats.effect.tracing.TracingConstants._
import cats.effect.unsafe.UnsafeNonFatal
import cats.syntax.all._

import scala.concurrent.{blocking, CancellationException, ExecutionContext}
import scala.concurrent.duration._
import scala.util.control.NonFatal

import java.util.concurrent.{ArrayBlockingQueue, CountDownLatch}
import java.util.concurrent.atomic.AtomicInteger
Expand Down Expand Up @@ -218,7 +218,7 @@ trait IOApp {
new ExecutionContext {
def reportFailure(t: Throwable): Unit =
t match {
case t if NonFatal(t) =>
case t if UnsafeNonFatal(t) =>
IOApp.this.reportFailure(t).unsafeRunAndForgetWithoutCallback()(runtime)

case t =>
Expand Down Expand Up @@ -555,7 +555,7 @@ trait IOApp {
throw e

case t: Throwable =>
if (NonFatal(t)) {
if (UnsafeNonFatal(t)) {
if (isForked) {
t.printStackTrace()
System.exit(1)
Expand All @@ -571,7 +571,7 @@ trait IOApp {
try {
r.run()
} catch {
case t if NonFatal(t) =>
case t if UnsafeNonFatal(t) =>
IOApp.this.reportFailure(t).unsafeRunAndForgetWithoutCallback()(runtime)

case t: Throwable =>
Expand Down
4 changes: 2 additions & 2 deletions core/jvm/src/main/scala/cats/effect/IOFiberPlatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package cats.effect

import scala.util.control.NonFatal
import cats.effect.unsafe.UnsafeNonFatal

import java.nio.channels.ClosedByInterruptException
import java.util.{concurrent => juc}
Expand Down Expand Up @@ -68,7 +68,7 @@ private[effect] abstract class IOFiberPlatform[A] extends AtomicBoolean(false) {
case ex: ClosedByInterruptException => throw ex

// this won't suppress InterruptedException:
case t if NonFatal(t) => Left(t)
case t if UnsafeNonFatal(t) => Left(t)
}

// this is why it has to be a semaphore rather than an atomic boolean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ package unsafe

import cats.effect.unsafe.metrics.PollerMetrics

import scala.util.control.NonFatal

import java.nio.channels.{SelectableChannel, SelectionKey}
import java.nio.channels.spi.{AbstractSelector, SelectorProvider}
import java.util.Iterator
Expand Down Expand Up @@ -72,7 +70,7 @@ final class SelectorSystem private (provider: SelectorProvider) extends PollingS
// reset interest in triggered ops
key.interestOps(key.interestOps() & ~readyOps)
} catch {
case ex if NonFatal(ex) =>
case ex if UnsafeNonFatal(ex) =>
error = ex
readyOps = -1 // interest all waiters
}
Expand Down Expand Up @@ -150,7 +148,7 @@ final class SelectorSystem private (provider: SelectorProvider) extends PollingS

cb(Right(Some(cancel)))
} catch {
case ex if NonFatal(ex) =>
case ex if UnsafeNonFatal(ex) =>
poller.countErroredOperation(ops)
cb(Left(ex))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import cats.effect.tracing.TracingConstants
import scala.collection.mutable
import scala.concurrent.ExecutionContextExecutor
import scala.concurrent.duration.{Duration, FiniteDuration}
import scala.util.control.NonFatal

import java.time.Instant
import java.time.temporal.ChronoField
Expand Down Expand Up @@ -680,7 +679,7 @@ private[effect] final class WorkStealingThreadPool[P <: AnyRef](
try {
task.run()
} catch {
case ex if NonFatal(ex) =>
case ex if UnsafeNonFatal(ex) =>
reportFailure(ex)
}
}
Expand Down
19 changes: 9 additions & 10 deletions core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import scala.annotation.{switch, tailrec}
import scala.collection.mutable
import scala.concurrent.{BlockContext, CanAwait}
import scala.concurrent.duration.{Duration, FiniteDuration}
import scala.util.control.NonFatal

import java.lang.Long.MIN_VALUE
import java.util.concurrent.{ArrayBlockingQueue, ThreadLocalRandom}
Expand Down Expand Up @@ -376,7 +375,7 @@ private[effect] final class WorkerThread[P <: AnyRef](
pool.notifyParked(rnd)
try fiber.run()
catch {
case t if NonFatal(t) => pool.reportFailure(t)
case t if UnsafeNonFatal(t) => pool.reportFailure(t)
case t: Throwable => IOFiber.onFatalFailure(t)
}

Expand All @@ -393,7 +392,7 @@ private[effect] final class WorkerThread[P <: AnyRef](
// The dequeued element is a single fiber. Execute it immediately.
try fiber.run()
catch {
case t if NonFatal(t) => pool.reportFailure(t)
case t if UnsafeNonFatal(t) => pool.reportFailure(t)
case t: Throwable => IOFiber.onFatalFailure(t)
}

Expand Down Expand Up @@ -443,7 +442,7 @@ private[effect] final class WorkerThread[P <: AnyRef](
// Run the stolen fiber.
try fiber.run()
catch {
case t if NonFatal(t) => pool.reportFailure(t)
case t if UnsafeNonFatal(t) => pool.reportFailure(t)
case t: Throwable => IOFiber.onFatalFailure(t)
}
}
Expand Down Expand Up @@ -495,7 +494,7 @@ private[effect] final class WorkerThread[P <: AnyRef](
pool.notifyParked(rnd)
try fiber.run()
catch {
case t if NonFatal(t) => pool.reportFailure(t)
case t if UnsafeNonFatal(t) => pool.reportFailure(t)
case t: Throwable => IOFiber.onFatalFailure(t)
}

Expand All @@ -515,7 +514,7 @@ private[effect] final class WorkerThread[P <: AnyRef](
// The dequeued element is a single fiber. Execute it immediately.
try fiber.run()
catch {
case t if NonFatal(t) => pool.reportFailure(t)
case t if UnsafeNonFatal(t) => pool.reportFailure(t)
case t: Throwable => IOFiber.onFatalFailure(t)
}

Expand Down Expand Up @@ -545,7 +544,7 @@ private[effect] final class WorkerThread[P <: AnyRef](
// Run the stolen fiber.
try fiber.run()
catch {
case t if NonFatal(t) => pool.reportFailure(t)
case t if UnsafeNonFatal(t) => pool.reportFailure(t)
case t: Throwable => IOFiber.onFatalFailure(t)
}
}
Expand Down Expand Up @@ -810,7 +809,7 @@ private[effect] final class WorkerThread[P <: AnyRef](

try fiber.run()
catch {
case t if NonFatal(t) => pool.reportFailure(t)
case t if UnsafeNonFatal(t) => pool.reportFailure(t)
case t: Throwable => IOFiber.onFatalFailure(t)
}
} else if (element.isInstanceOf[Runnable]) {
Expand All @@ -824,7 +823,7 @@ private[effect] final class WorkerThread[P <: AnyRef](
// The dequeued element is a single fiber. Execute it immediately.
try fiber.run()
catch {
case t if NonFatal(t) => pool.reportFailure(t)
case t if UnsafeNonFatal(t) => pool.reportFailure(t)
case t: Throwable => IOFiber.onFatalFailure(t)
}
}
Expand Down Expand Up @@ -861,7 +860,7 @@ private[effect] final class WorkerThread[P <: AnyRef](
// Run the fiber.
try fiber.run()
catch {
case t if NonFatal(t) => pool.reportFailure(t)
case t if UnsafeNonFatal(t) => pool.reportFailure(t)
case t: Throwable => IOFiber.onFatalFailure(t)
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import scala.scalanative.meta.LinktimeInfo
import scala.scalanative.posix.time._
import scala.scalanative.posix.timeOps._
import scala.scalanative.unsafe._
import scala.util.control.NonFatal

import java.util.{ArrayDeque, PriorityQueue}

Expand Down Expand Up @@ -96,7 +95,7 @@ private[effect] final class EventLoopExecutorScheduler[P](
val task = sleepQueue.poll()
try task.runnable.run()
catch {
case t if NonFatal(t) => reportFailure(t)
case t if UnsafeNonFatal(t) => reportFailure(t)
case t: Throwable => IOFiber.onFatalFailure(t)
}
}
Expand All @@ -107,7 +106,7 @@ private[effect] final class EventLoopExecutorScheduler[P](
val runnable = executeQueue.poll()
try runnable.run()
catch {
case t if NonFatal(t) => reportFailure(t)
case t if UnsafeNonFatal(t) => reportFailure(t)
case t: Throwable => IOFiber.onFatalFailure(t)
}
i += 1
Expand Down
9 changes: 4 additions & 5 deletions core/shared/src/main/scala/cats/effect/IO.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,14 @@ import cats.effect.std.{
UUIDGen
}
import cats.effect.tracing.{Tracing, TracingEvent}
import cats.effect.unsafe.IORuntime
import cats.effect.unsafe.{IORuntime, UnsafeNonFatal}
import cats.syntax._
import cats.syntax.all._

import scala.annotation.unchecked.uncheckedVariance
import scala.concurrent._
import scala.concurrent.duration._
import scala.util.{Failure, Success, Try}
import scala.util.control.NonFatal

import java.util.UUID
import java.util.concurrent.Executor
Expand Down Expand Up @@ -1014,7 +1013,7 @@ sealed abstract class IO[+A] private () extends IOPlatform[A] {
unsafeRunFiber(
cb(Left(new CancellationException("The fiber was canceled"))),
t => {
if (!NonFatal(t)) {
if (!UnsafeNonFatal(t)) {
t.printStackTrace()
}
cb(Left(t))
Expand All @@ -1027,7 +1026,7 @@ sealed abstract class IO[+A] private () extends IOPlatform[A] {
unsafeRunFiber(
cb(Outcome.canceled),
t => {
if (!NonFatal(t)) {
if (!UnsafeNonFatal(t)) {
t.printStackTrace()
}
cb(Outcome.errored(t))
Expand All @@ -1050,7 +1049,7 @@ sealed abstract class IO[+A] private () extends IOPlatform[A] {
val _ = unsafeRunFiber(
(),
t => {
if (NonFatal(t)) {
if (UnsafeNonFatal(t)) {
if (runtime.config.reportUnhandledFiberErrors)
runtime.compute.reportFailure(t)
} else { t.printStackTrace() }
Expand Down
Loading

0 comments on commit 83fecb9

Please sign in to comment.