Skip to content

Commit

Permalink
Fixed an issue where premature client disconnections led to all follo…
Browse files Browse the repository at this point in the history
…wing requests failing with a 500 error.

The underlying file, where the data is saved was closed if any error occurred while reading from the client connection. Subsequently, the file was attempted to be used again, but it is closed.
  • Loading branch information
BenLewis-Seequent committed Jul 22, 2024
1 parent 2945f6b commit a006918
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 2 deletions.
1 change: 1 addition & 0 deletions ChangeLog.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
General:

- Bump mysql2 to resolve to 3.10.1 for security patches
- Fixed an issue where premature client disconnections led to all following requests failing with a 500 error. (issue #1346)

Blob:

Expand Down
18 changes: 16 additions & 2 deletions src/common/persistence/FSExtentStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {
createReadStream,
createWriteStream,
fdatasync,
ftruncate,
mkdir,
open,
stat,
Expand Down Expand Up @@ -34,6 +35,7 @@ import OperationQueue from "./OperationQueue";
const statAsync = promisify(stat);
const mkdirAsync = promisify(mkdir);
const unlinkAsync = promisify(unlink);
const ftruncateAsync = promisify(ftruncate);

// The max size of an extent.
const MAX_EXTENT_SIZE = DEFAULT_MAX_EXTENT_SIZE;
Expand Down Expand Up @@ -293,6 +295,17 @@ export default class FSExtentStore implements IExtentStore {
count
};
} catch (err) {
// Reset cursor position to the current offset.
try {
await ftruncateAsync(fd, appendExtent.offset);
} catch (truncate_err) {
this.logger.error(
`FSExtentStore:appendExtent() Truncate fd:${fd} len: ${appendExtent.offset} error:${JSON.stringify(
truncate_err
)}.`,
contextId
);
}
appendExtent.appendStatus = AppendStatusCode.Idle;
throw err;
}
Expand Down Expand Up @@ -541,10 +554,11 @@ export default class FSExtentStore implements IExtentStore {
this.logger.debug(
`FSExtentStore:streamPipe() Readable stream triggers error event, error:${JSON.stringify(
err
)}, after ${count} bytes piped. Invoke write stream destroy() method.`,
)}, after ${count} bytes piped. Reject streamPipe().`,
contextId
);
ws.destroy(err);

reject(err);
});

ws.on("drain", () => {
Expand Down
38 changes: 38 additions & 0 deletions tests/blob/fsStore.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import assert from "assert";
import { Readable } from "stream";
import FSExtentStore from "../../src/common/persistence/FSExtentStore";
import IExtentMetadataStore from "../../src/common/persistence/IExtentMetadataStore";
import { DEFAULT_BLOB_PERSISTENCE_ARRAY } from "../../src/blob/utils/constants";
import logger from "../../src/common/Logger";

import { mock } from "ts-mockito";

describe("FSExtentStore", () => {

const metadataStore: IExtentMetadataStore = mock<IExtentMetadataStore>();
metadataStore.getExtentLocationId = () => Promise.resolve("Default");

it("should handle input stream error gracefully during appendExtent @loki", async () => {
const store = new FSExtentStore(metadataStore, DEFAULT_BLOB_PERSISTENCE_ARRAY, logger);
await store.init();

// A null value within the Readable.from array causes the stream to emit an error.
const stream1 = Readable.from(["deadbeef", null], { objectMode: false });
await assert.rejects(store.appendExtent(stream1));

// Write a valid stream to the store.
const stream2 = Readable.from("Test", { objectMode: false });
const extent = await store.appendExtent(stream2);
assert.strictEqual(extent.offset, 0);
assert.strictEqual(extent.count, 4);

// Check that the extent is readable.
let readable = await store.readExtent(extent);
const chunks: Buffer[] = [];
for await (const chunk of readable) {
chunks.push(chunk as Buffer);
}
const data = Buffer.concat(chunks);
assert.strictEqual(data.toString(), "Test");
});
});

0 comments on commit a006918

Please sign in to comment.