Skip to content

Commit 72b1b7f

Browse files
authored
Merge pull request #278 from Canner/chore/performance-analysis
Feature: improve duckdbDataSource performance
2 parents 232f449 + 58db508 commit 72b1b7f

File tree

9 files changed

+350
-45
lines changed

9 files changed

+350
-45
lines changed
+135
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
import * as fs from 'fs';
2+
import * as path from 'path';
3+
import { isEmpty } from 'lodash';
4+
5+
interface ConcurrentPerformanceRecord {
6+
[key: string]: [{ group?: string; diff?: number }];
7+
}
8+
let is_analysis = false;
9+
let performanceRecord: ConcurrentPerformanceRecord = {};
10+
let keyStatistics: Record<
11+
string,
12+
{
13+
min?: number;
14+
max?: number;
15+
avg?: number;
16+
median?: number;
17+
p90?: number;
18+
}
19+
> = {};
20+
/**
21+
* This is a performance analysis tool for concurrent tasks
22+
* You can use it to collect the start and end time of a task, the collected data with the same key will be summarized
23+
* the summarzied report contains the min, max, avg, median, p90 of the task
24+
* When the code snippet is executed, the performance analysis tool will automatically collect the start and end time of the task
25+
*
26+
* example:
27+
* const start = Date.now();
28+
* await fn_to_measure()
29+
* const end = Date.now();
30+
* PerformanceAnalysis.collect('fn_to_measure', start, end)
31+
*
32+
* You can choose when to summarize the performance data
33+
* for example, you can summarize the performance data before server closed
34+
*
35+
* public async close() {
36+
if (this.servers) {
37+
... close server
38+
}
39+
PerformanceAnalysis.count();
40+
}
41+
*
42+
* Note: If you want to view the performance by each API call, you can use k6 or you can specify the group name when collecting the performance data
43+
* and implement another count & writePerformanceReport funtion to summarize the performance data by group name
44+
*
45+
*/
46+
export class PerformanceAnalysis {
47+
public static collect(
48+
key: string,
49+
start: number,
50+
end: number,
51+
group?: string
52+
) {
53+
if (!start || !end) {
54+
throw new Error(
55+
`should provide start and end time when doing performance analysis task "${key}"`
56+
);
57+
}
58+
if (!performanceRecord[key]) {
59+
performanceRecord[key] = [] as any;
60+
}
61+
const diff = end - start;
62+
performanceRecord[key].push({ group, diff });
63+
if (process.env['PRINT_COLLECTION']) {
64+
console.log(
65+
`${key}: collected, start: ${start}, end: ${end}, diff: ${diff}`
66+
);
67+
}
68+
}
69+
70+
public static count(): boolean {
71+
// sort by time diff
72+
if (isEmpty(performanceRecord)) {
73+
console.log('performanceRecord is empty');
74+
return false;
75+
}
76+
Object.values(performanceRecord).map((records) => {
77+
records.sort((a, b) => {
78+
return <number>a.diff - <number>b.diff;
79+
});
80+
});
81+
// count statistics
82+
Object.entries(performanceRecord).map(([key, records]) => {
83+
const count = records.length;
84+
const min = records[0].diff;
85+
const max = records[count - 1].diff;
86+
const avg =
87+
records.reduce((acc, cur) => {
88+
return acc + <number>cur.diff;
89+
}, 0) / count;
90+
const median = records[Math.floor(count / 2)].diff;
91+
const p90 = records[Math.floor(count * 0.9)].diff;
92+
keyStatistics[key] = { min, max, avg, median, p90 };
93+
});
94+
return true;
95+
}
96+
97+
public static getStatistic(key: string): any {
98+
return keyStatistics[key];
99+
}
100+
101+
public static clean = () => {
102+
performanceRecord = {};
103+
keyStatistics = {};
104+
};
105+
106+
// write to txt file
107+
public static writePerformanceReport() {
108+
const filePath = path.join('./performanceRecord.txt');
109+
// print current date, time as human readable format
110+
fs.appendFileSync(filePath, `------${new Date().toLocaleString()}\n`);
111+
for (const key of Object.keys(keyStatistics)) {
112+
fs.appendFileSync(filePath, `${key}\n`);
113+
let staticLine = '';
114+
if (keyStatistics[key]) {
115+
const statics = keyStatistics[key];
116+
Object.entries(statics).map(([k, v]) => {
117+
staticLine += `${k}: ${v}, `;
118+
});
119+
fs.appendFileSync(filePath, `${staticLine}\n`);
120+
}
121+
}
122+
fs.appendFileSync(filePath, `------\n`);
123+
}
124+
}
125+
126+
export function getAnalysis() {
127+
const counted = PerformanceAnalysis.count();
128+
if (counted && !is_analysis) {
129+
PerformanceAnalysis.writePerformanceReport();
130+
console.log(
131+
'performance analysis finished, check the performanceRecord.txt file for details'
132+
);
133+
is_analysis = true;
134+
}
135+
}

