@@ -3,7 +3,7 @@ import { Injectable, InternalServerErrorException, Logger } from '@nestjs/common
3
3
import { Database } from 'src/modules/database/models/database' ;
4
4
import apiConfig from 'src/utils/config' ;
5
5
import { ConnectionOptions } from 'tls' ;
6
- import { isEmpty , isNumber } from 'lodash' ;
6
+ import { isEmpty , isNumber , set } from 'lodash' ;
7
7
import { cloneClassInstance , generateRedisConnectionName } from 'src/utils' ;
8
8
import { ConnectionType } from 'src/modules/database/entities/database.entity' ;
9
9
import { ClientMetadata } from 'src/common/models' ;
@@ -164,16 +164,21 @@ export class RedisConnectionFactory {
164
164
options : IRedisConnectionOptions ,
165
165
) : Promise < Redis > {
166
166
let tnl ;
167
+ let connection : Redis ;
167
168
168
169
try {
169
170
const config = await this . getRedisOptions ( clientMetadata , database , options ) ;
171
+ // cover cases when we are connecting to sentinel using standalone client to discover master groups
172
+ const dbIndex = config . db > 0 && ! database . sentinelMaster ? config . db : 0 ;
170
173
171
174
if ( database . ssh ) {
172
175
tnl = await this . sshTunnelProvider . createTunnel ( database ) ;
173
176
}
174
177
175
178
return await new Promise ( ( resolve , reject ) => {
176
179
try {
180
+ let lastError : Error ;
181
+
177
182
if ( tnl ) {
178
183
tnl . on ( 'error' , ( error ) => {
179
184
reject ( error ) ;
@@ -187,31 +192,45 @@ export class RedisConnectionFactory {
187
192
config . port = tnl . serverAddress . port ;
188
193
}
189
194
190
- const connection = new Redis ( {
195
+ connection = new Redis ( {
191
196
...config ,
192
- // cover cases when we are connecting to sentinel as to standalone to discover master groups
193
- db : config . db > 0 && ! database . sentinelMaster ? config . db : 0 ,
197
+ db : 0 ,
194
198
} ) ;
195
199
connection . on ( 'error' , ( e ) : void => {
196
200
this . logger . error ( 'Failed connection to the redis database.' , e ) ;
197
- reject ( e ) ;
201
+ lastError = e ;
198
202
} ) ;
199
203
connection . on ( 'end' , ( ) : void => {
200
- this . logger . error ( ERROR_MESSAGES . SERVER_CLOSED_CONNECTION ) ;
201
- reject ( new InternalServerErrorException ( ERROR_MESSAGES . SERVER_CLOSED_CONNECTION ) ) ;
204
+ this . logger . error ( ERROR_MESSAGES . UNABLE_TO_ESTABLISH_CONNECTION , lastError ) ;
205
+ reject ( lastError || new InternalServerErrorException ( ERROR_MESSAGES . SERVER_CLOSED_CONNECTION ) ) ;
202
206
} ) ;
203
207
connection . on ( 'ready' , ( ) : void => {
208
+ lastError = null ;
204
209
this . logger . log ( 'Successfully connected to the redis database' ) ;
205
- resolve ( connection ) ;
210
+
211
+ // manual switch to particular logical db
212
+ // since ioredis doesn't handle "select" command error during connection
213
+ if ( dbIndex > 0 ) {
214
+ connection . select ( dbIndex )
215
+ . then ( ( ) => {
216
+ set ( connection , [ 'options' , 'db' ] , dbIndex ) ;
217
+ resolve ( connection ) ;
218
+ } )
219
+ . catch ( reject ) ;
220
+ } else {
221
+ resolve ( connection ) ;
222
+ }
206
223
} ) ;
207
224
connection . on ( 'reconnecting' , ( ) : void => {
208
- this . logger . log ( 'Reconnecting to the redis database' ) ;
225
+ lastError = null ;
226
+ this . logger . log ( ERROR_MESSAGES . RECONNECTING_TO_DATABASE ) ;
209
227
} ) ;
210
228
} catch ( e ) {
211
229
reject ( e ) ;
212
230
}
213
231
} ) as Redis ;
214
232
} catch ( e ) {
233
+ connection ?. disconnect ?.( ) ;
215
234
tnl ?. close ?.( ) ;
216
235
throw e ;
217
236
}
@@ -228,36 +247,66 @@ export class RedisConnectionFactory {
228
247
database : Database ,
229
248
options : IRedisConnectionOptions ,
230
249
) : Promise < Cluster > {
231
- const config = await this . getRedisClusterOptions ( clientMetadata , database , options ) ;
250
+ let connection : Cluster ;
232
251
233
- if ( database . ssh ) {
234
- throw new Error ( 'SSH is unsupported for cluster databases.' ) ;
235
- }
252
+ try {
253
+ const config = await this . getRedisClusterOptions ( clientMetadata , database , options ) ;
236
254
237
- return new Promise ( ( resolve , reject ) => {
238
- try {
239
- const cluster = new Cluster ( [ {
240
- host : database . host ,
241
- port : database . port ,
242
- } ] . concat ( database . nodes ) , {
243
- ...config ,
244
- } ) ;
245
- cluster . on ( 'error' , ( e ) : void => {
246
- this . logger . error ( 'Failed connection to the redis oss cluster' , e ) ;
247
- reject ( ! isEmpty ( e . lastNodeError ) ? e . lastNodeError : e ) ;
248
- } ) ;
249
- cluster . on ( 'end' , ( ) : void => {
250
- this . logger . error ( ERROR_MESSAGES . SERVER_CLOSED_CONNECTION ) ;
251
- reject ( new InternalServerErrorException ( ERROR_MESSAGES . SERVER_CLOSED_CONNECTION ) ) ;
252
- } ) ;
253
- cluster . on ( 'ready' , ( ) : void => {
254
- this . logger . log ( 'Successfully connected to the redis oss cluster.' ) ;
255
- resolve ( cluster ) ;
256
- } ) ;
257
- } catch ( e ) {
258
- reject ( e ) ;
255
+ if ( database . ssh ) {
256
+ throw new Error ( 'SSH is unsupported for cluster databases.' ) ;
259
257
}
260
- } ) ;
258
+
259
+ return await ( new Promise ( ( resolve , reject ) => {
260
+ try {
261
+ let lastError : Error ;
262
+
263
+ connection = new Cluster ( [ {
264
+ host : database . host ,
265
+ port : database . port ,
266
+ } ] . concat ( database . nodes ) , {
267
+ ...config ,
268
+ redisOptions : {
269
+ ...config . redisOptions ,
270
+ db : 0 ,
271
+ } ,
272
+ } ) ;
273
+ connection . on ( 'error' , ( e ) : void => {
274
+ this . logger . error ( 'Failed connection to the redis oss cluster' , e ) ;
275
+ lastError = ! isEmpty ( e . lastNodeError ) ? e . lastNodeError : e ;
276
+ } ) ;
277
+ connection . on ( 'end' , ( ) : void => {
278
+ this . logger . error ( ERROR_MESSAGES . UNABLE_TO_ESTABLISH_CONNECTION , lastError ) ;
279
+ reject ( lastError || new InternalServerErrorException ( ERROR_MESSAGES . SERVER_CLOSED_CONNECTION ) ) ;
280
+ } ) ;
281
+ connection . on ( 'ready' , ( ) : void => {
282
+ lastError = null ;
283
+ this . logger . log ( 'Successfully connected to the redis oss cluster.' ) ;
284
+
285
+ // manual switch to particular logical db
286
+ // since ioredis doesn't handle "select" command error during connection
287
+ if ( config . redisOptions . db > 0 ) {
288
+ connection . select ( config . redisOptions . db )
289
+ . then ( ( ) => {
290
+ set ( connection , [ 'options' , 'db' ] , config . redisOptions . db ) ;
291
+ resolve ( connection ) ;
292
+ } )
293
+ . catch ( reject ) ;
294
+ } else {
295
+ resolve ( connection ) ;
296
+ }
297
+ } ) ;
298
+ connection . on ( 'reconnecting' , ( ) : void => {
299
+ lastError = null ;
300
+ this . logger . log ( ERROR_MESSAGES . RECONNECTING_TO_DATABASE ) ;
301
+ } ) ;
302
+ } catch ( e ) {
303
+ reject ( e ) ;
304
+ }
305
+ } ) ) ;
306
+ } catch ( e ) {
307
+ connection ?. disconnect ?.( ) ;
308
+ throw e ;
309
+ }
261
310
}
262
311
263
312
/**
@@ -271,27 +320,56 @@ export class RedisConnectionFactory {
271
320
database : Database ,
272
321
options : IRedisConnectionOptions ,
273
322
) : Promise < Redis > {
274
- const config = await this . getRedisSentinelOptions ( clientMetadata , database , options ) ;
323
+ let connection : Redis ;
275
324
276
- return new Promise ( ( resolve , reject ) => {
277
- try {
278
- const client = new Redis ( config ) ;
279
- client . on ( 'error' , ( e ) : void => {
280
- this . logger . error ( 'Failed connection to the redis oss sentinel' , e ) ;
325
+ try {
326
+ const config = await this . getRedisSentinelOptions ( clientMetadata , database , options ) ;
327
+
328
+ return await ( new Promise ( ( resolve , reject ) => {
329
+ try {
330
+ let lastError : Error ;
331
+
332
+ connection = new Redis ( {
333
+ ...config ,
334
+ db : 0 ,
335
+ } ) ;
336
+ connection . on ( 'error' , ( e ) : void => {
337
+ this . logger . error ( 'Failed connection to the redis oss sentinel' , e ) ;
338
+ lastError = e ;
339
+ } ) ;
340
+ connection . on ( 'end' , ( ) : void => {
341
+ this . logger . error ( ERROR_MESSAGES . UNABLE_TO_ESTABLISH_CONNECTION , lastError ) ;
342
+ reject ( lastError || new InternalServerErrorException ( ERROR_MESSAGES . SERVER_CLOSED_CONNECTION ) ) ;
343
+ } ) ;
344
+ connection . on ( 'ready' , ( ) : void => {
345
+ lastError = null ;
346
+ this . logger . log ( 'Successfully connected to the redis oss sentinel.' ) ;
347
+
348
+ // manual switch to particular logical db
349
+ // since ioredis doesn't handle "select" command error during connection
350
+ if ( config . db > 0 ) {
351
+ connection . select ( config . db )
352
+ . then ( ( ) => {
353
+ set ( connection , [ 'options' , 'db' ] , config . db ) ;
354
+ resolve ( connection ) ;
355
+ } )
356
+ . catch ( reject ) ;
357
+ } else {
358
+ resolve ( connection ) ;
359
+ }
360
+ } ) ;
361
+ connection . on ( 'reconnecting' , ( ) : void => {
362
+ lastError = null ;
363
+ this . logger . log ( ERROR_MESSAGES . RECONNECTING_TO_DATABASE ) ;
364
+ } ) ;
365
+ } catch ( e ) {
281
366
reject ( e ) ;
282
- } ) ;
283
- client . on ( 'end' , ( ) : void => {
284
- this . logger . error ( ERROR_MESSAGES . SERVER_CLOSED_CONNECTION ) ;
285
- reject ( new InternalServerErrorException ( ERROR_MESSAGES . SERVER_CLOSED_CONNECTION ) ) ;
286
- } ) ;
287
- client . on ( 'ready' , ( ) : void => {
288
- this . logger . log ( 'Successfully connected to the redis oss sentinel.' ) ;
289
- resolve ( client ) ;
290
- } ) ;
291
- } catch ( e ) {
292
- reject ( e ) ;
293
- }
294
- } ) ;
367
+ }
368
+ } ) ) ;
369
+ } catch ( e ) {
370
+ connection ?. disconnect ?.( ) ;
371
+ throw e ;
372
+ }
295
373
}
296
374
297
375
/**
0 commit comments