Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions mssql_python/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ class ConstantsDDBC(Enum):
SQL_ROWVER = 2
SQL_NO_NULLS = 0
SQL_NULLABLE_UNKNOWN = 2
SQL_INDEX_UNIQUE = 0
SQL_INDEX_ALL = 1
SQL_QUICK = 0
SQL_ENSURE = 1

class AuthType(Enum):
"""Constants for authentication types"""
Expand Down
288 changes: 288 additions & 0 deletions mssql_python/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,16 @@ def execute(
use_prepare: Whether to use SQLPrepareW (default) or SQLExecDirectW.
reset_cursor: Whether to reset the cursor before execution.
"""

# Restore original fetch methods if they exist
if hasattr(self, '_original_fetchone'):
self.fetchone = self._original_fetchone
self.fetchmany = self._original_fetchmany
self.fetchall = self._original_fetchall
del self._original_fetchone
del self._original_fetchmany
del self._original_fetchall

self._check_closed() # Check if the cursor is closed
if reset_cursor:
self._reset_cursor()
Expand Down Expand Up @@ -1255,6 +1265,284 @@ def rowVerColumns(self, table, catalog=None, schema=None, nullable=True):

return result_rows

def statistics(self, table: str, catalog: str = None, schema: str = None, unique: bool = False, quick: bool = True) -> 'Cursor':
"""
Creates a result set of statistics about a single table and the indexes associated
with the table by executing SQLStatistics.

Args:
table (str): The name of the table.
catalog (str, optional): The catalog name. Defaults to None.
schema (str, optional): The schema name. Defaults to None.
unique (bool, optional): If True, only unique indexes are returned.
If False, all indexes are returned. Defaults to False.
quick (bool, optional): If True, CARDINALITY and PAGES are returned only
if readily available. Defaults to True.

Returns:
cursor: The cursor itself, containing the result set. Use fetchone(), fetchmany(),
or fetchall() to retrieve the results.

Example:
# Get statistics for the 'Customers' table
stats_cursor = cursor.statistics(table='Customers')

# Fetch rows as needed
first_stat = stats_cursor.fetchone()
next_10_stats = stats_cursor.fetchmany(10)
all_remaining = stats_cursor.fetchall()
"""
self._check_closed()

# Always reset the cursor first to ensure clean state
self._reset_cursor()

# Table name is required
if not table:
raise ProgrammingError("Table name is required", "HY000")

# Set unique flag (SQL_INDEX_UNIQUE = 0, SQL_INDEX_ALL = 1)
unique_option = ddbc_sql_const.SQL_INDEX_UNIQUE.value if unique else ddbc_sql_const.SQL_INDEX_ALL.value

# Set quick flag (SQL_QUICK = 0, SQL_ENSURE = 1)
reserved_option = ddbc_sql_const.SQL_QUICK.value if quick else ddbc_sql_const.SQL_ENSURE.value

# Call the SQLStatistics function
retcode = ddbc_bindings.DDBCSQLStatistics(
self.hstmt,
catalog,
schema,
table,
unique_option,
reserved_option
)
check_error(ddbc_sql_const.SQL_HANDLE_STMT.value, self.hstmt, retcode)

# Initialize description from column metadata
column_metadata = []
try:
ddbc_bindings.DDBCSQLDescribeCol(self.hstmt, column_metadata)
self._initialize_description(column_metadata)
except InterfaceError as e:
log('error', f"Driver interface error during metadata retrieval: {e}")

except Exception as e:
# Log the exception with appropriate context
log('error', f"Failed to retrieve column metadata: {e}. Using standard ODBC column definitions instead.")

if not self.description:
# If describe fails, create a manual description for the standard columns
column_types = [str, str, str, bool, str, str, int, int, str, str, int, int, str]
self.description = [
("table_cat", column_types[0], None, 128, 128, 0, True),
("table_schem", column_types[1], None, 128, 128, 0, True),
("table_name", column_types[2], None, 128, 128, 0, False),
("non_unique", column_types[3], None, 1, 1, 0, False),
("index_qualifier", column_types[4], None, 128, 128, 0, True),
("index_name", column_types[5], None, 128, 128, 0, True),
("type", column_types[6], None, 10, 10, 0, False),
("ordinal_position", column_types[7], None, 10, 10, 0, False),
("column_name", column_types[8], None, 128, 128, 0, True),
("asc_or_desc", column_types[9], None, 1, 1, 0, True),
("cardinality", column_types[10], None, 20, 20, 0, True),
("pages", column_types[11], None, 20, 20, 0, True),
("filter_condition", column_types[12], None, 128, 128, 0, True)
]

# Create a column map with both ODBC standard names and lowercase aliases
self._column_map = {}
for i, (name, *_) in enumerate(self.description):
# Add standard name
self._column_map[name] = i
# Add lowercase alias
self._column_map[name.lower()] = i

# Remember original fetch methods (store only once)
if not hasattr(self, '_original_fetchone'):
self._original_fetchone = self.fetchone
self._original_fetchmany = self.fetchmany
self._original_fetchall = self.fetchall

# Create wrapper fetch methods that add column mappings
def fetchone_with_mapping():
row = self._original_fetchone()
if row is not None:
row._column_map = self._column_map
return row

def fetchmany_with_mapping(size=None):
rows = self._original_fetchmany(size)
for row in rows:
row._column_map = self._column_map
return rows

def fetchall_with_mapping():
rows = self._original_fetchall()
for row in rows:
row._column_map = self._column_map
return rows

# Replace fetch methods
self.fetchone = fetchone_with_mapping
self.fetchmany = fetchmany_with_mapping
self.fetchall = fetchall_with_mapping

return result_rows

def columns(self, table=None, catalog=None, schema=None, column=None):
"""
Creates a result set of column information in the specified tables
using the SQLColumns function.

Args:
table (str, optional): The table name pattern. Default is None (all tables).
catalog (str, optional): The catalog name. Default is None (current catalog).
schema (str, optional): The schema name pattern. Default is None (all schemas).
column (str, optional): The column name pattern. Default is None (all columns).

Returns:
cursor: The cursor itself, containing the result set. Use fetchone(), fetchmany(),
or fetchall() to retrieve the results.

Each row contains the following columns:
- table_cat (str): Catalog name
- table_schem (str): Schema name
- table_name (str): Table name
- column_name (str): Column name
- data_type (int): The ODBC SQL data type constant (e.g. SQL_CHAR)
- type_name (str): Data source dependent type name
- column_size (int): Column size
- buffer_length (int): Length of the column in bytes
- decimal_digits (int): Number of fractional digits
- num_prec_radix (int): Radix (typically 10 or 2)
- nullable (int): One of SQL_NO_NULLS, SQL_NULLABLE, SQL_NULLABLE_UNKNOWN
- remarks (str): Comments about the column
- column_def (str): Default value for the column
- sql_data_type (int): The SQL data type from java.sql.Types
- sql_datetime_sub (int): Subcode for datetime types
- char_octet_length (int): Maximum length in bytes for char types
- ordinal_position (int): Column position in the table (starting at 1)
- is_nullable (str): "YES", "NO", or "" (unknown)

Warning:
Calling this method without any filters (all parameters as None) will enumerate
EVERY column in EVERY table in the database. This can be extremely expensive in
large databases, potentially causing high memory usage, slow execution times,
and in extreme cases, timeout errors. Always use filters (catalog, schema, table,
or column) whenever possible to limit the result set.

Example:
# Get all columns in table 'Customers'
columns = cursor.columns(table='Customers')

# Get all columns in table 'Customers' in schema 'dbo'
columns = cursor.columns(table='Customers', schema='dbo')

# Get column named 'CustomerID' in any table
columns = cursor.columns(column='CustomerID')
"""
self._check_closed()

# Always reset the cursor first to ensure clean state
self._reset_cursor()

# Call the SQLColumns function
retcode = ddbc_bindings.DDBCSQLColumns(
self.hstmt,
catalog,
schema,
table,
column
)
check_error(ddbc_sql_const.SQL_HANDLE_STMT.value, self.hstmt, retcode)

# Initialize description from column metadata
column_metadata = []
try:
ddbc_bindings.DDBCSQLDescribeCol(self.hstmt, column_metadata)
self._initialize_description(column_metadata)
except InterfaceError as e:
log('error', f"Driver interface error during metadata retrieval: {e}")

except Exception as e:
# Log the exception with appropriate context
log('error', f"Failed to retrieve column metadata: {e}. Using standard ODBC column definitions instead.")

if not self.description:
# If describe fails, create a manual description for the standard columns
column_types = [str, str, str, str, int, str, int, int, int, int, int, str, str, int, int, int, int, str]
self.description = [
("table_cat", column_types[0], None, 128, 128, 0, True),
("table_schem", column_types[1], None, 128, 128, 0, True),
("table_name", column_types[2], None, 128, 128, 0, False),
("column_name", column_types[3], None, 128, 128, 0, False),
("data_type", column_types[4], None, 10, 10, 0, False),
("type_name", column_types[5], None, 128, 128, 0, False),
("column_size", column_types[6], None, 10, 10, 0, True),
("buffer_length", column_types[7], None, 10, 10, 0, True),
("decimal_digits", column_types[8], None, 10, 10, 0, True),
("num_prec_radix", column_types[9], None, 10, 10, 0, True),
("nullable", column_types[10], None, 10, 10, 0, False),
("remarks", column_types[11], None, 254, 254, 0, True),
("column_def", column_types[12], None, 254, 254, 0, True),
("sql_data_type", column_types[13], None, 10, 10, 0, False),
("sql_datetime_sub", column_types[14], None, 10, 10, 0, True),
("char_octet_length", column_types[15], None, 10, 10, 0, True),
("ordinal_position", column_types[16], None, 10, 10, 0, False),
("is_nullable", column_types[17], None, 254, 254, 0, True)
]

# Store the column mappings for this specific columns() call
column_names = [desc[0] for desc in self.description]

# Create a specialized column map for this result set
columns_map = {}
for i, name in enumerate(column_names):
columns_map[name] = i
columns_map[name.lower()] = i

# Define wrapped fetch methods that preserve existing column mapping
# but add our specialized mapping just for column results
def fetchone_with_columns_mapping():
row = self._original_fetchone()
if row is not None:
# Create a merged map with columns result taking precedence
merged_map = getattr(row, '_column_map', {}).copy()
merged_map.update(columns_map)
row._column_map = merged_map
return row

def fetchmany_with_columns_mapping(size=None):
rows = self._original_fetchmany(size)
for row in rows:
# Create a merged map with columns result taking precedence
merged_map = getattr(row, '_column_map', {}).copy()
merged_map.update(columns_map)
row._column_map = merged_map
return rows

def fetchall_with_columns_mapping():
rows = self._original_fetchall()
for row in rows:
# Create a merged map with columns result taking precedence
merged_map = getattr(row, '_column_map', {}).copy()
merged_map.update(columns_map)
row._column_map = merged_map
return rows

# Save original fetch methods
if not hasattr(self, '_original_fetchone'):
self._original_fetchone = self.fetchone
self._original_fetchmany = self.fetchmany
self._original_fetchall = self.fetchall

# Override fetch methods with our wrapped versions
self.fetchone = fetchone_with_columns_mapping
self.fetchmany = fetchmany_with_columns_mapping
self.fetchall = fetchall_with_columns_mapping

return self

@staticmethod
def _select_best_sample_value(column):
"""
Expand Down
Loading