Skip to content

Commit 6913f4b

Browse files
committed
fix(serve, ext-driver-pg): destroy the source stream to release connections
1 parent 9848eaf commit 6913f4b

File tree

4 files changed

+110
-23
lines changed

4 files changed

+110
-23
lines changed

packages/extension-driver-pg/src/lib/pgDataSource.ts

+24-21
Original file line numberDiff line numberDiff line change
@@ -58,10 +58,12 @@ export class PGDataSource extends DataSource<any, PGOptions> {
5858
const cursor = client.query(
5959
new Cursor(sql, Array.from(bindParams.values()))
6060
);
61-
cursor.once('done', () => {
61+
cursor.once('done', async () => {
6262
this.logger.debug(
6363
`Data fetched, release connection from ${profileName}`
6464
);
65+
// It is important to close the cursor before releasing connection, or the connection might not able to handle next request.
66+
await cursor.close();
6567
client.release();
6668
});
6769
// All promises MUST fulfilled in this function or we are not able to release the connection when error occurred
@@ -92,34 +94,35 @@ export class PGDataSource extends DataSource<any, PGOptions> {
9294
const { chunkSize = 100 } = options;
9395
const cursorRead = this.cursorRead.bind(this);
9496
const firstChunk = await cursorRead(cursor, chunkSize);
95-
97+
// save first chunk in buffer for incoming requests
98+
let bufferedRows = [...firstChunk.rows];
99+
let bufferReadIndex = 0;
100+
const fetchNext = async () => {
101+
if (bufferReadIndex >= bufferedRows.length) {
102+
bufferedRows = (await cursorRead(cursor, chunkSize)).rows;
103+
bufferReadIndex = 0;
104+
}
105+
return bufferedRows[bufferReadIndex++] || null;
106+
};
96107
const stream = new Readable({
97108
objectMode: true,
98109
read() {
99-
cursorRead(cursor, chunkSize)
100-
.then(({ rows }) => {
101-
if (rows.length === 0) {
102-
this.push(null);
103-
// Send done event to notify upstream to release the connection.
104-
cursor.emit('done');
105-
}
106-
for (const row of rows) {
107-
this.push(row);
108-
}
110+
fetchNext()
111+
.then((row) => {
112+
this.push(row);
109113
})
110114
.catch((error) => {
111-
this.emit('error', error);
112-
// Send done event to notify upstream to release the connection.
113-
cursor.emit('done');
115+
this.destroy(error);
114116
});
115117
},
118+
destroy(error: Error | null, cb: (error: Error | null) => void) {
119+
// Send done event to notify upstream to release the connection.
120+
cursor.emit('done');
121+
cb(error);
122+
},
123+
// automatically destroy() the stream when it emits 'finish' or errors. Node > 10.16
124+
autoDestroy: true,
116125
});
117-
118-
// Push the first chunk data.
119-
for (const row of firstChunk.rows) {
120-
stream.push(row);
121-
}
122-
123126
return {
124127
getColumns: () =>
125128
firstChunk.result.fields.map((field) => ({

packages/extension-driver-pg/test/pgDataSource.spec.ts

+51-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { PGServer } from './pgServer';
22
import { PGDataSource, PGOptions } from '../src';
33
import { streamToArray } from '@vulcan-sql/core';
4+
import { Writable } from 'stream';
45

56
const pg = new PGServer();
67
let dataSource: PGDataSource;
@@ -295,10 +296,58 @@ it('Data source should return correct column types', async () => {
295296
operations: {} as any,
296297
});
297298
const column = getColumns();
298-
// We need to consume the data stream or the driver waits for us
299-
await streamToArray(getData());
299+
// We need to destroy the data stream or the driver waits for us
300+
const data = getData();
301+
data.destroy();
302+
300303
// Assert
301304
expect(column[0]).toEqual({ name: 'id', type: 'number' });
302305
expect(column[1]).toEqual({ name: 'name', type: 'string' });
303306
expect(column[2]).toEqual({ name: 'enabled', type: 'boolean' });
304307
}, 30000);
308+
309+
it('Data source should release connection when readable stream is destroyed', async () => {
310+
// Arrange
311+
dataSource = new PGDataSource({}, '', [
312+
{
313+
name: 'profile1',
314+
type: 'pg',
315+
connection: {
316+
host: pg.host,
317+
user: pg.user,
318+
password: pg.password,
319+
database: pg.database,
320+
port: pg.port,
321+
} as PGOptions,
322+
allow: '*',
323+
},
324+
]);
325+
await dataSource.activate();
326+
// Act
327+
const { getData } = await dataSource.execute({
328+
statement: 'select * from users limit 100',
329+
bindParams: new Map(),
330+
profileName: 'profile1',
331+
operations: {} as any,
332+
});
333+
const readStream = getData();
334+
const rows: any[] = [];
335+
let resolve: any;
336+
const waitForStream = () => new Promise((res) => (resolve = res));
337+
const writeStream = new Writable({
338+
write(chunk, _, cb) {
339+
rows.push(chunk);
340+
// After read 5 records, destroy the upstream
341+
if (rows.length === 5) {
342+
readStream.destroy();
343+
resolve();
344+
} else cb();
345+
},
346+
objectMode: true,
347+
});
348+
readStream.pipe(writeStream);
349+
await waitForStream();
350+
// Assert
351+
expect(rows.length).toBe(5);
352+
// afterEach hook will timeout if any leak occurred.
353+
}, 30000);

packages/serve/src/models/extensions/responseFormatter.ts

+4
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,10 @@ export abstract class BaseResponseFormatter
3737
// if response has data and columns.
3838
const { data, columns } = ctx.response.body as BodyResponse;
3939
const formatted = this.format(data, columns);
40+
// koa destroy the stream when connection close, we need to destroy our upstream too to notice them to release the resource.
41+
formatted.on('close', () => {
42+
data.destroy();
43+
});
4044
// set formatted stream to response in context
4145
this.toResponse(formatted, ctx);
4246
return;

packages/serve/test/response-formatter/json.spec.ts

+31
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,37 @@ describe('Test to respond to json', () => {
2525
expect(ctx.response.body).toEqual(expected);
2626
});
2727

28+
it('Test to destroy the source when downstream is destroyed', (done) => {
29+
// Arrange
30+
const stubResponse = sinon.stubInterface<Response>();
31+
const source = new Stream.Readable({
32+
read() {
33+
this.push('some-data');
34+
// we don't push pull to simulate an endless stream
35+
},
36+
destroy(err, cb) {
37+
done();
38+
cb(err);
39+
},
40+
});
41+
stubResponse.body = {
42+
data: source,
43+
columns: {},
44+
};
45+
const ctx = {
46+
...sinon.stubInterface<KoaContext>(),
47+
url: faker.internet.url(),
48+
response: stubResponse,
49+
};
50+
// Act
51+
const formatter = new JsonFormatter({}, '');
52+
formatter.formatToResponse(ctx);
53+
(ctx.response.body as Stream.Readable).destroy();
54+
// Assert
55+
expect(true).toEqual(true);
56+
// the done function should be call after destroyed.
57+
}, 1000);
58+
2859
it.each([
2960
{
3061
input: {

0 commit comments

Comments
 (0)