Skip to content

Commit

Permalink
Factor out fake network reading code into FakeByteStream
Browse files Browse the repository at this point in the history
  • Loading branch information
tyoshino committed Feb 20, 2015
1 parent 02e1a6c commit b4d5431
Showing 1 changed file with 114 additions and 113 deletions.
227 changes: 114 additions & 113 deletions reference-implementation/test/experimental/operation-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ test('Asynchronous write, read and completion of the operation', t => {
t.end();
});

test.only('Pipe', t => {
test('Pipe', t => {
const pair0 = createOperationStream(new AdjustableStringStrategy());
const wos0 = pair0.writable;
const ros0 = pair0.readable;
Expand Down Expand Up @@ -227,146 +227,147 @@ test.only('Pipe', t => {
});
});

test('Sample implementation of network API with a buffer pool', t => {
const pair = createOperationStream(new AdjustableStrategy());
const wos = pair.writable;
const ros = pair.readable;

var bytesRead = 0;
var readableClosed = false;
function pump() {
for (;;) {
if (ros.state === 'readable') {
const op = ros.read();
if (op.type === 'data') {
const view = op.argument;
for (var i = 0; i < view.byteLength; ++i) {
if (view[i] === 1) {
++bytesRead;
}
}
} else {
readableClosed = true;
}
op.complete();
} else if (ros.state === 'waiting') {
ros.ready.then(pump);
return;
}
}
}
pump();
class FakeByteSource {
constructor() {
this._streams = createOperationStream(new AdjustableStrategy());

ros.window = 64;
this._bytesToWrite = 1024;

new Promise((resolve, reject) => {
const bufferPool = [];
this._bufferPool = [];
for (var i = 0; i < 10; ++i) {
bufferPool.push(new ArrayBuffer(10));
this._bufferPool.push(new ArrayBuffer(10));
}

var networkReadPromise = undefined;
this._buffersInUse = [];

const buffersInUse = [];
this._networkReadPromise = undefined;

var bytesToWrite = 1024;
this._loop();
}

function fakeReadFromNetworkLoop() {
for (;;) {
if (wos.state === 'cancelled') {
reject();
return;
}
get stream() {
return this._streams.readable;
}

var hasProgress = false;
_loop() {
const wos = this._streams.writable;

if (buffersInUse.length > 0) {
const entry = buffersInUse[0];
const status = entry.status;
if (status.state === 'completed') {
buffersInUse.shift();
for (;;) {
if (wos.state === 'cancelled') {
return;
}

if (entry.buffer === undefined) {
resolve();
return;
}
var hasProgress = false;

bufferPool.push(entry.buffer);
if (this._buffersInUse.length > 0) {
const entry = this._buffersInUse[0];
const status = entry.status;
if (status.state === 'completed') {
this._buffersInUse.shift();

hasProgress = true;
} else if (status.state === 'errored') {
reject();
if (entry.buffer === undefined) {
return;
}

this._bufferPool.push(entry.buffer);

hasProgress = true;
} else if (status.state === 'errored') {
return;
}
}

if (networkReadPromise === undefined && bufferPool.length > 0 && wos.state === 'writable') {
const buffer = bufferPool.shift();
const view = new Uint8Array(buffer);
for (var i = 0; i < view.byteLength; ++i) {
view[0] = 0;
}
if (this._networkReadPromise === undefined &&
this._bufferPool.length > 0 &&
wos.state === 'writable') {
const buffer = this._bufferPool.shift();
const view = new Uint8Array(buffer);
for (var i = 0; i < view.byteLength; ++i) {
view[0] = 0;
}

// Fake async network read operation.
networkReadPromise = new Promise((resolve, reject) => {
setTimeout(() => {
const bytesToWriteThisTime = Math.min(bytesToWrite, buffer.byteLength);
const view = new Uint8Array(buffer, 0, bytesToWriteThisTime);
for (var i = 0; i < view.byteLength; ++i) {
view[i] = 1;
}
bytesToWrite -= bytesToWriteThisTime;
if (bytesToWrite === 0) {
resolve({close: true, view});
} else {
resolve({close: false, view});
}
}, 0);
}).then(result => {
networkReadPromise = undefined;
if (result.close) {
buffersInUse.push({buffer, status: wos.write(result.view)});
buffersInUse.push({buffer: undefined, status: wos.close()});
// Fake async network read operation.
this._networkReadPromise = new Promise((resolve, reject) => {
setTimeout(() => {
const bytesToWriteThisTime = Math.min(this._bytesToWrite, buffer.byteLength);
const view = new Uint8Array(buffer, 0, bytesToWriteThisTime);
for (var i = 0; i < view.byteLength; ++i) {
view[i] = 1;
}
this._bytesToWrite -= bytesToWriteThisTime;
if (this._bytesToWrite === 0) {
resolve({close: true, view});
} else {
buffersInUse.push({buffer, status: wos.write(result.view)});
resolve({close: false, view});
}
});
}, 0);
}).then(result => {
this._networkReadPromise = undefined;
if (result.close) {
this._buffersInUse.push({buffer, status: wos.write(result.view)});
this._buffersInUse.push({buffer: undefined, status: wos.close()});
} else {
this._buffersInUse.push({buffer, status: wos.write(result.view)});
}
});

hasProgress = true;
}
hasProgress = true;
}

if (hasProgress) {
continue;
}
if (hasProgress) {
continue;
}

const promisesToRace = [];
const promisesToRace = [];

if (wos.state === 'writable') {
promisesToRace.push(wos.cancelled);
} else if (wos.state === 'waiting') {
promisesToRace.push(wos.ready);
}
if (wos.state === 'writable') {
promisesToRace.push(wos.cancelled);
} else if (wos.state === 'waiting') {
promisesToRace.push(wos.ready);
}

if (networkReadPromise !== undefined) {
promisesToRace.push(networkReadPromise);
}
if (this._networkReadPromise !== undefined) {
promisesToRace.push(this._networkReadPromise);
}

if (buffersInUse.length > 0) {
promisesToRace.push(buffersInUse[0].status.ready);
}
if (this._buffersInUse.length > 0) {
promisesToRace.push(this._buffersInUse[0].status.ready);
}

Promise.race(promisesToRace).then(this._loop.bind(this));
return;
}
}
}

test.only('Sample implementation of network API with a buffer pool', t => {
const bs = new FakeByteSource();
const ros = bs.stream;
ros.window = 64;

Promise.race(promisesToRace).then(fakeReadFromNetworkLoop);
var bytesRead = 0;
function pump() {
for (;;) {
if (ros.state === 'readable') {
const op = ros.read();
if (op.type === 'data') {
const view = op.argument;
for (var i = 0; i < view.byteLength; ++i) {
if (view[i] === 1) {
++bytesRead;
}
}
} else {
t.equals(bytesRead, 1024);

t.end()
}
op.complete();
} else if (ros.state === 'waiting') {
ros.ready.then(pump);
return;
}
}
fakeReadFromNetworkLoop();
}).then(
() => {
t.equals(bytesRead, 1024);
t.equals(readableClosed, true);
t.end()
}, e => {
t.fail(e);
t.end();
});
}
pump();
});

0 comments on commit b4d5431

Please sign in to comment.