Skip to content

Commit

Permalink
fix(stor): pause connection to avoid memory build up
Browse files Browse the repository at this point in the history
  • Loading branch information
trs committed Jan 10, 2018
1 parent 286c106 commit 65b1fd2
Showing 1 changed file with 12 additions and 5 deletions.
17 changes: 12 additions & 5 deletions src/commands/registration/stor.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,33 @@ module.exports = {
.tap(() => this.commandSocket.pause())
.then(() => when.try(this.fs.write.bind(this.fs), fileName, {append, start: this.restByteCount}))
.then(stream => {
this.restByteCount = 0;
const destroyConnection = (connection, reject) => err => connection && connection.destroy(err) && reject(err);

const streamPromise = when.promise((resolve, reject) => {
stream.once('error', err => reject(err));
stream.once('error', destroyConnection(this.connector.socket, reject));
stream.once('finish', () => resolve());
});

const socketPromise = when.promise((resolve, reject) => {
this.connector.socket.on('data', data => stream.write(data, this.transferType));
this.connector.socket.on('data', data => {
if (this.connector.socket) this.connector.socket.pause();
if (stream) {
stream.write(data, this.transferType, () => this.connector.socket && this.connector.socket.resume());
}
});
this.connector.socket.once('end', () => {
if (stream.listenerCount('close')) stream.emit('close');
else stream.end();
resolve();
});
this.connector.socket.once('error', err => reject(err));
this.connector.socket.once('error', destroyConnection(stream, reject));
});

this.restByteCount = 0;

return this.reply(150).then(() => this.connector.socket.resume())
.then(() => when.join(streamPromise, socketPromise))
.finally(() => stream.destroy ? stream.destroy() : null);
.finally(() => stream.destroy && stream.destroy());
})
.then(() => this.reply(226, fileName))
.catch(when.TimeoutError, err => {
Expand Down

0 comments on commit 65b1fd2

Please sign in to comment.