Skip to content

Commit

Permalink
Make ReadableStreamControler.enqueue() fail when the stream is alread…
Browse files Browse the repository at this point in the history
…y in the closed state

It was throwing the stored error but is also changed to throw a predefined
TypeError.

Issue: #424
  • Loading branch information
tyoshino committed Jan 20, 2016
1 parent 9b72462 commit a64f211
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 51 deletions.
20 changes: 8 additions & 12 deletions reference-implementation/lib/readable-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -419,15 +419,16 @@ class ReadableStreamController {
throw new TypeError('ReadableStreamController.prototype.enqueue can only be used on a ReadableStreamController');
}

const stream = this._controlledReadableStream;
if (stream._state === 'errored') {
throw stream._storedError;
}

if (this._closeRequested === true) {
throw new TypeError('stream is closed or draining');
}

const stream = this._controlledReadableStream;
if (stream._state !== 'readable') {
throw new TypeError(
`The stream (in ${stream._state} state) is not in the readable state and cannot be enqueued to`);
}

return EnqueueInReadableStreamController(this, chunk);
}

Expand Down Expand Up @@ -553,6 +554,7 @@ class ReadableByteStreamController {
if (this._closeRequested) {
throw new TypeError('stream is closed or draining');
}

if (this._controlledReadableStream._state !== 'readable') {
throw new TypeError('The stream is not in the readable state and cannot be enqueued to');
}
Expand Down Expand Up @@ -1113,13 +1115,7 @@ function EnqueueInReadableStreamController(controller, chunk) {
const stream = controller._controlledReadableStream;

assert(controller._closeRequested === false);
assert(stream._state !== 'errored');

if (stream._state === 'closed') {
// This will happen if the stream was closed without calling its controller's close() method, i.e. if it was closed
// via cancellation.
return undefined;
}
assert(stream._state === 'readable');

if (IsReadableStreamLocked(stream) === true && GetNumReadRequests(stream) > 0) {
RespondToReadRequest(stream, chunk);
Expand Down
68 changes: 52 additions & 16 deletions reference-implementation/test/pipe-to-options.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,34 @@ const test = require('tape-catch');
// Many other pipeTo-with-options tests have been templated.

test('Piping with no options and a destination error', t => {
t.plan(2);
t.plan(4);

class DelayedEnqueuingSource {
constructor() {
this._canceled = false;
}

const theError = new Error('destination error');
const rs = new ReadableStream({
start(c) {
c.enqueue('a');
setTimeout(() => c.enqueue('b'), 10);
setTimeout(() => {
t.doesNotThrow(() => c.enqueue('c'), 'enqueue after cancel should not throw');
t.notOk(this._canceled, 'the stream should not have been canceled yet');
c.enqueue('b');
}, 10);
setTimeout(() => {
t.ok(this._canceled, 'the stream should have been canceled');
t.throws(() => c.enqueue('c'), 'enqueue after cancel should throw');
}, 20);
},
}

cancel(r) {
t.equal(r, theError, 'reason passed to cancel equals the source error');

this._canceled = true;
}
});
}

const theError = new Error('destination error');
const rs = new ReadableStream(new DelayedEnqueuingSource());

const ws = new WritableStream({
write(chunk) {
Expand All @@ -31,21 +44,34 @@ test('Piping with no options and a destination error', t => {
});

test('Piping with { preventCancel: false } and a destination error', t => {
t.plan(2);
t.plan(4);

class DelayedEnqueuingSource {
constructor() {
this._canceled = false;
}

const theError = new Error('destination error');
const rs = new ReadableStream({
start(c) {
c.enqueue('a');
setTimeout(() => c.enqueue('b'), 10);
setTimeout(() => {
t.doesNotThrow(() => c.enqueue('c'), 'enqueue after cancel should not throw');
t.notOk(this._canceled, 'the stream should not have been canceled yet');
c.enqueue('b');
}, 10);
setTimeout(() => {
t.ok(this._canceled, 'the stream should have been canceled');
t.throws(() => c.enqueue('c'), 'enqueue after cancel should throw');
}, 20);
},
}

cancel(r) {
t.equal(r, theError, 'reason passed to cancel equals the source error');

this._canceled = true;
}
});
}

const theError = new Error('destination error');
const rs = new ReadableStream(new DelayedEnqueuingSource());

const ws = new WritableStream({
write(chunk) {
Expand All @@ -59,15 +85,23 @@ test('Piping with { preventCancel: false } and a destination error', t => {
});

test('Piping with { preventCancel: true } and a destination error', t => {
let resolveLastEnqueuePromise;
const lastEnqueuePromise = new Promise((r) => {
resolveLastEnqueuePromise = r;
});

const theError = new Error('destination error');
const rs = new ReadableStream({
start(c) {
c.enqueue('a');
setTimeout(() => c.enqueue('b'), 10);
setTimeout(() => c.enqueue('c'), 20);
setTimeout(() => c.enqueue('d'), 30);
setTimeout(() => {
c.enqueue('d');
resolveLastEnqueuePromise();
}, 30);
},
cancel(r) {
cancel() {
t.fail('unexpected call to cancel');
}
});
Expand All @@ -90,6 +124,8 @@ test('Piping with { preventCancel: true } and a destination error', t => {

return reader.read().then(result => {
t.deepEqual(result, { value: 'd', done: false }, 'should be able to read the remaining chunk from the reader');
return lastEnqueuePromise;
}).then(() => {
t.end();
});
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,11 @@ promise_test(() => {
});

rs.cancel();
controller.enqueue('a'); // Calling enqueue after canceling should not throw anything.
assert_throws(new TypeError, () => controller.enqueue('a'), 'Calling enqueue after canceling should throw');

return rs.getReader().closed;

}, 'Underlying source: calling enqueue on an empty canceled stream should not throw');
}, 'Underlying source: calling enqueue on an empty canceled stream should throw');

promise_test(() => {

Expand All @@ -171,11 +171,11 @@ promise_test(() => {
});

rs.cancel();
controller.enqueue('c'); // Calling enqueue after canceling should not throw anything.
assert_throws(new TypeError, () => controller.enqueue('c'), 'Calling enqueue after canceling should throw');

return rs.getReader().closed;

}, 'Underlying source: calling enqueue on a non-empty canceled stream should not throw');
}, 'Underlying source: calling enqueue on a non-empty canceled stream should throw');

promise_test(() => {

Expand All @@ -194,7 +194,7 @@ promise_test(t => {
const closed = new ReadableStream({
start(c) {
c.error(theError);
assert_throws(theError, () => c.enqueue('a'), 'call to enqueue should throw the error');
assert_throws(new TypeError(), () => c.enqueue('a'), 'call to enqueue should throw the error');
}
}).getReader().closed;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -673,24 +673,6 @@ test(() => {

}, 'ReadableStream: enqueue should throw when the stream is closed');

test(() => {

let startCalled = false;
const expectedError = new Error('i am sad');

new ReadableStream({
start(c) {
c.error(expectedError);

assert_throws(expectedError, () => c.enqueue('a'), 'enqueue after error should throw that error');
startCalled = true;
}
});

assert_true(startCalled);

}, 'ReadableStream: enqueue should throw the stored error when the stream is errored');

promise_test(() => {

let startCalled = 0;
Expand Down

0 comments on commit a64f211

Please sign in to comment.