diff --git a/cassandra/cluster.py b/cassandra/cluster.py index c836fb430..f37c52fad 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -4383,7 +4383,9 @@ def _on_timeout(self, _attempts=0): host = str(connection.endpoint) if connection else 'unknown' errors = {host: "Request timed out while waiting for schema agreement. See Session.execute[_async](timeout) and Cluster.max_schema_agreement_wait."} - self._set_final_exception(OperationTimedOut(errors, self._current_host)) + final_exc = OperationTimedOut(errors, self._current_host) + self.session.cluster.signal_connection_failure(self._current_host, final_exc, is_host_addition=False) + self._set_final_exception(final_exc) def _on_speculative_execute(self): self._timer = None diff --git a/tests/unit/test_response_future.py b/tests/unit/test_response_future.py index dbd8764ad..e6689da5e 100644 --- a/tests/unit/test_response_future.py +++ b/tests/unit/test_response_future.py @@ -19,8 +19,8 @@ from mock import Mock, MagicMock, ANY from cassandra import ConsistencyLevel, Unavailable, SchemaTargetType, SchemaChangeType, OperationTimedOut -from cassandra.cluster import Session, ResponseFuture, NoHostAvailable, ProtocolVersion -from cassandra.connection import Connection, ConnectionException +from cassandra.cluster import Session, ResponseFuture, NoHostAvailable, ProtocolVersion, Cluster +from cassandra.connection import Connection, ConnectionException, DefaultEndPoint from cassandra.protocol import (ReadTimeoutErrorMessage, WriteTimeoutErrorMessage, UnavailableErrorMessage, ResultMessage, QueryMessage, OverloadedErrorMessage, IsBootstrappingErrorMessage, @@ -28,8 +28,8 @@ RESULT_KIND_ROWS, RESULT_KIND_SET_KEYSPACE, RESULT_KIND_SCHEMA_CHANGE, RESULT_KIND_PREPARED, ProtocolHandler) -from cassandra.policies import RetryPolicy -from cassandra.pool import NoConnectionsAvailable +from cassandra.policies import RetryPolicy, ConvictionPolicy +from cassandra.pool import NoConnectionsAvailable, Host from cassandra.query import SimpleStatement @@ -160,6 +160,49 @@ def test_heartbeat_defunct_deadlock(self): rf._on_timeout() self.assertRaisesRegexp(OperationTimedOut, "Connection defunct by heartbeat", rf.result) + def test_timeout_updates_conviction_policy(self): + """ + PYTHON-539 + + Timeouts from ResponseFuture should notify the host's conviction policy, giving + the driver a mechanism to take action on repeated/systemic timeouts. + """ + conviction_policy = MagicMock(spec=ConvictionPolicy) + conviction_policy.add_failure.return_value = False + + host = Host(DefaultEndPoint("ip1"), lambda h: conviction_policy) + + connection = MagicMock(spec=Connection) + connection._requests = {1:False} + connection.orphaned_request_ids = set() + connection.orphaned_threshold = 2 + + pool = Mock() + pool.is_shutdown = False + pool.borrow_connection.return_value = [connection, 1] + + session = self.make_basic_session() + session.cluster._default_load_balancing_policy.make_query_plan.return_value = [host] + session._pools.get.return_value = pool + + # An extra bit of connective tissue. session.cluster is a mock but we want to use the + # actual impl in Cluster in order to get into the host (and from there to the conviction + # policy). As of this writing Cluster.signal_connection_failure is effectively static + # if the return value from add_failure() on the conviction poilcy is false so we can + # create this linkage _using the actual impl in Cluster_ via a mock side_effect. + def foo(*args, **kwargs): + Cluster.signal_connection_failure(Cluster(), *args, **kwargs) + session.cluster.signal_connection_failure.side_effect = foo + + query = SimpleStatement("SELECT * FROM foo") + message = QueryMessage(query=query, consistency_level=ConsistencyLevel.ONE) + + rf = ResponseFuture(session, message, query, 1) + rf.send_request() + rf._on_timeout() + + host.conviction_policy.add_failure.assert_called_once() + def test_read_timeout_error_message(self): session = self.make_session() query = SimpleStatement("SELECT * FROM foo")