Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Gracefully handle a pending connection during shutdown. #8685

Merged
merged 3 commits into from
Oct 29, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/8607.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Support generating structured logs via the standard logging configuration.
1 change: 0 additions & 1 deletion changelog.d/8607.misc

This file was deleted.

1 change: 1 addition & 0 deletions changelog.d/8685.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Support generating structured logs via the standard logging configuration.
25 changes: 16 additions & 9 deletions synapse/logging/_remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,15 @@
from zope.interface import implementer

from twisted.application.internet import ClientService
from twisted.internet.defer import Deferred
from twisted.internet.defer import CancelledError, Deferred
from twisted.internet.endpoints import (
HostnameEndpoint,
TCP4ClientEndpoint,
TCP6ClientEndpoint,
)
from twisted.internet.interfaces import IPushProducer, ITransport
from twisted.internet.protocol import Factory, Protocol
from twisted.python.failure import Failure

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -146,17 +147,21 @@ def _connect(self) -> None:

self._connection_waiter = self._service.whenConnected(failAfterFailures=1)

@self._connection_waiter.addErrback
def fail(r):
r.printTraceback(file=sys.__stderr__)
def fail(failure: Failure) -> None:
# If the Deferred was cancelled (e.g. during shutdown) do not try to
# reconnect (this will cause an infinite loop of errors).
if failure.check(CancelledError):
return
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved

# For a different error, print the traceback and re-connect.
failure.printTraceback(file=sys.__stderr__)
self._connection_waiter = None
self._connect()

@self._connection_waiter.addCallback
def writer(r):
def writer(result: Protocol) -> None:
# We have a connection. If we already have a producer, and its
# transport is the same, just trigger a resumeProducing.
if self._producer and r.transport is self._producer.transport:
if self._producer and result.transport is self._producer.transport:
self._producer.resumeProducing()
self._connection_waiter = None
return
Expand All @@ -167,12 +172,14 @@ def writer(r):

# Make a new producer and start it.
self._producer = LogProducer(
buffer=self._buffer, transport=r.transport, format=self.format,
buffer=self._buffer, transport=result.transport, format=self.format,
)
r.transport.registerProducer(self._producer, True)
result.transport.registerProducer(self._producer, True)
self._producer.resumeProducing()
self._connection_waiter = None

self._connection_waiter.addCallbacks(writer, fail)

def _handle_pressure(self) -> None:
"""
Handle backpressure by shedding records.
Expand Down
16 changes: 16 additions & 0 deletions tests/logging/test_remote_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,3 +151,19 @@ def test_log_backpressure_cut_middle(self):
+ ["warn %s" % (i,) for i in range(15, 20)],
logs,
)

def test_cancel_connection(self):
"""
Gracefully handle the connection being cancelled.
"""
handler = RemoteHandler(
"127.0.0.1", 9000, maximum_buffer=10, _reactor=self.reactor
)
logger = self.get_logger(handler)

# Send a message.
logger.info("Hello there, %s!", "wally")

# Do not accept the connection and shutdown. This causes the pending
# connection to be cancelled (and should not raise any exceptions).
handler.close()