diff --git a/reference-implementation/test/experimental/operation-stream.js b/reference-implementation/test/experimental/operation-stream.js index c9d7fb4ff..3a0f53945 100644 --- a/reference-implementation/test/experimental/operation-stream.js +++ b/reference-implementation/test/experimental/operation-stream.js @@ -177,7 +177,7 @@ test('Asynchronous write, read and completion of the operation', t => { t.end(); }); -test.only('Pipe', t => { +test('Pipe', t => { const pair0 = createOperationStream(new AdjustableStringStrategy()); const wos0 = pair0.writable; const ros0 = pair0.readable; @@ -227,146 +227,147 @@ test.only('Pipe', t => { }); }); -test('Sample implementation of network API with a buffer pool', t => { - const pair = createOperationStream(new AdjustableStrategy()); - const wos = pair.writable; - const ros = pair.readable; - - var bytesRead = 0; - var readableClosed = false; - function pump() { - for (;;) { - if (ros.state === 'readable') { - const op = ros.read(); - if (op.type === 'data') { - const view = op.argument; - for (var i = 0; i < view.byteLength; ++i) { - if (view[i] === 1) { - ++bytesRead; - } - } - } else { - readableClosed = true; - } - op.complete(); - } else if (ros.state === 'waiting') { - ros.ready.then(pump); - return; - } - } - } - pump(); +class FakeByteSource { + constructor() { + this._streams = createOperationStream(new AdjustableStrategy()); - ros.window = 64; + this._bytesToWrite = 1024; - new Promise((resolve, reject) => { - const bufferPool = []; + this._bufferPool = []; for (var i = 0; i < 10; ++i) { - bufferPool.push(new ArrayBuffer(10)); + this._bufferPool.push(new ArrayBuffer(10)); } - var networkReadPromise = undefined; + this._buffersInUse = []; - const buffersInUse = []; + this._networkReadPromise = undefined; - var bytesToWrite = 1024; + this._loop(); + } - function fakeReadFromNetworkLoop() { - for (;;) { - if (wos.state === 'cancelled') { - reject(); - return; - } + get stream() { + return this._streams.readable; + } - var hasProgress = false; + _loop() { + const wos = this._streams.writable; - if (buffersInUse.length > 0) { - const entry = buffersInUse[0]; - const status = entry.status; - if (status.state === 'completed') { - buffersInUse.shift(); + for (;;) { + if (wos.state === 'cancelled') { + return; + } - if (entry.buffer === undefined) { - resolve(); - return; - } + var hasProgress = false; - bufferPool.push(entry.buffer); + if (this._buffersInUse.length > 0) { + const entry = this._buffersInUse[0]; + const status = entry.status; + if (status.state === 'completed') { + this._buffersInUse.shift(); - hasProgress = true; - } else if (status.state === 'errored') { - reject(); + if (entry.buffer === undefined) { return; } + + this._bufferPool.push(entry.buffer); + + hasProgress = true; + } else if (status.state === 'errored') { + return; } + } - if (networkReadPromise === undefined && bufferPool.length > 0 && wos.state === 'writable') { - const buffer = bufferPool.shift(); - const view = new Uint8Array(buffer); - for (var i = 0; i < view.byteLength; ++i) { - view[0] = 0; - } + if (this._networkReadPromise === undefined && + this._bufferPool.length > 0 && + wos.state === 'writable') { + const buffer = this._bufferPool.shift(); + const view = new Uint8Array(buffer); + for (var i = 0; i < view.byteLength; ++i) { + view[0] = 0; + } - // Fake async network read operation. - networkReadPromise = new Promise((resolve, reject) => { - setTimeout(() => { - const bytesToWriteThisTime = Math.min(bytesToWrite, buffer.byteLength); - const view = new Uint8Array(buffer, 0, bytesToWriteThisTime); - for (var i = 0; i < view.byteLength; ++i) { - view[i] = 1; - } - bytesToWrite -= bytesToWriteThisTime; - if (bytesToWrite === 0) { - resolve({close: true, view}); - } else { - resolve({close: false, view}); - } - }, 0); - }).then(result => { - networkReadPromise = undefined; - if (result.close) { - buffersInUse.push({buffer, status: wos.write(result.view)}); - buffersInUse.push({buffer: undefined, status: wos.close()}); + // Fake async network read operation. + this._networkReadPromise = new Promise((resolve, reject) => { + setTimeout(() => { + const bytesToWriteThisTime = Math.min(this._bytesToWrite, buffer.byteLength); + const view = new Uint8Array(buffer, 0, bytesToWriteThisTime); + for (var i = 0; i < view.byteLength; ++i) { + view[i] = 1; + } + this._bytesToWrite -= bytesToWriteThisTime; + if (this._bytesToWrite === 0) { + resolve({close: true, view}); } else { - buffersInUse.push({buffer, status: wos.write(result.view)}); + resolve({close: false, view}); } - }); + }, 0); + }).then(result => { + this._networkReadPromise = undefined; + if (result.close) { + this._buffersInUse.push({buffer, status: wos.write(result.view)}); + this._buffersInUse.push({buffer: undefined, status: wos.close()}); + } else { + this._buffersInUse.push({buffer, status: wos.write(result.view)}); + } + }); - hasProgress = true; - } + hasProgress = true; + } - if (hasProgress) { - continue; - } + if (hasProgress) { + continue; + } - const promisesToRace = []; + const promisesToRace = []; - if (wos.state === 'writable') { - promisesToRace.push(wos.cancelled); - } else if (wos.state === 'waiting') { - promisesToRace.push(wos.ready); - } + if (wos.state === 'writable') { + promisesToRace.push(wos.cancelled); + } else if (wos.state === 'waiting') { + promisesToRace.push(wos.ready); + } - if (networkReadPromise !== undefined) { - promisesToRace.push(networkReadPromise); - } + if (this._networkReadPromise !== undefined) { + promisesToRace.push(this._networkReadPromise); + } - if (buffersInUse.length > 0) { - promisesToRace.push(buffersInUse[0].status.ready); - } + if (this._buffersInUse.length > 0) { + promisesToRace.push(this._buffersInUse[0].status.ready); + } + + Promise.race(promisesToRace).then(this._loop.bind(this)); + return; + } + } +} + +test.only('Sample implementation of network API with a buffer pool', t => { + const bs = new FakeByteSource(); + const ros = bs.stream; + ros.window = 64; - Promise.race(promisesToRace).then(fakeReadFromNetworkLoop); + var bytesRead = 0; + function pump() { + for (;;) { + if (ros.state === 'readable') { + const op = ros.read(); + if (op.type === 'data') { + const view = op.argument; + for (var i = 0; i < view.byteLength; ++i) { + if (view[i] === 1) { + ++bytesRead; + } + } + } else { + t.equals(bytesRead, 1024); + + t.end() + } + op.complete(); + } else if (ros.state === 'waiting') { + ros.ready.then(pump); return; } } - fakeReadFromNetworkLoop(); - }).then( - () => { - t.equals(bytesRead, 1024); - t.equals(readableClosed, true); - t.end() - }, e => { - t.fail(e); - t.end(); - }); + } + pump(); });