Skip to content

Commit a9a3b64

Browse files
committed
refactor(ext-driver-pg): release connection when stream destroyed
1 parent 9848eaf commit a9a3b64

File tree

3 files changed

+79
-21
lines changed

3 files changed

+79
-21
lines changed

packages/extension-driver-pg/src/lib/pgDataSource.ts

+20-19
Original file line numberDiff line numberDiff line change
@@ -92,34 +92,35 @@ export class PGDataSource extends DataSource<any, PGOptions> {
9292
const { chunkSize = 100 } = options;
9393
const cursorRead = this.cursorRead.bind(this);
9494
const firstChunk = await cursorRead(cursor, chunkSize);
95-
95+
// save first chunk in buffer for incoming requests
96+
let bufferedRows = [...firstChunk.rows];
97+
let bufferReadIndex = 0;
98+
const fetchNext = async () => {
99+
if (bufferReadIndex >= bufferedRows.length) {
100+
bufferedRows = (await cursorRead(cursor, chunkSize)).rows;
101+
bufferReadIndex = 0;
102+
}
103+
return bufferedRows[bufferReadIndex++] || null;
104+
};
96105
const stream = new Readable({
97106
objectMode: true,
98107
read() {
99-
cursorRead(cursor, chunkSize)
100-
.then(({ rows }) => {
101-
if (rows.length === 0) {
102-
this.push(null);
103-
// Send done event to notify upstream to release the connection.
104-
cursor.emit('done');
105-
}
106-
for (const row of rows) {
107-
this.push(row);
108-
}
108+
fetchNext()
109+
.then((row) => {
110+
this.push(row);
109111
})
110112
.catch((error) => {
111-
this.emit('error', error);
112-
// Send done event to notify upstream to release the connection.
113-
cursor.emit('done');
113+
this.destroy(error);
114114
});
115115
},
116+
destroy() {
117+
// Send done event to notify upstream to release the connection.
118+
cursor.emit('done');
119+
},
120+
// automatically destroy() the stream when it emits 'finish' or errors. Node > 10.16
121+
autoDestroy: true,
116122
});
117123

118-
// Push the first chunk data.
119-
for (const row of firstChunk.rows) {
120-
stream.push(row);
121-
}
122-
123124
return {
124125
getColumns: () =>
125126
firstChunk.result.fields.map((field) => ({

packages/extension-driver-pg/test/pgDataSource.spec.ts

+51-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { PGServer } from './pgServer';
22
import { PGDataSource, PGOptions } from '../src';
33
import { streamToArray } from '@vulcan-sql/core';
4+
import { Writable } from 'stream';
45

56
const pg = new PGServer();
67
let dataSource: PGDataSource;
@@ -295,10 +296,58 @@ it('Data source should return correct column types', async () => {
295296
operations: {} as any,
296297
});
297298
const column = getColumns();
298-
// We need to consume the data stream or the driver waits for us
299-
await streamToArray(getData());
299+
// We need to destroy the data stream or the driver waits for us
300+
const data = getData();
301+
data.destroy();
302+
300303
// Assert
301304
expect(column[0]).toEqual({ name: 'id', type: 'number' });
302305
expect(column[1]).toEqual({ name: 'name', type: 'string' });
303306
expect(column[2]).toEqual({ name: 'enabled', type: 'boolean' });
304307
}, 30000);
308+
309+
it('Data source should release connection when readable stream is destroyed', async () => {
310+
// Arrange
311+
dataSource = new PGDataSource({}, '', [
312+
{
313+
name: 'profile1',
314+
type: 'pg',
315+
connection: {
316+
host: pg.host,
317+
user: pg.user,
318+
password: pg.password,
319+
database: pg.database,
320+
port: pg.port,
321+
} as PGOptions,
322+
allow: '*',
323+
},
324+
]);
325+
await dataSource.activate();
326+
// Act
327+
const { getData } = await dataSource.execute({
328+
statement: 'select * from users limit 100',
329+
bindParams: new Map(),
330+
profileName: 'profile1',
331+
operations: {} as any,
332+
});
333+
const readStream = getData();
334+
const rows: any[] = [];
335+
let resolve: any;
336+
const waitForStream = () => new Promise((res) => (resolve = res));
337+
const writeStream = new Writable({
338+
write(chunk, _, cb) {
339+
rows.push(chunk);
340+
// After read 5 records, destroy the upstream
341+
if (rows.length === 5) {
342+
readStream.destroy();
343+
resolve();
344+
} else cb();
345+
},
346+
objectMode: true,
347+
});
348+
readStream.pipe(writeStream);
349+
await waitForStream();
350+
// Assert
351+
expect(rows.length).toBe(5);
352+
// afterEach hook will timeout if any leak occurred.
353+
}, 30000);

packages/serve/src/lib/response-formatter/jsonFormatter.ts

+8
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,10 @@ export class JsonFormatter extends BaseResponseFormatter {
7171
})
7272
.on('end', () => {
7373
logger.debug('convert to json format stream > done.');
74+
})
75+
.on('close', () => {
76+
console.log('cccc');
77+
data.destroy();
7478
});
7579

7680
return jsonStream;
@@ -83,5 +87,9 @@ export class JsonFormatter extends BaseResponseFormatter {
8387
// set json stream to response in context ( data is json stream, no need to convert. )
8488
ctx.response.body = stream;
8589
ctx.response.set('Content-type', 'application/json');
90+
ctx.req.on('close', () => {
91+
console.log(88);
92+
stream.destroy();
93+
});
8694
}
8795
}

0 commit comments

Comments
 (0)