Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PYTHON-3926 Add more information to connection errors and timeouts #1375

Merged
merged 15 commits into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions pymongo/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
if TYPE_CHECKING:
from pymongo.client_session import ClientSession


ORDERED_TYPES: Sequence[Type] = (SON, OrderedDict)

# Defaults until we connect to a server and get updated limits.
Expand Down Expand Up @@ -793,7 +792,6 @@ def validate_datetime_conversion(option: Any, value: Any) -> Optional[DatetimeCo
"waitqueuetimeoutms",
]


_AUTH_OPTIONS = frozenset(["authmechanismproperties"])


Expand Down
56 changes: 49 additions & 7 deletions pymongo/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,10 @@ def _truncate_metadata(metadata: MutableMapping[str, Any]) -> None:


def _raise_connection_failure(
address: Any, error: Exception, msg_prefix: Optional[str] = None
address: Any,
error: Exception,
msg_prefix: Optional[str] = None,
timeout_details: Optional[dict[str, float]] = None,
) -> NoReturn:
"""Convert a socket.error to ConnectionFailure and raise it."""
host, port = address
Expand All @@ -390,6 +393,8 @@ def _raise_connection_failure(
msg = f"{host}: {error}"
if msg_prefix:
msg = msg_prefix + msg
if "configured timeouts" not in msg:
msg += format_timeout_details(timeout_details)
if isinstance(error, socket.timeout):
raise NetworkTimeout(msg) from error
elif isinstance(error, SSLError) and "timed out" in str(error):
Expand All @@ -407,6 +412,32 @@ def _cond_wait(condition: threading.Condition, deadline: Optional[float]) -> boo
return condition.wait(timeout)


def _get_timeout_details(options: PoolOptions) -> dict[str, float]:
details = {}
timeout = _csot.get_timeout()
socket_timeout = options.socket_timeout
connect_timeout = options.connect_timeout
if timeout:
details["timeoutMS"] = timeout * 1000
if socket_timeout and not timeout:
details["socketTimeoutMS"] = socket_timeout * 1000
if connect_timeout:
details["connectTimeoutMS"] = connect_timeout * 1000
return details


def format_timeout_details(details: Optional[dict[str, float]]) -> str:
result = ""
if details:
result += " (configured timeouts:"
for timeout in ["socketTimeoutMS", "timeoutMS", "connectTimeoutMS"]:
if timeout in details:
result += f" {timeout}: {details[timeout]}ms,"
result = result[:-1]
result += ")"
return result


class PoolOptions:
"""Read only connection pool options for a MongoClient.

Expand Down Expand Up @@ -736,10 +767,15 @@ def apply_timeout(
rtt = self.connect_rtt
max_time_ms = timeout - rtt
if max_time_ms < 0:
timeout_details = _get_timeout_details(self.opts)
formatted = format_timeout_details(timeout_details)
# CSOT: raise an error without running the command since we know it will time out.
errmsg = f"operation would exceed time limit, remaining timeout:{timeout:.5f} <= network round trip time:{rtt:.5f}"
errmsg = f"operation would exceed time limit, remaining timeout:{timeout:.5f} <= network round trip time:{rtt:.5f} {formatted}"
raise ExecutionTimeout(
errmsg, 50, {"ok": 0, "errmsg": errmsg, "code": 50}, self.max_wire_version
errmsg,
50,
{"ok": 0, "errmsg": errmsg, "code": 50},
self.max_wire_version,
)
if cmd is not None:
cmd["maxTimeMS"] = int(max_time_ms * 1000)
Expand Down Expand Up @@ -1127,7 +1163,8 @@ def _raise_connection_failure(self, error: BaseException) -> NoReturn:
self.close_conn(reason)
# SSLError from PyOpenSSL inherits directly from Exception.
if isinstance(error, (IOError, OSError, SSLError)):
_raise_connection_failure(self.address, error)
details = _get_timeout_details(self.opts)
_raise_connection_failure(self.address, error, timeout_details=details)
else:
raise

Expand Down Expand Up @@ -1251,7 +1288,8 @@ def _configured_socket(address: _Address, options: PoolOptions) -> Union[socket.
# We raise AutoReconnect for transient and permanent SSL handshake
# failures alike. Permanent handshake failures, like protocol
# mismatch, will be turned into ServerSelectionTimeoutErrors later.
_raise_connection_failure(address, exc, "SSL handshake failed: ")
details = _get_timeout_details(options)
_raise_connection_failure(address, exc, "SSL handshake failed: ", timeout_details=details)
if (
ssl_context.verify_mode
and not ssl_context.check_hostname
Expand Down Expand Up @@ -1549,7 +1587,8 @@ def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connect
)

if isinstance(error, (IOError, OSError, SSLError)):
_raise_connection_failure(self.address, error)
details = _get_timeout_details(self.opts)
_raise_connection_failure(self.address, error, timeout_details=details)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see _configured_socket() also calls _raise_connection_failure(). Will this then add timeout_details to the error twice?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. I'll add a check in _raise_connection_failure to ensure it doesn't add timeout_details if they're already in the message.


raise

Expand Down Expand Up @@ -1630,7 +1669,10 @@ def _raise_if_not_ready(self, emit_event: bool) -> None:
self.opts._event_listeners.publish_connection_check_out_failed(
self.address, ConnectionCheckOutFailedReason.CONN_ERROR
)
_raise_connection_failure(self.address, AutoReconnect("connection pool paused"))
details = _get_timeout_details(self.opts)
_raise_connection_failure(
self.address, AutoReconnect("connection pool paused"), timeout_details=details
)

def _get_conn(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connection:
"""Get or create a Connection. Can raise ConnectionFailure."""
Expand Down
81 changes: 80 additions & 1 deletion test/test_pooling.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@

from bson.codec_options import DEFAULT_CODEC_OPTIONS
from bson.son import SON
from pymongo import MongoClient, message
from pymongo import MongoClient, message, timeout
from pymongo.errors import AutoReconnect, ConnectionFailure, DuplicateKeyError
from pymongo.hello import HelloCompat

sys.path[0:0] = [""]

Expand Down Expand Up @@ -411,6 +412,84 @@ def find_one():
# maxConnecting = unbounded: 30+ connections in ~0.140+ seconds
print(len(pool.conns))

@client_context.require_failCommand_fail_point
def test_csot_timeout_message(self):
client = rs_or_single_client(appName="connectionTimeoutApp")
# Mock a connection failing due to timeout.
mock_connection_timeout = {
"configureFailPoint": "failCommand",
"mode": "alwaysOn",
"data": {
"blockConnection": True,
"blockTimeMS": 1000,
"failCommands": ["find"],
"appName": "connectionTimeoutApp",
},
}

client.db.t.insert_one({"x": 1})

with self.fail_point(mock_connection_timeout):
with self.assertRaises(Exception) as error:
with timeout(0.5):
client.db.t.find_one({"$where": delay(2)})

self.assertTrue("(configured timeouts: timeoutMS: 500.0ms" in str(error.exception))

@client_context.require_failCommand_fail_point
def test_socket_timeout_message(self):
client = rs_or_single_client(socketTimeoutMS=500, appName="connectionTimeoutApp")

# Mock a connection failing due to timeout.
mock_connection_timeout = {
"configureFailPoint": "failCommand",
"mode": "alwaysOn",
"data": {
"blockConnection": True,
"blockTimeMS": 1000,
"failCommands": ["find"],
"appName": "connectionTimeoutApp",
},
}

client.db.t.insert_one({"x": 1})

with self.fail_point(mock_connection_timeout):
with self.assertRaises(Exception) as error:
client.db.t.find_one({"$where": delay(2)})

self.assertTrue(
"(configured timeouts: socketTimeoutMS: 500.0ms, connectTimeoutMS: 20000.0ms)"
in str(error.exception)
)

@client_context.require_failCommand_fail_point
@client_context.require_version_min(
4, 9, 0
) # configureFailPoint does not allow failure on handshake before 4.9, fixed in SERVER-49336
def test_connection_timeout_message(self):
# Mock a connection failing due to timeout.
mock_connection_timeout = {
"configureFailPoint": "failCommand",
"mode": "alwaysOn",
"data": {
"blockConnection": True,
"blockTimeMS": 1000,
"failCommands": [HelloCompat.LEGACY_CMD, "hello"],
"appName": "connectionTimeoutApp",
},
}

with self.fail_point(mock_connection_timeout):
with self.assertRaises(Exception) as error:
client = rs_or_single_client(connectTimeoutMS=500, appName="connectionTimeoutApp")
client.admin.command("ping")

self.assertTrue(
"(configured timeouts: socketTimeoutMS: 500.0ms, connectTimeoutMS: 500.0ms)"
in str(error.exception)
)


class TestPoolMaxSize(_TestPoolingBase):
def test_max_pool_size(self):
Expand Down