From b61146fa437c71942ca46148d12650280f7277a7 Mon Sep 17 00:00:00 2001 From: Rick Newton-Rogers Date: Thu, 12 Sep 2024 16:47:52 +0100 Subject: [PATCH] review comments --- Sources/NIOFileSystem/DirectoryEntries.swift | 39 ++++++++++++------- Sources/NIOFileSystem/FileChunks.swift | 33 ++++++++++------ .../Internal/BufferedOrAnyStream.swift | 6 ++- 3 files changed, 49 insertions(+), 29 deletions(-) diff --git a/Sources/NIOFileSystem/DirectoryEntries.swift b/Sources/NIOFileSystem/DirectoryEntries.swift index bb17834a32..105b2b7b85 100644 --- a/Sources/NIOFileSystem/DirectoryEntries.swift +++ b/Sources/NIOFileSystem/DirectoryEntries.swift @@ -119,7 +119,9 @@ extension DirectoryEntries { public struct BatchedIterator: AsyncIteratorProtocol { private var iterator: BufferedOrAnyStream<[DirectoryEntry], DirectoryEntryProducer>.AsyncIterator - fileprivate init(wrapping iterator: BufferedOrAnyStream<[DirectoryEntry], DirectoryEntryProducer>.AsyncIterator) { + fileprivate init( + wrapping iterator: BufferedOrAnyStream<[DirectoryEntry], DirectoryEntryProducer>.AsyncIterator + ) { self.iterator = iterator } @@ -136,15 +138,23 @@ extension DirectoryEntries.Batched.AsyncIterator: Sendable {} // MARK: - Internal @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) -extension NIOThrowingAsyncSequenceProducer where Element == [DirectoryEntry], Failure == Error, - Strategy == NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark, Delegate == DirectoryEntryProducer { +extension NIOThrowingAsyncSequenceProducer +where + Element == [DirectoryEntry], + Failure == (any Error), + Strategy == NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark, + Delegate == DirectoryEntryProducer +{ fileprivate static func makeBatchedDirectoryEntryStream( handle: SystemFileHandle, recursive: Bool, entriesPerBatch: Int, lowWatermark: Int, highWatermark: Int - ) -> NIOThrowingAsyncSequenceProducer<[DirectoryEntry], any Error, NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark, DirectoryEntryProducer> { + ) -> NIOThrowingAsyncSequenceProducer< + [DirectoryEntry], any Error, NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark, + DirectoryEntryProducer + > { let producer = DirectoryEntryProducer( handle: handle, recursive: recursive, @@ -168,10 +178,12 @@ extension NIOThrowingAsyncSequenceProducer where Element == [DirectoryEntry], Fa } @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) -fileprivate typealias DirectoryEntrySequenceProducer = NIOThrowingAsyncSequenceProducer<[DirectoryEntry], Error, NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark, DirectoryEntryProducer> +private typealias DirectoryEntrySequenceProducer = NIOThrowingAsyncSequenceProducer< + [DirectoryEntry], Error, NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark, DirectoryEntryProducer +> @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) -fileprivate final class DirectoryEntryProducer: NIOAsyncSequenceProducerDelegate { +private final class DirectoryEntryProducer: NIOAsyncSequenceProducerDelegate { let state: NIOLockedValueBox let entriesPerBatch: Int @@ -262,7 +274,7 @@ fileprivate final class DirectoryEntryProducer: NIOAsyncSequenceProducerDelegate // error. self.close() let source = self.state.withLockedValue { state in - return state.sequenceProducerSource + state.sequenceProducerSource } source?.finish(error) self.clearSource() @@ -270,9 +282,8 @@ fileprivate final class DirectoryEntryProducer: NIOAsyncSequenceProducerDelegate } private func onNextBatch(_ entries: [DirectoryEntry]) { - let source = self.state.withLockedValue { state in - return state.sequenceProducerSource + state.sequenceProducerSource } guard let source else { @@ -412,7 +423,6 @@ private struct DirectoryEnumerator: Sendable { case let .open(threadPool, _, _): return threadPool case .openPausedProducing(let threadPool, let source, let array): - self.state = .modifying self.state = .open(threadPool, source, array) return threadPool case .done: @@ -424,13 +434,12 @@ private struct DirectoryEnumerator: Sendable { internal mutating func pauseProducing() { switch self.state { - case .open(let nIOThreadPool, let source, let array): - self.state = .modifying - self.state = .openPausedProducing(nIOThreadPool, source, array) + case .open(let threadPool, let source, let array): + self.state = .openPausedProducing(threadPool, source, array) case .idle: - () // we won't apply back pressure until something has been read + () // we won't apply back pressure until something has been read case .openPausedProducing, .done: - () // no-op + () // no-op case .modifying: fatalError() } diff --git a/Sources/NIOFileSystem/FileChunks.swift b/Sources/NIOFileSystem/FileChunks.swift index 70f54dd656..13739b7091 100644 --- a/Sources/NIOFileSystem/FileChunks.swift +++ b/Sources/NIOFileSystem/FileChunks.swift @@ -85,11 +85,18 @@ extension FileChunks.FileChunkIterator: Sendable {} // MARK: - Internal @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) -fileprivate typealias FileChunkSequenceProducer = NIOThrowingAsyncSequenceProducer +private typealias FileChunkSequenceProducer = NIOThrowingAsyncSequenceProducer< + ByteBuffer, Error, NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark, FileChunkProducer +> @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) -extension NIOThrowingAsyncSequenceProducer where Element == ByteBuffer, Failure == Error, - Strategy == NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark, Delegate == FileChunkProducer { +extension NIOThrowingAsyncSequenceProducer +where + Element == ByteBuffer, + Failure == Error, + Strategy == NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark, + Delegate == FileChunkProducer +{ static func makeFileChunksStream( of: Element.Type = Element.self, handle: SystemFileHandle, @@ -129,12 +136,12 @@ private final class FileChunkProducer: NIOAsyncSequenceProducerDelegate, Sendabl let length: Int64 init(range: FileChunks.ChunkRange, handle: SystemFileHandle, length: Int64) { - - let state: ProducerState = switch range { + let state: ProducerState + switch range { case .entireFile: - .init(handle: handle, range: nil) + state = .init(handle: handle, range: nil) case .partial(let partialRange): - .init(handle: handle, range: partialRange) + state = .init(handle: handle, range: partialRange) } self.state = NIOLockedValueBox(state) @@ -272,7 +279,7 @@ private final class FileChunkProducer: NIOAsyncSequenceProducerDelegate, Sendabl case .produceMore: self.produceMore() case .stopProducing: - self.state.withLockedValue { state in state.pauseProducing()} + self.state.withLockedValue { state in state.pauseProducing() } case .dropped: // The source is finished; mark ourselves as done. self.state.withLockedValue { state in state.done() } @@ -357,13 +364,13 @@ private struct ProducerState: Sendable { mutating func didReadBytes(_ count: Int) { switch self.state { case var .producing(state): - if state.updateRange(count: count) { + if state.didReadBytes(count) { self.state = .done(emptyRange: false) } else { self.state = .producing(state) } case var .pausedProducing(state): - if state.updateRange(count: count) { + if state.didReadBytes(count) { self.state = .done(emptyRange: false) } else { self.state = .pausedProducing(state) @@ -396,10 +403,12 @@ private struct ProducerState: Sendable { } } - @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) extension ProducerState.Producing { - mutating func updateRange(count: Int) -> Bool { + /// Updates the range (the offsets to read from and up to) to reflect the number of bytes which have been read. + /// - Parameter count: The number of bytes which have been read. + /// - Returns: Returns `True` if there are no remaining bytes to read, `False` otherwise. + mutating func didReadBytes(_ count: Int) -> Bool { guard let currentRange = self.range else { // if we read 0 bytes we are done return count == 0 diff --git a/Sources/NIOFileSystem/Internal/BufferedOrAnyStream.swift b/Sources/NIOFileSystem/Internal/BufferedOrAnyStream.swift index 38805b31e3..2c88405c02 100644 --- a/Sources/NIOFileSystem/Internal/BufferedOrAnyStream.swift +++ b/Sources/NIOFileSystem/Internal/BufferedOrAnyStream.swift @@ -18,7 +18,9 @@ import NIOCore /// Wraps a ``NIOThrowingAsyncSequenceProducer`` or ``AnyAsyncSequence``. @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) internal enum BufferedOrAnyStream { - typealias AsyncSequenceProducer = NIOThrowingAsyncSequenceProducer + typealias AsyncSequenceProducer = NIOThrowingAsyncSequenceProducer< + Element, Error, NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark, Delegate + > case nioThrowingAsyncSequenceProducer(AsyncSequenceProducer) case anyAsyncSequence(AnyAsyncSequence) @@ -47,7 +49,7 @@ internal enum BufferedOrAnyStream Element? { let element: Element? switch self { - case var .bufferedStream(iterator): + case let .bufferedStream(iterator): defer { self = .bufferedStream(iterator) } element = try await iterator.next() case var .anyAsyncSequence(iterator):