Skip to content
This repository has been archived by the owner on Oct 17, 2024. It is now read-only.

Commit

Permalink
Add peek and lookAhead to StreamQueue.
Browse files Browse the repository at this point in the history
These allow users to look at events (similar to `next` and `take`)
without consuming them. For simple cases, they can be used instead of
`startTransaction` to decide what to do before doing it.

R=nweiz@google.com

Review-Url: https://codereview.chromium.org//2649033006 .
  • Loading branch information
lrhn committed Jan 31, 2017
1 parent 4a55a88 commit 420d084
Show file tree
Hide file tree
Showing 3 changed files with 192 additions and 9 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
* Add an `AsyncCache` class that caches asynchronous operations for a period of
time.

* Add `StreamQueue.peek` and `StreamQueue.lookAheead`.
These allow users to look at events without consuming them.

* Add `StreamQueue.startTransaction()` and `StreamQueue.withTransaction()`.
These allow users to conditionally consume events based on their values.

Expand Down
104 changes: 95 additions & 9 deletions lib/src/stream_queue.dart
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,22 @@ abstract class StreamQueue<T> {
throw _failClosed();
}


/// Look at the next [count] data events without consuming them.
///
/// Works like [take] except that the events are left in the queue.
/// If one of the next [count] events is an error, the returned future
/// completes with this error, and the error is still left in the queue.
Future<List<T>> lookAhead(int count) {
if (count < 0) throw new RangeError.range(count, 0, null, "count");
if (!_isClosed) {
var request = new _LookAheadRequest<T>(count);
_addRequest(request);
return request.future;
}
throw _failClosed();
}

/// Requests the next (yet unrequested) event from the stream.
///
/// When the requested event arrives, the returned future is completed with
Expand All @@ -154,6 +170,19 @@ abstract class StreamQueue<T> {
throw _failClosed();
}

/// Looks at the next (yet unrequested) event from the stream.
///
/// Like [next] except that the event is not consumed.
/// If the next event is an error event, it stays in the queue.
Future<T> get peek {
if (!_isClosed) {
var nextRequest = new _PeekRequest<T>();
_addRequest(nextRequest);
return nextRequest.future;
}
throw _failClosed();
}

/// Returns a stream of all the remaning events of the source stream.
///
/// All requested [next], [skip] or [take] operations are completed
Expand Down Expand Up @@ -353,8 +382,8 @@ abstract class StreamQueue<T> {
/// `cancel`.
///
/// After calling `cancel`, no further events can be requested.
/// None of [next], [rest], [skip], [take] or [cancel] may be
/// called again.
/// None of [lookAhead], [next], [peek], [rest], [skip], [take] or [cancel]
/// may be called again.
Future cancel({bool immediate: false}) {
if (_isClosed) throw _failClosed();
_isClosed = true;
Expand Down Expand Up @@ -692,15 +721,42 @@ class _NextRequest<T> implements _EventRequest<T> {
return true;
}
if (isDone) {
var errorFuture =
new Future.sync(() => throw new StateError("No elements"));
_completer.complete(errorFuture);
_completer.completeError(new StateError("No elements"),
StackTrace.current);
return true;
}
return false;
}
}


/// Request for a [StreamQueue.peek] call.
///
/// Completes the returned future when receiving the first event,
/// and is then complete, but doesn't consume the event.
class _PeekRequest<T> implements _EventRequest<T> {
/// Completer for the future returned by [StreamQueue.next].
final _completer = new Completer<T>();

_PeekRequest();

Future<T> get future => _completer.future;

bool update(QueueList<Result<T>> events, bool isDone) {
if (events.isNotEmpty) {
events.first.complete(_completer);
return true;
}
if (isDone) {
_completer.completeError(new StateError("No elements"),
StackTrace.current);
return true;
}
return false;
}
}


/// Request for a [StreamQueue.skip] call.
class _SkipRequest<T> implements _EventRequest<T> {
/// Completer for the future returned by the skip call.
Expand Down Expand Up @@ -738,8 +794,8 @@ class _SkipRequest<T> implements _EventRequest<T> {
}
}

