Skip to content

Commit

Permalink
review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
rnro committed Sep 13, 2024
1 parent 97590ac commit b61146f
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 29 deletions.
39 changes: 24 additions & 15 deletions Sources/NIOFileSystem/DirectoryEntries.swift
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,9 @@ extension DirectoryEntries {
public struct BatchedIterator: AsyncIteratorProtocol {
private var iterator: BufferedOrAnyStream<[DirectoryEntry], DirectoryEntryProducer>.AsyncIterator

fileprivate init(wrapping iterator: BufferedOrAnyStream<[DirectoryEntry], DirectoryEntryProducer>.AsyncIterator) {
fileprivate init(
wrapping iterator: BufferedOrAnyStream<[DirectoryEntry], DirectoryEntryProducer>.AsyncIterator
) {
self.iterator = iterator
}

Expand All @@ -136,15 +138,23 @@ extension DirectoryEntries.Batched.AsyncIterator: Sendable {}
// MARK: - Internal

@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
extension NIOThrowingAsyncSequenceProducer where Element == [DirectoryEntry], Failure == Error,
Strategy == NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark, Delegate == DirectoryEntryProducer {
extension NIOThrowingAsyncSequenceProducer
where
Element == [DirectoryEntry],
Failure == (any Error),
Strategy == NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark,
Delegate == DirectoryEntryProducer
{
fileprivate static func makeBatchedDirectoryEntryStream(
handle: SystemFileHandle,
recursive: Bool,
entriesPerBatch: Int,
lowWatermark: Int,
highWatermark: Int
) -> NIOThrowingAsyncSequenceProducer<[DirectoryEntry], any Error, NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark, DirectoryEntryProducer> {
) -> NIOThrowingAsyncSequenceProducer<
[DirectoryEntry], any Error, NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark,
DirectoryEntryProducer
> {
let producer = DirectoryEntryProducer(
handle: handle,
recursive: recursive,
Expand All @@ -168,10 +178,12 @@ extension NIOThrowingAsyncSequenceProducer where Element == [DirectoryEntry], Fa
}

@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
fileprivate typealias DirectoryEntrySequenceProducer = NIOThrowingAsyncSequenceProducer<[DirectoryEntry], Error, NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark, DirectoryEntryProducer>
private typealias DirectoryEntrySequenceProducer = NIOThrowingAsyncSequenceProducer<
[DirectoryEntry], Error, NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark, DirectoryEntryProducer
>

@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
fileprivate final class DirectoryEntryProducer: NIOAsyncSequenceProducerDelegate {
private final class DirectoryEntryProducer: NIOAsyncSequenceProducerDelegate {
let state: NIOLockedValueBox<DirectoryEnumerator>
let entriesPerBatch: Int

Expand Down Expand Up @@ -262,17 +274,16 @@ fileprivate final class DirectoryEntryProducer: NIOAsyncSequenceProducerDelegate
// error.
self.close()
let source = self.state.withLockedValue { state in
return state.sequenceProducerSource
state.sequenceProducerSource
}
source?.finish(error)
self.clearSource()
}
}

private func onNextBatch(_ entries: [DirectoryEntry]) {

let source = self.state.withLockedValue { state in
return state.sequenceProducerSource
state.sequenceProducerSource
}

guard let source else {
Expand Down Expand Up @@ -412,7 +423,6 @@ private struct DirectoryEnumerator: Sendable {
case let .open(threadPool, _, _):
return threadPool
case .openPausedProducing(let threadPool, let source, let array):
self.state = .modifying
self.state = .open(threadPool, source, array)
return threadPool
case .done:
Expand All @@ -424,13 +434,12 @@ private struct DirectoryEnumerator: Sendable {

internal mutating func pauseProducing() {
switch self.state {
case .open(let nIOThreadPool, let source, let array):
self.state = .modifying
self.state = .openPausedProducing(nIOThreadPool, source, array)
case .open(let threadPool, let source, let array):
self.state = .openPausedProducing(threadPool, source, array)
case .idle:
() // we won't apply back pressure until something has been read
() // we won't apply back pressure until something has been read
case .openPausedProducing, .done:
() // no-op
() // no-op
case .modifying:
fatalError()
}
Expand Down
33 changes: 21 additions & 12 deletions Sources/NIOFileSystem/FileChunks.swift
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,18 @@ extension FileChunks.FileChunkIterator: Sendable {}
// MARK: - Internal

@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
fileprivate typealias FileChunkSequenceProducer = NIOThrowingAsyncSequenceProducer<ByteBuffer, Error, NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark, FileChunkProducer>
private typealias FileChunkSequenceProducer = NIOThrowingAsyncSequenceProducer<
ByteBuffer, Error, NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark, FileChunkProducer
>

@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
extension NIOThrowingAsyncSequenceProducer where Element == ByteBuffer, Failure == Error,
Strategy == NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark, Delegate == FileChunkProducer {
extension NIOThrowingAsyncSequenceProducer
where
Element == ByteBuffer,
Failure == Error,
Strategy == NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark,
Delegate == FileChunkProducer
{
static func makeFileChunksStream(
of: Element.Type = Element.self,
handle: SystemFileHandle,
Expand Down Expand Up @@ -129,12 +136,12 @@ private final class FileChunkProducer: NIOAsyncSequenceProducerDelegate, Sendabl
let length: Int64

init(range: FileChunks.ChunkRange, handle: SystemFileHandle, length: Int64) {

let state: ProducerState = switch range {
let state: ProducerState
switch range {
case .entireFile:
.init(handle: handle, range: nil)
state = .init(handle: handle, range: nil)
case .partial(let partialRange):
.init(handle: handle, range: partialRange)
state = .init(handle: handle, range: partialRange)
}

self.state = NIOLockedValueBox(state)
Expand Down Expand Up @@ -272,7 +279,7 @@ private final class FileChunkProducer: NIOAsyncSequenceProducerDelegate, Sendabl
case .produceMore:
self.produceMore()
case .stopProducing:
self.state.withLockedValue { state in state.pauseProducing()}
self.state.withLockedValue { state in state.pauseProducing() }
case .dropped:
// The source is finished; mark ourselves as done.
self.state.withLockedValue { state in state.done() }
Expand Down Expand Up @@ -357,13 +364,13 @@ private struct ProducerState: Sendable {
mutating func didReadBytes(_ count: Int) {
switch self.state {
case var .producing(state):
if state.updateRange(count: count) {
if state.didReadBytes(count) {
self.state = .done(emptyRange: false)
} else {
self.state = .producing(state)
}
case var .pausedProducing(state):
if state.updateRange(count: count) {
if state.didReadBytes(count) {
self.state = .done(emptyRange: false)
} else {
self.state = .pausedProducing(state)
Expand Down Expand Up @@ -396,10 +403,12 @@ private struct ProducerState: Sendable {
}
}


@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
extension ProducerState.Producing {
mutating func updateRange(count: Int) -> Bool {
/// Updates the range (the offsets to read from and up to) to reflect the number of bytes which have been read.
/// - Parameter count: The number of bytes which have been read.
/// - Returns: Returns `True` if there are no remaining bytes to read, `False` otherwise.
mutating func didReadBytes(_ count: Int) -> Bool {
guard let currentRange = self.range else {
// if we read 0 bytes we are done
return count == 0
Expand Down
6 changes: 4 additions & 2 deletions Sources/NIOFileSystem/Internal/BufferedOrAnyStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ import NIOCore
/// Wraps a ``NIOThrowingAsyncSequenceProducer<Element>`` or ``AnyAsyncSequence<Element>``.
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
internal enum BufferedOrAnyStream<Element, Delegate: NIOAsyncSequenceProducerDelegate> {
typealias AsyncSequenceProducer = NIOThrowingAsyncSequenceProducer<Element, Error, NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark, Delegate>
typealias AsyncSequenceProducer = NIOThrowingAsyncSequenceProducer<
Element, Error, NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark, Delegate
>

case nioThrowingAsyncSequenceProducer(AsyncSequenceProducer)
case anyAsyncSequence(AnyAsyncSequence<Element>)
Expand Down Expand Up @@ -47,7 +49,7 @@ internal enum BufferedOrAnyStream<Element, Delegate: NIOAsyncSequenceProducerDel
internal mutating func next() async throws -> Element? {
let element: Element?
switch self {
case var .bufferedStream(iterator):
case let .bufferedStream(iterator):
defer { self = .bufferedStream(iterator) }
element = try await iterator.next()
case var .anyAsyncSequence(iterator):
Expand Down

0 comments on commit b61146f

Please sign in to comment.