Skip to content

Commit

Permalink
Adopt NIOThrowingAsyncSequenceProducer (#2879)
Browse files Browse the repository at this point in the history
### Motivation:

Adopt `NIOThrowingAsyncSequenceProducer` in NIOFileSystem to reduce code
duplication.

### Modifications:

Adopt `NIOThrowingAsyncSequenceProducer` in NIOFileSystem
`DirectoryEntryProducer` and `FileChunkProducer`

### Result:

No functional changes. Internal changes reduce code duplication.
  • Loading branch information
rnro authored Sep 13, 2024
1 parent 530aa8d commit 282f593
Show file tree
Hide file tree
Showing 4 changed files with 337 additions and 165 deletions.
212 changes: 147 additions & 65 deletions Sources/NIOFileSystem/DirectoryEntries.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import CNIODarwin
import CNIOLinux
import NIOConcurrencyHelpers
import NIOCore
import NIOPosix
@preconcurrency import SystemPackage

Expand Down Expand Up @@ -89,17 +90,17 @@ extension DirectoryEntries {
public typealias AsyncIterator = BatchedIterator
public typealias Element = [DirectoryEntry]

private let stream: BufferedOrAnyStream<[DirectoryEntry]>
private let stream: BufferedOrAnyStream<[DirectoryEntry], DirectoryEntryProducer>

/// Creates a ``DirectoryEntries/Batched`` sequence by wrapping an `AsyncSequence`
/// of directory entry batches.
public init<S: AsyncSequence>(wrapping sequence: S) where S.Element == Element {
self.stream = BufferedOrAnyStream(wrapping: sequence)
self.stream = BufferedOrAnyStream<[DirectoryEntry], DirectoryEntryProducer>(wrapping: sequence)
}

fileprivate init(handle: SystemFileHandle, recursive: Bool) {
// Expanding the batches yields watermarks of 256 and 512 directory entries.
let stream = BufferedStream.makeBatchedDirectoryEntryStream(
let stream = NIOThrowingAsyncSequenceProducer.makeBatchedDirectoryEntryStream(
handle: handle,
recursive: recursive,
entriesPerBatch: 64,
Expand All @@ -116,9 +117,11 @@ extension DirectoryEntries {

/// An `AsyncIteratorProtocol` of `Array<DirectoryEntry>`.
public struct BatchedIterator: AsyncIteratorProtocol {
private var iterator: BufferedOrAnyStream<[DirectoryEntry]>.AsyncIterator
private var iterator: BufferedOrAnyStream<[DirectoryEntry], DirectoryEntryProducer>.AsyncIterator

init(wrapping iterator: BufferedOrAnyStream<[DirectoryEntry]>.AsyncIterator) {
fileprivate init(
wrapping iterator: BufferedOrAnyStream<[DirectoryEntry], DirectoryEntryProducer>.AsyncIterator
) {
self.iterator = iterator
}

Expand All @@ -135,52 +138,95 @@ extension DirectoryEntries.Batched.AsyncIterator: Sendable {}
// MARK: - Internal

@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
extension BufferedStream where Element == [DirectoryEntry] {
extension NIOThrowingAsyncSequenceProducer
where
Element == [DirectoryEntry],
Failure == (any Error),
Strategy == NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark,
Delegate == DirectoryEntryProducer
{
fileprivate static func makeBatchedDirectoryEntryStream(
handle: SystemFileHandle,
recursive: Bool,
entriesPerBatch: Int,
lowWatermark: Int,
highWatermark: Int
) -> BufferedStream<[DirectoryEntry]> {
let state = DirectoryEnumerator(handle: handle, recursive: recursive)
let protectedState = NIOLockedValueBox(state)

var (stream, source) = BufferedStream.makeStream(
of: [DirectoryEntry].self,
backPressureStrategy: .watermark(low: lowWatermark, high: highWatermark)
)

source.onTermination = {
guard let threadPool = protectedState.withLockedValue({ $0.threadPoolForClosing() }) else {
return
}

threadPool.submit { _ in // always run, even if cancelled
protectedState.withLockedValue { state in
state.closeIfNecessary()
}
}
}

) -> NIOThrowingAsyncSequenceProducer<
[DirectoryEntry], any Error, NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark,
DirectoryEntryProducer
> {
let producer = DirectoryEntryProducer(
state: protectedState,
source: source,
handle: handle,
recursive: recursive,
entriesPerBatch: entriesPerBatch
)
// Start producing immediately.
producer.produceMore()

return stream
let nioThrowingAsyncSequence = NIOThrowingAsyncSequenceProducer.makeSequence(
elementType: [DirectoryEntry].self,
backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark(
lowWatermark: lowWatermark,
highWatermark: highWatermark
),
finishOnDeinit: false,
delegate: producer
)

producer.setSequenceProducerSource(nioThrowingAsyncSequence.source)

return nioThrowingAsyncSequence.sequence
}
}

@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
private struct DirectoryEntryProducer {
private typealias DirectoryEntrySequenceProducer = NIOThrowingAsyncSequenceProducer<
[DirectoryEntry], Error, NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark, DirectoryEntryProducer
>

@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
private final class DirectoryEntryProducer: NIOAsyncSequenceProducerDelegate {
let state: NIOLockedValueBox<DirectoryEnumerator>
let source: BufferedStream<[DirectoryEntry]>.Source
let entriesPerBatch: Int

init(handle: SystemFileHandle, recursive: Bool, entriesPerBatch: Int) {
let state = DirectoryEnumerator(handle: handle, recursive: recursive)
self.state = NIOLockedValueBox(state)
self.entriesPerBatch = entriesPerBatch
}

func didTerminate() {
guard let threadPool = self.state.withLockedValue({ $0.threadPoolForClosing() }) else {
return
}

threadPool.submit { _ in // always run, even if cancelled
self.state.withLockedValue { state in
state.closeIfNecessary()
}
}
}

/// sets the source within the producer state
func setSequenceProducerSource(_ sequenceProducerSource: DirectoryEntrySequenceProducer.Source) {
self.state.withLockedValue { state in
switch state.state {
case .idle:
state.sequenceProducerSource = sequenceProducerSource
case .done:
sequenceProducerSource.finish()
case .open, .openPausedProducing:
fatalError()
case .modifying:
fatalError()
}
}
}

func clearSource() {
self.state.withLockedValue { state in
state.sequenceProducerSource = nil
}
}

/// The 'entry point' for producing elements.
///
/// Calling this function will start producing directory entries asynchronously by dispatching
Expand All @@ -207,6 +253,12 @@ private struct DirectoryEntryProducer {
}
}

func pauseProducing() {
self.state.withLockedValue { state in
state.pauseProducing()
}
}

private func nextBatch() throws -> [DirectoryEntry] {
try self.state.withLockedValue { state in
try state.next(self.entriesPerBatch)
Expand All @@ -221,45 +273,51 @@ private struct DirectoryEntryProducer {
// Failed to read more entries: close and notify the stream so consumers receive the
// error.
self.close()
self.source.finish(throwing: error)
let source = self.state.withLockedValue { state in
state.sequenceProducerSource
}
source?.finish(error)
self.clearSource()
}
}

private func onNextBatch(_ entries: [DirectoryEntry]) {
let source = self.state.withLockedValue { state in
state.sequenceProducerSource
}

guard let source else {
assertionFailure("unexpectedly missing source")
return
}

// No entries were read: this must be the end (as the batch size must be greater than zero).
if entries.isEmpty {
self.source.finish(throwing: nil)
source.finish()
self.clearSource()
return
}

// Reading short means reading EOF. The enumerator closes itself in that case.
let readEOF = entries.count < self.entriesPerBatch

// Entries were produced: yield them and maybe produce more.
do {
let writeResult = try self.source.write(contentsOf: CollectionOfOne(entries))
// Exit early if EOF was read; no use in trying to produce more.
if readEOF {
self.source.finish(throwing: nil)
return
}
let writeResult = source.yield(contentsOf: CollectionOfOne(entries))

switch writeResult {
case .produceMore:
self.produceMore()
case let .enqueueCallback(token):
self.source.enqueueCallback(callbackToken: token) {
switch $0 {
case .success:
self.produceMore()
case .failure:
self.close()
}
}
}
} catch {
// Failure to write means the source is already done, that's okay we just need to
// update our state and stop producing.
// Exit early if EOF was read; no use in trying to produce more.
if readEOF {
source.finish()
self.clearSource()
return
}

switch writeResult {
case .produceMore:
self.produceMore()
case .stopProducing:
self.pauseProducing()
case .dropped:
// The source is finished; mark ourselves as done.
self.close()
}
}
Expand All @@ -282,25 +340,30 @@ private struct DirectoryEntryProducer {
/// Note that this is not a `Sequence` because we allow for errors to be thrown on `next()`.
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
private struct DirectoryEnumerator: Sendable {
private enum State: @unchecked Sendable {
internal enum State: @unchecked Sendable {
case modifying
case idle(SystemFileHandle.SendableView, recursive: Bool)
case open(NIOThreadPool, Source, [DirectoryEntry])
case openPausedProducing(NIOThreadPool, Source, [DirectoryEntry])
case done
}

/// The source of directory entries.
private enum Source {
internal enum Source {
case readdir(CInterop.DirPointer)
case fts(CInterop.FTSPointer)
}

/// The current state of enumeration.
private var state: State
internal var state: State

/// The path to the directory being enumerated.
private let path: FilePath

/// The route via which directory entry batches are yielded,
/// the sourcing end of the `DirectoryEntrySequenceProducer`
internal var sequenceProducerSource: DirectoryEntrySequenceProducer.Source?

/// Information about an entry returned by FTS. See 'fts(3)'.
private enum FTSInfo: Hashable, Sendable {
case directoryPreOrder
Expand Down Expand Up @@ -353,22 +416,38 @@ private struct DirectoryEnumerator: Sendable {
self.path = handle.path
}

internal func produceMore() -> NIOThreadPool? {
internal mutating func produceMore() -> NIOThreadPool? {
switch self.state {
case let .idle(handle, _):
return handle.threadPool
case let .open(threadPool, _, _):
return threadPool
case .openPausedProducing(let threadPool, let source, let array):
self.state = .open(threadPool, source, array)
return threadPool
case .done:
return nil
case .modifying:
fatalError()
}
}

internal mutating func pauseProducing() {
switch self.state {
case .open(let threadPool, let source, let array):
self.state = .openPausedProducing(threadPool, source, array)
case .idle:
() // we won't apply back pressure until something has been read
case .openPausedProducing, .done:
() // no-op
case .modifying:
fatalError()
}
}

internal func threadPoolForClosing() -> NIOThreadPool? {
switch self.state {
case let .open(threadPool, _, _):
case .open(let threadPool, _, _), .openPausedProducing(let threadPool, _, _):
return threadPool
case .idle, .done:
// Don't need to close in the idle state: we don't own the handle.
Expand Down Expand Up @@ -397,7 +476,7 @@ private struct DirectoryEnumerator: Sendable {
// We don't own the handle so don't close it.
self.state = .done

case let .open(_, mode, _):
case .open(_, let mode, _), .openPausedProducing(_, let mode, _):
self.state = .done
switch mode {
case .readdir(let dir):
Expand Down Expand Up @@ -631,6 +710,9 @@ private struct DirectoryEnumerator: Sendable {
return result
}

case .openPausedProducing:
return .yield(.success([]))

case .done:
return .yield(.success([]))

Expand Down
Loading

0 comments on commit 282f593

Please sign in to comment.