Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Gracefully handle a pending connection during shutdown. #8685

Merged
merged 3 commits into from
Oct 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/8607.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Support generating structured logs via the standard logging configuration.
1 change: 0 additions & 1 deletion changelog.d/8607.misc

This file was deleted.

1 change: 1 addition & 0 deletions changelog.d/8685.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Support generating structured logs via the standard logging configuration.
27 changes: 18 additions & 9 deletions synapse/logging/_remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,15 @@
from zope.interface import implementer

from twisted.application.internet import ClientService
from twisted.internet.defer import Deferred
from twisted.internet.defer import CancelledError, Deferred
from twisted.internet.endpoints import (
HostnameEndpoint,
TCP4ClientEndpoint,
TCP6ClientEndpoint,
)
from twisted.internet.interfaces import IPushProducer, ITransport
from twisted.internet.protocol import Factory, Protocol
from twisted.python.failure import Failure

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -131,9 +132,11 @@ def __init__(
factory = Factory.forProtocol(Protocol)
self._service = ClientService(endpoint, factory, clock=_reactor)
self._service.startService()
self._stopping = False
self._connect()

def close(self):
self._stopping = True
self._service.stopService()

def _connect(self) -> None:
Expand All @@ -146,17 +149,21 @@ def _connect(self) -> None:

self._connection_waiter = self._service.whenConnected(failAfterFailures=1)

@self._connection_waiter.addErrback
def fail(r):
r.printTraceback(file=sys.__stderr__)
def fail(failure: Failure) -> None:
# If the Deferred was cancelled (e.g. during shutdown) do not try to
# reconnect (this will cause an infinite loop of errors).
if failure.check(CancelledError) and self._stopping:
return

# For a different error, print the traceback and re-connect.
failure.printTraceback(file=sys.__stderr__)
self._connection_waiter = None
self._connect()

@self._connection_waiter.addCallback
def writer(r):
def writer(result: Protocol) -> None:
# We have a connection. If we already have a producer, and its
# transport is the same, just trigger a resumeProducing.
if self._producer and r.transport is self._producer.transport:
if self._producer and result.transport is self._producer.transport:
self._producer.resumeProducing()
self._connection_waiter = None
return
Expand All @@ -167,12 +174,14 @@ def writer(r):

# Make a new producer and start it.
self._producer = LogProducer(
buffer=self._buffer, transport=r.transport, format=self.format,
buffer=self._buffer, transport=result.transport, format=self.format,
)
r.transport.registerProducer(self._producer, True)
result.transport.registerProducer(self._producer, True)
self._producer.resumeProducing()
self._connection_waiter = None

self._connection_waiter.addCallbacks(writer, fail)

def _handle_pressure(self) -> None:
"""
Handle backpressure by shedding records.
Expand Down
16 changes: 16 additions & 0 deletions tests/logging/test_remote_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,3 +151,19 @@ def test_log_backpressure_cut_middle(self):
+ ["warn %s" % (i,) for i in range(15, 20)],
logs,
)

def test_cancel_connection(self):
"""
Gracefully handle the connection being cancelled.
"""
handler = RemoteHandler(
"127.0.0.1", 9000, maximum_buffer=10, _reactor=self.reactor
)
logger = self.get_logger(handler)

# Send a message.
logger.info("Hello there, %s!", "wally")

# Do not accept the connection and shutdown. This causes the pending
# connection to be cancelled (and should not raise any exceptions).
handler.close()