1
1
import * as cluster from 'cluster' ;
2
+ import * as inspector from 'inspector' ;
2
3
import { Server } from 'net' ;
3
- import { BehaviorSubject , forkJoin , Observable , of } from 'rxjs' ;
4
- import { distinctUntilChanged , switchMap } from 'rxjs/operators' ;
4
+ import { BehaviorSubject , forkJoin , Observable , of , race , timer } from 'rxjs' ;
5
+ import { distinctUntilChanged , switchMap , take , tap } from 'rxjs/operators' ;
5
6
import { LoggingService } from './logger' ;
6
7
7
8
const EXIT_SIGNALS : string [ ] = [
@@ -17,8 +18,6 @@ export class HealthService {
17
18
18
19
private _servers : Server [ ] = [ ] ;
19
20
private _services = [ ] ;
20
-
21
- private _timeouts = { } ;
22
21
private _listeners = [ ] ;
23
22
24
23
constructor (
@@ -33,6 +32,30 @@ export class HealthService {
33
32
} )
34
33
) ;
35
34
} ) ;
35
+
36
+ if ( cluster . isWorker ) {
37
+ cluster . worker . on ( 'message' , ( msg ) => {
38
+ if ( EXIT_SIGNALS . indexOf ( msg ) >= 0 ) {
39
+ if ( ! this . _isHealthy . value ) {
40
+ this . _logger . log ( `${ msg } ignored. Process is already terminating` ) ;
41
+ return ;
42
+ }
43
+ this . _isHealthy . next ( false ) ;
44
+ this . _logger . log ( `shutting down...` ) ;
45
+ this . _cleanup ( msg )
46
+ . subscribe (
47
+ _ => {
48
+ this . _logger . log ( 'Goodbye.' ) ;
49
+ process . exit ( 0 ) ;
50
+ } ,
51
+ err => {
52
+ this . _logger . logError ( err ) ;
53
+ this . _logger . log ( 'Goodbye.' ) ;
54
+ process . exit ( 1 ) ;
55
+ } ) ;
56
+ }
57
+ } ) ;
58
+ }
36
59
}
37
60
38
61
setHealthy ( isHealthy : boolean ) {
@@ -71,18 +94,28 @@ export class HealthService {
71
94
return alive ;
72
95
}
73
96
74
- private _cleanup ( signal : string ) {
97
+ private _cleanup ( signal ) {
98
+ if ( inspector && inspector . url ( ) ) {
99
+ this . _logger . log ( 'closing inspector' ) ;
100
+ inspector . close ( ) ;
101
+ }
75
102
if ( cluster . isMaster ) {
76
103
return forkJoin ( [
77
104
of ( true ) ,
78
105
...( this . _servers . map ( s => this . _closeServer ( s ) ) )
79
106
] ) . pipe (
80
107
switchMap ( _ => {
81
- const workers = this . getLivingWorkers ( ) || [ ] ;
82
- return forkJoin ( [
83
- of ( true ) ,
84
- ...( workers . map ( w => this . _cleanupWorker ( w , signal ) ) )
85
- ] ) ;
108
+ return race (
109
+ this . _cleanupWorkers ( signal ) ,
110
+ timer ( 10 * 1000 )
111
+ . pipe (
112
+ take ( 1 ) ,
113
+ tap ( _ => {
114
+ this . _logger . log ( '[ master ]: Workers took too long to shut down' ) ;
115
+ this . getLivingWorkers ( ) . forEach ( w => w . kill ( 'SIGKILL' ) ) ;
116
+ } )
117
+ )
118
+ ) ;
86
119
} ) ,
87
120
switchMap ( _ => {
88
121
return forkJoin ( [
@@ -108,24 +141,25 @@ export class HealthService {
108
141
}
109
142
110
143
private _handleExitSignal ( signal : string ) {
111
- if ( ! this . _isHealthy . value ) {
112
- this . _logger . log ( `${ signal } ignored. Process is already terminating` ) ;
144
+ if ( cluster . isMaster ) {
145
+ if ( ! this . _isHealthy . value ) {
146
+ this . _logger . log ( `${ signal } ignored. Process is already terminating` ) ;
147
+ return ;
148
+ }
149
+ this . _isHealthy . next ( false ) ;
150
+ this . _logger . log ( `[ master ]: caught ${ signal } . shutting down...` ) ;
151
+ this . _cleanup ( signal )
152
+ . subscribe ( _ => {
153
+ this . _logger . log ( `[ master ]: Goodbye.` ) ;
154
+ process . exit ( 0 )
155
+ } , err => {
156
+ this . _logger . logError ( err ) ;
157
+ process . exit ( 1 ) ;
158
+ } ) ;
159
+ } else {
160
+ // workers wait to die
113
161
return ;
114
162
}
115
- this . _isHealthy . next ( false ) ;
116
- this . _logger . log ( `${ cluster . isMaster ? '[ master ]: ' : '' } caught ${ signal } . shutting down...` ) ;
117
- this . _cleanup ( signal )
118
- . subscribe ( _ => {
119
- if ( cluster . isMaster ) {
120
- this . _logger . log ( `[ master ]: Goodbye.` ) ;
121
- } else {
122
- this . _logger . log ( `Goodbye.` ) ;
123
- }
124
- process . exit ( 0 )
125
- } , err => {
126
- this . _logger . logError ( err ) ;
127
- process . exit ( 1 ) ;
128
- } ) ;
129
163
}
130
164
131
165
private _closeServer ( server : Server ) : Observable < any > {
@@ -137,24 +171,39 @@ export class HealthService {
137
171
} ) ;
138
172
}
139
173
140
- private _cleanupWorker ( worker : cluster . Worker , signal : string ) : Observable < any > {
174
+ private _cleanupWorkers ( signal ) : Observable < any > {
175
+ if ( ! cluster . isMaster ) {
176
+ return of ( true ) ;
177
+ }
178
+ this . getLivingWorkers ( ) . forEach ( w => {
179
+ w . send ( signal ) ;
180
+ } ) ;
141
181
return new Observable ( obs => {
142
- worker . on ( 'exit' , ( code , signal ) => {
143
- if ( this . _timeouts [ worker . id ] ) {
144
- clearTimeout ( this . _timeouts [ worker . id ] ) ;
145
- }
146
- obs . next ( `${ worker . id } exited` ) ;
182
+ cluster . disconnect ( ( ) => {
183
+ obs . next ( true ) ;
147
184
obs . complete ( ) ;
148
185
} ) ;
149
-
150
- worker . kill ( signal ) ;
151
- this . _timeouts [ worker . id ] = setTimeout ( ( ) => {
152
- this . _logger . log ( 'worker shutdown time expired. forcefully killing worker' , worker . id ) ;
153
- worker . process . kill ( 'SIGKILL' ) ;
154
- } , 6000 ) ; // based on maximum `keep-alive` timeout
155
186
} ) ;
156
187
}
157
188
189
+ // private _cleanupWorker(worker: cluster.Worker, signal: string): Observable<any> {
190
+ // return new Observable(obs => {
191
+ // worker.on('exit', (code, signal) => {
192
+ // if (this._timeouts[worker.id]) {
193
+ // clearTimeout(this._timeouts[worker.id]);
194
+ // }
195
+ // obs.next(`${worker.id} exited`);
196
+ // obs.complete();
197
+ // });
198
+
199
+ // worker.kill(signal);
200
+ // this._timeouts[worker.id] = setTimeout(() => {
201
+ // this._logger.log('worker shutdown time expired. forcefully killing worker', worker.id);
202
+ // worker.process.kill('SIGKILL');
203
+ // }, 6000); // based on maximum `keep-alive` timeout
204
+ // });
205
+ // }
206
+
158
207
private _cleanupService ( service ) : Observable < any > {
159
208
if ( service && service . cleanup ) {
160
209
return service . cleanup ( ) ;
0 commit comments