Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fs: make sure to write entire buffer #42434

Closed
wants to merge 13 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 58 additions & 14 deletions lib/internal/fs/streams.js
Original file line number Diff line number Diff line change
Expand Up @@ -386,22 +386,71 @@ WriteStream.prototype.open = openWriteFs;

WriteStream.prototype._construct = _construct;

function writeAll(data, size, pos, cb, retries = 0) {
this[kFs].write(this.fd, data, 0, size, pos, (er, bytesWritten, buffer) => {
if (er?.code === 'EAGAIN') {
er = null;
bytesWritten = 0;
}

if (this.destroyed || er) {
return cb(er);
ronag marked this conversation as resolved.
Show resolved Hide resolved
}
benjamingr marked this conversation as resolved.
Show resolved Hide resolved

ronag marked this conversation as resolved.
Show resolved Hide resolved
this.bytesWritten += bytesWritten;

retries = bytesWritten ? 0 : retries + 1;
size -= bytesWritten;
pos += bytesWritten;

if (retries > 5) {
cb(new Error('writev failed'));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be a proper error with code?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestions?

} else if (size) {
writeAll(buffer.slice(bytesWritten), size, pos, cb, retries);
} else {
cb();
}
});
}

function writevAll(chunks, size, pos, cb, retries = 0) {
this[kFs].writev(this.fd, chunks, this.pos, (er, bytesWritten, buffers) => {
if (er?.code === 'EAGAIN') {
er = null;
bytesWritten = 0;
}

if (this.destroyed || er) {
return cb(er);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will er here be something if this.destroyed is truthy? Otherwise, this could signal to the caller that the call succeeded?

}

this.bytesWritten += bytesWritten;

retries = bytesWritten ? 0 : retries + 1;
size -= bytesWritten;
pos += bytesWritten;

if (retries > 5) {
cb(new Error('writev failed'));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need a little help here... not sure what error to use

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lemme check

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checked the existing codes, no idea either 😅

} else if (size) {
writevAll([Buffer.concat(buffers).slice(bytesWritten)], size, pos, cb, retries);
Copy link
Contributor

@mscdex mscdex Mar 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're going to be concatenating, we may as well just do it once and call writeAll() with it instead. Otherwise like @mcollina said, we should handle this more efficiently.

Ideally it would be great if we didn't have to even keep recreating arrays on retry. That might be a nice addition to the fs.writev() API to make it more like fs.write() with its offset parameter, so we could just pass a starting index or something. With something like that all we'd have to do is possibly slice() a single buffer (or simply increasing the starting index) before retrying.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a cold path though? It should almost never happen. Is it worth optimizing for?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should almost never happen

It think it would be useful to get an idea of how often it happens (I personally have no idea). How cold is it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, if it does happen often then we have a lot of seriously broken software out there... I guess we would have quite a bit of reports....

Copy link
Contributor

@mscdex mscdex Mar 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or people have learned to handle retries on their own?

Copy link
Member Author

@ronag ronag Mar 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don’t think you can actually detect that in this interface… there is nothing a user can do… it will just be corrupt w/o any way to detect or recover… so no

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, if it does happen often then we have a lot of seriously broken software out there... I guess we would have quite a bit of reports....

makes sense, yeah sounds like a cold path to me.

} else {
cb();
}
});
}

WriteStream.prototype._write = function(data, encoding, cb) {
this[kIsPerformingIO] = true;
this[kFs].write(this.fd, data, 0, data.length, this.pos, (er, bytes) => {
writeAll.call(this, data.length, data, this.pos, (er) => {
this[kIsPerformingIO] = false;
if (this.destroyed) {
// Tell ._destroy() that it's safe to close the fd now.
cb(er);
return this.emit(kIoDone, er);
}

if (er) {
return cb(er);
}

this.bytesWritten += bytes;
cb();
cb(er);
});

if (this.pos !== undefined)
Expand All @@ -421,20 +470,15 @@ WriteStream.prototype._writev = function(data, cb) {
}

this[kIsPerformingIO] = true;
this[kFs].writev(this.fd, chunks, this.pos, (er, bytes) => {
writevAll.call(this, size, chunks, this.pos, (er) => {
this[kIsPerformingIO] = false;
if (this.destroyed) {
// Tell ._destroy() that it's safe to close the fd now.
cb(er);
return this.emit(kIoDone, er);
}

if (er) {
return cb(er);
}

this.bytesWritten += bytes;
cb();
cb(er);
});

if (this.pos !== undefined)
Expand Down