/// Request for a [StreamQueue.take] call.
class _TakeRequest<T> implements _EventRequest<T> {
/// Common superclass for [_TakeRequest] and [_LookAheadRequest].
abstract class _ListRequest<T> implements _EventRequest<T> {
/// Completer for the future returned by the take call.
final _completer = new Completer<List<T>>();

Expand All @@ -752,10 +808,16 @@ class _TakeRequest<T> implements _EventRequest<T> {
/// this value.
final int _eventsToTake;

_TakeRequest(this._eventsToTake);
_ListRequest(this._eventsToTake);

/// The future completed when the correct number of events have been captured.
Future<List<T>> get future => _completer.future;
}


/// Request for a [StreamQueue.take] call.
class _TakeRequest<T> extends _ListRequest<T> {
_TakeRequest(int eventsToTake) : super(eventsToTake);

bool update(QueueList<Result<T>> events, bool isDone) {
while (_list.length < _eventsToTake) {
Expand All @@ -766,7 +828,7 @@ class _TakeRequest<T> implements _EventRequest<T> {

var event = events.removeFirst();
if (event.isError) {
_completer.completeError(event.asError.error, event.asError.stackTrace);
event.asError.complete(_completer);
return true;
}
_list.add(event.asValue.value);
Expand All @@ -776,6 +838,30 @@ class _TakeRequest<T> implements _EventRequest<T> {
}
}


/// Request for a [StreamQueue.lookAhead] call.
class _LookAheadRequest<T> extends _ListRequest<T> {
_LookAheadRequest(int eventsToTake) : super(eventsToTake);

bool update(QueueList<Result<T>> events, bool isDone) {
while (_list.length < _eventsToTake) {
if (events.length == _list.length) {
if (isDone) break;
return false;
}
var event = events.elementAt(_list.length);
if (event.isError) {
event.asError.complete(_completer);
return true;
}
_list.add(event.asValue.value);
}
_completer.complete(_list);
return true;
}
}


/// Request for a [StreamQueue.cancel] call.
///
/// The request needs no events, it just waits in the request queue
Expand Down
94 changes: 94 additions & 0 deletions test/stream_queue_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,65 @@ main() {
});
});

group("lookAhead operation", () {
test("as simple list of events", () async {
var events = new StreamQueue<int>(createStream());
expect(await events.lookAhead(4), [1, 2, 3, 4]);
expect(await events.next, 1);
expect(await events.lookAhead(2), [2, 3]);
expect(await events.take(2), [2, 3]);
expect(await events.next, 4);
await events.cancel();
});

test("of 0 events", () async {
var events = new StreamQueue<int>(createStream());
expect(events.lookAhead(0), completion([]));
expect(events.next, completion(1));
expect(events.lookAhead(0), completion([]));
expect(events.next, completion(2));
expect(events.lookAhead(0), completion([]));
expect(events.next, completion(3));
expect(events.lookAhead(0), completion([]));
expect(events.next, completion(4));
expect(events.lookAhead(0), completion([]));
expect(events.lookAhead(5), completion([]));
expect(events.next, throwsStateError);
await events.cancel();
});

test("with bad arguments throws", () async {
var events = new StreamQueue<int>(createStream());
expect(() => events.lookAhead(-1), throwsArgumentError);
expect(await events.next, 1); // Did not consume event.
expect(() => events.lookAhead(-1), throwsArgumentError);
expect(await events.next, 2); // Did not consume event.
await events.cancel();
});

test("of too many arguments", () async {
var events = new StreamQueue<int>(createStream());
expect(await events.lookAhead(6), [1, 2, 3, 4]);
await events.cancel();
});

test("too large later", () async {
var events = new StreamQueue<int>(createStream());
expect(await events.next, 1);
expect(await events.next, 2);
expect(await events.lookAhead(6), [3, 4]);
await events.cancel();
});

test("error", () async {
var events = new StreamQueue<int>(createErrorStream());
expect(events.lookAhead(4), throwsA("To err is divine!"));
expect(events.take(4), throwsA("To err is divine!"));
expect(await events.next, 4);
await events.cancel();
});
});

group("next operation", () {
test("simple sequence of requests", () async {
var events = new StreamQueue<int>(createStream());
Expand Down Expand Up @@ -374,11 +433,46 @@ main() {
});
});

group("peek operation", () {
test("peeks one event", () async {
var events = new StreamQueue<int>(createStream());
expect(await events.peek, 1);
expect(await events.next, 1);
expect(await events.peek, 2);
expect(await events.take(2), [2, 3]);
expect(await events.peek, 4);
expect(await events.next, 4);
// Throws at end.
expect(events.peek, throws);
await events.cancel();
});
test("multiple requests at the same time", () async {
var events = new StreamQueue<int>(createStream());
var result = await Future.wait(
[events.peek, events.peek, events.next, events.peek, events.peek]);
expect(result, [1, 1, 1, 2, 2]);
await events.cancel();
});
test("sequence of requests with error", () async {
var events = new StreamQueue<int>(createErrorStream());
expect(await events.next, 1);
expect(await events.next, 2);
expect(events.peek, throwsA("To err is divine!"));
// Error stays in queue.
expect(events.peek, throwsA("To err is divine!"));
expect(events.next, throwsA("To err is divine!"));
expect(await events.next, 4);
await events.cancel();
});
});

group("cancel operation", () {
test("closes the events, prevents any other operation", () async {
var events = new StreamQueue<int>(createStream());
await events.cancel();
expect(() => events.lookAhead(1), throwsStateError);
expect(() => events.next, throwsStateError);
expect(() => events.peek, throwsStateError);
expect(() => events.skip(1), throwsStateError);
expect(() => events.take(1), throwsStateError);
expect(() => events.rest, throwsStateError);
Expand Down

0 comments on commit 420d084

Please sign in to comment.