packages/core/src/lib/utils/index.ts

+1
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,4 @@ export * from './module';
44
export * from './streams';
55
export * from './errors';
66
export * from './flattenElements';
7+
export * from './analyzer';

packages/core/test/analyzer.spec.ts

+47
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
import { PerformanceAnalysis, getAnalysis } from '../src/lib/utils/analyzer';
2+
import * as fs from 'fs';
3+
4+
async function waitOneSec(): Promise<void> {
5+
return new Promise((resolve) => {
6+
setTimeout(() => {
7+
resolve();
8+
}, 1000);
9+
});
10+
}
11+
12+
async function collect(key: string): Promise<void> {
13+
const t1 = new Date().getTime();
14+
await waitOneSec();
15+
const t2 = new Date().getTime();
16+
PerformanceAnalysis.collect(key, t1, t2);
17+
}
18+
19+
describe('Performance Analysis', () => {
20+
beforeEach(() => {
21+
PerformanceAnalysis.clean();
22+
if (fs.existsSync('performanceRecord.txt')) {
23+
fs.unlinkSync('performanceRecord.txt');
24+
}
25+
});
26+
afterEach(() => {
27+
PerformanceAnalysis.clean();
28+
if (fs.existsSync('performanceRecord.txt')) {
29+
fs.unlinkSync('performanceRecord.txt');
30+
}
31+
});
32+
it('should collect performance data', async () => {
33+
await collect('waitOneSec');
34+
expect(PerformanceAnalysis.count()).toBeTruthy();
35+
});
36+
it('should write performance data to file', async () => {
37+
await collect('waitOneSec');
38+
await collect('waitAnotherSec');
39+
PerformanceAnalysis.count();
40+
getAnalysis();
41+
expect(fs.existsSync('performanceRecord.txt')).toBeTruthy();
42+
// expect file have two lines
43+
const data = fs.readFileSync('performanceRecord.txt', 'utf8');
44+
const lines = data.split('\n').filter((line) => line !== '');
45+
expect(lines.length).toBe(6);
46+
});
47+
});

packages/extension-driver-duckdb/README.md

