From f9e13f624a52b0265005ca8d844301b0432e5de3 Mon Sep 17 00:00:00 2001 From: Michael Pilquist Date: Tue, 15 Oct 2019 17:28:08 -0400 Subject: [PATCH 01/19] wip --- .../src/main/scala/fs2/ResourceProxy.scala | 52 +++++++++++++++++++ 1 file changed, 52 insertions(+) create mode 100644 core/shared/src/main/scala/fs2/ResourceProxy.scala diff --git a/core/shared/src/main/scala/fs2/ResourceProxy.scala b/core/shared/src/main/scala/fs2/ResourceProxy.scala new file mode 100644 index 0000000000..75dadc9497 --- /dev/null +++ b/core/shared/src/main/scala/fs2/ResourceProxy.scala @@ -0,0 +1,52 @@ +package fs2 + +import cats.implicits._ +import cats.effect.{Resource, Sync} +import cats.effect.concurrent.Ref + +final class ResourceProxy[F[_], R](state: Ref[F, Option[(R, F[Unit])]])(implicit F: Sync[F]) { + def get: F[R] = state.get.flatMap { + case None => F.raiseError(new RuntimeException("Resource already finalized")) + case Some((r, _)) => F.pure(r) + } + + // Runs the finalizer for the current proxy target and delays finalizer of newValue until this proxy is finalized + def swap(newValue: Resource[F, R]): F[Unit] = + newValue.allocated.flatMap { + case nv => + def doSwap: F[Unit] = + state.access.flatMap { + case (current, setter) => + current match { + case None => + nv._2 *> F.raiseError( + new RuntimeException("Resource proxy already finalized at time of swap") + ) + case Some((_, oldFinalizer)) => + setter(Some(nv)).ifM(oldFinalizer, doSwap) + oldFinalizer + } + } + doSwap + } + + private def runFinalizer: F[Unit] = + state.access.flatMap { + case (current, setter) => + current match { + case None => F.raiseError(new RuntimeException("Finalizer already run")) + case Some((_, finalizer)) => setter(None).ifM(finalizer, runFinalizer) + } + } +} + +object ResourceProxy { + def apply[F[_]: Sync, R](initialValue: Resource[F, R]): Resource[F, ResourceProxy[F, R]] = { + val acquire = initialValue.allocated.flatMap { v => + Ref.of[F, Option[(R, F[Unit])]](Some(v)).map { state => + new ResourceProxy(state) + } + } + Resource.make(acquire)(_.runFinalizer) + } +} From cff44c3577f27803c919b9206741a73a916bbb14 Mon Sep 17 00:00:00 2001 From: Michael Pilquist Date: Wed, 16 Oct 2019 18:12:04 -0400 Subject: [PATCH 02/19] Example of file rotation via ResourceProxy --- .../src/main/scala/fs2/ResourceProxy.scala | 29 +++++++------- io/src/main/scala/fs2/io/file/file.scala | 38 ++++++++++++++++++ io/src/test/scala/fs2/io/file/FileSpec.scala | 39 +++++++++++++++++++ 3 files changed, 92 insertions(+), 14 deletions(-) diff --git a/core/shared/src/main/scala/fs2/ResourceProxy.scala b/core/shared/src/main/scala/fs2/ResourceProxy.scala index 75dadc9497..7132b5d570 100644 --- a/core/shared/src/main/scala/fs2/ResourceProxy.scala +++ b/core/shared/src/main/scala/fs2/ResourceProxy.scala @@ -3,32 +3,34 @@ package fs2 import cats.implicits._ import cats.effect.{Resource, Sync} import cats.effect.concurrent.Ref +import cats.effect.Bracket -final class ResourceProxy[F[_], R](state: Ref[F, Option[(R, F[Unit])]])(implicit F: Sync[F]) { +final class ResourceProxy[F[_], R](state: Ref[F, Option[(R, F[Unit])]])( + implicit F: Bracket[F, Throwable] +) { def get: F[R] = state.get.flatMap { - case None => F.raiseError(new RuntimeException("Resource already finalized")) + case None => F.raiseError(new RuntimeException("Cannot get after proxy has been finalized")) case Some((r, _)) => F.pure(r) } // Runs the finalizer for the current proxy target and delays finalizer of newValue until this proxy is finalized - def swap(newValue: Resource[F, R]): F[Unit] = - newValue.allocated.flatMap { + def swap(newValue: Resource[F, R]): F[R] = + F.uncancelable(newValue.allocated.flatMap { case nv => - def doSwap: F[Unit] = + def doSwap: F[R] = state.access.flatMap { case (current, setter) => current match { case None => nv._2 *> F.raiseError( - new RuntimeException("Resource proxy already finalized at time of swap") + new RuntimeException("Cannot swap after proxy has been finalized") ) case Some((_, oldFinalizer)) => - setter(Some(nv)).ifM(oldFinalizer, doSwap) - oldFinalizer + setter(Some(nv)).ifM(oldFinalizer.as(nv._1), doSwap) } } doSwap - } + }) private def runFinalizer: F[Unit] = state.access.flatMap { @@ -42,11 +44,10 @@ final class ResourceProxy[F[_], R](state: Ref[F, Option[(R, F[Unit])]])(implicit object ResourceProxy { def apply[F[_]: Sync, R](initialValue: Resource[F, R]): Resource[F, ResourceProxy[F, R]] = { - val acquire = initialValue.allocated.flatMap { v => - Ref.of[F, Option[(R, F[Unit])]](Some(v)).map { state => - new ResourceProxy(state) - } - } + val acquire = for { + v <- initialValue.allocated + state <- Ref.of[F, Option[(R, F[Unit])]](Some(v)) + } yield new ResourceProxy(state) Resource.make(acquire)(_.runFinalizer) } } diff --git a/io/src/main/scala/fs2/io/file/file.scala b/io/src/main/scala/fs2/io/file/file.scala index 320c399054..0f86f1913b 100644 --- a/io/src/main/scala/fs2/io/file/file.scala +++ b/io/src/main/scala/fs2/io/file/file.scala @@ -112,6 +112,44 @@ package object file { _writeAll1(buf.drop(written), out, offset + written) } + def writeRotate[F[_]: Sync: ContextShift]( + path: F[Path], + limit: Long, + blocker: Blocker, + flags: Seq[StandardOpenOption] = List(StandardOpenOption.CREATE) + ): Pipe[F, Byte, Unit] = { + def openNewFile: Resource[F, FileHandle[F]] = + Resource + .liftF(path) + .flatMap(p => FileHandle.fromPath(p, blocker, StandardOpenOption.WRITE :: flags.toList)) + + def go( + fileHandleProxy: ResourceProxy[F, FileHandle[F]], + offset: Long, + acc: Long, + s: Stream[F, Byte] + ): Pull[F, Unit, Unit] = + s.pull.unconsLimit((limit - acc).min(Int.MaxValue.toLong).toInt).flatMap { + case Some((hd, tl)) => + println(s"limit $limit, acc $acc, offset $offset, hd ${hd.size}") + val write = Pull.eval(fileHandleProxy.get.flatMap(_.write(hd, offset))) + val sz = hd.size + val newAcc = acc + sz + val next = if (newAcc >= limit) { + Pull.eval(fileHandleProxy.swap(openNewFile)) >> go(fileHandleProxy, 0L, 0L, tl) + } else { + go(fileHandleProxy, offset + sz, newAcc, tl) + } + write >> next + case None => Pull.done + } + + in => + Stream + .resource(ResourceProxy(openNewFile)) + .flatMap(fileHandleProxy => go(fileHandleProxy, 0L, 0L, in).stream) + } + /** * Creates a [[Watcher]] for the default file system. * diff --git a/io/src/test/scala/fs2/io/file/FileSpec.scala b/io/src/test/scala/fs2/io/file/FileSpec.scala index eeb5a4442e..1020008e27 100644 --- a/io/src/test/scala/fs2/io/file/FileSpec.scala +++ b/io/src/test/scala/fs2/io/file/FileSpec.scala @@ -5,6 +5,7 @@ package file import java.nio.file.StandardOpenOption import cats.effect.{Blocker, IO} +import cats.effect.concurrent.Ref import cats.implicits._ import scala.concurrent.duration._ @@ -315,4 +316,42 @@ class FileSpec extends BaseFileSpec { } } + "writeRotate" in { + val bufferSize = 100 + val totalBytes = 1000 + val rotateLimit = 150 + Stream + .resource(Blocker[IO]) + .flatMap { bec => + tempDirectory.flatMap { dir => + Stream.eval(Ref.of[IO, Int](0)).flatMap { counter => + val path = counter.modify(i => (i + 1, i)).map(i => dir.resolve(i.toString)) + val write = Stream(0x42.toByte).repeat + .buffer(bufferSize) + .take(totalBytes) + .through(file.writeRotate[IO](path, rotateLimit, bec)) + .compile + .drain + val verify = file + .directoryStream[IO](bec, dir) + .compile + .toList + .flatMap { paths => + paths + .sortBy(_.toString) + .traverse(p => IO(println(p.toString)) *> file.size[IO](bec, p)) + } + .asserting { sizes => + assert(sizes.size == ((totalBytes + rotateLimit - 1) / rotateLimit)) + assert( + sizes.init.forall(_ == rotateLimit) && sizes.last == (totalBytes % rotateLimit) + ) + } + Stream.eval(write *> verify) + } + } + } + .compile + .lastOrError + } } From b98c62259d9e66668d6c45f3bd949ca5e6e6ad43 Mon Sep 17 00:00:00 2001 From: Michael Pilquist Date: Thu, 17 Oct 2019 16:30:32 -0400 Subject: [PATCH 03/19] Refactored common file writing to a WriteCursor type --- .../main/scala/fs2/io/file/WriteCursor.scala | 66 +++++++++++++++++++ io/src/main/scala/fs2/io/file/file.scala | 55 ++++------------ io/src/main/scala/fs2/io/file/pulls.scala | 20 +----- 3 files changed, 80 insertions(+), 61 deletions(-) create mode 100644 io/src/main/scala/fs2/io/file/WriteCursor.scala diff --git a/io/src/main/scala/fs2/io/file/WriteCursor.scala b/io/src/main/scala/fs2/io/file/WriteCursor.scala new file mode 100644 index 0000000000..7b91a83ae4 --- /dev/null +++ b/io/src/main/scala/fs2/io/file/WriteCursor.scala @@ -0,0 +1,66 @@ +package fs2 +package io +package file + +import cats.{Monad, ~>} +import cats.implicits._ +import cats.effect.{Blocker, ContextShift, Resource, Sync} + +import java.nio.file._ +import cats.arrow.FunctionK + +/** + * Associates a `FileHandle` with an offset in to the file. + * + * This encapsulates the pattern of incrementally writing bytes in to a file, + * a chunk at a time. Additionally, convenience methods are provided for + * working with pulls. + */ +final case class WriteCursor[F[_]](val file: FileHandle[F], val offset: Long) { + + /** + * Writes a single chunk to the underlying file handle, returning a new cursor + * with an offset incremented by the chunk size. + */ + def write(bytes: Chunk[Byte])(implicit F: Monad[F]): F[WriteCursor[F]] = + write_[F](bytes, FunctionK.id[F]) + + /** + * Like `write` but returns a pull instead of an `F[WriteCursor[F]]`. + */ + def writePull(bytes: Chunk[Byte]): Pull[F, Nothing, WriteCursor[F]] = + write_(bytes, Pull.functionKInstance) + + private def write_[G[_]: Monad](bytes: Chunk[Byte], u: F ~> G): G[WriteCursor[F]] = + u(file.write(bytes, offset)).flatMap { written => + val next = WriteCursor(file, offset + written) + if (written == bytes.size) next.pure[G] + else next.write_(bytes.drop(written), u) + } + + /** + * Writes all chunks from the supplied stream to the underlying file handle, returning a cursor + * with offset incremented by the total number of bytes written. + */ + def writeAll(s: Stream[F, Byte]): Pull[F, Nothing, WriteCursor[F]] = + s.pull.uncons.flatMap { + case Some((hd, tl)) => writePull(hd).flatMap(_.writeAll(tl)) + case None => Pull.pure(this) + } +} + +object WriteCursor { + + def fromPath[F[_]: Sync: ContextShift]( + path: Path, + blocker: Blocker, + flags: Seq[OpenOption] = List(StandardOpenOption.CREATE) + ): Resource[F, WriteCursor[F]] = + FileHandle.fromPath(path, blocker, StandardOpenOption.WRITE :: flags.toList).flatMap { + fileHandle => + val size = if (flags.contains(StandardOpenOption.APPEND)) fileHandle.size else 0L.pure[F] + val cursor = size.map(s => WriteCursor(fileHandle, s)) + Resource.liftF(cursor) + } + +} diff --git a/io/src/main/scala/fs2/io/file/file.scala b/io/src/main/scala/fs2/io/file/file.scala index 0f86f1913b..63e0eb5314 100644 --- a/io/src/main/scala/fs2/io/file/file.scala +++ b/io/src/main/scala/fs2/io/file/file.scala @@ -2,6 +2,7 @@ package fs2 package io import java.nio.file._ +import java.nio.file.attribute.FileAttribute import java.util.stream.{Stream => JStream} import scala.collection.JavaConverters._ @@ -12,10 +13,6 @@ import cats.implicits._ /** Provides support for working with files. */ package object file { - import java.nio.file.attribute.FileAttribute - import java.nio.file.CopyOption - import java.nio.file.LinkOption - import java.nio.file.Files /** * Reads all data synchronously from the file at the specified `java.nio.file.Path`. @@ -80,37 +77,9 @@ package object file { flags: Seq[StandardOpenOption] = List(StandardOpenOption.CREATE) ): Pipe[F, Byte, Unit] = in => - for { - fileHandle <- Stream.resource( - FileHandle.fromPath(path, blocker, StandardOpenOption.WRITE :: flags.toList) - ) - offset <- if (flags.contains(StandardOpenOption.APPEND)) Stream.eval(fileHandle.size) - else Stream(0L) - _ <- pulls.writeAllToFileHandleAtOffset(in, fileHandle, offset).stream - } yield () - - private def _writeAll0[F[_]]( - in: Stream[F, Byte], - out: FileHandle[F], - offset: Long - ): Pull[F, Nothing, Unit] = - in.pull.uncons.flatMap { - case None => Pull.done - case Some((hd, tl)) => - _writeAll1(hd, out, offset) >> _writeAll0(tl, out, offset + hd.size) - } - - private def _writeAll1[F[_]]( - buf: Chunk[Byte], - out: FileHandle[F], - offset: Long - ): Pull[F, Nothing, Unit] = - Pull.eval(out.write(buf, offset)).flatMap { (written: Int) => - if (written >= buf.size) - Pull.pure(()) - else - _writeAll1(buf.drop(written), out, offset + written) - } + Stream + .resource(WriteCursor.fromPath(path, blocker, flags)) + .flatMap(_.writeAll(in).void.stream) def writeRotate[F[_]: Sync: ContextShift]( path: F[Path], @@ -118,27 +87,25 @@ package object file { blocker: Blocker, flags: Seq[StandardOpenOption] = List(StandardOpenOption.CREATE) ): Pipe[F, Byte, Unit] = { - def openNewFile: Resource[F, FileHandle[F]] = + def openNewFile: Resource[F, WriteCursor[F]] = Resource .liftF(path) - .flatMap(p => FileHandle.fromPath(p, blocker, StandardOpenOption.WRITE :: flags.toList)) + .flatMap(p => WriteCursor.fromPath(p, blocker, flags)) def go( - fileHandleProxy: ResourceProxy[F, FileHandle[F]], - offset: Long, + cursorProxy: ResourceProxy[F, WriteCursor[F]], acc: Long, s: Stream[F, Byte] ): Pull[F, Unit, Unit] = s.pull.unconsLimit((limit - acc).min(Int.MaxValue.toLong).toInt).flatMap { case Some((hd, tl)) => - println(s"limit $limit, acc $acc, offset $offset, hd ${hd.size}") - val write = Pull.eval(fileHandleProxy.get.flatMap(_.write(hd, offset))) + val write = Pull.eval(cursorProxy.get.flatMap(_.write(hd))) val sz = hd.size val newAcc = acc + sz val next = if (newAcc >= limit) { - Pull.eval(fileHandleProxy.swap(openNewFile)) >> go(fileHandleProxy, 0L, 0L, tl) + Pull.eval(cursorProxy.swap(openNewFile)) >> go(cursorProxy, 0L, tl) } else { - go(fileHandleProxy, offset + sz, newAcc, tl) + go(cursorProxy, newAcc, tl) } write >> next case None => Pull.done @@ -147,7 +114,7 @@ package object file { in => Stream .resource(ResourceProxy(openNewFile)) - .flatMap(fileHandleProxy => go(fileHandleProxy, 0L, 0L, in).stream) + .flatMap(cursor => go(cursor, 0L, in).stream) } /** diff --git a/io/src/main/scala/fs2/io/file/pulls.scala b/io/src/main/scala/fs2/io/file/pulls.scala index e0004b11b4..4558845a19 100644 --- a/io/src/main/scala/fs2/io/file/pulls.scala +++ b/io/src/main/scala/fs2/io/file/pulls.scala @@ -78,22 +78,8 @@ object pulls { in.pull.uncons.flatMap { case None => Pull.done case Some((hd, tl)) => - writeChunkToFileHandle(hd, out, offset) >> writeAllToFileHandleAtOffset( - tl, - out, - offset + hd.size - ) - } - - private def writeChunkToFileHandle[F[_]]( - buf: Chunk[Byte], - out: FileHandle[F], - offset: Long - ): Pull[F, Nothing, Unit] = - Pull.eval(out.write(buf, offset)).flatMap { (written: Int) => - if (written >= buf.size) - Pull.pure(()) - else - writeChunkToFileHandle(buf.drop(written), out, offset + written) + WriteCursor(out, offset) + .writePull(hd) + .flatMap(cursor => writeAllToFileHandleAtOffset(tl, cursor.file, cursor.offset)) } } From b9e042e42980cf6c6bb668a9c73460eff0c8ca15 Mon Sep 17 00:00:00 2001 From: Michael Pilquist Date: Fri, 18 Oct 2019 16:13:19 -0400 Subject: [PATCH 04/19] Added ReadCursor --- .../main/scala/fs2/io/file/ReadCursor.scala | 94 +++++++++++++++++++ .../main/scala/fs2/io/file/WriteCursor.scala | 10 +- io/src/main/scala/fs2/io/file/file.scala | 61 +++++++----- io/src/main/scala/fs2/io/file/pulls.scala | 39 +------- io/src/test/scala/fs2/io/file/FileSpec.scala | 2 +- 5 files changed, 144 insertions(+), 62 deletions(-) create mode 100644 io/src/main/scala/fs2/io/file/ReadCursor.scala diff --git a/io/src/main/scala/fs2/io/file/ReadCursor.scala b/io/src/main/scala/fs2/io/file/ReadCursor.scala new file mode 100644 index 0000000000..19e6d4e760 --- /dev/null +++ b/io/src/main/scala/fs2/io/file/ReadCursor.scala @@ -0,0 +1,94 @@ +package fs2 +package io +package file + +import scala.concurrent.duration.FiniteDuration + +import cats.{Functor, ~>} +import cats.arrow.FunctionK +import cats.implicits._ +import cats.effect.{Blocker, ContextShift, Resource, Sync, Timer} + +import java.nio.file._ + +/** + * Associates a `FileHandle` with an offset in to the file. + * + * This encapsulates the pattern of incrementally reading bytes in from a file, + * a chunk at a time. Additionally, convenience methods are provided for + * working with pulls. + */ +final case class ReadCursor[F[_]](val file: FileHandle[F], val offset: Long) { + + /** + * Reads a single chunk from the underlying file handle, returning the + * read chunk and a new cursor with an offset incremented by the chunk size. + */ + def read(chunkSize: Int)(implicit F: Functor[F]): F[Option[(ReadCursor[F], Chunk[Byte])]] = + read_[F](chunkSize, FunctionK.id[F]) + + /** + * Like `read` but returns a pull instead of an `F[(ReadCursor[F], Option[Chunk[Byte]])]`. + */ + def readPull(chunkSize: Int): Pull[F, Nothing, Option[(ReadCursor[F], Chunk[Byte])]] = + read_(chunkSize, Pull.functionKInstance) + + private def read_[G[_]: Functor]( + chunkSize: Int, + u: F ~> G + ): G[Option[(ReadCursor[F], Chunk[Byte])]] = + u(file.read(chunkSize, offset)).map { + _.map { chunk => + val next = ReadCursor(file, offset + chunk.size) + (next, chunk) + } + } + + /** + * Reads all chunks from the underlying file handle, returning a cursor + * with offset incremented by the total number of bytes read. + */ + def readAll(chunkSize: Int): Pull[F, Byte, ReadCursor[F]] = + readPull(chunkSize).flatMap { + case Some((next, chunk)) => Pull.output(chunk) >> next.readAll(chunkSize) + case None => Pull.pure(this) + } + + /** + * Reads chunks until the specified end position in the file. Returns a pull that outputs + * the read chunks and completes with a cursor with offset incremented by the total number + * of bytes read. + */ + def readUntil(chunkSize: Int, end: Long): Pull[F, Byte, ReadCursor[F]] = + if (offset < end) { + val toRead = ((end - offset).min(Int.MaxValue).toInt).min(chunkSize) + readPull(toRead).flatMap { + case Some((next, chunk)) => Pull.output(chunk) >> next.readUntil(chunkSize, end) + case None => Pull.pure(this) + } + } else Pull.pure(this) + + /** Returns a new cursor with the offset adjusted to the specified position. */ + def seek(position: Long): ReadCursor[F] = ReadCursor(file, position) + + def tail(chunkSize: Int, pollDelay: FiniteDuration)( + implicit timer: Timer[F] + ): Pull[F, Byte, ReadCursor[F]] = + readPull(chunkSize).flatMap { + case Some((next, chunk)) => Pull.output(chunk) >> next.tail(chunkSize, pollDelay) + case None => Pull.eval(timer.sleep(pollDelay)) >> tail(chunkSize, pollDelay) + } +} + +object ReadCursor { + + def fromPath[F[_]: Sync: ContextShift]( + path: Path, + blocker: Blocker, + flags: Seq[OpenOption] = Nil + ): Resource[F, ReadCursor[F]] = + FileHandle.fromPath(path, blocker, StandardOpenOption.READ :: flags.toList).map { fileHandle => + ReadCursor(fileHandle, 0L) + } + +} diff --git a/io/src/main/scala/fs2/io/file/WriteCursor.scala b/io/src/main/scala/fs2/io/file/WriteCursor.scala index 7b91a83ae4..28a4e71d3b 100644 --- a/io/src/main/scala/fs2/io/file/WriteCursor.scala +++ b/io/src/main/scala/fs2/io/file/WriteCursor.scala @@ -16,7 +16,10 @@ import cats.arrow.FunctionK * a chunk at a time. Additionally, convenience methods are provided for * working with pulls. */ -final case class WriteCursor[F[_]](val file: FileHandle[F], val offset: Long) { +final case class WriteCursor[F[_]](file: FileHandle[F], offset: Long) { + + /** Returns a new cursor with the offset adjusted to the specified position. */ + def seek(position: Long): WriteCursor[F] = WriteCursor(file, position) /** * Writes a single chunk to the underlying file handle, returning a new cursor @@ -63,4 +66,9 @@ object WriteCursor { Resource.liftF(cursor) } + def fromFileHandle[F[_]: Sync: ContextShift]( + file: FileHandle[F], + append: Boolean + ): F[WriteCursor[F]] = + if (append) file.size.map(s => WriteCursor(file, s)) else WriteCursor(file, 0L).pure[F] } diff --git a/io/src/main/scala/fs2/io/file/file.scala b/io/src/main/scala/fs2/io/file/file.scala index 63e0eb5314..aef6b06197 100644 --- a/io/src/main/scala/fs2/io/file/file.scala +++ b/io/src/main/scala/fs2/io/file/file.scala @@ -22,12 +22,9 @@ package object file { blocker: Blocker, chunkSize: Int ): Stream[F, Byte] = - Stream - .resource( - FileHandle - .fromPath(path, blocker, List(StandardOpenOption.READ)) - ) - .flatMap(c => pulls.readAllFromFileHandle(chunkSize)(c).stream) + Stream.resource(ReadCursor.fromPath(path, blocker)).flatMap { cursor => + cursor.readAll(chunkSize).void.stream + } /** * Reads a range of data synchronously from the file at the specified `java.nio.file.Path`. @@ -41,9 +38,9 @@ package object file { start: Long, end: Long ): Stream[F, Byte] = - Stream - .resource(FileHandle.fromPath(path, blocker, List(StandardOpenOption.READ))) - .flatMap(c => pulls.readRangeFromFileHandle(chunkSize, start, end)(c).stream) + Stream.resource(ReadCursor.fromPath(path, blocker)).flatMap { cursor => + cursor.seek(start).readUntil(chunkSize, end).void.stream + } /** * Returns an infinite stream of data from the file at the specified path. @@ -62,9 +59,9 @@ package object file { offset: Long = 0L, pollDelay: FiniteDuration = 1.second ): Stream[F, Byte] = - Stream - .resource(FileHandle.fromPath(path, blocker, List(StandardOpenOption.READ))) - .flatMap(c => pulls.tailFromFileHandle(chunkSize, offset, pollDelay)(c).stream) + Stream.resource(ReadCursor.fromPath(path, blocker)).flatMap { cursor => + cursor.seek(offset).tail(chunkSize, pollDelay).void.stream + } /** * Writes all data synchronously to the file at the specified `java.nio.file.Path`. @@ -87,34 +84,46 @@ package object file { blocker: Blocker, flags: Seq[StandardOpenOption] = List(StandardOpenOption.CREATE) ): Pipe[F, Byte, Unit] = { - def openNewFile: Resource[F, WriteCursor[F]] = + def openNewFile: Resource[F, FileHandle[F]] = Resource .liftF(path) - .flatMap(p => WriteCursor.fromPath(p, blocker, flags)) + .flatMap(p => FileHandle.fromPath(p, blocker, StandardOpenOption.WRITE :: flags.toList)) + + def newCursor(fileProxy: ResourceProxy[F, FileHandle[F]]): F[WriteCursor[F]] = + fileProxy.get.flatMap( + WriteCursor.fromFileHandle[F](_, flags.contains(StandardOpenOption.APPEND)) + ) def go( - cursorProxy: ResourceProxy[F, WriteCursor[F]], + fileProxy: ResourceProxy[F, FileHandle[F]], + cursor: WriteCursor[F], acc: Long, s: Stream[F, Byte] - ): Pull[F, Unit, Unit] = - s.pull.unconsLimit((limit - acc).min(Int.MaxValue.toLong).toInt).flatMap { + ): Pull[F, Unit, Unit] = { + val toWrite = (limit - acc).min(Int.MaxValue.toLong).toInt + s.pull.unconsLimit(toWrite).flatMap { case Some((hd, tl)) => - val write = Pull.eval(cursorProxy.get.flatMap(_.write(hd))) - val sz = hd.size - val newAcc = acc + sz - val next = if (newAcc >= limit) { - Pull.eval(cursorProxy.swap(openNewFile)) >> go(cursorProxy, 0L, tl) - } else { - go(cursorProxy, newAcc, tl) + val newAcc = acc + hd.size + cursor.writePull(hd).flatMap { nc => + if (newAcc >= limit) { + Pull.eval(fileProxy.swap(openNewFile)) >> + Pull.eval(newCursor(fileProxy)).flatMap(nc => go(fileProxy, nc, 0L, tl)) + } else { + go(fileProxy, nc, newAcc, tl) + } } - write >> next case None => Pull.done } + } in => Stream .resource(ResourceProxy(openNewFile)) - .flatMap(cursor => go(cursor, 0L, in).stream) + .flatMap { fileProxy => + Stream.eval(newCursor(fileProxy)).flatMap { cursor => + go(fileProxy, cursor, 0L, in).stream + } + } } /** diff --git a/io/src/main/scala/fs2/io/file/pulls.scala b/io/src/main/scala/fs2/io/file/pulls.scala index 4558845a19..6b4999cb34 100644 --- a/io/src/main/scala/fs2/io/file/pulls.scala +++ b/io/src/main/scala/fs2/io/file/pulls.scala @@ -28,40 +28,17 @@ object pulls { private def _tailFromFileHandle[F[_]](chunkSize: Int, offset: Long, delay: FiniteDuration)( h: FileHandle[F] )(implicit timer: Timer[F]): Pull[F, Byte, Unit] = - Pull.eval(h.read(chunkSize, offset)).flatMap { - case Some(bytes) => - Pull.output(bytes) >> _tailFromFileHandle(chunkSize, offset + bytes.size, delay)(h) - case None => - Pull.eval(timer.sleep(delay)) >> _tailFromFileHandle(chunkSize, offset, delay)(h) - } + ReadCursor(h, offset).tail(chunkSize, delay).void private def _readRangeFromFileHandle0[F[_]](chunkSize: Int, offset: Long, end: Long)( h: FileHandle[F] - ): Pull[F, Byte, Unit] = { - - val bytesLeft = end - offset - if (bytesLeft <= 0L) { - Pull.done - } else { - val actualChunkSize = - if (bytesLeft > Int.MaxValue) chunkSize else math.min(chunkSize, bytesLeft.toInt) - - Pull.eval(h.read(actualChunkSize, offset)).flatMap { - case Some(o) => - Pull.output(o) >> _readRangeFromFileHandle0(chunkSize, offset + o.size, end)(h) - case None => Pull.done - } - } - } + ): Pull[F, Byte, Unit] = + ReadCursor(h, offset).readUntil(chunkSize, end).void private def _readAllFromFileHandle0[F[_]](chunkSize: Int, offset: Long)( h: FileHandle[F] ): Pull[F, Byte, Unit] = - Pull.eval(h.read(chunkSize, offset)).flatMap { - case Some(o) => - Pull.output(o) >> _readAllFromFileHandle0(chunkSize, offset + o.size)(h) - case None => Pull.done - } + ReadCursor(h, offset).readAll(chunkSize).void /** * Given a `Stream[F, Byte]` and `FileHandle[F]`, writes all data from the stream to the file. @@ -75,11 +52,5 @@ object pulls { out: FileHandle[F], offset: Long ): Pull[F, Nothing, Unit] = - in.pull.uncons.flatMap { - case None => Pull.done - case Some((hd, tl)) => - WriteCursor(out, offset) - .writePull(hd) - .flatMap(cursor => writeAllToFileHandleAtOffset(tl, cursor.file, cursor.offset)) - } + WriteCursor(out, offset).writeAll(in).void } diff --git a/io/src/test/scala/fs2/io/file/FileSpec.scala b/io/src/test/scala/fs2/io/file/FileSpec.scala index 1020008e27..9f696e4618 100644 --- a/io/src/test/scala/fs2/io/file/FileSpec.scala +++ b/io/src/test/scala/fs2/io/file/FileSpec.scala @@ -339,7 +339,7 @@ class FileSpec extends BaseFileSpec { .flatMap { paths => paths .sortBy(_.toString) - .traverse(p => IO(println(p.toString)) *> file.size[IO](bec, p)) + .traverse(p => file.size[IO](bec, p)) } .asserting { sizes => assert(sizes.size == ((totalBytes + rotateLimit - 1) / rotateLimit)) From 1fd8982c5a90fa0d067671d8c3d3d449895e55a6 Mon Sep 17 00:00:00 2001 From: Fabio Labella Date: Mon, 21 Oct 2019 03:11:34 +0100 Subject: [PATCH 05/19] Make ResourceProxy into a trait --- .../src/main/scala/fs2/ResourceProxy.scala | 85 +++++++++++-------- 1 file changed, 48 insertions(+), 37 deletions(-) diff --git a/core/shared/src/main/scala/fs2/ResourceProxy.scala b/core/shared/src/main/scala/fs2/ResourceProxy.scala index 7132b5d570..c9f21e7186 100644 --- a/core/shared/src/main/scala/fs2/ResourceProxy.scala +++ b/core/shared/src/main/scala/fs2/ResourceProxy.scala @@ -5,41 +5,9 @@ import cats.effect.{Resource, Sync} import cats.effect.concurrent.Ref import cats.effect.Bracket -final class ResourceProxy[F[_], R](state: Ref[F, Option[(R, F[Unit])]])( - implicit F: Bracket[F, Throwable] -) { - def get: F[R] = state.get.flatMap { - case None => F.raiseError(new RuntimeException("Cannot get after proxy has been finalized")) - case Some((r, _)) => F.pure(r) - } - - // Runs the finalizer for the current proxy target and delays finalizer of newValue until this proxy is finalized - def swap(newValue: Resource[F, R]): F[R] = - F.uncancelable(newValue.allocated.flatMap { - case nv => - def doSwap: F[R] = - state.access.flatMap { - case (current, setter) => - current match { - case None => - nv._2 *> F.raiseError( - new RuntimeException("Cannot swap after proxy has been finalized") - ) - case Some((_, oldFinalizer)) => - setter(Some(nv)).ifM(oldFinalizer.as(nv._1), doSwap) - } - } - doSwap - }) - - private def runFinalizer: F[Unit] = - state.access.flatMap { - case (current, setter) => - current match { - case None => F.raiseError(new RuntimeException("Finalizer already run")) - case Some((_, finalizer)) => setter(None).ifM(finalizer, runFinalizer) - } - } +trait ResourceProxy[F[_], R] { + def get: F[R] + def swap(newValue: Resource[F, R]): F[R] } object ResourceProxy { @@ -47,7 +15,50 @@ object ResourceProxy { val acquire = for { v <- initialValue.allocated state <- Ref.of[F, Option[(R, F[Unit])]](Some(v)) - } yield new ResourceProxy(state) - Resource.make(acquire)(_.runFinalizer) + } yield new ResourceProxyImpl(state) + Resource.make(acquire)(_.runFinalizer).map { impl => + new ResourceProxy[F, R] { + def get: F[R] = impl.get + def swap(newValue: Resource[F, R]): F[R] = impl.swap(newValue) + } + } } + + final class ResourceProxyImpl[F[_], R](state: Ref[F, Option[(R, F[Unit])]])( + implicit F: Bracket[F, Throwable] + ) { + def get: F[R] = state.get.flatMap { + case None => F.raiseError(new RuntimeException("Cannot get after proxy has been finalized")) + case Some((r, _)) => F.pure(r) + } + + // Runs the finalizer for the current proxy target and delays finalizer of newValue until this proxy is finalized + def swap(newValue: Resource[F, R]): F[R] = + F.uncancelable(newValue.allocated.flatMap { + case nv => + def doSwap: F[R] = + state.access.flatMap { + case (current, setter) => + current match { + case None => + nv._2 *> F.raiseError( + new RuntimeException("Cannot swap after proxy has been finalized") + ) + case Some((_, oldFinalizer)) => + setter(Some(nv)).ifM(oldFinalizer.as(nv._1), doSwap) + } + } + doSwap + }) + + def runFinalizer: F[Unit] = + state.access.flatMap { + case (current, setter) => + current match { + case None => F.raiseError(new RuntimeException("Finalizer already run")) + case Some((_, finalizer)) => setter(None).ifM(finalizer, runFinalizer) + } + } + } + } From 2eca59d0b5d1a8193a28035459cd164ffa6b8388 Mon Sep 17 00:00:00 2001 From: Fabio Labella Date: Fri, 25 Oct 2019 00:51:29 +0100 Subject: [PATCH 06/19] Syntax cosmetics --- .../src/main/scala/fs2/ResourceProxy.scala | 37 ++++++++++--------- 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/core/shared/src/main/scala/fs2/ResourceProxy.scala b/core/shared/src/main/scala/fs2/ResourceProxy.scala index c9f21e7186..75b2b5d8fa 100644 --- a/core/shared/src/main/scala/fs2/ResourceProxy.scala +++ b/core/shared/src/main/scala/fs2/ResourceProxy.scala @@ -1,25 +1,26 @@ package fs2 import cats.implicits._ -import cats.effect.{Resource, Sync} +import cats.effect.{Bracket, Resource, Sync} import cats.effect.concurrent.Ref -import cats.effect.Bracket +import cats.effect.implicits._ trait ResourceProxy[F[_], R] { def get: F[R] - def swap(newValue: Resource[F, R]): F[R] + def swap(next: Resource[F, R]): F[R] } object ResourceProxy { - def apply[F[_]: Sync, R](initialValue: Resource[F, R]): Resource[F, ResourceProxy[F, R]] = { + def apply[F[_]: Sync, R](initial: Resource[F, R]): Resource[F, ResourceProxy[F, R]] = { val acquire = for { - v <- initialValue.allocated + v <- initial.allocated state <- Ref.of[F, Option[(R, F[Unit])]](Some(v)) } yield new ResourceProxyImpl(state) + Resource.make(acquire)(_.runFinalizer).map { impl => new ResourceProxy[F, R] { def get: F[R] = impl.get - def swap(newValue: Resource[F, R]): F[R] = impl.swap(newValue) + def swap(next: Resource[F, R]): F[R] = impl.swap(next) } } } @@ -27,38 +28,38 @@ object ResourceProxy { final class ResourceProxyImpl[F[_], R](state: Ref[F, Option[(R, F[Unit])]])( implicit F: Bracket[F, Throwable] ) { + + def raise[A](msg: String): F[A] = F.raiseError(new RuntimeException(msg)) + def get: F[R] = state.get.flatMap { - case None => F.raiseError(new RuntimeException("Cannot get after proxy has been finalized")) - case Some((r, _)) => F.pure(r) + case None => raise("Cannot get after proxy has been finalized") + case Some((r, _)) => r.pure[F] } // Runs the finalizer for the current proxy target and delays finalizer of newValue until this proxy is finalized - def swap(newValue: Resource[F, R]): F[R] = - F.uncancelable(newValue.allocated.flatMap { - case nv => + def swap(next: Resource[F, R]): F[R] = + next.allocated.flatMap { + case next @ (newValue, newFinalizer) => def doSwap: F[R] = state.access.flatMap { case (current, setter) => current match { case None => - nv._2 *> F.raiseError( - new RuntimeException("Cannot swap after proxy has been finalized") - ) + newFinalizer *> raise("Cannot swap after proxy has been finalized") case Some((_, oldFinalizer)) => - setter(Some(nv)).ifM(oldFinalizer.as(nv._1), doSwap) + setter(next.some).ifM(oldFinalizer.as(newValue), doSwap) } } doSwap - }) + }.uncancelable def runFinalizer: F[Unit] = state.access.flatMap { case (current, setter) => current match { - case None => F.raiseError(new RuntimeException("Finalizer already run")) + case None => raise("Finalizer already run") case Some((_, finalizer)) => setter(None).ifM(finalizer, runFinalizer) } } } - } From dc976e9a07dfc973d133650912a2a04076ea2a13 Mon Sep 17 00:00:00 2001 From: Fabio Labella Date: Fri, 25 Oct 2019 00:59:54 +0100 Subject: [PATCH 07/19] Prefer modify to access --- .../src/main/scala/fs2/ResourceProxy.scala | 28 +++++++------------ 1 file changed, 10 insertions(+), 18 deletions(-) diff --git a/core/shared/src/main/scala/fs2/ResourceProxy.scala b/core/shared/src/main/scala/fs2/ResourceProxy.scala index 75b2b5d8fa..50ac8ca400 100644 --- a/core/shared/src/main/scala/fs2/ResourceProxy.scala +++ b/core/shared/src/main/scala/fs2/ResourceProxy.scala @@ -40,26 +40,18 @@ object ResourceProxy { def swap(next: Resource[F, R]): F[R] = next.allocated.flatMap { case next @ (newValue, newFinalizer) => - def doSwap: F[R] = - state.access.flatMap { - case (current, setter) => - current match { - case None => - newFinalizer *> raise("Cannot swap after proxy has been finalized") - case Some((_, oldFinalizer)) => - setter(next.some).ifM(oldFinalizer.as(newValue), doSwap) - } - } - doSwap + state.modify { + case Some((_, oldFinalizer)) => + next.some -> oldFinalizer.as(newValue) + case None => + None -> (newFinalizer *> raise[R]("Cannot swap after proxy has been finalized")) + }.flatten }.uncancelable def runFinalizer: F[Unit] = - state.access.flatMap { - case (current, setter) => - current match { - case None => raise("Finalizer already run") - case Some((_, finalizer)) => setter(None).ifM(finalizer, runFinalizer) - } - } + state.modify { + case None => None -> raise[Unit]("Finalizer already run") + case Some((_, finalizer)) => None -> finalizer + }.flatten } } From f126ee752fdcc76ab71f263c67e4ed278ba2833c Mon Sep 17 00:00:00 2001 From: Fabio Labella Date: Fri, 25 Oct 2019 01:08:41 +0100 Subject: [PATCH 08/19] inline definition of ResourceProxy --- .../src/main/scala/fs2/ResourceProxy.scala | 64 +++++++++---------- 1 file changed, 29 insertions(+), 35 deletions(-) diff --git a/core/shared/src/main/scala/fs2/ResourceProxy.scala b/core/shared/src/main/scala/fs2/ResourceProxy.scala index 50ac8ca400..9c70208b85 100644 --- a/core/shared/src/main/scala/fs2/ResourceProxy.scala +++ b/core/shared/src/main/scala/fs2/ResourceProxy.scala @@ -1,7 +1,7 @@ package fs2 import cats.implicits._ -import cats.effect.{Bracket, Resource, Sync} +import cats.effect.{Resource, Sync} import cats.effect.concurrent.Ref import cats.effect.implicits._ @@ -12,46 +12,40 @@ trait ResourceProxy[F[_], R] { object ResourceProxy { def apply[F[_]: Sync, R](initial: Resource[F, R]): Resource[F, ResourceProxy[F, R]] = { + val acquire = for { v <- initial.allocated - state <- Ref.of[F, Option[(R, F[Unit])]](Some(v)) - } yield new ResourceProxyImpl(state) - - Resource.make(acquire)(_.runFinalizer).map { impl => - new ResourceProxy[F, R] { - def get: F[R] = impl.get - def swap(next: Resource[F, R]): F[R] = impl.swap(next) - } - } - } - - final class ResourceProxyImpl[F[_], R](state: Ref[F, Option[(R, F[Unit])]])( - implicit F: Bracket[F, Throwable] - ) { - - def raise[A](msg: String): F[A] = F.raiseError(new RuntimeException(msg)) + state <- Ref.of[F, Option[(R, F[Unit])]](v.some) + } yield state - def get: F[R] = state.get.flatMap { - case None => raise("Cannot get after proxy has been finalized") - case Some((r, _)) => r.pure[F] - } - - // Runs the finalizer for the current proxy target and delays finalizer of newValue until this proxy is finalized - def swap(next: Resource[F, R]): F[R] = - next.allocated.flatMap { - case next @ (newValue, newFinalizer) => - state.modify { - case Some((_, oldFinalizer)) => - next.some -> oldFinalizer.as(newValue) - case None => - None -> (newFinalizer *> raise[R]("Cannot swap after proxy has been finalized")) - }.flatten - }.uncancelable - - def runFinalizer: F[Unit] = + def runFinalizer(state: Ref[F, Option[(R, F[Unit])]]): F[Unit] = state.modify { case None => None -> raise[Unit]("Finalizer already run") case Some((_, finalizer)) => None -> finalizer }.flatten + + def raise[A](msg: String): F[A] = Sync[F].raiseError(new RuntimeException(msg)) + + Resource.make(acquire)(runFinalizer(_)).map { state => + new ResourceProxy[F, R] { + def get: F[R] = state.get.flatMap { + case None => raise("Cannot get after proxy has been finalized") + case Some((r, _)) => r.pure[F] + } + + // Runs the finalizer for the current proxy target and delays finalizer of newValue until this proxy is finalized + def swap(next: Resource[F, R]): F[R] = + next.allocated.flatMap { + case next @ (newValue, newFinalizer) => + state.modify { + case Some((_, oldFinalizer)) => + next.some -> oldFinalizer.as(newValue) + case None => + None -> (newFinalizer *> raise[R]("Cannot swap after proxy has been finalized")) + }.flatten + }.uncancelable + + } + } } } From 9c83f8a5f473115a68bafac7319c36fbd125a1e8 Mon Sep 17 00:00:00 2001 From: Fabio Labella Date: Fri, 25 Oct 2019 02:21:32 +0100 Subject: [PATCH 09/19] Express writeRotate with a single ResourceProxy operation --- io/src/main/scala/fs2/io/file/file.scala | 27 +++++++++++++------- io/src/test/scala/fs2/io/file/FileSpec.scala | 1 + 2 files changed, 19 insertions(+), 9 deletions(-) diff --git a/io/src/main/scala/fs2/io/file/file.scala b/io/src/main/scala/fs2/io/file/file.scala index aef6b06197..f5f42ded79 100644 --- a/io/src/main/scala/fs2/io/file/file.scala +++ b/io/src/main/scala/fs2/io/file/file.scala @@ -89,10 +89,8 @@ package object file { .liftF(path) .flatMap(p => FileHandle.fromPath(p, blocker, StandardOpenOption.WRITE :: flags.toList)) - def newCursor(fileProxy: ResourceProxy[F, FileHandle[F]]): F[WriteCursor[F]] = - fileProxy.get.flatMap( - WriteCursor.fromFileHandle[F](_, flags.contains(StandardOpenOption.APPEND)) - ) + def newCursor(file: FileHandle[F]): F[WriteCursor[F]] = + WriteCursor.fromFileHandle[F](file, flags.contains(StandardOpenOption.APPEND)) def go( fileProxy: ResourceProxy[F, FileHandle[F]], @@ -106,8 +104,13 @@ package object file { val newAcc = acc + hd.size cursor.writePull(hd).flatMap { nc => if (newAcc >= limit) { - Pull.eval(fileProxy.swap(openNewFile)) >> - Pull.eval(newCursor(fileProxy)).flatMap(nc => go(fileProxy, nc, 0L, tl)) + Pull + .eval { + fileProxy + .swap(openNewFile) + .flatMap(newCursor) + } + .flatMap(nc => go(fileProxy, nc, 0L, tl)) } else { go(fileProxy, nc, newAcc, tl) } @@ -120,9 +123,15 @@ package object file { Stream .resource(ResourceProxy(openNewFile)) .flatMap { fileProxy => - Stream.eval(newCursor(fileProxy)).flatMap { cursor => - go(fileProxy, cursor, 0L, in).stream - } + Stream + .eval { + fileProxy + .swap(openNewFile) + .flatMap(newCursor) + } + .flatMap { cursor => + go(fileProxy, cursor, 0L, in).stream + } } } diff --git a/io/src/test/scala/fs2/io/file/FileSpec.scala b/io/src/test/scala/fs2/io/file/FileSpec.scala index 9f696e4618..8b0450fecc 100644 --- a/io/src/test/scala/fs2/io/file/FileSpec.scala +++ b/io/src/test/scala/fs2/io/file/FileSpec.scala @@ -317,6 +317,7 @@ class FileSpec extends BaseFileSpec { } "writeRotate" in { + pending // changing api for ResourceProxy val bufferSize = 100 val totalBytes = 1000 val rotateLimit = 150 From eb770665888253ab7aa9f3ea59292b837f34aeaf Mon Sep 17 00:00:00 2001 From: Fabio Labella Date: Fri, 25 Oct 2019 02:47:01 +0100 Subject: [PATCH 10/19] Refactor ResourceProxy to have one operation --- .../src/main/scala/fs2/ResourceProxy.scala | 54 ++++++++++--------- io/src/main/scala/fs2/io/file/file.scala | 2 +- 2 files changed, 31 insertions(+), 25 deletions(-) diff --git a/core/shared/src/main/scala/fs2/ResourceProxy.scala b/core/shared/src/main/scala/fs2/ResourceProxy.scala index 9c70208b85..ed841a3195 100644 --- a/core/shared/src/main/scala/fs2/ResourceProxy.scala +++ b/core/shared/src/main/scala/fs2/ResourceProxy.scala @@ -6,40 +6,46 @@ import cats.effect.concurrent.Ref import cats.effect.implicits._ trait ResourceProxy[F[_], R] { - def get: F[R] def swap(next: Resource[F, R]): F[R] } -object ResourceProxy { - def apply[F[_]: Sync, R](initial: Resource[F, R]): Resource[F, ResourceProxy[F, R]] = { - - val acquire = for { - v <- initial.allocated - state <- Ref.of[F, Option[(R, F[Unit])]](v.some) - } yield state - - def runFinalizer(state: Ref[F, Option[(R, F[Unit])]]): F[Unit] = - state.modify { - case None => None -> raise[Unit]("Finalizer already run") - case Some((_, finalizer)) => None -> finalizer - }.flatten +/* +Notes: +1)it's not possible to safely guard swap +one can swap (i.e. close) a resource used by something else (e.g. in +another fiber) from under the consumer's fit. Problem is that the +whole point of this PR is avoiding `Pull.bracket`, which means `swap` +cannot return a resource. - def raise[A](msg: String): F[A] = Sync[F].raiseError(new RuntimeException(msg)) +2) Is it worth guarding against the case where someone tries to swap +in another fiber after the resourceProxy's lifetime is over: probably +yes, because the cost is not an error like in 1), but a resource leak + */ - Resource.make(acquire)(runFinalizer(_)).map { state => - new ResourceProxy[F, R] { - def get: F[R] = state.get.flatMap { - case None => raise("Cannot get after proxy has been finalized") - case Some((r, _)) => r.pure[F] +object ResourceProxy { + def create[F[_]: Sync, R]: Resource[F, ResourceProxy[F, R]] = { + def raise[A](msg: String): F[A] = + Sync[F].raiseError(new RuntimeException(msg)) + + def initialize = Ref[F].of(().pure[F].some) + def finalize(state: Ref[F, Option[F[Unit]]]): F[Unit] = + state + .getAndSet(None) + .flatMap { + case None => raise[Unit]("Finalizer already run") + case Some(finalizer) => finalizer } - // Runs the finalizer for the current proxy target and delays finalizer of newValue until this proxy is finalized + Resource.make(initialize)(finalize(_)).map { state => + new ResourceProxy[F, R] { + // Runs the finalizer for the current proxy target and delays + // finalizer of newValue until this proxy is finalized def swap(next: Resource[F, R]): F[R] = next.allocated.flatMap { - case next @ (newValue, newFinalizer) => + case (newValue, newFinalizer) => state.modify { - case Some((_, oldFinalizer)) => - next.some -> oldFinalizer.as(newValue) + case Some(oldFinalizer) => + newFinalizer.some -> oldFinalizer.as(newValue) case None => None -> (newFinalizer *> raise[R]("Cannot swap after proxy has been finalized")) }.flatten diff --git a/io/src/main/scala/fs2/io/file/file.scala b/io/src/main/scala/fs2/io/file/file.scala index f5f42ded79..1640759afa 100644 --- a/io/src/main/scala/fs2/io/file/file.scala +++ b/io/src/main/scala/fs2/io/file/file.scala @@ -121,7 +121,7 @@ package object file { in => Stream - .resource(ResourceProxy(openNewFile)) + .resource(ResourceProxy.create[F, FileHandle[F]]) .flatMap { fileProxy => Stream .eval { From a6aa55d1a92b57d411608c5362e51d8bb44a102c Mon Sep 17 00:00:00 2001 From: Fabio Labella Date: Fri, 25 Oct 2019 02:57:23 +0100 Subject: [PATCH 11/19] Preserve cancellability of ResourceProxy --- .../src/main/scala/fs2/ResourceProxy.scala | 29 +++++++++++-------- io/src/main/scala/fs2/io/file/file.scala | 2 +- 2 files changed, 18 insertions(+), 13 deletions(-) diff --git a/core/shared/src/main/scala/fs2/ResourceProxy.scala b/core/shared/src/main/scala/fs2/ResourceProxy.scala index ed841a3195..bddefa56b1 100644 --- a/core/shared/src/main/scala/fs2/ResourceProxy.scala +++ b/core/shared/src/main/scala/fs2/ResourceProxy.scala @@ -1,7 +1,7 @@ package fs2 import cats.implicits._ -import cats.effect.{Resource, Sync} +import cats.effect.{Concurrent, Resource, Sync} import cats.effect.concurrent.Ref import cats.effect.implicits._ @@ -23,7 +23,7 @@ yes, because the cost is not an error like in 1), but a resource leak */ object ResourceProxy { - def create[F[_]: Sync, R]: Resource[F, ResourceProxy[F, R]] = { + def create[F[_]: Concurrent, R]: Resource[F, ResourceProxy[F, R]] = { def raise[A](msg: String): F[A] = Sync[F].raiseError(new RuntimeException(msg)) @@ -41,16 +41,21 @@ object ResourceProxy { // Runs the finalizer for the current proxy target and delays // finalizer of newValue until this proxy is finalized def swap(next: Resource[F, R]): F[R] = - next.allocated.flatMap { - case (newValue, newFinalizer) => - state.modify { - case Some(oldFinalizer) => - newFinalizer.some -> oldFinalizer.as(newValue) - case None => - None -> (newFinalizer *> raise[R]("Cannot swap after proxy has been finalized")) - }.flatten - }.uncancelable - + (next <* ().pure[Resource[F, ?]]) // workaround for https://github.com/typelevel/cats-effect/issues/579 + .allocated + .continual { r => // this whole block is inside continual and cannot be canceled + Sync[F].fromEither(r).flatMap { + case (newValue, newFinalizer) => + state.modify { + case Some(oldFinalizer) => + newFinalizer.some -> oldFinalizer.as(newValue) + case None => + None -> (newFinalizer *> raise[R]( + "Cannot swap after proxy has been finalized" + )) + }.flatten + } + } } } } diff --git a/io/src/main/scala/fs2/io/file/file.scala b/io/src/main/scala/fs2/io/file/file.scala index 1640759afa..d5e577d33f 100644 --- a/io/src/main/scala/fs2/io/file/file.scala +++ b/io/src/main/scala/fs2/io/file/file.scala @@ -78,7 +78,7 @@ package object file { .resource(WriteCursor.fromPath(path, blocker, flags)) .flatMap(_.writeAll(in).void.stream) - def writeRotate[F[_]: Sync: ContextShift]( + def writeRotate[F[_]: Concurrent: ContextShift]( path: F[Path], limit: Long, blocker: Blocker, From bd89dd866aa7b0f6856a8bda50fd5c027972704e Mon Sep 17 00:00:00 2001 From: Fabio Labella Date: Fri, 25 Oct 2019 03:10:18 +0100 Subject: [PATCH 12/19] Add scaladoc to ResourceProxy --- .../src/main/scala/fs2/ResourceProxy.scala | 37 ++++++++++++------- 1 file changed, 24 insertions(+), 13 deletions(-) diff --git a/core/shared/src/main/scala/fs2/ResourceProxy.scala b/core/shared/src/main/scala/fs2/ResourceProxy.scala index bddefa56b1..ff497d4122 100644 --- a/core/shared/src/main/scala/fs2/ResourceProxy.scala +++ b/core/shared/src/main/scala/fs2/ResourceProxy.scala @@ -6,23 +6,34 @@ import cats.effect.concurrent.Ref import cats.effect.implicits._ trait ResourceProxy[F[_], R] { + + /** + * Allocates a new resource, closes the last one if present, and + * returns the newly allocated `R`. + * + * If there are no further calls to `swap`, the resource created by + * the last call will be finalized when the lifetime of + * `ResourceProxy` (which is itself tracked by `Resource`) is over. + * + * Since `swap` closes the old resource immediately, you need to + * ensure that no code is using the old `R` when `swap` is called. + * Failing to do so is likely to result in an error on the + * _consumer_ side. + * + * If you try to call swap after the lifetime of `ResourceProxy` is + * over, `swap` will fail, but it will ensure all resources are + * closed, and never leak any. + */ def swap(next: Resource[F, R]): F[R] } -/* -Notes: -1)it's not possible to safely guard swap -one can swap (i.e. close) a resource used by something else (e.g. in -another fiber) from under the consumer's fit. Problem is that the -whole point of this PR is avoiding `Pull.bracket`, which means `swap` -cannot return a resource. - -2) Is it worth guarding against the case where someone tries to swap -in another fiber after the resourceProxy's lifetime is over: probably -yes, because the cost is not an error like in 1), but a resource leak - */ - object ResourceProxy { + + /** + * Creates a new `ResourceProxy`, which represents a `Resource` + * that can be swapped during the lifetime of this `ResourceProxy`. + * See `io.file.writeRotate` for an example of usage. + */ def create[F[_]: Concurrent, R]: Resource[F, ResourceProxy[F, R]] = { def raise[A](msg: String): F[A] = Sync[F].raiseError(new RuntimeException(msg)) From 201dcabb417dc763424aefd87b500f82daa9d0a5 Mon Sep 17 00:00:00 2001 From: Fabio Labella Date: Fri, 25 Oct 2019 03:19:00 +0100 Subject: [PATCH 13/19] Restore writeRotate tests --- core/shared/src/main/scala/fs2/ResourceProxy.scala | 3 ++- io/src/test/scala/fs2/io/file/FileSpec.scala | 1 - 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/shared/src/main/scala/fs2/ResourceProxy.scala b/core/shared/src/main/scala/fs2/ResourceProxy.scala index ff497d4122..ba088fa2f1 100644 --- a/core/shared/src/main/scala/fs2/ResourceProxy.scala +++ b/core/shared/src/main/scala/fs2/ResourceProxy.scala @@ -18,7 +18,8 @@ trait ResourceProxy[F[_], R] { * Since `swap` closes the old resource immediately, you need to * ensure that no code is using the old `R` when `swap` is called. * Failing to do so is likely to result in an error on the - * _consumer_ side. + * _consumer_ side. In any case, no resources will be leaked by + * `swap` * * If you try to call swap after the lifetime of `ResourceProxy` is * over, `swap` will fail, but it will ensure all resources are diff --git a/io/src/test/scala/fs2/io/file/FileSpec.scala b/io/src/test/scala/fs2/io/file/FileSpec.scala index 8b0450fecc..9f696e4618 100644 --- a/io/src/test/scala/fs2/io/file/FileSpec.scala +++ b/io/src/test/scala/fs2/io/file/FileSpec.scala @@ -317,7 +317,6 @@ class FileSpec extends BaseFileSpec { } "writeRotate" in { - pending // changing api for ResourceProxy val bufferSize = 100 val totalBytes = 1000 val rotateLimit = 150 From c3a994eb86b881673dbbfc320c9d9528776cc6a2 Mon Sep 17 00:00:00 2001 From: Michael Pilquist Date: Wed, 30 Oct 2019 17:23:49 -0400 Subject: [PATCH 14/19] Renamed ResourceProxy to Hotswap, added some ScalaDoc --- core/shared/src/main/scala/fs2/Hotswap.scala | 118 ++++++++++++++++++ .../src/main/scala/fs2/ResourceProxy.scala | 74 ----------- io/src/main/scala/fs2/io/file/file.scala | 23 ++-- 3 files changed, 127 insertions(+), 88 deletions(-) create mode 100644 core/shared/src/main/scala/fs2/Hotswap.scala delete mode 100644 core/shared/src/main/scala/fs2/ResourceProxy.scala diff --git a/core/shared/src/main/scala/fs2/Hotswap.scala b/core/shared/src/main/scala/fs2/Hotswap.scala new file mode 100644 index 0000000000..d27eddc318 --- /dev/null +++ b/core/shared/src/main/scala/fs2/Hotswap.scala @@ -0,0 +1,118 @@ +package fs2 + +import cats.implicits._ +import cats.effect.{Concurrent, Resource, Sync} +import cats.effect.concurrent.Ref +import cats.effect.implicits._ + +/** + * Supports treating a linear sequence of resources as a single resource. + * + * A `Hotswap[F, R]` instance is created as a `Resource` and hence, has + * a lifetime that is scoped by the `Resource`. After creation, a `Resource[F, R]` + * can be swapped in to the `Hotswap` by calling `swap`. The acquired resource + * is returned and is finalized when the `Hotswap` is finalized or upon the next + * call to `swap`, whichever occurs first. + * + * For example, the sequence of three resources `r1, r2, r3` are shown in the + * following diagram: + * + * {{{ + * >----- swap(r1) ---- swap(r2) ---- swap(r3) ----X + * | | | | | + * Creation | | | | + * r1 acquired | | | + * r2 acquired | | + * r1 released r3 acquired | + * r2 released | + * r3 released + * }}} + * + * This class is particularly useful when working with pulls that cycle through + * resources -- e.g., writing bytes to files, rotating files every N bytes or M seconds. + * Without `Hotswap`, such pulls leak resources -- on each file rotation, a file handle + * or at least an internal resource reference accumulates. With `Hotswap`, the `Hotswap` + * instance is the only registered resource and each file is swapped in to the `Hotswap`. + * + * Utilization typically looks something like: + * + * {{{ + * Stream.resource(Hotswap(mkResource)).flatMap { case (hotswap, r) => + * // Use r, call hotswap.swap(mkResource) as necessary + * } + * }}} + * + * See `fs2.io.file.writeRotate` for an example of usage. + */ +sealed trait Hotswap[F[_], R] { + + /** + * Allocates a new resource, closes the last one if present, and + * returns the newly allocated `R`. + * + * If there are no further calls to `swap`, the resource created by + * the last call will be finalized when the lifetime of + * this `Hotswap` (which is itself tracked by `Resource`) is over. + * + * Since `swap` closes the old resource immediately, you need to + * ensure that no code is using the old `R` when `swap` is called. + * Failing to do so is likely to result in an error on the + * _consumer_ side. In any case, no resources will be leaked by + * `swap`. + * + * If you try to call swap after the lifetime of this `Hotswap` is + * over, `swap` will fail, but it will ensure all resources are + * closed, and never leak any. + */ + def swap(next: Resource[F, R]): F[R] +} + +object Hotswap { + + /** + * Creates a new `Hotswap` initialized with the specified resource. + * The `Hotswap` instance and the initial resource are returned. + */ + def apply[F[_]: Concurrent, R](initial: Resource[F, R]): Resource[F, (Hotswap[F, R], R)] = + create[F, R].evalMap(p => p.swap(initial).map(r => (p, r))) + + /** + * Creates a new `Hotswap`, which represents a `Resource` + * that can be swapped during the lifetime of this `Hotswap`. + */ + def create[F[_]: Concurrent, R]: Resource[F, Hotswap[F, R]] = { + def raise[A](msg: String): F[A] = + Sync[F].raiseError(new RuntimeException(msg)) + + def initialize = Ref[F].of(().pure[F].some) + + def finalize(state: Ref[F, Option[F[Unit]]]): F[Unit] = + state + .getAndSet(None) + .flatMap { + case None => raise[Unit]("Finalizer already run") + case Some(finalizer) => finalizer + } + + Resource.make(initialize)(finalize).map { state => + new Hotswap[F, R] { + override def swap(next: Resource[F, R]): F[R] = + (next <* ().pure[Resource[F, ?]]) // workaround for https://github.com/typelevel/cats-effect/issues/579 + .allocated + .continual { r => // this whole block is inside continual and cannot be canceled + Sync[F].fromEither(r).flatMap { + case (newValue, newFinalizer) => + state.modify { + case Some(oldFinalizer) => + newFinalizer.some -> oldFinalizer.as(newValue) + case None => + None -> (newFinalizer *> raise[R]( + "Cannot swap after proxy has been finalized" + )) + }.flatten + } + } + } + } + } +} diff --git a/core/shared/src/main/scala/fs2/ResourceProxy.scala b/core/shared/src/main/scala/fs2/ResourceProxy.scala deleted file mode 100644 index ba088fa2f1..0000000000 --- a/core/shared/src/main/scala/fs2/ResourceProxy.scala +++ /dev/null @@ -1,74 +0,0 @@ -package fs2 - -import cats.implicits._ -import cats.effect.{Concurrent, Resource, Sync} -import cats.effect.concurrent.Ref -import cats.effect.implicits._ - -trait ResourceProxy[F[_], R] { - - /** - * Allocates a new resource, closes the last one if present, and - * returns the newly allocated `R`. - * - * If there are no further calls to `swap`, the resource created by - * the last call will be finalized when the lifetime of - * `ResourceProxy` (which is itself tracked by `Resource`) is over. - * - * Since `swap` closes the old resource immediately, you need to - * ensure that no code is using the old `R` when `swap` is called. - * Failing to do so is likely to result in an error on the - * _consumer_ side. In any case, no resources will be leaked by - * `swap` - * - * If you try to call swap after the lifetime of `ResourceProxy` is - * over, `swap` will fail, but it will ensure all resources are - * closed, and never leak any. - */ - def swap(next: Resource[F, R]): F[R] -} - -object ResourceProxy { - - /** - * Creates a new `ResourceProxy`, which represents a `Resource` - * that can be swapped during the lifetime of this `ResourceProxy`. - * See `io.file.writeRotate` for an example of usage. - */ - def create[F[_]: Concurrent, R]: Resource[F, ResourceProxy[F, R]] = { - def raise[A](msg: String): F[A] = - Sync[F].raiseError(new RuntimeException(msg)) - - def initialize = Ref[F].of(().pure[F].some) - def finalize(state: Ref[F, Option[F[Unit]]]): F[Unit] = - state - .getAndSet(None) - .flatMap { - case None => raise[Unit]("Finalizer already run") - case Some(finalizer) => finalizer - } - - Resource.make(initialize)(finalize(_)).map { state => - new ResourceProxy[F, R] { - // Runs the finalizer for the current proxy target and delays - // finalizer of newValue until this proxy is finalized - def swap(next: Resource[F, R]): F[R] = - (next <* ().pure[Resource[F, ?]]) // workaround for https://github.com/typelevel/cats-effect/issues/579 - .allocated - .continual { r => // this whole block is inside continual and cannot be canceled - Sync[F].fromEither(r).flatMap { - case (newValue, newFinalizer) => - state.modify { - case Some(oldFinalizer) => - newFinalizer.some -> oldFinalizer.as(newValue) - case None => - None -> (newFinalizer *> raise[R]( - "Cannot swap after proxy has been finalized" - )) - }.flatten - } - } - } - } - } -} diff --git a/io/src/main/scala/fs2/io/file/file.scala b/io/src/main/scala/fs2/io/file/file.scala index d5e577d33f..5b8d20a365 100644 --- a/io/src/main/scala/fs2/io/file/file.scala +++ b/io/src/main/scala/fs2/io/file/file.scala @@ -93,7 +93,7 @@ package object file { WriteCursor.fromFileHandle[F](file, flags.contains(StandardOpenOption.APPEND)) def go( - fileProxy: ResourceProxy[F, FileHandle[F]], + fileHotswap: Hotswap[F, FileHandle[F]], cursor: WriteCursor[F], acc: Long, s: Stream[F, Byte] @@ -106,13 +106,13 @@ package object file { if (newAcc >= limit) { Pull .eval { - fileProxy + fileHotswap .swap(openNewFile) .flatMap(newCursor) } - .flatMap(nc => go(fileProxy, nc, 0L, tl)) + .flatMap(nc => go(fileHotswap, nc, 0L, tl)) } else { - go(fileProxy, nc, newAcc, tl) + go(fileHotswap, nc, newAcc, tl) } } case None => Pull.done @@ -121,16 +121,11 @@ package object file { in => Stream - .resource(ResourceProxy.create[F, FileHandle[F]]) - .flatMap { fileProxy => - Stream - .eval { - fileProxy - .swap(openNewFile) - .flatMap(newCursor) - } - .flatMap { cursor => - go(fileProxy, cursor, 0L, in).stream + .resource(Hotswap(openNewFile)) + .flatMap { + case (fileHotswap, fileHandle) => + Stream.eval(newCursor(fileHandle)).flatMap { cursor => + go(fileHotswap, cursor, 0L, in).stream } } } From 5e531cdf9a78877740cceab5b3e0e76553b96bfa Mon Sep 17 00:00:00 2001 From: Michael Pilquist Date: Wed, 30 Oct 2019 17:48:54 -0400 Subject: [PATCH 15/19] More ScalaDoc --- core/shared/src/main/scala/fs2/Hotswap.scala | 2 +- io/src/main/scala/fs2/io/file/file.scala | 20 +++++++++++++------- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/core/shared/src/main/scala/fs2/Hotswap.scala b/core/shared/src/main/scala/fs2/Hotswap.scala index d27eddc318..7795b6dbe4 100644 --- a/core/shared/src/main/scala/fs2/Hotswap.scala +++ b/core/shared/src/main/scala/fs2/Hotswap.scala @@ -34,7 +34,7 @@ import cats.effect.implicits._ * or at least an internal resource reference accumulates. With `Hotswap`, the `Hotswap` * instance is the only registered resource and each file is swapped in to the `Hotswap`. * - * Utilization typically looks something like: + * Usage typically looks something like: * * {{{ * Stream.resource(Hotswap(mkResource)).flatMap { case (hotswap, r) => diff --git a/io/src/main/scala/fs2/io/file/file.scala b/io/src/main/scala/fs2/io/file/file.scala index 5b8d20a365..b195f99c3f 100644 --- a/io/src/main/scala/fs2/io/file/file.scala +++ b/io/src/main/scala/fs2/io/file/file.scala @@ -64,7 +64,7 @@ package object file { } /** - * Writes all data synchronously to the file at the specified `java.nio.file.Path`. + * Writes all data to the file at the specified `java.nio.file.Path`. * * Adds the WRITE flag to any other `OpenOption` flags specified. By default, also adds the CREATE flag. */ @@ -78,15 +78,23 @@ package object file { .resource(WriteCursor.fromPath(path, blocker, flags)) .flatMap(_.writeAll(in).void.stream) + /** + * Writes all data to a sequence of files, each limited in size to `limit`. + * + * The `computePath` operation is used to compute the path of the first file + * and every subsequent file. Typically, the next file should be determined + * by analyzing the current state of the filesystem -- e.g., by looking at all + * files in a directory and generating a unique name. + */ def writeRotate[F[_]: Concurrent: ContextShift]( - path: F[Path], + computePath: F[Path], limit: Long, blocker: Blocker, flags: Seq[StandardOpenOption] = List(StandardOpenOption.CREATE) ): Pipe[F, Byte, Unit] = { def openNewFile: Resource[F, FileHandle[F]] = Resource - .liftF(path) + .liftF(computePath) .flatMap(p => FileHandle.fromPath(p, blocker, StandardOpenOption.WRITE :: flags.toList)) def newCursor(file: FileHandle[F]): F[WriteCursor[F]] = @@ -133,10 +141,8 @@ package object file { /** * Creates a [[Watcher]] for the default file system. * - * A singleton bracketed stream is returned consisting of the single watcher. To use the watcher, - * `flatMap` the returned stream, watch or register 1 or more paths, and then return `watcher.events()`. - * - * @return singleton bracketed stream returning a watcher + * The watcher is returned as a resource. To use the watcher, lift the resource to a stream, + * watch or register 1 or more paths, and then return `watcher.events()`. */ def watcher[F[_]: Concurrent: ContextShift](blocker: Blocker): Resource[F, Watcher[F]] = Watcher.default(blocker) From 1a815c8ac0be21468f26cda10fb12efbbbcffa69 Mon Sep 17 00:00:00 2001 From: Michael Pilquist Date: Thu, 31 Oct 2019 11:14:49 -0400 Subject: [PATCH 16/19] Added unit tests for Hotswap --- core/shared/src/main/scala/fs2/Hotswap.scala | 2 +- .../src/test/scala/fs2/HotswapSpec.scala | 82 +++++++++++++++++++ 2 files changed, 83 insertions(+), 1 deletion(-) create mode 100644 core/shared/src/test/scala/fs2/HotswapSpec.scala diff --git a/core/shared/src/main/scala/fs2/Hotswap.scala b/core/shared/src/main/scala/fs2/Hotswap.scala index 7795b6dbe4..2766da8584 100644 --- a/core/shared/src/main/scala/fs2/Hotswap.scala +++ b/core/shared/src/main/scala/fs2/Hotswap.scala @@ -16,7 +16,7 @@ import cats.effect.implicits._ * * For example, the sequence of three resources `r1, r2, r3` are shown in the * following diagram: - * + * * {{{ * >----- swap(r1) ---- swap(r2) ---- swap(r3) ----X * | | | | | diff --git a/core/shared/src/test/scala/fs2/HotswapSpec.scala b/core/shared/src/test/scala/fs2/HotswapSpec.scala new file mode 100644 index 0000000000..1a040cfb48 --- /dev/null +++ b/core/shared/src/test/scala/fs2/HotswapSpec.scala @@ -0,0 +1,82 @@ +package fs2 + +import cats.implicits._ +import cats.effect.Sync +import cats.effect.concurrent.Ref +import cats.effect.IO +import cats.effect.Resource + +class HotswapSpec extends Fs2Spec { + "Hotswap" - { + "finalizer of target run when hotswap is finalized" in { + mkEventLogger.flatMap { logger => + Stream + .resource(Hotswap(logLifecycle(logger, "a"))) + .flatMap { _ => + Stream + .eval_(logger.log(Info("using"))) + } + .compile + .drain *> logger.get.asserting( + _ shouldBe List( + Acquired("a"), + Info("using"), + Released("a") + ) + ) + } + } + "swap acquires new resource and then finalizes old resource" in { + mkEventLogger.flatMap { logger => + Stream + .resource(Hotswap(logLifecycle(logger, "a"))) + .flatMap { + case (hotswap, _) => + Stream.eval_(logger.log(Info("using a"))) ++ + Stream.eval_(hotswap.swap(logLifecycle(logger, "b"))) ++ + Stream.eval_(logger.log(Info("using b"))) ++ + Stream.eval_(hotswap.swap(logLifecycle(logger, "c"))) ++ + Stream.eval_(logger.log(Info("using c"))) + } + .compile + .drain *> logger.get.asserting( + _ shouldBe List( + Acquired("a"), + Info("using a"), + Acquired("b"), + Released("a"), + Info("using b"), + Acquired("c"), + Released("b"), + Info("using c"), + Released("c") + ) + ) + } + } + } + + trait Logger[F[_], A] { + def log(a: A): F[Unit] + def get: F[List[A]] + } + object Logger { + def apply[F[_]: Sync, A]: F[Logger[F, A]] = + Ref.of(Nil: List[A]).map { ref => + new Logger[F, A] { + def log(a: A): F[Unit] = ref.update(acc => a :: acc) + def get: F[List[A]] = ref.get.map(_.reverse) + } + } + } + + sealed trait Event + case class Acquired(tag: String) extends Event + case class Released(tag: String) extends Event + case class Info(message: String) extends Event + + def mkEventLogger: IO[Logger[IO, Event]] = Logger[IO, Event] + + def logLifecycle(logger: Logger[IO, Event], tag: String): Resource[IO, Unit] = + Resource.make(logger.log(Acquired(tag)))(_ => logger.log(Released(tag))) +} From 345979b4c016989c08891b353daa43617ea0f529 Mon Sep 17 00:00:00 2001 From: Michael Pilquist Date: Mon, 4 Nov 2019 08:50:35 -0500 Subject: [PATCH 17/19] Added clear method to Hotswap --- core/shared/src/main/scala/fs2/Hotswap.scala | 30 ++++++++++++++----- .../src/test/scala/fs2/HotswapSpec.scala | 23 ++++++++++++++ 2 files changed, 45 insertions(+), 8 deletions(-) diff --git a/core/shared/src/main/scala/fs2/Hotswap.scala b/core/shared/src/main/scala/fs2/Hotswap.scala index 2766da8584..6a80a45c9e 100644 --- a/core/shared/src/main/scala/fs2/Hotswap.scala +++ b/core/shared/src/main/scala/fs2/Hotswap.scala @@ -65,6 +65,16 @@ sealed trait Hotswap[F[_], R] { * closed, and never leak any. */ def swap(next: Resource[F, R]): F[R] + + /** + * Runs the finalizer of the current resource, if any, and restores + * this `Hotswap` to its initial state. + * + * Like `swap`, you need to ensure that no code is using the old `R` when + * `clear is called`. Similarly, calling `clear` after the lifetime of this + * `Hotswap` results in an error. + */ + def clear: F[Unit] } object Hotswap { @@ -102,16 +112,20 @@ object Hotswap { .continual { r => // this whole block is inside continual and cannot be canceled Sync[F].fromEither(r).flatMap { case (newValue, newFinalizer) => - state.modify { - case Some(oldFinalizer) => - newFinalizer.some -> oldFinalizer.as(newValue) - case None => - None -> (newFinalizer *> raise[R]( - "Cannot swap after proxy has been finalized" - )) - }.flatten + swapFinalizer(newFinalizer).as(newValue) } } + + override def clear: F[Unit] = + swapFinalizer(().pure[F]) + + private def swapFinalizer(newFinalizer: F[Unit]): F[Unit] = + state.modify { + case Some(oldFinalizer) => + newFinalizer.some -> oldFinalizer + case None => + None -> (newFinalizer *> raise[Unit]("Cannot swap after proxy has been finalized")) + }.flatten } } } diff --git a/core/shared/src/test/scala/fs2/HotswapSpec.scala b/core/shared/src/test/scala/fs2/HotswapSpec.scala index 1a040cfb48..c4c71452ae 100644 --- a/core/shared/src/test/scala/fs2/HotswapSpec.scala +++ b/core/shared/src/test/scala/fs2/HotswapSpec.scala @@ -26,6 +26,7 @@ class HotswapSpec extends Fs2Spec { ) } } + "swap acquires new resource and then finalizes old resource" in { mkEventLogger.flatMap { logger => Stream @@ -54,6 +55,28 @@ class HotswapSpec extends Fs2Spec { ) } } + + "clear finalizes old resource" in { + mkEventLogger.flatMap { logger => + Stream + .resource(Hotswap(logLifecycle(logger, "a"))) + .flatMap { + case (hotswap, _) => + Stream.eval_(logger.log(Info("using a"))) ++ + Stream.eval_(hotswap.clear) ++ + Stream.eval_(logger.log(Info("after clear"))) + } + .compile + .drain *> logger.get.asserting( + _ shouldBe List( + Acquired("a"), + Info("using a"), + Released("a"), + Info("after clear") + ) + ) + } + } } trait Logger[F[_], A] { From c8b884409f773b0383f68b7a5dddd0f9c1d7f4e9 Mon Sep 17 00:00:00 2001 From: Michael Pilquist Date: Mon, 4 Nov 2019 09:17:50 -0500 Subject: [PATCH 18/19] Updated ScalaDoc and deprecated fs2.io.file.pulls --- .../main/scala/fs2/io/file/ReadCursor.scala | 15 ++++++++--- .../main/scala/fs2/io/file/WriteCursor.scala | 14 ++++++++-- io/src/main/scala/fs2/io/file/file.scala | 14 +++++----- io/src/main/scala/fs2/io/file/pulls.scala | 27 +++++++------------ 4 files changed, 40 insertions(+), 30 deletions(-) diff --git a/io/src/main/scala/fs2/io/file/ReadCursor.scala b/io/src/main/scala/fs2/io/file/ReadCursor.scala index 19e6d4e760..e58e6776e4 100644 --- a/io/src/main/scala/fs2/io/file/ReadCursor.scala +++ b/io/src/main/scala/fs2/io/file/ReadCursor.scala @@ -15,10 +15,9 @@ import java.nio.file._ * Associates a `FileHandle` with an offset in to the file. * * This encapsulates the pattern of incrementally reading bytes in from a file, - * a chunk at a time. Additionally, convenience methods are provided for - * working with pulls. + * a chunk at a time. Convenience methods are provided for working with pulls. */ -final case class ReadCursor[F[_]](val file: FileHandle[F], val offset: Long) { +final case class ReadCursor[F[_]](file: FileHandle[F], offset: Long) { /** * Reads a single chunk from the underlying file handle, returning the @@ -71,6 +70,13 @@ final case class ReadCursor[F[_]](val file: FileHandle[F], val offset: Long) { /** Returns a new cursor with the offset adjusted to the specified position. */ def seek(position: Long): ReadCursor[F] = ReadCursor(file, position) + /** + * Returns an infinite stream that reads until the end of the file and then starts + * polling the file for additional writes. Similar to the `tail` command line utility. + * + * @param pollDelay amount of time to wait upon reaching the end of the file before + * polling for updates + */ def tail(chunkSize: Int, pollDelay: FiniteDuration)( implicit timer: Timer[F] ): Pull[F, Byte, ReadCursor[F]] = @@ -82,6 +88,9 @@ final case class ReadCursor[F[_]](val file: FileHandle[F], val offset: Long) { object ReadCursor { + /** + * Returns a `ReadCursor` for the specified path. The `READ` option is added to the supplied flags. + */ def fromPath[F[_]: Sync: ContextShift]( path: Path, blocker: Blocker, diff --git a/io/src/main/scala/fs2/io/file/WriteCursor.scala b/io/src/main/scala/fs2/io/file/WriteCursor.scala index 28a4e71d3b..045503a3e0 100644 --- a/io/src/main/scala/fs2/io/file/WriteCursor.scala +++ b/io/src/main/scala/fs2/io/file/WriteCursor.scala @@ -13,8 +13,7 @@ import cats.arrow.FunctionK * Associates a `FileHandle` with an offset in to the file. * * This encapsulates the pattern of incrementally writing bytes in to a file, - * a chunk at a time. Additionally, convenience methods are provided for - * working with pulls. + * a chunk at a time. Convenience methods are provided for working with pulls. */ final case class WriteCursor[F[_]](file: FileHandle[F], offset: Long) { @@ -54,6 +53,12 @@ final case class WriteCursor[F[_]](file: FileHandle[F], offset: Long) { object WriteCursor { + /** + * Returns a `WriteCursor` for the specified path. + * + * The `WRITE` option is added to the supplied flags. If the `APPEND` option is present in `flags`, + * the offset is initialized to the current size of the file. + */ def fromPath[F[_]: Sync: ContextShift]( path: Path, blocker: Blocker, @@ -66,6 +71,11 @@ object WriteCursor { Resource.liftF(cursor) } + /** + * Returns a `WriteCursor` for the specified file handle. + * + * If `append` is true, the offset is initialized to the current size of the file. + */ def fromFileHandle[F[_]: Sync: ContextShift]( file: FileHandle[F], append: Boolean diff --git a/io/src/main/scala/fs2/io/file/file.scala b/io/src/main/scala/fs2/io/file/file.scala index b195f99c3f..e2eb6910b8 100644 --- a/io/src/main/scala/fs2/io/file/file.scala +++ b/io/src/main/scala/fs2/io/file/file.scala @@ -79,13 +79,13 @@ package object file { .flatMap(_.writeAll(in).void.stream) /** - * Writes all data to a sequence of files, each limited in size to `limit`. - * - * The `computePath` operation is used to compute the path of the first file - * and every subsequent file. Typically, the next file should be determined - * by analyzing the current state of the filesystem -- e.g., by looking at all - * files in a directory and generating a unique name. - */ + * Writes all data to a sequence of files, each limited in size to `limit`. + * + * The `computePath` operation is used to compute the path of the first file + * and every subsequent file. Typically, the next file should be determined + * by analyzing the current state of the filesystem -- e.g., by looking at all + * files in a directory and generating a unique name. + */ def writeRotate[F[_]: Concurrent: ContextShift]( computePath: F[Path], limit: Long, diff --git a/io/src/main/scala/fs2/io/file/pulls.scala b/io/src/main/scala/fs2/io/file/pulls.scala index 6b4999cb34..edc1aa5f5c 100644 --- a/io/src/main/scala/fs2/io/file/pulls.scala +++ b/io/src/main/scala/fs2/io/file/pulls.scala @@ -6,47 +6,38 @@ import cats.effect.Timer import scala.concurrent.duration.FiniteDuration -/** Provides various `Pull`s for working with files. */ +/**Provides various `Pull`s for working with files. */ +@deprecated("Use ReadCursor/WriteCursor instead", "2.1.0") object pulls { /** * Given a `FileHandle[F]`, creates a `Pull` which reads all data from the associated file. */ + @deprecated("Use ReadCursor(h, 0L).readAll(chunkSize).void", "2.1.0") def readAllFromFileHandle[F[_]](chunkSize: Int)(h: FileHandle[F]): Pull[F, Byte, Unit] = - _readAllFromFileHandle0(chunkSize, 0)(h) + ReadCursor(h, 0L).readAll(chunkSize).void + @deprecated("Use ReadCursor(h, start).readUntil(chunkSize, end).void", "2.1.0") def readRangeFromFileHandle[F[_]](chunkSize: Int, start: Long, end: Long)( h: FileHandle[F] ): Pull[F, Byte, Unit] = - _readRangeFromFileHandle0(chunkSize, start, end)(h) + ReadCursor(h, start).readUntil(chunkSize, end).void + @deprecated("Use ReadCursor(h, offset).tail(chunkSize, delay).void", "2.1.0") def tailFromFileHandle[F[_]: Timer](chunkSize: Int, offset: Long, delay: FiniteDuration)( h: FileHandle[F] ): Pull[F, Byte, Unit] = - _tailFromFileHandle(chunkSize, offset, delay)(h) - - private def _tailFromFileHandle[F[_]](chunkSize: Int, offset: Long, delay: FiniteDuration)( - h: FileHandle[F] - )(implicit timer: Timer[F]): Pull[F, Byte, Unit] = ReadCursor(h, offset).tail(chunkSize, delay).void - private def _readRangeFromFileHandle0[F[_]](chunkSize: Int, offset: Long, end: Long)( - h: FileHandle[F] - ): Pull[F, Byte, Unit] = - ReadCursor(h, offset).readUntil(chunkSize, end).void - - private def _readAllFromFileHandle0[F[_]](chunkSize: Int, offset: Long)( - h: FileHandle[F] - ): Pull[F, Byte, Unit] = - ReadCursor(h, offset).readAll(chunkSize).void - /** * Given a `Stream[F, Byte]` and `FileHandle[F]`, writes all data from the stream to the file. */ + @deprecated("Use WriteCursor(out, 0).writeAll(in).void", "2.1.0") def writeAllToFileHandle[F[_]](in: Stream[F, Byte], out: FileHandle[F]): Pull[F, Nothing, Unit] = writeAllToFileHandleAtOffset(in, out, 0) /** Like `writeAllToFileHandle` but takes an offset in to the file indicating where write should start. */ + @deprecated("Use WriteCursor(out, offset).writeAll(in).void", "2.1.0") def writeAllToFileHandleAtOffset[F[_]]( in: Stream[F, Byte], out: FileHandle[F], From 58fb64432b733f24425b5c5b88b5e83381c407c2 Mon Sep 17 00:00:00 2001 From: Michael Pilquist Date: Mon, 4 Nov 2019 09:19:25 -0500 Subject: [PATCH 19/19] Added uncancellable to Hotswap.clear --- core/shared/src/main/scala/fs2/Hotswap.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/shared/src/main/scala/fs2/Hotswap.scala b/core/shared/src/main/scala/fs2/Hotswap.scala index 6a80a45c9e..c32d4e8fcb 100644 --- a/core/shared/src/main/scala/fs2/Hotswap.scala +++ b/core/shared/src/main/scala/fs2/Hotswap.scala @@ -117,7 +117,7 @@ object Hotswap { } override def clear: F[Unit] = - swapFinalizer(().pure[F]) + swapFinalizer(().pure[F]).uncancelable private def swapFinalizer(newFinalizer: F[Unit]): F[Unit] = state.modify {