Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

transports: convert to Duplex streams [WIP] #92

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 4 additions & 12 deletions lib/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ const timers = require('timers');
const util = require('util');

const common = require('./common');
const jsrs = require('./record-serialization');
const errors = require('./errors');
const RemoteProxy = require('./remote-proxy');

Expand Down Expand Up @@ -58,7 +57,7 @@ function Connection(transport, server, client) {

this._heartbeatCallbackInstance = null;

transport.on('packet', this._processPacket.bind(this));
transport.on('data', this._processPacket.bind(this));
transport.on('close', this._onSocketClose.bind(this));
transport.on('error', this._onSocketError.bind(this));
}
Expand Down Expand Up @@ -207,7 +206,7 @@ Connection.prototype.startHeartbeat = function(interval) {
// Internal function used by startHeartbeat
//
Connection.prototype._heartbeatCallback = function(interval) {
this.transport.send('{}');
this.transport.write({});
this.setTimeout(interval, this._heartbeatCallbackInstance);
};

Expand Down Expand Up @@ -279,22 +278,15 @@ Connection.prototype._onTimeout = function() {
// packet - a packet to send
//
Connection.prototype._send = function(packet) {
const data = jsrs.stringify(packet);
this.transport.send(data);
this.transport.write(packet);
};

// Close the connection, optionally sending a final packet
// packet - a packet to send (optional)
//
Connection.prototype._end = function(packet) {
this.stopHeartbeat();

if (packet) {
const data = jsrs.stringify(packet);
this.transport.end(data);
} else {
this.transport.end();
}
this.transport.end(packet);
};

// Closed socket event handler
Expand Down
106 changes: 57 additions & 49 deletions lib/transport.socket.js
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
'use strict';

const events = require('events');
const fs = require('fs');
const net = require('net');
const stream = require('stream');
const tls = require('tls');
const util = require('util');
const events = require('events');

const jsrs = require('./record-serialization');
const common = require('./common');
const Server = require('./server');
const Client = require('./client');
const jsrs = require('./record-serialization');
const transportCommon = require('./transport.common');
const Client = require('./client');
const Server = require('./server');

const SEPARATOR = Buffer.alloc(1);
const MAX_PACKET_SIZE = 8 * 1024 * 1024;
Expand Down Expand Up @@ -167,71 +168,78 @@ SocketClient.prototype._onSocketClose = function() {
// socket - socket instance
//
function SocketTransport(socket) {
events.EventEmitter.call(this);

this.socket = socket;
this._buffer = '';
this._uncorkSocket = this.socket.uncork.bind(this.socket);
stream.Duplex.call(this, {
readableObjectMode: true,
writableObjectMode: true,
allowHalfOpen: false
});

this.remoteAddress = socket.remoteAddress;

this.socket.setEncoding('utf8');
this.socket.on('data', this._onSocketData.bind(this));

common.forwardMultipleEvents(this.socket, this, [
'error',
'close'
]);
this._socket = socket;
this._buffer = '';
this._readable = false;

socket.setEncoding('utf8');
socket.on('error', (err) => this.emit('error', err));
socket.on('end', () => this.push(null));
socket.on('readable', () => {
this._readable = true;
this._read();
});
}

util.inherits(SocketTransport, events.EventEmitter);
util.inherits(SocketTransport, stream.Duplex);
sock.SocketTransport = SocketTransport;

// Send data over the connection
// data - Buffer or string
//
SocketTransport.prototype.send = function(data) {
this.socket.cork();
this.socket.write(data);
this.socket.write(SEPARATOR);
process.nextTick(this._uncorkSocket);
};
SocketTransport.prototype._read = function() {
if (!this._readable) return;

// End the connection optionally sending the last chunk of data
// data - Buffer or string (optional)
//
SocketTransport.prototype.end = function(data) {
if (data) {
this.socket.cork();
this.socket.write(data);
this.socket.end(SEPARATOR);
} else {
this.socket.end();
}
const chunk = this._socket.read();
if (chunk === null) return;
this._buffer += chunk;

this.socket.destroy();
};
this._readable = false;

// Socket data handler
// data - data received
//
SocketTransport.prototype._onSocketData = function(chunk) {
const packets = [];
this._buffer += chunk;

try {
this._buffer = jsrs.parseNetworkPackets(this._buffer, packets);
} catch (error) {
this.emit('error', error);
return;
this._socket.destroy(error);
}

const packetsCount = packets.length;
for (let i = 0; i < packetsCount; i++) {
this.emit('packet', packets[i]);
const packet = packets[i];
if (packet !== null) this.push(packet);
}

if (this._buffer.length > MAX_PACKET_SIZE) {
this.emit('error', new Error('Maximal packet size exceeded'));
this._socket.destroy(new Error('Maximal packet size exceeded'));
}
};

SocketTransport.prototype._write = function(packet, encoding, callback) {
this._socket.cork();
writeOne(this._socket, packet);
process.nextTick(uncork, this._socket, callback);
};

SocketTransport.prototype._writev = function(packets, callback) {
this._socket.cork();
const count = packets.length;
for (let i = 0; i < count; i++) {
writeOne(this._socket, packets[i].chunk);
}
process.nextTick(uncork, this._socket, callback);
};

function writeOne(socket, packet) {
socket.write(jsrs.stringify(packet));
socket.write(SEPARATOR);
}

function uncork(socket, callback) {
socket.uncork();
callback();
}
57 changes: 22 additions & 35 deletions lib/transport.ws.browser.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@
'use strict';

const events = require('events');
const stream = require('stream');
const util = require('util');

const jsrs = require('./record-serialization');
const Client = require('./client');
const common = require('./common');
const constants = require('./internal-constants');
const jsrs = require('./record-serialization');
const transportCommon = require('./transport.common');
const Client = require('./client');

const ws = {};
module.exports = ws;
Expand Down Expand Up @@ -131,49 +132,34 @@ W3CWebSocketClient.prototype._onMessage = function(message) {
// socketEventEmitter - an EventEmitter that proxies socket events
//
function W3CWebSocketTransport(socket, socketEventEmitter) {
events.EventEmitter.call(this);

this.socket = socket;
this.socketEventEmitter = socketEventEmitter;
stream.Duplex.call(this, {
readableObjectMode: true,
writableObjectMode: true,
allowHalfOpen: false
});

this.remoteAddress = socket.url;
this._socket = socket;
this._socketEventEmitter = socketEventEmitter;

common.forwardMultipleEvents(socketEventEmitter, this, [
'close',
'error'
]);

this.socketEventEmitter.on('message', this._onMessage.bind(this));
socketEventEmitter.on('message', this._onMessage.bind(this));
socketEventEmitter.on('close', () => this.push(null));
common.forwardEvent(socketEventEmitter, this, 'error');
}

util.inherits(W3CWebSocketTransport, events.EventEmitter);
util.inherits(W3CWebSocketTransport, stream.Duplex);
ws.W3CWebSocketTransport = W3CWebSocketTransport;

// Send data over the connection
// data - Buffer or string
//
W3CWebSocketTransport.prototype.send = function(data) {
if (Buffer.isBuffer(data)) {
data = data.toString();
}

this.socket.send(data);
W3CWebSocketTransport.prototype._read = function() {
// no-op
};

// End the connection optionally sending the last chunk of data
// data - Buffer or string (optional)
//
W3CWebSocketTransport.prototype.end = function(data) {
if (data) {
this.send(data);
}

this.socket.close();
W3CWebSocketTransport.prototype._write = function(packet, enc, callback) {
const data = jsrs.stringify(packet);
this._socket.send(data);
callback();
};

// WebSocket message handler
// message - WebSocket message
//
W3CWebSocketTransport.prototype._onMessage = function(message) {
const data = (
typeof(message.data) === 'string' ?
Expand All @@ -186,8 +172,9 @@ W3CWebSocketTransport.prototype._onMessage = function(message) {
packet = jsrs.parse(data);
} catch (error) {
this.emit('error', error);
this.socket.close();
return;
}

this.emit('packet', packet);
if (packet !== null) this.push(packet);
};
56 changes: 22 additions & 34 deletions lib/transport.ws.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,17 @@
const events = require('events');
const http = require('http');
const https = require('https');
const stream = require('stream');
const util = require('util');

const websocket = require('websocket');

const jsrs = require('./record-serialization');
const common = require('./common');
const Server = require('./server');
const Client = require('./client');
const constants = require('./internal-constants');
const jsrs = require('./record-serialization');
const transportCommon = require('./transport.common');
const Client = require('./client');
const Server = require('./server');

const WebSocketServer = websocket.server;
const WebSocketClient = websocket.client;
Expand Down Expand Up @@ -248,39 +249,31 @@ JstpWebSocketClient.prototype._onClose = function() {
// connection - WebSocket connection
//
function WebSocketTransport(connection) {
events.EventEmitter.call(this);
stream.Duplex.call(this, {
readableObjectMode: true,
writableObjectMode: true,
allowHalfOpen: false
});

this.connection = connection;
this.remoteAddress = connection.remoteAddress;
this._connection = connection;

this.connection.on('message', this._onMessage.bind(this));
this.connection.on('close', this._onClose.bind(this));
this.connection.on('error', this._onError.bind(this));
connection.on('message', this._onMessage.bind(this));
connection.on('close', () => this.push(null));
connection.on('error', this._onError.bind(this));
}

util.inherits(WebSocketTransport, events.EventEmitter);
util.inherits(WebSocketTransport, stream.Duplex);
ws.WebSocketTransport = WebSocketTransport;

// Send data over the connection
// data - Buffer or string
//
WebSocketTransport.prototype.send = function(data) {
if (Buffer.isBuffer(data)) {
data = data.toString();
}

this.connection.sendUTF(data);
WebSocketTransport.prototype._read = function() {
// no-op
};

// End the connection optionally sending the last chunk of data
// data - Buffer or string (optional)
//
WebSocketTransport.prototype.end = function(data) {
if (data) {
this.send(data);
}

this.connection.close();
WebSocketTransport.prototype._write = function(packet, enc, callback) {
const chunk = jsrs.stringify(packet);
this._connection.sendUTF(chunk);
process.nextTick(callback);
};

// WebSocket message handler
Expand All @@ -297,17 +290,12 @@ WebSocketTransport.prototype._onMessage = function(message) {
try {
packet = jsrs.parse(data);
} catch (error) {
this._connection.drop();
this.emit('error', error);
return;
}

this.emit('packet', packet);
};

// Connection close handler
//
WebSocketTransport.prototype._onClose = function() {
this.emit('close');
this.push(packet);
};

// Connection error handler
Expand Down