Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prototype of file rotation / resource proxying #1667

Merged
merged 19 commits into from
Nov 4, 2019
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 132 additions & 0 deletions core/shared/src/main/scala/fs2/Hotswap.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package fs2

import cats.implicits._
import cats.effect.{Concurrent, Resource, Sync}
import cats.effect.concurrent.Ref
import cats.effect.implicits._

/**
* Supports treating a linear sequence of resources as a single resource.
*
* A `Hotswap[F, R]` instance is created as a `Resource` and hence, has
* a lifetime that is scoped by the `Resource`. After creation, a `Resource[F, R]`
* can be swapped in to the `Hotswap` by calling `swap`. The acquired resource
* is returned and is finalized when the `Hotswap` is finalized or upon the next
* call to `swap`, whichever occurs first.
*
* For example, the sequence of three resources `r1, r2, r3` are shown in the
* following diagram:
*
* {{{
* >----- swap(r1) ---- swap(r2) ---- swap(r3) ----X
* | | | | |
* Creation | | | |
* r1 acquired | | |
* r2 acquired | |
* r1 released r3 acquired |
* r2 released |
* r3 released
* }}}
*
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's pretty slick :)

* This class is particularly useful when working with pulls that cycle through
* resources -- e.g., writing bytes to files, rotating files every N bytes or M seconds.
* Without `Hotswap`, such pulls leak resources -- on each file rotation, a file handle
* or at least an internal resource reference accumulates. With `Hotswap`, the `Hotswap`
* instance is the only registered resource and each file is swapped in to the `Hotswap`.
*
* Usage typically looks something like:
*
* {{{
* Stream.resource(Hotswap(mkResource)).flatMap { case (hotswap, r) =>
* // Use r, call hotswap.swap(mkResource) as necessary
* }
* }}}
*
* See `fs2.io.file.writeRotate` for an example of usage.
*/
sealed trait Hotswap[F[_], R] {

/**
* Allocates a new resource, closes the last one if present, and
* returns the newly allocated `R`.
*
* If there are no further calls to `swap`, the resource created by
* the last call will be finalized when the lifetime of
* this `Hotswap` (which is itself tracked by `Resource`) is over.
*
* Since `swap` closes the old resource immediately, you need to
* ensure that no code is using the old `R` when `swap` is called.
* Failing to do so is likely to result in an error on the
* _consumer_ side. In any case, no resources will be leaked by
* `swap`.
*
* If you try to call swap after the lifetime of this `Hotswap` is
* over, `swap` will fail, but it will ensure all resources are
* closed, and never leak any.
*/
def swap(next: Resource[F, R]): F[R]

/**
* Runs the finalizer of the current resource, if any, and restores
* this `Hotswap` to its initial state.
*
* Like `swap`, you need to ensure that no code is using the old `R` when
* `clear is called`. Similarly, calling `clear` after the lifetime of this
* `Hotswap` results in an error.
*/
def clear: F[Unit]
}

