|
| 1 | +# -*- coding: utf-8 -*- |
| 2 | +"""Location: ./tests/unit/mcpgateway/instrumentation/test_sqlalchemy.py |
| 3 | +Copyright 2025 |
| 4 | +SPDX-License-Identifier: Apache-2.0 |
| 5 | +Authors: Mihai Criveti |
| 6 | +
|
| 7 | +Unit tests for sqlalchemy instrumentation. |
| 8 | +""" |
| 9 | +import pytest |
| 10 | +import threading |
| 11 | +import queue |
| 12 | +import time |
| 13 | +from unittest.mock import MagicMock, patch |
| 14 | + |
| 15 | +import mcpgateway.instrumentation.sqlalchemy as sa |
| 16 | + |
| 17 | + |
| 18 | +@pytest.fixture(autouse=True) |
| 19 | +def reset_globals(): |
| 20 | + sa._query_tracking.clear() |
| 21 | + sa._instrumentation_context = threading.local() |
| 22 | + sa._span_queue = queue.Queue(maxsize=2) |
| 23 | + sa._shutdown_event.clear() |
| 24 | + sa._span_writer_thread = None |
| 25 | + yield |
| 26 | + sa._query_tracking.clear() |
| 27 | + |
| 28 | + |
| 29 | +def test_before_cursor_execute_stores_tracking(): |
| 30 | + conn = MagicMock() |
| 31 | + sa._before_cursor_execute(conn, None, "SELECT * FROM test", {"id": 1}, None, False) |
| 32 | + conn_id = id(conn) |
| 33 | + assert conn_id in sa._query_tracking |
| 34 | + tracking = sa._query_tracking[conn_id] |
| 35 | + assert tracking["statement"] == "SELECT * FROM test" |
| 36 | + assert "start_time" in tracking |
| 37 | + |
| 38 | + |
| 39 | +def test_after_cursor_execute_no_tracking(): |
| 40 | + conn = MagicMock() |
| 41 | + sa._after_cursor_execute(conn, None, "SELECT * FROM test", None, None, False) |
| 42 | + # Should not raise or enqueue anything |
| 43 | + assert sa._span_queue.empty() |
| 44 | + |
| 45 | + |
| 46 | +def test_after_cursor_execute_inside_span_creation_skips(): |
| 47 | + conn = MagicMock() |
| 48 | + conn_id = id(conn) |
| 49 | + sa._query_tracking[conn_id] = {"start_time": time.time(), "statement": "SELECT 1", "parameters": None, "executemany": False} |
| 50 | + sa._instrumentation_context.inside_span_creation = True |
| 51 | + sa._after_cursor_execute(conn, MagicMock(), "SELECT 1", None, None, False) |
| 52 | + assert sa._span_queue.empty() |
| 53 | + |
| 54 | + |
| 55 | +def test_after_cursor_execute_observability_table_skips(caplog): |
| 56 | + import logging |
| 57 | + caplog.set_level(logging.DEBUG) |
| 58 | + sa.logger.setLevel(logging.DEBUG) |
| 59 | + sa.logger.propagate = True |
| 60 | + conn = MagicMock() |
| 61 | + conn_id = id(conn) |
| 62 | + sa._query_tracking[conn_id] = {"start_time": time.time(), "statement": "SELECT * FROM observability_spans", "parameters": None, "executemany": False} |
| 63 | + sa._after_cursor_execute(conn, MagicMock(), "SELECT * FROM observability_spans", None, None, False) |
| 64 | + assert "Skipping instrumentation" in caplog.text |
| 65 | + |
| 66 | + |
| 67 | +def test_after_cursor_execute_with_trace_id_calls_create_query_span(): |
| 68 | + conn = MagicMock() |
| 69 | + conn.info = {"trace_id": "abc123"} |
| 70 | + conn_id = id(conn) |
| 71 | + sa._query_tracking[conn_id] = {"start_time": time.time(), "statement": "SELECT * FROM users", "parameters": None, "executemany": False} |
| 72 | + with patch.object(sa, "_create_query_span") as mock_create: |
| 73 | + sa._after_cursor_execute(conn, MagicMock(rowcount=5), "SELECT * FROM users", None, None, False) |
| 74 | + mock_create.assert_called_once() |
| 75 | + args, kwargs = mock_create.call_args |
| 76 | + assert kwargs["trace_id"] == "abc123" |
| 77 | + |
| 78 | + |
| 79 | +def test_after_cursor_execute_without_trace_id_logs_debug(caplog): |
| 80 | + import logging |
| 81 | + caplog.set_level(logging.DEBUG) |
| 82 | + sa.logger.setLevel(logging.DEBUG) |
| 83 | + sa.logger.propagate = True |
| 84 | + conn = MagicMock() |
| 85 | + conn.info = {} |
| 86 | + conn_id = id(conn) |
| 87 | + sa._query_tracking[conn_id] = {"start_time": time.time(), "statement": "SELECT * FROM users", "parameters": None, "executemany": False} |
| 88 | + sa._after_cursor_execute(conn, MagicMock(rowcount=5), "SELECT * FROM users", None, None, False) |
| 89 | + assert "Query executed without trace context" in caplog.text |
| 90 | + |
| 91 | + |
| 92 | +def test_create_query_span_enqueues_successfully(caplog): |
| 93 | + import logging |
| 94 | + caplog.set_level(logging.DEBUG) |
| 95 | + sa.logger.setLevel(logging.DEBUG) |
| 96 | + sa.logger.propagate = True |
| 97 | + sa._create_query_span("trace123", "SELECT * FROM test", 10.0, 5, False) |
| 98 | + assert not sa._span_queue.empty() |
| 99 | + assert "Enqueued span" in caplog.text |
| 100 | + |
| 101 | + |
| 102 | +def test_create_query_span_queue_full_warns(caplog): |
| 103 | + sa._span_queue = queue.Queue(maxsize=1) |
| 104 | + sa._span_queue.put({"dummy": "data"}) |
| 105 | + sa._create_query_span("trace123", "SELECT * FROM test", 10.0, 5, False) |
| 106 | + assert "Span queue is full" in caplog.text |
| 107 | + |
| 108 | + |
| 109 | +def test_create_query_span_exception_does_not_raise(caplog): |
| 110 | + import logging |
| 111 | + caplog.set_level(logging.DEBUG) |
| 112 | + sa.logger.setLevel(logging.DEBUG) |
| 113 | + sa.logger.propagate = True |
| 114 | + with patch("mcpgateway.instrumentation.sqlalchemy._span_queue.put_nowait", side_effect=Exception("fail")): |
| 115 | + sa._create_query_span("trace123", "SELECT * FROM test", 10.0, 5, False) |
| 116 | + assert "Failed to enqueue query span" in caplog.text |
| 117 | + |
| 118 | + |
| 119 | +def test_write_span_to_db_success(): |
| 120 | + span_data = { |
| 121 | + "trace_id": "t1", |
| 122 | + "name": "db.query.select", |
| 123 | + "kind": "client", |
| 124 | + "resource_type": "database", |
| 125 | + "resource_name": "SELECT", |
| 126 | + "start_attributes": {}, |
| 127 | + "end_attributes": {}, |
| 128 | + "status": "ok", |
| 129 | + "duration_ms": 10.0, |
| 130 | + "row_count": 1, |
| 131 | + } |
| 132 | + mock_service = MagicMock() |
| 133 | + mock_db = MagicMock() |
| 134 | + mock_span = MagicMock() |
| 135 | + mock_db.query().filter_by().first.return_value = mock_span |
| 136 | + with patch("mcpgateway.services.observability_service.ObservabilityService", return_value=mock_service), \ |
| 137 | + patch("mcpgateway.db.SessionLocal", return_value=mock_db), \ |
| 138 | + patch("mcpgateway.db.ObservabilitySpan", MagicMock()): |
| 139 | + sa._write_span_to_db(span_data) |
| 140 | + mock_service.start_span.assert_called_once() |
| 141 | + mock_service.end_span.assert_called_once() |
| 142 | + mock_db.commit.assert_called_once() |
| 143 | + |
| 144 | + |
| 145 | +def test_write_span_to_db_exception_logs_warning(caplog): |
| 146 | + with patch("mcpgateway.services.observability_service.ObservabilityService", side_effect=Exception("fail")): |
| 147 | + sa._write_span_to_db({}) |
| 148 | + assert "Failed to write query span" in caplog.text |
| 149 | + |
| 150 | + |
| 151 | +def test_span_writer_worker_processes_queue(monkeypatch): |
| 152 | + span_data = {"trace_id": "t1", "name": "db.query.select", "kind": "client", "resource_type": "database", "resource_name": "SELECT", "start_attributes": {}, "end_attributes": {}, "status": "ok", "duration_ms": 10.0} |
| 153 | + sa._span_queue.put(span_data) |
| 154 | + mock_write = MagicMock() |
| 155 | + monkeypatch.setattr(sa, "_write_span_to_db", mock_write) |
| 156 | + thread = threading.Thread(target=lambda: (time.sleep(0.1), sa._shutdown_event.set())) |
| 157 | + thread.start() |
| 158 | + sa._span_writer_worker() |
| 159 | + mock_write.assert_called_once() |
| 160 | + |
| 161 | + |
| 162 | +def test_instrument_sqlalchemy_starts_thread_and_registers_events(): |
| 163 | + engine = MagicMock() |
| 164 | + with patch("mcpgateway.instrumentation.sqlalchemy.event.listen") as mock_listen, \ |
| 165 | + patch("mcpgateway.instrumentation.sqlalchemy.threading.Thread") as mock_thread: |
| 166 | + mock_thread.return_value = MagicMock(is_alive=lambda: False) |
| 167 | + sa.instrument_sqlalchemy(engine) |
| 168 | + assert mock_listen.call_count == 2 |
| 169 | + mock_thread.assert_called_once() |
| 170 | + |
| 171 | + |
| 172 | +def test_attach_trace_to_session_sets_trace_id(): |
| 173 | + session = MagicMock() |
| 174 | + connection = MagicMock() |
| 175 | + connection.info = {} |
| 176 | + session.bind = True |
| 177 | + session.connection.return_value = connection |
| 178 | + sa.attach_trace_to_session(session, "trace123") |
| 179 | + assert connection.info["trace_id"] == "trace123" |
0 commit comments