Skip to content
This repository was archived by the owner on Jan 9, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion lib/server/command_processor.js
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,8 @@ class CommandProcessor extends Duplex {
if(this[kReadStream] === null || this[kSource] === null) return;

let chunk;
while((chunk = this[kReadStream].read()) !== null) {
const rs = this[kReadStream];
while((chunk = rs.read()) !== null) {
this._sendFileQueueChunkReads++;
this._sendFileQueueReadBytes += chunk.length;
if(!this.push(chunk, 'ascii')) break;
Expand Down
44 changes: 28 additions & 16 deletions lib/server/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,13 @@ class CacheServer {
this.errCallback = errCallback;

this._server = net.createServer(socket => {
helpers.log(consts.LOG_INFO, `${socket.remoteAddress}:${socket.remotePort} connected.`);
const remoteAddress = socket.remoteAddress;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call, this makes the template literals easier to read.

const remotePort = socket.remotePort;

helpers.log(consts.LOG_INFO, `${remoteAddress}:${remotePort} connected.`);

const cmdProc = new CommandProcessor(this.cache);
const streamProc = new ClientStreamProcessor({clientAddress: `${socket.remoteAddress}:${socket.remotePort}`});
const streamProc = new ClientStreamProcessor({clientAddress: `${remoteAddress}:${remotePort}`});

const mirrors = this._mirrors;
if(mirrors.length > 0) {
Expand All @@ -119,23 +122,32 @@ class CacheServer {
});
}

const unpipeStreams = () => {
socket.unpipe();
streamProc.unpipe();
cmdProc.unpipe();
};

cmdProc.on('finish', () => {
helpers.log(consts.LOG_DBG, `${remoteAddress}:${remotePort} CommandProcessor finished.`);
process.nextTick(unpipeStreams);
});

socket.on('close', () => {
helpers.log(consts.LOG_INFO, `${socket.remoteAddress}:${socket.remotePort} closed connection.`);
socket.unpipe();
streamProc.unpipe();
cmdProc.unpipe();
}).on('error', err => {
helpers.log(consts.LOG_ERR, err.message);
});
helpers.log(consts.LOG_INFO, `${remoteAddress}:${remotePort} Closed connection.`);
}).on('error', err => {
helpers.log(consts.LOG_ERR, `${remoteAddress}:${remotePort} Connection ERROR: ${err.message}`);
unpipeStreams();
});

if(this.isRecordingClient) {
const sessionId = `${socket.remoteAddress}_${socket.remotePort}_${Date.now()}`;
socket.pipe(new ClientStreamRecorder({sessionId})); // Record the incoming byte stream to disk
}
if(this.isRecordingClient) {
const sessionId = `${remoteAddress}_${remotePort}_${Date.now()}`;
socket.pipe(new ClientStreamRecorder({sessionId})); // Record the incoming byte stream to disk
}

socket.pipe(streamProc) // Transform the incoming byte stream into commands and file data
.pipe(cmdProc) // Execute commands and interface with the cache module
.pipe(socket); // Connect back to socket to send files
socket.pipe(streamProc) // Transform the incoming byte stream into commands and file data
.pipe(cmdProc) // Execute commands and interface with the cache module
.pipe(socket); // Connect back to socket to send files
}).on('error', err => {
if (err.code === 'EADDRINUSE') {
helpers.log(consts.LOG_ERR, `Port ${this.port} is already in use...`);
Expand Down
Loading