object Hotswap {

/**
* Creates a new `Hotswap` initialized with the specified resource.
* The `Hotswap` instance and the initial resource are returned.
*/
def apply[F[_]: Concurrent, R](initial: Resource[F, R]): Resource[F, (Hotswap[F, R], R)] =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how inconvenient do you think it would be to rename this to create or make, and create to empty? apply is pretty much as convenient as the Stream combinator you wanted, so if you really want that convenience, ignore this comment. My doubt stems from the fact that apply on tagless stuff is generally for summoning

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IDK, I like using apply for the default constructor. We have precedence for it too with things like Deferred and Socket and FileHandle.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, leave it

create[F, R].evalMap(p => p.swap(initial).map(r => (p, r)))

/**
* Creates a new `Hotswap`, which represents a `Resource`
* that can be swapped during the lifetime of this `Hotswap`.
*/
def create[F[_]: Concurrent, R]: Resource[F, Hotswap[F, R]] = {
def raise[A](msg: String): F[A] =
Sync[F].raiseError(new RuntimeException(msg))

def initialize = Ref[F].of(().pure[F].some)

def finalize(state: Ref[F, Option[F[Unit]]]): F[Unit] =
state
.getAndSet(None)
.flatMap {
case None => raise[Unit]("Finalizer already run")
case Some(finalizer) => finalizer
}

Resource.make(initialize)(finalize).map { state =>
new Hotswap[F, R] {
override def swap(next: Resource[F, R]): F[R] =
(next <* ().pure[Resource[F, ?]]) // workaround for https://github.com/typelevel/cats-effect/issues/579
.allocated
.continual { r => // this whole block is inside continual and cannot be canceled
Sync[F].fromEither(r).flatMap {
case (newValue, newFinalizer) =>
swapFinalizer(newFinalizer).as(newValue)
}
}

override def clear: F[Unit] =
swapFinalizer(().pure[F])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needs uncancelable here :)


private def swapFinalizer(newFinalizer: F[Unit]): F[Unit] =
state.modify {
case Some(oldFinalizer) =>
newFinalizer.some -> oldFinalizer
case None =>
None -> (newFinalizer *> raise[Unit]("Cannot swap after proxy has been finalized"))
}.flatten
}
}
}
}
105 changes: 105 additions & 0 deletions core/shared/src/test/scala/fs2/HotswapSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package fs2

import cats.implicits._
import cats.effect.Sync
import cats.effect.concurrent.Ref
import cats.effect.IO
import cats.effect.Resource

class HotswapSpec extends Fs2Spec {
"Hotswap" - {
"finalizer of target run when hotswap is finalized" in {
mkEventLogger.flatMap { logger =>
Stream
.resource(Hotswap(logLifecycle(logger, "a")))
.flatMap { _ =>
Stream
.eval_(logger.log(Info("using")))
}
.compile
.drain *> logger.get.asserting(
_ shouldBe List(
Acquired("a"),
Info("using"),
Released("a")
)
)
}
}

"swap acquires new resource and then finalizes old resource" in {
mkEventLogger.flatMap { logger =>
Stream
.resource(Hotswap(logLifecycle(logger, "a")))
.flatMap {
case (hotswap, _) =>
Stream.eval_(logger.log(Info("using a"))) ++
Stream.eval_(hotswap.swap(logLifecycle(logger, "b"))) ++
Stream.eval_(logger.log(Info("using b"))) ++
Stream.eval_(hotswap.swap(logLifecycle(logger, "c"))) ++
Stream.eval_(logger.log(Info("using c")))
}
.compile
.drain *> logger.get.asserting(
_ shouldBe List(
Acquired("a"),
Info("using a"),
Acquired("b"),
Released("a"),
Info("using b"),
Acquired("c"),
Released("b"),
Info("using c"),
Released("c")
)
)
}
}

"clear finalizes old resource" in {
mkEventLogger.flatMap { logger =>
Stream
.resource(Hotswap(logLifecycle(logger, "a")))
.flatMap {
case (hotswap, _) =>
Stream.eval_(logger.log(Info("using a"))) ++
Stream.eval_(hotswap.clear) ++
Stream.eval_(logger.log(Info("after clear")))
}
.compile
.drain *> logger.get.asserting(
_ shouldBe List(
Acquired("a"),
Info("using a"),
Released("a"),
Info("after clear")
)
)
}
}
}

trait Logger[F[_], A] {
def log(a: A): F[Unit]
def get: F[List[A]]
}
object Logger {
def apply[F[_]: Sync, A]: F[Logger[F, A]] =
Ref.of(Nil: List[A]).map { ref =>
new Logger[F, A] {
def log(a: A): F[Unit] = ref.update(acc => a :: acc)
def get: F[List[A]] = ref.get.map(_.reverse)
}
}
}

sealed trait Event
case class Acquired(tag: String) extends Event
case class Released(tag: String) extends Event
case class Info(message: String) extends Event

def mkEventLogger: IO[Logger[IO, Event]] = Logger[IO, Event]

def logLifecycle(logger: Logger[IO, Event], tag: String): Resource[IO, Unit] =
Resource.make(logger.log(Acquired(tag)))(_ => logger.log(Released(tag)))
}
94 changes: 94 additions & 0 deletions io/src/main/scala/fs2/io/file/ReadCursor.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package fs2
package io
package file