+4
Original file line numberDiff line numberDiff line change
@@ -41,3 +41,7 @@
4141
url_style?: string
4242
use_ssl?: boolean
4343
```
44+
45+
4. Environment Variables
46+
- DUCKDB_EXECUTE_CHUNK_SIZE: Optional, dafult 2000. The data chunk size, we will acquire this size of data using conn.all() at once and get the rest of data using conn.stream to prevent from OOM, this parameter will affect the **API performance** and **server memory usage**.
47+
- DUCKDB_THREADS: Optional, if not been set, use used duckdb default thread value.

packages/extension-driver-duckdb/src/lib/duckdbDataSource.ts

+100-35
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import {
1414
VulcanExtensionId,
1515
} from '@vulcan-sql/core';
1616
import * as path from 'path';
17-
import { buildSQL } from './sqlBuilder';
17+
import { buildSQL, chunkSize } from './sqlBuilder';
1818
import { DuckDBExtensionLoader } from './duckdbExtensionLoader';
1919

2020
const getType = (value: any) => {
@@ -99,49 +99,31 @@ export class DuckDBDataSource extends DataSource<any, DuckDBOptions> {
9999
}
100100
const { db, configurationParameters, ...options } =
101101
this.dbMapping.get(profileName)!;
102-
const builtSQL = buildSQL(sql, operations);
102+
const [firstDataSQL, restDataSQL] = buildSQL(sql, operations);
103+
103104
// create new connection for each query
105+
const parameters = Array.from(bindParams.values());
106+
this.logRequest(firstDataSQL, parameters, options);
104107
const connection = db.connect();
105108
await this.loadExtensions(connection, configurationParameters);
106-
const statement = connection.prepare(builtSQL);
107-
const parameters = Array.from(bindParams.values());
108-
this.logRequest(builtSQL, parameters, options);
109-
110-
const result = await statement.stream(...parameters);
111-
const firstChunk = await result.nextChunk();
109+
if (restDataSQL) this.logRequest(restDataSQL, parameters, options);
110+
const [firstData, restDataStream] = await this.acquireData(
111+
firstDataSQL,
112+
restDataSQL,
113+
parameters,
114+
db
115+
);
116+
const readable = this.createReadableStream(firstData, restDataStream);
112117
return {
113118
getColumns: () => {
114-
if (!firstChunk || firstChunk.length === 0) return [];
115-
return Object.keys(firstChunk[0]).map((columnName) => ({
119+
if (!firstData || firstData.length === 0) return [];
120+
return Object.keys(firstData[0]).map((columnName) => ({
116121
name: columnName,
117-
type: getType(firstChunk[0][columnName as any]),
122+
type: getType(firstData[0][columnName as any]),
118123
}));
119124
},
120125
getData: () => {
121-
const stream = new Readable({
122-
objectMode: true,
123-
read() {
124-
result.nextChunk().then((chunk) => {
125-
if (!chunk) {
126-
this.push(null);
127-
return;
128-
}
129-
for (const row of chunk) {
130-
this.push(row);
131-
}
132-
});
133-
},
134-
});
135-
// Send the first chunk
136-
if (firstChunk) {
137-
for (const row of firstChunk) {
138-
stream.push(row);
139-
}
140-
} else {
141-
// If there is no data, close the stream.
142-
stream.push(null);
143-
}
144-
return stream;
126+
return readable;
145127
},
146128
};
147129
}
@@ -150,6 +132,73 @@ export class DuckDBDataSource extends DataSource<any, DuckDBOptions> {
150132
return `$${parameterIndex}`;
151133
}
152134

135+
private createReadableStream(
136+
firstData: duckdb.TableData,
137+
restDataStream: duckdb.QueryResult | undefined
138+
) {
139+
const readable = new Readable({
140+
objectMode: true,
141+
read: function () {
142+
for (const row of firstData) {
143+
this.push(row);
144+
}
145+
this.push(null);
146+
},
147+
});
148+
if (firstData.length >= chunkSize) {
149+
readable._read = async function () {
150+
if (restDataStream) {
151+
for await (const row of restDataStream) {
152+
this.push(row);
153+
}
154+
this.push(null);
155+
}
156+
};
157+
if (firstData) {
158+
for (const row of firstData) {
159+
readable.push(row);
160+
}
161+
}
162+
}
163+
return readable;
164+
}
165+
166+
private async acquireData(
167+
firstDataSql: string,
168+
restDataSql: string | undefined,
169+
parameters: any[],
170+
db: duckdb.Database
171+
) {
172+
// conn.all() is faster then stream.checkChunk().
173+
// For the small size data we use conn.all() to get the data at once
174+
// To limit memory use and prevent server crashes, we will use conn.all() to acquire the initial chunk of data, then conn.stream() to receive the remainder of the data.
175+
return await Promise.all([
176+
new Promise<duckdb.TableData>((resolve, reject) => {
177+
const c = db.connect();
178+
c.all(
179+
firstDataSql,
180+
...parameters,
181+
(err: duckdb.DuckDbError | null, res: duckdb.TableData) => {
182+
if (err) {
183+
reject(err);
184+
}
185+
resolve(res);
186+
}
187+
);
188+
}),
189+
new Promise<duckdb.QueryResult | undefined>((resolve, reject) => {
190+
if (!restDataSql) resolve(undefined);
191+
try {
192+
const c = db.connect();
193+
const result = c.stream(restDataSql, ...parameters);
194+
resolve(result);
195+
} catch (err: any) {
196+
reject(err);
197+
}
198+
}),
199+
]);
200+
}
201+
153202
private logRequest(
154203
sql: string,
155204
parameters: string[],
@@ -246,9 +295,25 @@ export class DuckDBDataSource extends DataSource<any, DuckDBOptions> {
246295
});
247296
}
248297

298+
// The dafault duckdb thread is 16
299+
// Setting thread below your CPU core number may result in enhanced performance, according to our observations.
300+
private async setThread(db: duckdb.Database) {
301+
const thread = process.env['DUCKDB_THREADS'];
302+
303+
if (!thread) return;
304+
await new Promise((resolve, reject) => {
305+
db.run(`SET threads=${Number(thread)}`, (err: any) => {
306+
if (err) reject(err);
307+
this.logger.debug(`Set thread to ${thread}`);
308+
resolve(true);
309+
});
310+
});
311+
}
312+
249313
private async initDatabase(dbPath: string) {
250314
const db = new duckdb.Database(dbPath);
251315
const conn = db.connect();
316+
await this.setThread(db);
252317
await this.installExtensions(conn);
253318
return db;
254319
}

0 commit comments

Comments
 (0)