@@ -99,20 +99,84 @@ export class DuckDBDataSource extends DataSource<any, DuckDBOptions> {
99
99
}
100
100
const { db, configurationParameters, ...options } =
101
101
this . dbMapping . get ( profileName ) ! ;
102
- const [ builtSQL , streamSQL ] = buildSQL ( sql , operations ) ;
102
+ const [ firstDataSQL , restDataSQL ] = buildSQL ( sql , operations ) ;
103
103
104
104
// create new connection for each query
105
+ const parameters = Array . from ( bindParams . values ( ) ) ;
106
+ this . logRequest ( firstDataSQL , parameters , options ) ;
105
107
const connection = db . connect ( ) ;
106
108
await this . loadExtensions ( connection , configurationParameters ) ;
107
- const parameters = Array . from ( bindParams . values ( ) ) ;
108
- this . logRequest ( builtSQL , parameters , options ) ;
109
- if ( streamSQL ) this . logRequest ( streamSQL , parameters , options ) ;
109
+ if ( restDataSQL ) this . logRequest ( restDataSQL , parameters , options ) ;
110
+ const [ firstData , restDataStream ] = await this . acquireData (
111
+ firstDataSQL ,
112
+ restDataSQL ,
113
+ parameters ,
114
+ db
115
+ ) ;
116
+ const readable = this . createReadableStream ( firstData , restDataStream ) ;
117
+ return {
118
+ getColumns : ( ) => {
119
+ if ( ! firstData || firstData . length === 0 ) return [ ] ;
120
+ return Object . keys ( firstData [ 0 ] ) . map ( ( columnName ) => ( {
121
+ name : columnName ,
122
+ type : getType ( firstData [ 0 ] [ columnName as any ] ) ,
123
+ } ) ) ;
124
+ } ,
125
+ getData : ( ) => {
126
+ return readable ;
127
+ } ,
128
+ } ;
129
+ }
130
+
131
+ public async prepare ( { parameterIndex } : RequestParameter ) {
132
+ return `$${ parameterIndex } ` ;
133
+ }
134
+
135
+ private createReadableStream (
136
+ firstData : duckdb . TableData ,
137
+ restDataStream : duckdb . QueryResult | undefined
138
+ ) {
139
+ const readable = new Readable ( {
140
+ objectMode : true ,
141
+ read : function ( ) {
142
+ for ( const row of firstData ) {
143
+ this . push ( row ) ;
144
+ }
145
+ this . push ( null ) ;
146
+ } ,
147
+ } ) ;
148
+ if ( firstData . length >= chunkSize ) {
149
+ readable . _read = async function ( ) {
150
+ if ( restDataStream ) {
151
+ for await ( const row of restDataStream ) {
152
+ this . push ( row ) ;
153
+ }
154
+ this . push ( null ) ;
155
+ }
156
+ } ;
157
+ if ( firstData ) {
158
+ for ( const row of firstData ) {
159
+ readable . push ( row ) ;
160
+ }
161
+ }
162
+ }
163
+ return readable ;
164
+ }
110
165
111
- const [ result , asyncIterable ] = await Promise . all ( [
166
+ private async acquireData (
167
+ firstDataSql : string ,
168
+ restDataSql : string | undefined ,
169
+ parameters : any [ ] ,
170
+ db : duckdb . Database
171
+ ) {
172
+ // conn.all() is faster then stream.checkChunk().
173
+ // For the small size data we use conn.all() to get the data at once
174
+ // To limit memory use and prevent server crashes, we will use conn.all() to acquire the initial chunk of data, then conn.stream() to receive the remainder of the data.
175
+ return await Promise . all ( [
112
176
new Promise < duckdb . TableData > ( ( resolve , reject ) => {
113
177
const c = db . connect ( ) ;
114
178
c . all (
115
- builtSQL ,
179
+ firstDataSql ,
116
180
...parameters ,
117
181
( err : duckdb . DuckDbError | null , res : duckdb . TableData ) => {
118
182
if ( err ) {
@@ -123,56 +187,16 @@ export class DuckDBDataSource extends DataSource<any, DuckDBOptions> {
123
187
) ;
124
188
} ) ,
125
189
new Promise < duckdb . QueryResult | undefined > ( ( resolve , reject ) => {
126
- if ( ! streamSQL ) resolve ( undefined ) ;
190
+ if ( ! restDataSql ) resolve ( undefined ) ;
127
191
try {
128
192
const c = db . connect ( ) ;
129
- const result = c . stream ( streamSQL , ...parameters ) ;
193
+ const result = c . stream ( restDataSql , ...parameters ) ;
130
194
resolve ( result ) ;
131
195
} catch ( err : any ) {
132
196
reject ( err ) ;
133
197
}
134
198
} ) ,
135
199
] ) ;
136
- const asyncIterableStream = new Readable ( {
137
- objectMode : true ,
138
- read : function ( ) {
139
- for ( const row of result ) {
140
- this . push ( row ) ;
141
- }
142
- this . push ( null ) ;
143
- } ,
144
- } ) ;
145
- if ( result . length >= chunkSize ) {
146
- asyncIterableStream . _read = async function ( ) {
147
- if ( asyncIterable ) {
148
- for await ( const row of asyncIterable ) {
149
- this . push ( row ) ;
150
- }
151
- this . push ( null ) ;
152
- }
153
- } ;
154
- if ( result ) {
155
- for ( const row of result ) {
156
- asyncIterableStream . push ( row ) ;
157
- }
158
- }
159
- }
160
- return {
161
- getColumns : ( ) => {
162
- if ( ! result || result . length === 0 ) return [ ] ;
163
- return Object . keys ( result [ 0 ] ) . map ( ( columnName ) => ( {
164
- name : columnName ,
165
- type : getType ( result [ 0 ] [ columnName as any ] ) ,
166
- } ) ) ;
167
- } ,
168
- getData : ( ) => {
169
- return asyncIterableStream ;
170
- } ,
171
- } ;
172
- }
173
-
174
- public async prepare ( { parameterIndex } : RequestParameter ) {
175
- return `$${ parameterIndex } ` ;
176
200
}
177
201
178
202
private logRequest (
@@ -271,9 +295,10 @@ export class DuckDBDataSource extends DataSource<any, DuckDBOptions> {
271
295
} ) ;
272
296
}
273
297
274
- // set duckdb thread to number
298
+ // The dafault duckdb thread is 16
299
+ // Setting thread below your CPU core number may result in enhanced performance, according to our observations.
275
300
private async setThread ( db : duckdb . Database ) {
276
- const thread = process . env [ 'THREADS ' ] ;
301
+ const thread = process . env [ 'DUCKDB_THREADS ' ] ;
277
302
278
303
if ( ! thread ) return ;
279
304
await new Promise ( ( resolve , reject ) => {
0 commit comments