import scala.concurrent.duration.FiniteDuration

import cats.{Functor, ~>}
import cats.arrow.FunctionK
import cats.implicits._
import cats.effect.{Blocker, ContextShift, Resource, Sync, Timer}

import java.nio.file._

/**
* Associates a `FileHandle` with an offset in to the file.
*
* This encapsulates the pattern of incrementally reading bytes in from a file,
* a chunk at a time. Additionally, convenience methods are provided for
* working with pulls.
*/
final case class ReadCursor[F[_]](val file: FileHandle[F], val offset: Long) {

/**
* Reads a single chunk from the underlying file handle, returning the
* read chunk and a new cursor with an offset incremented by the chunk size.
*/
def read(chunkSize: Int)(implicit F: Functor[F]): F[Option[(ReadCursor[F], Chunk[Byte])]] =
read_[F](chunkSize, FunctionK.id[F])

/**
* Like `read` but returns a pull instead of an `F[(ReadCursor[F], Option[Chunk[Byte]])]`.
*/
def readPull(chunkSize: Int): Pull[F, Nothing, Option[(ReadCursor[F], Chunk[Byte])]] =
read_(chunkSize, Pull.functionKInstance)

private def read_[G[_]: Functor](
chunkSize: Int,
u: F ~> G
): G[Option[(ReadCursor[F], Chunk[Byte])]] =
u(file.read(chunkSize, offset)).map {
_.map { chunk =>
val next = ReadCursor(file, offset + chunk.size)
(next, chunk)
}
}

/**
* Reads all chunks from the underlying file handle, returning a cursor
* with offset incremented by the total number of bytes read.
*/
def readAll(chunkSize: Int): Pull[F, Byte, ReadCursor[F]] =
readPull(chunkSize).flatMap {
case Some((next, chunk)) => Pull.output(chunk) >> next.readAll(chunkSize)
case None => Pull.pure(this)
}

/**
* Reads chunks until the specified end position in the file. Returns a pull that outputs
* the read chunks and completes with a cursor with offset incremented by the total number
* of bytes read.
*/
def readUntil(chunkSize: Int, end: Long): Pull[F, Byte, ReadCursor[F]] =
if (offset < end) {
val toRead = ((end - offset).min(Int.MaxValue).toInt).min(chunkSize)
readPull(toRead).flatMap {
case Some((next, chunk)) => Pull.output(chunk) >> next.readUntil(chunkSize, end)
case None => Pull.pure(this)
}
} else Pull.pure(this)

/** Returns a new cursor with the offset adjusted to the specified position. */
def seek(position: Long): ReadCursor[F] = ReadCursor(file, position)

def tail(chunkSize: Int, pollDelay: FiniteDuration)(
implicit timer: Timer[F]
): Pull[F, Byte, ReadCursor[F]] =
readPull(chunkSize).flatMap {
case Some((next, chunk)) => Pull.output(chunk) >> next.tail(chunkSize, pollDelay)
case None => Pull.eval(timer.sleep(pollDelay)) >> tail(chunkSize, pollDelay)
}
}

object ReadCursor {

def fromPath[F[_]: Sync: ContextShift](
path: Path,
blocker: Blocker,
flags: Seq[OpenOption] = Nil
): Resource[F, ReadCursor[F]] =
FileHandle.fromPath(path, blocker, StandardOpenOption.READ :: flags.toList).map { fileHandle =>
ReadCursor(fileHandle, 0L)
}

}
Loading