22
22
import sys
23
23
import traceback
24
24
import warnings
25
- from typing import TYPE_CHECKING , Awaitable , Callable , Iterable
25
+ from typing import (
26
+ TYPE_CHECKING ,
27
+ Any ,
28
+ Awaitable ,
29
+ Callable ,
30
+ Collection ,
31
+ Dict ,
32
+ Iterable ,
33
+ List ,
34
+ NoReturn ,
35
+ Tuple ,
36
+ cast ,
37
+ )
26
38
27
39
from cryptography .utils import CryptographyDeprecationWarning
28
- from typing_extensions import NoReturn
29
40
30
41
import twisted
31
- from twisted .internet import defer , error , reactor
42
+ from twisted .internet import defer , error , reactor as _reactor
43
+ from twisted .internet .interfaces import IOpenSSLContextFactory , IReactorSSL , IReactorTCP
44
+ from twisted .internet .protocol import ServerFactory
45
+ from twisted .internet .tcp import Port
32
46
from twisted .logger import LoggingFile , LogLevel
33
47
from twisted .protocols .tls import TLSMemoryBIOFactory
34
48
from twisted .python .threadpool import ThreadPool
48
62
from synapse .metrics import register_threadpool
49
63
from synapse .metrics .background_process_metrics import wrap_as_background_process
50
64
from synapse .metrics .jemalloc import setup_jemalloc_stats
65
+ from synapse .types import ISynapseReactor
51
66
from synapse .util .caches .lrucache import setup_expire_lru_cache_entries
52
67
from synapse .util .daemonize import daemonize_process
53
68
from synapse .util .gai_resolver import GAIResolver
57
72
if TYPE_CHECKING :
58
73
from synapse .server import HomeServer
59
74
75
+ # Twisted injects the global reactor to make it easier to import, this confuses
76
+ # mypy which thinks it is a module. Tell it that it a more proper type.
77
+ reactor = cast (ISynapseReactor , _reactor )
78
+
79
+
60
80
logger = logging .getLogger (__name__ )
61
81
62
82
# list of tuples of function, args list, kwargs dict
63
- _sighup_callbacks = []
83
+ _sighup_callbacks : List [
84
+ Tuple [Callable [..., None ], Tuple [Any , ...], Dict [str , Any ]]
85
+ ] = []
64
86
65
87
66
- def register_sighup (func , * args , ** kwargs ) :
88
+ def register_sighup (func : Callable [..., None ], * args : Any , ** kwargs : Any ) -> None :
67
89
"""
68
90
Register a function to be called when a SIGHUP occurs.
69
91
70
92
Args:
71
- func (function) : Function to be called when sent a SIGHUP signal.
93
+ func: Function to be called when sent a SIGHUP signal.
72
94
*args, **kwargs: args and kwargs to be passed to the target function.
73
95
"""
74
96
_sighup_callbacks .append ((func , args , kwargs ))
75
97
76
98
77
- def start_worker_reactor (appname , config , run_command = reactor .run ):
99
+ def start_worker_reactor (
100
+ appname : str ,
101
+ config : HomeServerConfig ,
102
+ run_command : Callable [[], None ] = reactor .run ,
103
+ ) -> None :
78
104
"""Run the reactor in the main process
79
105
80
106
Daemonizes if necessary, and then configures some resources, before starting
81
107
the reactor. Pulls configuration from the 'worker' settings in 'config'.
82
108
83
109
Args:
84
- appname (str) : application name which will be sent to syslog
85
- config (synapse.config.Config) : config object
86
- run_command (Callable[]) : callable that actually runs the reactor
110
+ appname: application name which will be sent to syslog
111
+ config: config object
112
+ run_command: callable that actually runs the reactor
87
113
"""
88
114
89
115
logger = logging .getLogger (config .worker .worker_app )
@@ -101,32 +127,32 @@ def start_worker_reactor(appname, config, run_command=reactor.run):
101
127
102
128
103
129
def start_reactor (
104
- appname ,
105
- soft_file_limit ,
106
- gc_thresholds ,
107
- pid_file ,
108
- daemonize ,
109
- print_pidfile ,
110
- logger ,
111
- run_command = reactor .run ,
112
- ):
130
+ appname : str ,
131
+ soft_file_limit : int ,
132
+ gc_thresholds : Tuple [ int , int , int ] ,
133
+ pid_file : str ,
134
+ daemonize : bool ,
135
+ print_pidfile : bool ,
136
+ logger : logging . Logger ,
137
+ run_command : Callable [[], None ] = reactor .run ,
138
+ ) -> None :
113
139
"""Run the reactor in the main process
114
140
115
141
Daemonizes if necessary, and then configures some resources, before starting
116
142
the reactor
117
143
118
144
Args:
119
- appname (str) : application name which will be sent to syslog
120
- soft_file_limit (int) :
145
+ appname: application name which will be sent to syslog
146
+ soft_file_limit:
121
147
gc_thresholds:
122
- pid_file (str) : name of pid file to write to if daemonize is True
123
- daemonize (bool) : true to run the reactor in a background process
124
- print_pidfile (bool) : whether to print the pid file, if daemonize is True
125
- logger (logging.Logger) : logger instance to pass to Daemonize
126
- run_command (Callable[]) : callable that actually runs the reactor
148
+ pid_file: name of pid file to write to if daemonize is True
149
+ daemonize: true to run the reactor in a background process
150
+ print_pidfile: whether to print the pid file, if daemonize is True
151
+ logger: logger instance to pass to Daemonize
152
+ run_command: callable that actually runs the reactor
127
153
"""
128
154
129
- def run ():
155
+ def run () -> None :
130
156
logger .info ("Running" )
131
157
setup_jemalloc_stats ()
132
158
change_resource_limit (soft_file_limit )
@@ -185,7 +211,7 @@ def redirect_stdio_to_logs() -> None:
185
211
print ("Redirected stdout/stderr to logs" )
186
212
187
213
188
- def register_start (cb : Callable [..., Awaitable ], * args , ** kwargs ) -> None :
214
+ def register_start (cb : Callable [..., Awaitable ], * args : Any , ** kwargs : Any ) -> None :
189
215
"""Register a callback with the reactor, to be called once it is running
190
216
191
217
This can be used to initialise parts of the system which require an asynchronous
@@ -195,7 +221,7 @@ def register_start(cb: Callable[..., Awaitable], *args, **kwargs) -> None:
195
221
will exit.
196
222
"""
197
223
198
- async def wrapper ():
224
+ async def wrapper () -> None :
199
225
try :
200
226
await cb (* args , ** kwargs )
201
227
except Exception :
@@ -224,7 +250,7 @@ async def wrapper():
224
250
reactor .callWhenRunning (lambda : defer .ensureDeferred (wrapper ()))
225
251
226
252
227
- def listen_metrics (bind_addresses , port ) :
253
+ def listen_metrics (bind_addresses : Iterable [ str ] , port : int ) -> None :
228
254
"""
229
255
Start Prometheus metrics server.
230
256
"""
@@ -236,11 +262,11 @@ def listen_metrics(bind_addresses, port):
236
262
237
263
238
264
def listen_manhole (
239
- bind_addresses : Iterable [str ],
265
+ bind_addresses : Collection [str ],
240
266
port : int ,
241
267
manhole_settings : ManholeConfig ,
242
268
manhole_globals : dict ,
243
- ):
269
+ ) -> None :
244
270
# twisted.conch.manhole 21.1.0 uses "int_from_bytes", which produces a confusing
245
271
# warning. It's fixed by https://github.com/twisted/twisted/pull/1522), so
246
272
# suppress the warning for now.
@@ -259,12 +285,18 @@ def listen_manhole(
259
285
)
260
286
261
287
262
- def listen_tcp (bind_addresses , port , factory , reactor = reactor , backlog = 50 ):
288
+ def listen_tcp (
289
+ bind_addresses : Collection [str ],
290
+ port : int ,
291
+ factory : ServerFactory ,
292
+ reactor : IReactorTCP = reactor ,
293
+ backlog : int = 50 ,
294
+ ) -> List [Port ]:
263
295
"""
264
296
Create a TCP socket for a port and several addresses
265
297
266
298
Returns:
267
- list[ twisted.internet.tcp.Port]: listening for TCP connections
299
+ list of twisted.internet.tcp.Port listening for TCP connections
268
300
"""
269
301
r = []
270
302
for address in bind_addresses :
@@ -273,12 +305,19 @@ def listen_tcp(bind_addresses, port, factory, reactor=reactor, backlog=50):
273
305
except error .CannotListenError as e :
274
306
check_bind_error (e , address , bind_addresses )
275
307
276
- return r
308
+ # IReactorTCP returns an object implementing IListeningPort from listenTCP,
309
+ # but we know it will be a Port instance.
310
+ return r # type: ignore[return-value]
277
311
278
312
279
313
def listen_ssl (
280
- bind_addresses , port , factory , context_factory , reactor = reactor , backlog = 50
281
- ):
314
+ bind_addresses : Collection [str ],
315
+ port : int ,
316
+ factory : ServerFactory ,
317
+ context_factory : IOpenSSLContextFactory ,
318
+ reactor : IReactorSSL = reactor ,
319
+ backlog : int = 50 ,
320
+ ) -> List [Port ]:
282
321
"""
283
322
Create an TLS-over-TCP socket for a port and several addresses
284
323
@@ -294,10 +333,13 @@ def listen_ssl(
294
333
except error .CannotListenError as e :
295
334
check_bind_error (e , address , bind_addresses )
296
335
297
- return r
336
+ # IReactorSSL incorrectly declares that an int is returned from listenSSL,
337
+ # it actually returns an object implementing IListeningPort, but we know it
338
+ # will be a Port instance.
339
+ return r # type: ignore[return-value]
298
340
299
341
300
- def refresh_certificate (hs : "HomeServer" ):
342
+ def refresh_certificate (hs : "HomeServer" ) -> None :
301
343
"""
302
344
Refresh the TLS certificates that Synapse is using by re-reading them from
303
345
disk and updating the TLS context factories to use them.
@@ -329,7 +371,7 @@ def refresh_certificate(hs: "HomeServer"):
329
371
logger .info ("Context factories updated." )
330
372
331
373
332
- async def start (hs : "HomeServer" ):
374
+ async def start (hs : "HomeServer" ) -> None :
333
375
"""
334
376
Start a Synapse server or worker.
335
377
@@ -360,7 +402,7 @@ async def start(hs: "HomeServer"):
360
402
if hasattr (signal , "SIGHUP" ):
361
403
362
404
@wrap_as_background_process ("sighup" )
363
- def handle_sighup (* args , ** kwargs ) :
405
+ def handle_sighup (* args : Any , ** kwargs : Any ) -> None :
364
406
# Tell systemd our state, if we're using it. This will silently fail if
365
407
# we're not using systemd.
366
408
sdnotify (b"RELOADING=1" )
@@ -373,7 +415,7 @@ def handle_sighup(*args, **kwargs):
373
415
# We defer running the sighup handlers until next reactor tick. This
374
416
# is so that we're in a sane state, e.g. flushing the logs may fail
375
417
# if the sighup happens in the middle of writing a log entry.
376
- def run_sighup (* args , ** kwargs ) :
418
+ def run_sighup (* args : Any , ** kwargs : Any ) -> None :
377
419
# `callFromThread` should be "signal safe" as well as thread
378
420
# safe.
379
421
reactor .callFromThread (handle_sighup , * args , ** kwargs )
@@ -436,12 +478,8 @@ def run_sighup(*args, **kwargs):
436
478
atexit .register (gc .freeze )
437
479
438
480
439
- def setup_sentry (hs : "HomeServer" ):
440
- """Enable sentry integration, if enabled in configuration
441
-
442
- Args:
443
- hs
444
- """
481
+ def setup_sentry (hs : "HomeServer" ) -> None :
482
+ """Enable sentry integration, if enabled in configuration"""
445
483
446
484
if not hs .config .metrics .sentry_enabled :
447
485
return
@@ -466,7 +504,7 @@ def setup_sentry(hs: "HomeServer"):
466
504
scope .set_tag ("worker_name" , name )
467
505
468
506
469
- def setup_sdnotify (hs : "HomeServer" ):
507
+ def setup_sdnotify (hs : "HomeServer" ) -> None :
470
508
"""Adds process state hooks to tell systemd what we are up to."""
471
509
472
510
# Tell systemd our state, if we're using it. This will silently fail if
@@ -481,7 +519,7 @@ def setup_sdnotify(hs: "HomeServer"):
481
519
sdnotify_sockaddr = os .getenv ("NOTIFY_SOCKET" )
482
520
483
521
484
- def sdnotify (state ) :
522
+ def sdnotify (state : bytes ) -> None :
485
523
"""
486
524
Send a notification to systemd, if the NOTIFY_SOCKET env var is set.
487
525
@@ -490,7 +528,7 @@ def sdnotify(state):
490
528
package which many OSes don't include as a matter of principle.
491
529
492
530
Args:
493
- state (bytes) : notification to send
531
+ state: notification to send
494
532
"""
495
533
if not isinstance (state , bytes ):
496
534
raise TypeError ("sdnotify should be called with a bytes" )
0 commit comments