Skip to content
This repository has been archived by the owner on Feb 10, 2025. It is now read-only.

Use more async/await #70

Merged
merged 2 commits into from
May 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 8 additions & 9 deletions lib/src/async_queue.dart
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import 'dart:async';
import 'dart:collection';

typedef ItemProcessor<T> = Future Function(T item);
typedef ItemProcessor<T> = Future<void> Function(T item);

/// A queue of items that are sequentially, asynchronously processed.
///
Expand Down Expand Up @@ -57,15 +57,14 @@ class AsyncQueue<T> {
///
/// When complete, recursively calls itself to continue processing unless
/// the process was cancelled.
Future _processNextItem() {
Future<void> _processNextItem() async {
var item = _items.removeFirst();
return _processor(item).then((_) async {
if (_items.isNotEmpty) return await _processNextItem();
await _processor(item);
if (_items.isNotEmpty) return _processNextItem();

// We have drained the queue, stop processing and wait until something
// has been enqueued.
_isProcessing = false;
return null;
});
// We have drained the queue, stop processing and wait until something
// has been enqueued.
_isProcessing = false;
return null;
}
}
54 changes: 28 additions & 26 deletions lib/src/directory_watcher/polling.dart
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ class _PollingDirectoryWatcher

bool get isReady => _ready.isCompleted;

Future get ready => _ready.future;
final _ready = Completer();
Future<void> get ready => _ready.future;
final _ready = Completer<void>();

/// The amount of time the watcher pauses between successive polls of the
/// directory contents.
Expand Down Expand Up @@ -129,38 +129,41 @@ class _PollingDirectoryWatcher

/// Processes [file] to determine if it has been modified since the last
/// time it was scanned.
Future _processFile(String file) {
Future<void> _processFile(String file) async {
// `null` is the sentinel which means the directory listing is complete.
if (file == null) return _completePoll();

return getModificationTime(file).then((modified) {
if (_events.isClosed) return null;
if (file == null) {
await _completePoll();
return;
}

var lastModified = _lastModifieds[file];
final modified = await modificationTime(file);

// If its modification time hasn't changed, assume the file is unchanged.
if (lastModified != null && lastModified == modified) {
// The file is still here.
_polledFiles.add(file);
return null;
}
if (_events.isClosed) return;

if (_events.isClosed) return null;
var lastModified = _lastModifieds[file];

_lastModifieds[file] = modified;
// If its modification time hasn't changed, assume the file is unchanged.
if (lastModified != null && lastModified == modified) {
// The file is still here.
_polledFiles.add(file);
return;
}

// Only notify if we're ready to emit events.
if (!isReady) return null;
if (_events.isClosed) return;

var type = lastModified == null ? ChangeType.ADD : ChangeType.MODIFY;
_events.add(WatchEvent(type, file));
});
_lastModifieds[file] = modified;
_polledFiles.add(file);

// Only notify if we're ready to emit events.
if (!isReady) return;

var type = lastModified == null ? ChangeType.ADD : ChangeType.MODIFY;
_events.add(WatchEvent(type, file));
}

/// After the directory listing is complete, this determines which files were
/// removed and then restarts the next poll.
Future _completePoll() {
Future<void> _completePoll() async {
// Any files that were not seen in the last poll but that we have a
// status for must have been removed.
var removedFiles = _lastModifieds.keys.toSet().difference(_polledFiles);
Expand All @@ -172,9 +175,8 @@ class _PollingDirectoryWatcher
if (!isReady) _ready.complete();

// Wait and then poll again.
return Future.delayed(_pollingDelay).then((_) {
if (_events.isClosed) return;
_poll();
});
await Future.delayed(_pollingDelay);
if (_events.isClosed) return;
_poll();
}
}
4 changes: 2 additions & 2 deletions lib/src/directory_watcher/windows.dart
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class _WindowsDirectoryWatcher

bool get isReady => _readyCompleter.isCompleted;

Future get ready => _readyCompleter.future;
Future<void> get ready => _readyCompleter.future;
final _readyCompleter = Completer();

final Map<String, _EventBatcher> _eventBatchers =
Expand Down Expand Up @@ -380,7 +380,7 @@ class _WindowsDirectoryWatcher

/// Starts or restarts listing the watched directory to get an initial picture
/// of its state.
Future _listDir() {
Future<void> _listDir() {
assert(!isReady);
if (_initialListSubscription != null) _initialListSubscription.cancel();

Expand Down
2 changes: 1 addition & 1 deletion lib/src/file_watcher/polling.dart
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class _PollingFileWatcher implements FileWatcher, ManuallyClosedWatcher {

DateTime modified;
try {
modified = await getModificationTime(path);
modified = await modificationTime(path);
} on FileSystemException catch (error, stackTrace) {
if (!_eventsController.isClosed) {
_eventsController.addError(error, stackTrace);
Expand Down
15 changes: 8 additions & 7 deletions lib/src/resubscribable.dart
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ abstract class ResubscribableWatcher implements Watcher {

bool get isReady => _readyCompleter.isCompleted;

Future get ready => _readyCompleter.future;
var _readyCompleter = Completer();
Future<void> get ready => _readyCompleter.future;
var _readyCompleter = Completer<void>();

/// Creates a new [ResubscribableWatcher] wrapping the watchers
/// emitted by [_factory].
Expand All @@ -41,16 +41,17 @@ abstract class ResubscribableWatcher implements Watcher {
StreamSubscription subscription;

_eventsController = StreamController<WatchEvent>.broadcast(
onListen: () {
onListen: () async {
watcher = _factory();
subscription = watcher.events.listen(_eventsController.add,
onError: _eventsController.addError,
onDone: _eventsController.close);

// It's important that we complete the value of [_readyCompleter] at the
// time [onListen] is called, as opposed to the value when [watcher.ready]
// fires. A new completer may be created by that time.
watcher.ready.then(_readyCompleter.complete);
// It's important that we complete the value of [_readyCompleter] at
// the time [onListen] is called, as opposed to the value when
// [watcher.ready] fires. A new completer may be created by that time.
await watcher.ready;
_readyCompleter.complete();
},
onCancel: () {
// Cancel the subscription before closing the watcher so that the
Expand Down
7 changes: 4 additions & 3 deletions lib/src/stat.dart
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ void mockGetModificationTime(MockTimeCallback callback) {
}

/// Gets the modification time for the file at [path].
Future<DateTime> getModificationTime(String path) {
Future<DateTime> modificationTime(String path) async {
if (_mockTimeCallback != null) {
return Future.value(_mockTimeCallback(path));
return _mockTimeCallback(path);
}

return FileStat.stat(path).then((stat) => stat.modified);
final stat = await FileStat.stat(path);
return stat.modified;
}
5 changes: 3 additions & 2 deletions test/utils.dart
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,9 @@ Future<Null> startWatcher({String path}) async {
/// at the end of a test. Otherwise, if they don't occur, the test will wait
/// indefinitely because they might in the future and because the watcher is
/// normally only closed after the test completes.
void startClosingEventStream() {
pumpEventQueue().then((_) => _watcherEvents.cancel(immediate: true));
void startClosingEventStream() async {
await pumpEventQueue();
await _watcherEvents.cancel(immediate: true);
}

/// A list of [StreamMatcher]s that have been collected using
Expand Down