diff --git a/changelog.d/10407.feature b/changelog.d/10407.feature new file mode 100644 index 000000000000..db277d9ecde0 --- /dev/null +++ b/changelog.d/10407.feature @@ -0,0 +1 @@ +Add a buffered logging handler which periodically flushes itself. diff --git a/docs/sample_log_config.yaml b/docs/sample_log_config.yaml index 669e60008113..b088c834057d 100644 --- a/docs/sample_log_config.yaml +++ b/docs/sample_log_config.yaml @@ -28,7 +28,7 @@ handlers: # will be a delay for INFO/DEBUG logs to get written, but WARNING/ERROR # logs will still be flushed immediately. buffer: - class: logging.handlers.MemoryHandler + class: synapse.logging.handlers.PeriodicallyFlushingMemoryHandler target: file # The capacity is the number of log lines that are buffered before # being written to disk. Increasing this will lead to better @@ -36,6 +36,9 @@ handlers: # be written to disk. capacity: 10 flushLevel: 30 # Flush for WARNING logs as well + # The period of time, in seconds, between forced flushes. + # Messages will not be delayed for longer than this time. + period: 5 # A handler that writes logs to stderr. Unused by default, but can be used # instead of "buffer" and "file" in the logger handlers. diff --git a/synapse/config/logger.py b/synapse/config/logger.py index ad4e6e61c3bf..dcd3ed1dac12 100644 --- a/synapse/config/logger.py +++ b/synapse/config/logger.py @@ -71,7 +71,7 @@ # will be a delay for INFO/DEBUG logs to get written, but WARNING/ERROR # logs will still be flushed immediately. buffer: - class: logging.handlers.MemoryHandler + class: synapse.logging.handlers.PeriodicallyFlushingMemoryHandler target: file # The capacity is the number of log lines that are buffered before # being written to disk. Increasing this will lead to better @@ -79,6 +79,9 @@ # be written to disk. capacity: 10 flushLevel: 30 # Flush for WARNING logs as well + # The period of time, in seconds, between forced flushes. + # Messages will not be delayed for longer than this time. + period: 5 # A handler that writes logs to stderr. Unused by default, but can be used # instead of "buffer" and "file" in the logger handlers. diff --git a/synapse/logging/handlers.py b/synapse/logging/handlers.py new file mode 100644 index 000000000000..a6c212f300fd --- /dev/null +++ b/synapse/logging/handlers.py @@ -0,0 +1,88 @@ +import logging +import time +from logging import Handler, LogRecord +from logging.handlers import MemoryHandler +from threading import Thread +from typing import Optional + +from twisted.internet.interfaces import IReactorCore + + +class PeriodicallyFlushingMemoryHandler(MemoryHandler): + """ + This is a subclass of MemoryHandler that additionally spawns a background + thread to periodically flush the buffer. + + This prevents messages from being buffered for too long. + + Additionally, all messages will be immediately flushed if the reactor has + not yet been started. + """ + + def __init__( + self, + capacity: int, + flushLevel: int = logging.ERROR, + target: Optional[Handler] = None, + flushOnClose: bool = True, + period: float = 5.0, + reactor: Optional[IReactorCore] = None, + ) -> None: + """ + period: the period between automatic flushes + + reactor: if specified, a custom reactor to use. If not specifies, + defaults to the globally-installed reactor. + Log entries will be flushed immediately until this reactor has + started. + """ + super().__init__(capacity, flushLevel, target, flushOnClose) + + self._flush_period: float = period + self._active: bool = True + self._reactor_started = False + + self._flushing_thread: Thread = Thread( + name="PeriodicallyFlushingMemoryHandler flushing thread", + target=self._flush_periodically, + ) + self._flushing_thread.start() + + def on_reactor_running(): + self._reactor_started = True + + reactor_to_use: IReactorCore + if reactor is None: + from twisted.internet import reactor as global_reactor + + reactor_to_use = global_reactor # type: ignore[assignment] + else: + reactor_to_use = reactor + + # call our hook when the reactor start up + reactor_to_use.callWhenRunning(on_reactor_running) + + def shouldFlush(self, record: LogRecord) -> bool: + """ + Before reactor start-up, log everything immediately. + Otherwise, fall back to original behaviour of waiting for the buffer to fill. + """ + + if self._reactor_started: + return super().shouldFlush(record) + else: + return True + + def _flush_periodically(self): + """ + Whilst this handler is active, flush the handler periodically. + """ + + while self._active: + # flush is thread-safe; it acquires and releases the lock internally + self.flush() + time.sleep(self._flush_period) + + def close(self) -> None: + self._active = False + super().close()