diff --git a/mssql_python/__init__.py b/mssql_python/__init__.py index 07113646..1341a4ef 100644 --- a/mssql_python/__init__.py +++ b/mssql_python/__init__.py @@ -3,9 +3,37 @@ Licensed under the MIT license. This module initializes the mssql_python package. """ - +import threading # Exceptions # https://www.python.org/dev/peps/pep-0249/#exceptions + +# GLOBALS +# Read-Only +apilevel = "2.0" +paramstyle = "qmark" +threadsafety = 1 + +_settings_lock = threading.Lock() + +# Create a settings object to hold configuration +class Settings: + def __init__(self): + self.lowercase = False + +# Create a global settings instance +_settings = Settings() + +# Define the get_settings function for internal use +def get_settings(): + """Return the global settings object""" + with _settings_lock: + _settings.lowercase = lowercase + return _settings + +# Expose lowercase as a regular module variable that users can access and set +lowercase = _settings.lowercase + +# Import necessary modules from .exceptions import ( Warning, Error, @@ -52,12 +80,6 @@ SQL_WCHAR = ConstantsDDBC.SQL_WCHAR.value SQL_WMETADATA = -99 -# GLOBALS -# Read-Only -apilevel = "2.0" -paramstyle = "qmark" -threadsafety = 1 - from .pooling import PoolingManager def pooling(max_size=100, idle_timeout=600, enabled=True): # """ @@ -76,3 +98,18 @@ def pooling(max_size=100, idle_timeout=600, enabled=True): PoolingManager.disable() else: PoolingManager.enable(max_size, idle_timeout) + +import sys +_original_module_setattr = sys.modules[__name__].__setattr__ + +def _custom_setattr(name, value): + if name == 'lowercase': + with _settings_lock: + _settings.lowercase = bool(value) + # Update the module's lowercase variable + _original_module_setattr(name, _settings.lowercase) + else: + _original_module_setattr(name, value) + +# Replace the module's __setattr__ with our custom version +sys.modules[__name__].__setattr__ = _custom_setattr diff --git a/mssql_python/cursor.py b/mssql_python/cursor.py index 970eff1a..e925bbfd 100644 --- a/mssql_python/cursor.py +++ b/mssql_python/cursor.py @@ -16,7 +16,8 @@ from mssql_python.helpers import check_error, log from mssql_python import ddbc_bindings from mssql_python.exceptions import InterfaceError, NotSupportedError, ProgrammingError -from .row import Row +from mssql_python.row import Row +from mssql_python import get_settings # Constants for string handling MAX_INLINE_CHAR = 4000 # NVARCHAR/VARCHAR inline limit; this triggers NVARCHAR(MAX)/VARCHAR(MAX) + DAE @@ -543,26 +544,32 @@ def _create_parameter_types_list(self, parameter, param_info, parameters_list, i return paraminfo - def _initialize_description(self): - """ - Initialize the description attribute using SQLDescribeCol. - """ - col_metadata = [] - ret = ddbc_bindings.DDBCSQLDescribeCol(self.hstmt, col_metadata) - check_error(ddbc_sql_const.SQL_HANDLE_STMT.value, self.hstmt, ret) + def _initialize_description(self, column_metadata=None): + """Initialize the description attribute from column metadata.""" + if not column_metadata: + self.description = None + return - self.description = [ - ( - col["ColumnName"], - self._map_data_type(col["DataType"]), - None, - col["ColumnSize"], - col["ColumnSize"], - col["DecimalDigits"], - col["Nullable"] == ddbc_sql_const.SQL_NULLABLE.value, - ) - for col in col_metadata - ] + description = [] + for i, col in enumerate(column_metadata): + # Get column name - lowercase it if the lowercase flag is set + column_name = col["ColumnName"] + + # Use the current global setting to ensure tests pass correctly + if get_settings().lowercase: + column_name = column_name.lower() + + # Add to description tuple (7 elements as per PEP-249) + description.append(( + column_name, # name + self._map_data_type(col["DataType"]), # type_code + None, # display_size + col["ColumnSize"], # internal_size + col["ColumnSize"], # precision - should match ColumnSize + col["DecimalDigits"], # scale + col["Nullable"] == ddbc_sql_const.SQL_NULLABLE.value, # null_ok + )) + self.description = description def _map_data_type(self, sql_type): """ @@ -746,6 +753,16 @@ def execute( use_prepare: Whether to use SQLPrepareW (default) or SQLExecDirectW. reset_cursor: Whether to reset the cursor before execution. """ + + # Restore original fetch methods if they exist + if hasattr(self, '_original_fetchone'): + self.fetchone = self._original_fetchone + self.fetchmany = self._original_fetchmany + self.fetchall = self._original_fetchall + del self._original_fetchone + del self._original_fetchmany + del self._original_fetchall + self._check_closed() # Check if the cursor is closed if reset_cursor: self._reset_cursor() @@ -822,7 +839,14 @@ def execute( self.rowcount = ddbc_bindings.DDBCSQLRowCount(self.hstmt) # Initialize description after execution - self._initialize_description() + # After successful execution, initialize description if there are results + column_metadata = [] + try: + ddbc_bindings.DDBCSQLDescribeCol(self.hstmt, column_metadata) + self._initialize_description(column_metadata) + except Exception as e: + # If describe fails, it's likely there are no results (e.g., for INSERT) + self.description = None # Reset rownumber for new result set (only for SELECT statements) if self.description: # If we have column descriptions, it's likely a SELECT @@ -975,7 +999,7 @@ def fetchone(self) -> Union[None, Row]: # Create and return a Row object, passing column name map if available column_map = getattr(self, '_column_name_map', None) - return Row(row_data, self.description, column_map) + return Row(self, self.description, row_data, column_map) except Exception as e: # On error, don't increment rownumber - rethrow the error raise e @@ -1017,7 +1041,7 @@ def fetchmany(self, size: int = None) -> List[Row]: # Convert raw data to Row objects column_map = getattr(self, '_column_name_map', None) - return [Row(row_data, self.description, column_map) for row_data in rows_data] + return [Row(self, self.description, row_data, column_map) for row_data in rows_data] except Exception as e: # On error, don't increment rownumber - rethrow the error raise e @@ -1049,7 +1073,7 @@ def fetchall(self) -> List[Row]: # Convert raw data to Row objects column_map = getattr(self, '_column_name_map', None) - return [Row(row_data, self.description, column_map) for row_data in rows_data] + return [Row(self, self.description, row_data, column_map) for row_data in rows_data] except Exception as e: # On error, don't increment rownumber - rethrow the error raise e @@ -1363,30 +1387,20 @@ def tables(self, table=None, catalog=None, schema=None, tableType=None): Example: "TABLE" or ["TABLE", "VIEW"] Returns: - list: A list of Row objects containing table information with these columns: - - table_cat: Catalog name - - table_schem: Schema name - - table_name: Table name - - table_type: Table type (e.g., "TABLE", "VIEW") - - remarks: Comments about the table - - Notes: - This method only processes the standard five columns as defined in the ODBC - specification. Any additional columns that might be returned by specific ODBC - drivers are not included in the result set. - + Cursor: The cursor object itself for method chaining with fetch methods. + Example: # Get all tables in the database - tables = cursor.tables() + tables = cursor.tables().fetchall() # Get all tables in schema 'dbo' - tables = cursor.tables(schema='dbo') + tables = cursor.tables(schema='dbo').fetchall() # Get table named 'Customers' - tables = cursor.tables(table='Customers') + tables = cursor.tables(table='Customers').fetchone() - # Get all views - tables = cursor.tables(tableType='VIEW') + # Get all views with fetchmany + tables = cursor.tables(tableType='VIEW').fetchmany(10) """ self._check_closed() @@ -1418,7 +1432,13 @@ def tables(self, table=None, catalog=None, schema=None, tableType=None): try: ddbc_bindings.DDBCSQLDescribeCol(self.hstmt, column_metadata) self._initialize_description(column_metadata) - except Exception: + except InterfaceError as e: + log('error', f"Driver interface error during metadata retrieval: {e}") + except Exception as e: + # Log the exception with appropriate context + log('error', f"Failed to retrieve column metadata: {e}. Using standard ODBC column definitions instead.") + + if not self.description: # If describe fails, create a manual description for the standard columns column_types = [str, str, str, str, str] self.description = [ @@ -1428,23 +1448,54 @@ def tables(self, table=None, catalog=None, schema=None, tableType=None): ("table_type", column_types[3], None, 128, 128, 0, False), ("remarks", column_types[4], None, 254, 254, 0, True) ] - - # Define column names in ODBC standard order - column_names = [ - "table_cat", "table_schem", "table_name", "table_type", "remarks" - ] - - # Fetch all rows - rows_data = [] - ddbc_bindings.DDBCSQLFetchAll(self.hstmt, rows_data) - - # Create a column map for attribute access - column_map = {name: i for i, name in enumerate(column_names)} - - # Create Row objects with the column map - result_rows = [] - for row_data in rows_data: - row = Row(row_data, self.description, column_map) - result_rows.append(row) - - return result_rows + + # Store the column mappings for this specific tables() call + column_names = [desc[0] for desc in self.description] + + # Create a specialized column map for this result set + columns_map = {} + for i, name in enumerate(column_names): + columns_map[name] = i + columns_map[name.lower()] = i + + # Define wrapped fetch methods that preserve existing column mapping + # but add our specialized mapping just for column results + def fetchone_with_columns_mapping(): + row = self._original_fetchone() + if row is not None: + # Create a merged map with columns result taking precedence + merged_map = getattr(row, '_column_map', {}).copy() + merged_map.update(columns_map) + row._column_map = merged_map + return row + + def fetchmany_with_columns_mapping(size=None): + rows = self._original_fetchmany(size) + for row in rows: + # Create a merged map with columns result taking precedence + merged_map = getattr(row, '_column_map', {}).copy() + merged_map.update(columns_map) + row._column_map = merged_map + return rows + + def fetchall_with_columns_mapping(): + rows = self._original_fetchall() + for row in rows: + # Create a merged map with columns result taking precedence + merged_map = getattr(row, '_column_map', {}).copy() + merged_map.update(columns_map) + row._column_map = merged_map + return rows + + # Save original fetch methods + if not hasattr(self, '_original_fetchone'): + self._original_fetchone = self.fetchone + self._original_fetchmany = self.fetchmany + self._original_fetchall = self.fetchall + + # Override fetch methods with our wrapped versions + self.fetchone = fetchone_with_columns_mapping + self.fetchmany = fetchmany_with_columns_mapping + self.fetchall = fetchall_with_columns_mapping + + return self \ No newline at end of file diff --git a/mssql_python/row.py b/mssql_python/row.py index bbea7fde..53f1e50b 100644 --- a/mssql_python/row.py +++ b/mssql_python/row.py @@ -2,44 +2,73 @@ class Row: """ A row of data from a cursor fetch operation. Provides both tuple-like indexing and attribute access to column values. - + + Column attribute access behavior depends on the global 'lowercase' setting: + - When enabled: Case-insensitive attribute access + - When disabled (default): Case-sensitive attribute access matching original column names + Example: row = cursor.fetchone() print(row[0]) # Access by index - print(row.column_name) # Access by column name + print(row.column_name) # Access by column name (case sensitivity varies) """ - def __init__(self, values, description, column_map=None): + def __init__(self, cursor, description, values, column_map=None): """ Initialize a Row object with values and description. Args: - values: List of values for this row. - description: Description of the columns (from cursor.description). - column_map: Optional mapping of column names to indices. + cursor: The cursor object + description: The cursor description containing column metadata + values: List of values for this row + column_map: Optional pre-built column map (for optimization) """ + self._cursor = cursor self._values = values - self._description = description - # Build column map if not provided + # TODO: ADO task - Optimize memory usage by sharing column map across rows + # Instead of storing the full cursor_description in each Row object: + # 1. Build the column map once at the cursor level after setting description + # 2. Pass only this map to each Row instance + # 3. Remove cursor_description from Row objects entirely + + # Create mapping of column names to indices + # If column_map is not provided, build it from description if column_map is None: - self._column_map = {} - for i, desc in enumerate(description): - col_name = desc[0] - self._column_map[col_name] = i - self._column_map[col_name.lower()] = i # Add lowercase for case-insensitivity - else: - self._column_map = column_map + column_map = {} + for i, col_desc in enumerate(description): + col_name = col_desc[0] # Name is first item in description tuple + column_map[col_name] = i + + self._column_map = column_map def __getitem__(self, index): """Allow accessing by numeric index: row[0]""" return self._values[index] def __getattr__(self, name): - """Allow accessing by column name as attribute: row.column_name""" + """ + Allow accessing by column name as attribute: row.column_name + + Note: Case sensitivity depends on the global 'lowercase' setting: + - When lowercase=True: Column names are stored in lowercase, enabling + case-insensitive attribute access (e.g., row.NAME, row.name, row.Name all work). + - When lowercase=False (default): Column names preserve original casing, + requiring exact case matching for attribute access. + """ + # Handle lowercase attribute access - if lowercase is enabled, + # try to match attribute names case-insensitively if name in self._column_map: return self._values[self._column_map[name]] - raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{name}'") + + # If lowercase is enabled on the cursor, try case-insensitive lookup + if hasattr(self._cursor, 'lowercase') and self._cursor.lowercase: + name_lower = name.lower() + for col_name in self._column_map: + if col_name.lower() == name_lower: + return self._values[self._column_map[col_name]] + + raise AttributeError(f"Row has no attribute '{name}'") def __eq__(self, other): """ diff --git a/tests/test_001_globals.py b/tests/test_001_globals.py index f41a9a14..30c408c6 100644 --- a/tests/test_001_globals.py +++ b/tests/test_001_globals.py @@ -4,12 +4,16 @@ - test_apilevel: Check if apilevel has the expected value. - test_threadsafety: Check if threadsafety has the expected value. - test_paramstyle: Check if paramstyle has the expected value. +- test_lowercase: Check if lowercase has the expected value. """ import pytest +import threading +import time +import mssql_python # Import global variables from the repository -from mssql_python import apilevel, threadsafety, paramstyle +from mssql_python import apilevel, threadsafety, paramstyle, lowercase def test_apilevel(): # Check if apilevel has the expected value @@ -22,3 +26,124 @@ def test_threadsafety(): def test_paramstyle(): # Check if paramstyle has the expected value assert paramstyle == "qmark", "paramstyle should be 'qmark'" + +def test_lowercase(): + # Check if lowercase has the expected default value + assert lowercase is False, "lowercase should default to False" + +def test_lowercase_thread_safety_no_db(): + """ + Tests concurrent modifications to mssql_python.lowercase without database interaction. + This test ensures that the value is not corrupted by simultaneous writes from multiple threads. + """ + original_lowercase = mssql_python.lowercase + iterations = 100 + + def worker(): + for _ in range(iterations): + mssql_python.lowercase = True + mssql_python.lowercase = False + + threads = [threading.Thread(target=worker) for _ in range(4)] + + for t in threads: + t.start() + + for t in threads: + t.join() + + # The final value will be False because it's the last write in the loop. + # The main point is to ensure the lock prevented any corruption. + assert mssql_python.lowercase is False, "Final state of lowercase should be False" + + # Restore original value + mssql_python.lowercase = original_lowercase + +def test_lowercase_concurrent_access_with_db(db_connection): + """ + Tests concurrent modification of the 'lowercase' setting while simultaneously + creating cursors and executing queries. This simulates a real-world race condition. + """ + original_lowercase = mssql_python.lowercase + stop_event = threading.Event() + errors = [] + + # Create a temporary table for the test + cursor = None + try: + cursor = db_connection.cursor() + cursor.execute("CREATE TABLE #pytest_thread_test (COLUMN_NAME INT)") + db_connection.commit() + except Exception as e: + pytest.fail(f"Failed to create test table: {e}") + finally: + if cursor: + cursor.close() + + def writer(): + """Continuously toggles the lowercase setting.""" + while not stop_event.is_set(): + try: + mssql_python.lowercase = True + time.sleep(0.001) + mssql_python.lowercase = False + time.sleep(0.001) + except Exception as e: + errors.append(f"Writer thread error: {e}") + break + + def reader(): + """Continuously creates cursors and checks for valid description casing.""" + while not stop_event.is_set(): + cursor = None + try: + cursor = db_connection.cursor() + cursor.execute("SELECT * FROM #pytest_thread_test") + + # The lock ensures the description is generated atomically. + # We just need to check if the result is one of the two valid states. + col_name = cursor.description[0][0] + + if col_name not in ('COLUMN_NAME', 'column_name'): + errors.append(f"Invalid column name '{col_name}' found. Race condition likely.") + except Exception as e: + errors.append(f"Reader thread error: {e}") + break + finally: + if cursor: + cursor.close() + + # Start threads + writer_thread = threading.Thread(target=writer) + reader_threads = [threading.Thread(target=reader) for _ in range(3)] + + writer_thread.start() + for t in reader_threads: + t.start() + + # Let the threads run for a short period to induce race conditions + time.sleep(1) + stop_event.set() + + # Wait for threads to finish + writer_thread.join() + for t in reader_threads: + t.join() + + # Clean up + cursor = None + try: + cursor = db_connection.cursor() + cursor.execute("DROP TABLE #pytest_thread_test") + db_connection.commit() + except Exception as e: + # Log cleanup error but don't fail the test for it + print(f"Warning: Failed to drop test table during cleanup: {e}") + finally: + if cursor: + cursor.close() + + mssql_python.lowercase = original_lowercase + + # Assert that no errors occurred in the threads + assert not errors, f"Thread safety test failed with errors: {errors}" \ No newline at end of file diff --git a/tests/test_004_cursor.py b/tests/test_004_cursor.py index 473807e9..a63782b7 100644 --- a/tests/test_004_cursor.py +++ b/tests/test_004_cursor.py @@ -14,6 +14,8 @@ import decimal from contextlib import closing from mssql_python import Connection, row +import mssql_python +from mssql_python.exceptions import InterfaceError # Setup test table TEST_TABLE = """ @@ -1817,6 +1819,108 @@ def test_row_column_mapping(cursor, db_connection): cursor.execute("DROP TABLE #pytest_row_test") db_connection.commit() +def test_lowercase_attribute(cursor, db_connection): + """Test that the lowercase attribute properly converts column names to lowercase""" + + # Store original value to restore after test + original_lowercase = mssql_python.lowercase + drop_cursor = None + + try: + # Create a test table with mixed-case column names + cursor.execute(""" + CREATE TABLE #pytest_lowercase_test ( + ID INT PRIMARY KEY, + UserName VARCHAR(50), + EMAIL_ADDRESS VARCHAR(100), + PhoneNumber VARCHAR(20) + ) + """) + db_connection.commit() + + # Insert test data + cursor.execute(""" + INSERT INTO #pytest_lowercase_test (ID, UserName, EMAIL_ADDRESS, PhoneNumber) + VALUES (1, 'JohnDoe', 'john@example.com', '555-1234') + """) + db_connection.commit() + + # First test with lowercase=False (default) + mssql_python.lowercase = False + cursor1 = db_connection.cursor() + cursor1.execute("SELECT * FROM #pytest_lowercase_test") + + # Description column names should preserve original case + column_names1 = [desc[0] for desc in cursor1.description] + assert "ID" in column_names1, "Column 'ID' should be present with original case" + assert "UserName" in column_names1, "Column 'UserName' should be present with original case" + + # Make sure to consume all results and close the cursor + cursor1.fetchall() + cursor1.close() + + # Now test with lowercase=True + mssql_python.lowercase = True + cursor2 = db_connection.cursor() + cursor2.execute("SELECT * FROM #pytest_lowercase_test") + + # Description column names should be lowercase + column_names2 = [desc[0] for desc in cursor2.description] + assert "id" in column_names2, "Column names should be lowercase when lowercase=True" + assert "username" in column_names2, "Column names should be lowercase when lowercase=True" + + # Make sure to consume all results and close the cursor + cursor2.fetchall() + cursor2.close() + + # Create a fresh cursor for cleanup + drop_cursor = db_connection.cursor() + + finally: + # Restore original setting + mssql_python.lowercase = original_lowercase + # Clean up the table + if drop_cursor: + try: + drop_cursor.execute("DROP TABLE #pytest_lowercase_test") + db_connection.commit() + drop_cursor.close() + except Exception: + pass # Suppress errors during cleanup + +def test_lowercase_setting_after_cursor_creation(cursor, db_connection): + """Test that changing lowercase setting after cursor creation doesn't affect existing cursor""" + original_lowercase = mssql_python.lowercase + try: + # Create table and execute with lowercase=False + mssql_python.lowercase = False + cursor.execute("CREATE TABLE #test_lowercase_after (UserName VARCHAR(50))") + db_connection.commit() + cursor.execute("SELECT * FROM #test_lowercase_after") + + # Change setting after cursor's description is initialized + mssql_python.lowercase = True + + # The existing cursor should still use the original casing + column_names = [desc[0] for desc in cursor.description] + assert "UserName" in column_names, "Column casing should not change after cursor creation" + assert "username" not in column_names, "Lowercase should not apply to existing cursor" + + finally: + mssql_python.lowercase = original_lowercase + try: + cursor.execute("DROP TABLE #test_lowercase_after") + db_connection.commit() + except Exception: + pass # Suppress cleanup errors + +@pytest.mark.skip(reason="Future work: relevant if per-cursor lowercase settings are implemented.") +def test_concurrent_cursors_different_lowercase_settings(): + """Test behavior when multiple cursors exist with different lowercase settings""" + # This test is a placeholder for when per-cursor settings might be supported. + # Currently, the global setting affects all new cursors uniformly. + pass + def test_cursor_context_manager_basic(db_connection): """Test basic cursor context manager functionality""" # Test that cursor context manager works and closes cursor @@ -5264,7 +5368,7 @@ def test_tables_all(cursor, db_connection): test_tables_setup(cursor, db_connection) # Get all tables (no filters) - tables_list = cursor.tables() + tables_list = cursor.tables().fetchall() # Verify we got results assert tables_list is not None, "tables() should return results" @@ -5304,7 +5408,7 @@ def test_tables_specific_table(cursor, db_connection): tables_list = cursor.tables( table='regular_table', schema='pytest_tables_schema' - ) + ).fetchall() # Verify we got the right result assert len(tables_list) == 1, "Should find exactly 1 table" @@ -5326,7 +5430,7 @@ def test_tables_with_table_pattern(cursor, db_connection): tables_list = cursor.tables( table='%table', schema='pytest_tables_schema' - ) + ).fetchall() # Should find both test tables assert len(tables_list) == 2, "Should find 2 tables matching '%table'" @@ -5350,7 +5454,7 @@ def test_tables_with_schema_pattern(cursor, db_connection): # Get tables with schema pattern tables_list = cursor.tables( schema='pytest_%' - ) + ).fetchall() # Should find our test tables/view test_tables = [] @@ -5377,7 +5481,7 @@ def test_tables_with_type_filter(cursor, db_connection): tables_list = cursor.tables( schema='pytest_tables_schema', tableType='TABLE' - ) + ).fetchall() # Verify only regular tables table_types = set() @@ -5398,7 +5502,7 @@ def test_tables_with_type_filter(cursor, db_connection): views_list = cursor.tables( schema='pytest_tables_schema', tableType='VIEW' - ) + ).fetchall() # Verify only views view_names = set() @@ -5421,8 +5525,8 @@ def test_tables_with_multiple_types(cursor, db_connection): tables_list = cursor.tables( schema='pytest_tables_schema', tableType=['TABLE', 'VIEW'] - ) - + ).fetchall() + # Verify both tables and views object_names = set() for obj in tables_list: @@ -5449,8 +5553,8 @@ def test_tables_catalog_filter(cursor, db_connection): tables_list = cursor.tables( catalog=current_db, schema='pytest_tables_schema' - ) - + ).fetchall() + # Verify catalog filter worked assert len(tables_list) > 0, "Should find tables with correct catalog" @@ -5464,7 +5568,7 @@ def test_tables_catalog_filter(cursor, db_connection): fake_tables = cursor.tables( catalog='nonexistent_db_xyz123', schema='pytest_tables_schema' - ) + ).fetchall() assert len(fake_tables) == 0, "Should return empty list for non-existent catalog" finally: @@ -5474,7 +5578,7 @@ def test_tables_catalog_filter(cursor, db_connection): def test_tables_nonexistent(cursor): """Test tables with non-existent objects""" # Test with non-existent table - tables_list = cursor.tables(table='nonexistent_table_xyz123') + tables_list = cursor.tables(table='nonexistent_table_xyz123').fetchall() # Should return empty list, not error assert isinstance(tables_list, list), "Should return a list for non-existent table" @@ -5484,7 +5588,7 @@ def test_tables_nonexistent(cursor): tables_list = cursor.tables( table='regular_table', schema='nonexistent_schema_xyz123' - ) + ).fetchall() assert len(tables_list) == 0, "Should return empty list for non-existent schema" def test_tables_combined_filters(cursor, db_connection): @@ -5494,8 +5598,8 @@ def test_tables_combined_filters(cursor, db_connection): tables_list = cursor.tables( schema='pytest_tables_schema', table='regular%' - ) - + ).fetchall() + # Should find only regular_table assert len(tables_list) == 1, "Should find 1 table with combined filters" assert tables_list[0].table_name.lower() == 'regular_table', "Should find regular_table" @@ -5505,8 +5609,8 @@ def test_tables_combined_filters(cursor, db_connection): schema='pytest_tables_schema', table='%table', tableType='TABLE' - ) - + ).fetchall() + # Should find both tables but not view table_names = set() for table in tables_list: @@ -5526,8 +5630,8 @@ def test_tables_result_processing(cursor, db_connection): """Test processing of tables result set for different client needs""" try: # Get all test objects - tables_list = cursor.tables(schema='pytest_tables_schema') - + tables_list = cursor.tables(schema='pytest_tables_schema').fetchall() + # Test 1: Extract just table names table_names = [table.table_name for table in tables_list] assert len(table_names) == 3, "Should extract 3 table names" @@ -5564,7 +5668,7 @@ def test_tables_method_chaining(cursor, db_connection): chained_result = cursor.tables( schema='pytest_tables_schema', table='regular_table' - ) + ).fetchall() # Verify chained result assert len(chained_result) == 1, "Chained result should find 1 table"