Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix init failures #46

Merged
merged 5 commits into from
Apr 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@

- Upgrade dependencies.
- Prevent Node.js max listeners exceeded warnings if many `fs-capacitor` `ReadStream` instances are created at the same time, fixing [#30](https://github.com/mike-marcacci/fs-capacitor/issues/30) via [#42](https://github.com/mike-marcacci/fs-capacitor/pull/42).
- Ensure initialization failures are reported, fixing [#45](https://github.com/mike-marcacci/fs-capacitor/issues/45) via [#46](https://github.com/mike-marcacci/fs-capacitor/pull/46/files).
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR link accidentally ended in /files.

- **BREAKING:** Drop support for node 13.
- **BREAKING:** Drop support for node 10.
- **BREAKING:** Change module type to ES module.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ Uses node's default of `16384` (16kb). Optional buffer size at which the writabl

Uses node's default of `utf8`. Optional default encoding to use when no encoding is specified as an argument to `stream.write()`. See [node's docs for `stream.Writable`](https://nodejs.org/api/stream.html#stream_constructor_new_stream_writable_options). Possible values depend on the version of node, and are [defined in node's buffer implementation](https://github.com/nodejs/node/blob/master/lib/buffer.js);

#### `.tmpdir`

Used node's [`os.tmpdir`](https://nodejs.org/api/os.html#os_os_tmpdir) by default. This function returns the directory used by fs-capacitor to store file buffers, and is intended primarily for testing and debugging.

### ReadStream

`ReadStream` extends [`stream.Readable`](https://nodejs.org/api/stream.html#stream_new_stream_readable_options);
Expand Down
23 changes: 23 additions & 0 deletions src/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,29 @@ test("Data from a complete stream.", async (t) => {
);
});

test("Error while initializing.", async (t) => {
// Create a new capacitor
const capacitor1 = new WriteStream({ tmpdir: () => "/tmp/does-not-exist" });

let resolve: () => void, reject: (error: Error) => void;
const promise = new Promise<void>((_resolve, _reject) => {
resolve = _resolve;
reject = _reject;
});

// Synchronously attach an error listener.
capacitor1.on("error", (error) => {
try {
t.is((error as any).code, "ENOENT");
resolve();
} catch (error) {
reject(error);
}
});

await promise;
});

test("Allows specification of encoding in createReadStream.", async (t) => {
const data = Buffer.from("1".repeat(10), "utf8");
const source = new Readable({
Expand Down
68 changes: 47 additions & 21 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ export class ReadStream extends Readable {
export interface WriteStreamOptions {
highWaterMark?: WritableOptions["highWaterMark"];
defaultEncoding?: WritableOptions["defaultEncoding"];
tmpdir?: () => string;
}

export class WriteStream extends Writable {
Expand All @@ -106,7 +107,10 @@ export class WriteStream extends Writable {
return;
}

this._path = join(tmpdir(), `capacitor-${buffer.toString("hex")}.tmp`);
this._path = join(
(options?.tmpdir ?? tmpdir)(),
`capacitor-${buffer.toString("hex")}.tmp`
);

// Create a file in the OS's temporary files directory.
open(this._path, "wx+", 0o600, (error, fd) => {
Expand All @@ -124,6 +128,32 @@ export class WriteStream extends Writable {
});
}

_cleanup = (callback: (error: null | Error) => void): void => {
const fd = this._fd;
const path = this._path;

if (typeof fd !== "number" || typeof path !== "string") {
callback(null);
return;
}

// Close the file descriptor.
close(fd, (closeError) => {
// An error here probably means the fd was already closed, but we can
// still try to unlink the file.
unlink(path, (unlinkError) => {
// If we are unable to unlink the file, the operating system will
// clean up on next restart, since we use store thes in `os.tmpdir()`
this._fd = null;

// We avoid removing this until now in case an exit occurs while
// asyncronously cleaning up.
processExitProxy.off("exit", this._cleanupSync);
callback(unlinkError ?? closeError);
});
});
};

_cleanupSync = (): void => {
processExitProxy.off("exit", this._cleanupSync);

Expand Down Expand Up @@ -191,32 +221,28 @@ export class WriteStream extends Writable {
error: undefined | null | Error,
callback: (error?: null | Error) => any
): void {
const fd = this._fd;
const path = this._path;
if (typeof fd !== "number" || typeof path !== "string") {
this.once("ready", () => this._destroy(error, callback));
return;
// Destroy all attached read streams.
for (const readStream of this._readStreams) {
readStream.destroy(error || undefined);
}

// Close the file descriptor.
close(fd, (closeError) => {
// An error here probably means the fd was already closed, but we can
// still try to unlink the file.
unlink(path, (unlinkError) => {
// If we are unable to unlink the file, the operating system will
// clean up on next restart, since we use store thes in `os.tmpdir()`
this._fd = null;
// This capacitor is fully initialized.
if (typeof this._fd === "number" && typeof this._path === "string") {
this._cleanup((cleanupError) => callback(cleanupError ?? error));
return;
}

// We avoid removing this until now in case an exit occurs while
// asyncronously cleaning up.
processExitProxy.off("exit", this._cleanupSync);
callback(unlinkError || closeError || error);
// This capacitor has not yet finished initialization; if initialization
// does complete, immediately clean up after.
this.once("ready", () => {
this._cleanup((cleanupError) => {
if (cleanupError) {
this.emit("error", cleanupError);
}
});
});

// Destroy all attached read streams.
for (const readStream of this._readStreams)
readStream.destroy(error || undefined);
callback(error);
}

createReadStream(options?: ReadStreamOptions): ReadStream {
Expand Down