diff --git a/benchmarks/perf-benchmarking.py b/benchmarks/perf-benchmarking.py index cbcca668..d51fbf53 100644 --- a/benchmarks/perf-benchmarking.py +++ b/benchmarks/perf-benchmarking.py @@ -35,9 +35,11 @@ # Ensure pyodbc connection string has ODBC driver specified if CONN_STR and 'Driver=' not in CONN_STR: - CONN_STR = f"Driver={{ODBC Driver 18 for SQL Server}};{CONN_STR}" + CONN_STR_PYODBC = f"Driver={{ODBC Driver 18 for SQL Server}};{CONN_STR}" +else: + CONN_STR_PYODBC = CONN_STR -NUM_ITERATIONS = 5 # Number of times to run each test for averaging +NUM_ITERATIONS = 10 # Number of times to run each test for averaging # SQL Queries COMPLEX_JOIN_AGGREGATION = """ @@ -187,7 +189,7 @@ def run_benchmark_pyodbc(query: str, name: str, iterations: int) -> BenchmarkRes for i in range(iterations): try: start_time = time.time() - conn = pyodbc.connect(CONN_STR) + conn = pyodbc.connect(CONN_STR_PYODBC) cursor = conn.cursor() cursor.execute(query) rows = cursor.fetchall() diff --git a/mssql_python/pybind/build.sh b/mssql_python/pybind/build.sh index 7a20b61c..81177728 100755 --- a/mssql_python/pybind/build.sh +++ b/mssql_python/pybind/build.sh @@ -118,6 +118,21 @@ else else echo "[WARNING] macOS dylib configuration encountered issues" fi + + # Codesign the Python extension module (.so file) to prevent SIP crashes + echo "[ACTION] Codesigning Python extension module..." + SO_FILE="$PARENT_DIR/"*.so + for so in $SO_FILE; do + if [ -f "$so" ]; then + echo " Signing: $so" + codesign -s - -f "$so" 2>/dev/null + if [ $? -eq 0 ]; then + echo "[SUCCESS] Python extension codesigned: $so" + else + echo "[WARNING] Failed to codesign: $so" + fi + fi + done fi else echo "[ERROR] Failed to copy .so file" diff --git a/mssql_python/pybind/ddbc_bindings.cpp b/mssql_python/pybind/ddbc_bindings.cpp index 75311b8f..ec066193 100644 --- a/mssql_python/pybind/ddbc_bindings.cpp +++ b/mssql_python/pybind/ddbc_bindings.cpp @@ -135,52 +135,6 @@ struct NumericData { } }; -// Struct to hold the DateTimeOffset structure -struct DateTimeOffset -{ - SQLSMALLINT year; - SQLUSMALLINT month; - SQLUSMALLINT day; - SQLUSMALLINT hour; - SQLUSMALLINT minute; - SQLUSMALLINT second; - SQLUINTEGER fraction; // Nanoseconds - SQLSMALLINT timezone_hour; // Offset hours from UTC - SQLSMALLINT timezone_minute; // Offset minutes from UTC -}; - -// Struct to hold data buffers and indicators for each column -struct ColumnBuffers { - std::vector> charBuffers; - std::vector> wcharBuffers; - std::vector> intBuffers; - std::vector> smallIntBuffers; - std::vector> realBuffers; - std::vector> doubleBuffers; - std::vector> timestampBuffers; - std::vector> bigIntBuffers; - std::vector> dateBuffers; - std::vector> timeBuffers; - std::vector> guidBuffers; - std::vector> indicators; - std::vector> datetimeoffsetBuffers; - - ColumnBuffers(SQLSMALLINT numCols, int fetchSize) - : charBuffers(numCols), - wcharBuffers(numCols), - intBuffers(numCols), - smallIntBuffers(numCols), - realBuffers(numCols), - doubleBuffers(numCols), - timestampBuffers(numCols), - bigIntBuffers(numCols), - dateBuffers(numCols), - timeBuffers(numCols), - guidBuffers(numCols), - datetimeoffsetBuffers(numCols), - indicators(numCols, std::vector(fetchSize)) {} -}; - //------------------------------------------------------------------------------------------------- // Function pointer initialization //------------------------------------------------------------------------------------------------- @@ -2405,11 +2359,12 @@ SQLRETURN SQLFetch_wrap(SqlHandlePtr StatementHandle) { return SQLFetch_ptr(StatementHandle->get()); } -static py::object FetchLobColumnData(SQLHSTMT hStmt, - SQLUSMALLINT colIndex, - SQLSMALLINT cType, - bool isWideChar, - bool isBinary) +// Non-static so it can be called from inline functions in header +py::object FetchLobColumnData(SQLHSTMT hStmt, + SQLUSMALLINT colIndex, + SQLSMALLINT cType, + bool isWideChar, + bool isBinary) { std::vector buffer; SQLRETURN ret = SQL_SUCCESS_WITH_INFO; @@ -3220,40 +3175,119 @@ SQLRETURN FetchBatchData(SQLHSTMT hStmt, ColumnBuffers& buffers, py::list& colum std::string decimalSeparator = GetDecimalSeparator(); // Cache decimal separator - size_t initialSize = rows.size(); - for (SQLULEN i = 0; i < numRowsFetched; i++) { - rows.append(py::none()); + // Performance: Build function pointer dispatch table (once per batch) + // This eliminates the switch statement from the hot loop - 10,000 rows × 10 cols + // reduces from 100,000 switch evaluations to just 10 switch evaluations + std::vector columnProcessors(numCols); + std::vector columnInfosExt(numCols); + + for (SQLUSMALLINT col = 0; col < numCols; col++) { + // Populate extended column info for processors that need it + columnInfosExt[col].dataType = columnInfos[col].dataType; + columnInfosExt[col].columnSize = columnInfos[col].columnSize; + columnInfosExt[col].processedColumnSize = columnInfos[col].processedColumnSize; + columnInfosExt[col].fetchBufferSize = columnInfos[col].fetchBufferSize; + columnInfosExt[col].isLob = columnInfos[col].isLob; + + // Map data type to processor function (switch executed once per column, not per cell) + SQLSMALLINT dataType = columnInfos[col].dataType; + switch (dataType) { + case SQL_INTEGER: + columnProcessors[col] = ColumnProcessors::ProcessInteger; + break; + case SQL_SMALLINT: + columnProcessors[col] = ColumnProcessors::ProcessSmallInt; + break; + case SQL_BIGINT: + columnProcessors[col] = ColumnProcessors::ProcessBigInt; + break; + case SQL_TINYINT: + columnProcessors[col] = ColumnProcessors::ProcessTinyInt; + break; + case SQL_BIT: + columnProcessors[col] = ColumnProcessors::ProcessBit; + break; + case SQL_REAL: + columnProcessors[col] = ColumnProcessors::ProcessReal; + break; + case SQL_DOUBLE: + case SQL_FLOAT: + columnProcessors[col] = ColumnProcessors::ProcessDouble; + break; + case SQL_CHAR: + case SQL_VARCHAR: + case SQL_LONGVARCHAR: + columnProcessors[col] = ColumnProcessors::ProcessChar; + break; + case SQL_WCHAR: + case SQL_WVARCHAR: + case SQL_WLONGVARCHAR: + columnProcessors[col] = ColumnProcessors::ProcessWChar; + break; + case SQL_BINARY: + case SQL_VARBINARY: + case SQL_LONGVARBINARY: + columnProcessors[col] = ColumnProcessors::ProcessBinary; + break; + default: + // For complex types (Decimal, DateTime, Guid, etc.), set to nullptr + // and handle via fallback switch in the hot loop + columnProcessors[col] = nullptr; + break; + } } + // Performance: Single-phase row creation pattern + // Create each row, fill it completely, then append to results list + // This prevents data corruption (no partially-filled rows) and simplifies error handling + PyObject* rowsList = rows.ptr(); + for (SQLULEN i = 0; i < numRowsFetched; i++) { - // Create row container pre-allocated with known column count - py::list row(numCols); + // Create row and immediately fill it (atomic operation per row) + // This eliminates the two-phase pattern that could leave garbage rows on exception + PyObject* row = PyList_New(numCols); + if (!row) { + throw std::runtime_error("Failed to allocate row list - memory allocation failure"); + } + for (SQLUSMALLINT col = 1; col <= numCols; col++) { - const ColumnInfo& colInfo = columnInfos[col - 1]; - SQLSMALLINT dataType = colInfo.dataType; + // Performance: Centralized NULL checking before calling processor functions + // This eliminates redundant NULL checks inside each processor and improves CPU branch prediction SQLLEN dataLen = buffers.indicators[col - 1][i]; + + // Handle NULL and special indicator values first (applies to ALL types) if (dataLen == SQL_NULL_DATA) { - row[col - 1] = py::none(); + Py_INCREF(Py_None); + PyList_SET_ITEM(row, col - 1, Py_None); continue; } if (dataLen == SQL_NO_TOTAL) { - LOG("Cannot determine the length of the data. Returning NULL value instead." - "Column ID - {}", col); - row[col - 1] = py::none(); + LOG("Cannot determine the length of the data. Returning NULL value instead. Column ID - {}", col); + Py_INCREF(Py_None); + PyList_SET_ITEM(row, col - 1, Py_None); continue; - } else if (dataLen == 0) { - // Handle zero-length (non-NULL) data - if (dataType == SQL_CHAR || dataType == SQL_VARCHAR || dataType == SQL_LONGVARCHAR) { - row[col - 1] = std::string(""); - } else if (dataType == SQL_WCHAR || dataType == SQL_WVARCHAR || dataType == SQL_WLONGVARCHAR) { - row[col - 1] = std::wstring(L""); - } else if (dataType == SQL_BINARY || dataType == SQL_VARBINARY || dataType == SQL_LONGVARBINARY) { - row[col - 1] = py::bytes(""); - } else { - // For other datatypes, 0 length is unexpected. Log & set None - LOG("Column data length is 0 for non-string/binary datatype. Setting None to the result row. Column ID - {}", col); - row[col - 1] = py::none(); - } + } + + // Performance: Use function pointer dispatch for simple types (fast path) + // This eliminates the switch statement from hot loop - reduces 100,000 switch + // evaluations (1000 rows × 10 cols × 10 types) to just 10 (setup only) + // Note: Processor functions no longer need to check for NULL since we do it above + if (columnProcessors[col - 1] != nullptr) { + columnProcessors[col - 1](row, buffers, &columnInfosExt[col - 1], col, i, hStmt); + continue; + } + + // Fallback for complex types (Decimal, DateTime, Guid, DateTimeOffset, etc.) + // that require pybind11 or special handling + const ColumnInfoExt& colInfo = columnInfosExt[col - 1]; + SQLSMALLINT dataType = colInfo.dataType; + + // Additional validation for complex types + if (dataLen == 0) { + // Handle zero-length (non-NULL) data for complex types + LOG("Column data length is 0 for complex datatype. Setting None to the result row. Column ID - {}", col); + Py_INCREF(Py_None); + PyList_SET_ITEM(row, col - 1, Py_None); continue; } else if (dataLen < 0) { // Negative value is unexpected, log column index, SQL type & raise exception @@ -3262,70 +3296,8 @@ SQLRETURN FetchBatchData(SQLHSTMT hStmt, ColumnBuffers& buffers, py::list& colum } assert(dataLen > 0 && "Data length must be > 0"); + // Handle complex types that couldn't use function pointers switch (dataType) { - case SQL_CHAR: - case SQL_VARCHAR: - case SQL_LONGVARCHAR: { - SQLULEN columnSize = colInfo.columnSize; - HandleZeroColumnSizeAtFetch(columnSize); - uint64_t fetchBufferSize = columnSize + 1 /*null-terminator*/; - uint64_t numCharsInData = dataLen / sizeof(SQLCHAR); - bool isLob = colInfo.isLob; - // fetchBufferSize includes null-terminator, numCharsInData doesn't. Hence '<' - if (!isLob && numCharsInData < fetchBufferSize) { - row[col - 1] = py::str( - reinterpret_cast(&buffers.charBuffers[col - 1][i * fetchBufferSize]), - numCharsInData); - } else { - row[col - 1] = FetchLobColumnData(hStmt, col, SQL_C_CHAR, false, false); - } - break; - } - case SQL_WCHAR: - case SQL_WVARCHAR: - case SQL_WLONGVARCHAR: { - // TODO: variable length data needs special handling, this logic wont suffice - SQLULEN columnSize = colInfo.columnSize; - HandleZeroColumnSizeAtFetch(columnSize); - uint64_t fetchBufferSize = columnSize + 1 /*null-terminator*/; - uint64_t numCharsInData = dataLen / sizeof(SQLWCHAR); - bool isLob = colInfo.isLob; - // fetchBufferSize includes null-terminator, numCharsInData doesn't. Hence '<' - if (!isLob && numCharsInData < fetchBufferSize) { -#if defined(__APPLE__) || defined(__linux__) - SQLWCHAR* wcharData = &buffers.wcharBuffers[col - 1][i * fetchBufferSize]; - std::wstring wstr = SQLWCHARToWString(wcharData, numCharsInData); - row[col - 1] = wstr; -#else - row[col - 1] = std::wstring( - reinterpret_cast(&buffers.wcharBuffers[col - 1][i * fetchBufferSize]), - numCharsInData); -#endif - } else { - row[col - 1] = FetchLobColumnData(hStmt, col, SQL_C_WCHAR, true, false); - } - break; - } - case SQL_INTEGER: { - row[col - 1] = buffers.intBuffers[col - 1][i]; - break; - } - case SQL_SMALLINT: { - row[col - 1] = buffers.smallIntBuffers[col - 1][i]; - break; - } - case SQL_TINYINT: { - row[col - 1] = buffers.charBuffers[col - 1][i]; - break; - } - case SQL_BIT: { - row[col - 1] = static_cast(buffers.charBuffers[col - 1][i]); - break; - } - case SQL_REAL: { - row[col - 1] = buffers.realBuffers[col - 1][i]; - break; - } case SQL_DECIMAL: case SQL_NUMERIC: { try { @@ -3335,44 +3307,40 @@ SQLRETURN FetchBatchData(SQLHSTMT hStmt, ColumnBuffers& buffers, py::list& colum // Always use standard decimal point for Python Decimal parsing // The decimal separator only affects display formatting, not parsing - row[col - 1] = PythonObjectCache::get_decimal_class()(py::str(rawData, decimalDataLen)); + PyObject* decimalObj = PythonObjectCache::get_decimal_class()(py::str(rawData, decimalDataLen)).release().ptr(); + PyList_SET_ITEM(row, col - 1, decimalObj); } catch (const py::error_already_set& e) { // Handle the exception, e.g., log the error and set py::none() LOG("Error converting to decimal: {}", e.what()); - row[col - 1] = py::none(); + Py_INCREF(Py_None); + PyList_SET_ITEM(row, col - 1, Py_None); } break; } - case SQL_DOUBLE: - case SQL_FLOAT: { - row[col - 1] = buffers.doubleBuffers[col - 1][i]; - break; - } case SQL_TIMESTAMP: case SQL_TYPE_TIMESTAMP: case SQL_DATETIME: { const SQL_TIMESTAMP_STRUCT& ts = buffers.timestampBuffers[col - 1][i]; - row[col - 1] = PythonObjectCache::get_datetime_class()(ts.year, ts.month, ts.day, + PyObject* datetimeObj = PythonObjectCache::get_datetime_class()(ts.year, ts.month, ts.day, ts.hour, ts.minute, ts.second, - ts.fraction / 1000); - break; - } - case SQL_BIGINT: { - row[col - 1] = buffers.bigIntBuffers[col - 1][i]; + ts.fraction / 1000).release().ptr(); + PyList_SET_ITEM(row, col - 1, datetimeObj); break; } case SQL_TYPE_DATE: { - row[col - 1] = PythonObjectCache::get_date_class()(buffers.dateBuffers[col - 1][i].year, + PyObject* dateObj = PythonObjectCache::get_date_class()(buffers.dateBuffers[col - 1][i].year, buffers.dateBuffers[col - 1][i].month, - buffers.dateBuffers[col - 1][i].day); + buffers.dateBuffers[col - 1][i].day).release().ptr(); + PyList_SET_ITEM(row, col - 1, dateObj); break; } case SQL_TIME: case SQL_TYPE_TIME: case SQL_SS_TIME2: { - row[col - 1] = PythonObjectCache::get_time_class()(buffers.timeBuffers[col - 1][i].hour, + PyObject* timeObj = PythonObjectCache::get_time_class()(buffers.timeBuffers[col - 1][i].hour, buffers.timeBuffers[col - 1][i].minute, - buffers.timeBuffers[col - 1][i].second); + buffers.timeBuffers[col - 1][i].second).release().ptr(); + PyList_SET_ITEM(row, col - 1, timeObj); break; } case SQL_SS_TIMESTAMPOFFSET: { @@ -3395,16 +3363,18 @@ SQLRETURN FetchBatchData(SQLHSTMT hStmt, ColumnBuffers& buffers, py::list& colum dtoValue.fraction / 1000, // ns → µs tzinfo ); - row[col - 1] = py_dt; + PyList_SET_ITEM(row, col - 1, py_dt.release().ptr()); } else { - row[col - 1] = py::none(); + Py_INCREF(Py_None); + PyList_SET_ITEM(row, col - 1, Py_None); } break; } case SQL_GUID: { SQLLEN indicator = buffers.indicators[col - 1][i]; if (indicator == SQL_NULL_DATA) { - row[col - 1] = py::none(); + Py_INCREF(Py_None); + PyList_SET_ITEM(row, col - 1, Py_None); break; } SQLGUID* guidValue = &buffers.guidBuffers[col - 1][i]; @@ -3423,22 +3393,7 @@ SQLRETURN FetchBatchData(SQLHSTMT hStmt, ColumnBuffers& buffers, py::list& colum py::dict kwargs; kwargs["bytes"] = py_guid_bytes; py::object uuid_obj = PythonObjectCache::get_uuid_class()(**kwargs); - row[col - 1] = uuid_obj; - break; - } - case SQL_BINARY: - case SQL_VARBINARY: - case SQL_LONGVARBINARY: { - SQLULEN columnSize = colInfo.columnSize; - HandleZeroColumnSizeAtFetch(columnSize); - bool isLob = colInfo.isLob; - if (!isLob && static_cast(dataLen) <= columnSize) { - row[col - 1] = py::bytes(reinterpret_cast( - &buffers.charBuffers[col - 1][i * columnSize]), - dataLen); - } else { - row[col - 1] = FetchLobColumnData(hStmt, col, SQL_C_BINARY, false, true); - } + PyList_SET_ITEM(row, col - 1, uuid_obj.release().ptr()); break; } default: { @@ -3453,7 +3408,14 @@ SQLRETURN FetchBatchData(SQLHSTMT hStmt, ColumnBuffers& buffers, py::list& colum } } } - rows[initialSize + i] = row; + + // Row is now fully populated - add it to results list atomically + // This ensures no partially-filled rows exist in the list on exception + if (PyList_Append(rowsList, row) < 0) { + Py_DECREF(row); // Clean up this row + throw std::runtime_error("Failed to append row to results list - memory allocation failure"); + } + Py_DECREF(row); // PyList_Append increments refcount, release our reference } return ret; } diff --git a/mssql_python/pybind/ddbc_bindings.h b/mssql_python/pybind/ddbc_bindings.h index eeb5bb37..0feb614a 100644 --- a/mssql_python/pybind/ddbc_bindings.h +++ b/mssql_python/pybind/ddbc_bindings.h @@ -170,7 +170,6 @@ inline std::vector WStringToSQLWCHAR(const std::wstring& str) { #if defined(__APPLE__) || defined(__linux__) #include "unix_utils.h" // Unix-specific fixes -#include "unix_buffers.h" // Unix-specific buffers #endif //------------------------------------------------------------------------------------------------- @@ -563,3 +562,315 @@ inline std::string GetDecimalSeparator() { // Function to set the decimal separator void DDBCSetDecimalSeparator(const std::string& separator); + +//------------------------------------------------------------------------------------------------- +// INTERNAL: Performance Optimization Helpers for Fetch Path +// (Used internally by ddbc_bindings.cpp - not part of public API) +//------------------------------------------------------------------------------------------------- + +// Struct to hold the DateTimeOffset structure +struct DateTimeOffset +{ + SQLSMALLINT year; + SQLUSMALLINT month; + SQLUSMALLINT day; + SQLUSMALLINT hour; + SQLUSMALLINT minute; + SQLUSMALLINT second; + SQLUINTEGER fraction; // Nanoseconds + SQLSMALLINT timezone_hour; // Offset hours from UTC + SQLSMALLINT timezone_minute; // Offset minutes from UTC +}; + +// Struct to hold data buffers and indicators for each column +struct ColumnBuffers { + std::vector> charBuffers; + std::vector> wcharBuffers; + std::vector> intBuffers; + std::vector> smallIntBuffers; + std::vector> realBuffers; + std::vector> doubleBuffers; + std::vector> timestampBuffers; + std::vector> bigIntBuffers; + std::vector> dateBuffers; + std::vector> timeBuffers; + std::vector> guidBuffers; + std::vector> indicators; + std::vector> datetimeoffsetBuffers; + + ColumnBuffers(SQLSMALLINT numCols, int fetchSize) + : charBuffers(numCols), + wcharBuffers(numCols), + intBuffers(numCols), + smallIntBuffers(numCols), + realBuffers(numCols), + doubleBuffers(numCols), + timestampBuffers(numCols), + bigIntBuffers(numCols), + dateBuffers(numCols), + timeBuffers(numCols), + guidBuffers(numCols), + datetimeoffsetBuffers(numCols), + indicators(numCols, std::vector(fetchSize)) {} +}; + +// Performance: Column processor function type for fast type conversion +// Using function pointers eliminates switch statement overhead in the hot loop +typedef void (*ColumnProcessor)(PyObject* row, ColumnBuffers& buffers, const void* colInfo, + SQLUSMALLINT col, SQLULEN rowIdx, SQLHSTMT hStmt); + +// Extended column info struct for processor functions +struct ColumnInfoExt { + SQLSMALLINT dataType; + SQLULEN columnSize; + SQLULEN processedColumnSize; + uint64_t fetchBufferSize; + bool isLob; +}; + +// Forward declare FetchLobColumnData (defined in ddbc_bindings.cpp) - MUST be outside namespace +py::object FetchLobColumnData(SQLHSTMT hStmt, SQLUSMALLINT col, SQLSMALLINT cType, + bool isWideChar, bool isBinary); + +// Specialized column processors for each data type (eliminates switch in hot loop) +namespace ColumnProcessors { + +// Process SQL INTEGER (4-byte int) column into Python int +// SAFETY: PyList_SET_ITEM is safe here because row is freshly allocated with PyList_New() +// and each slot is filled exactly once (NULL -> value) +// Performance: NULL check removed - handled centrally before processor is called +inline void ProcessInteger(PyObject* row, ColumnBuffers& buffers, const void*, SQLUSMALLINT col, + SQLULEN rowIdx, SQLHSTMT) { + // Performance: Direct Python C API call (bypasses pybind11 overhead) + PyObject* pyInt = PyLong_FromLong(buffers.intBuffers[col - 1][rowIdx]); + if (!pyInt) { // Handle memory allocation failure + Py_INCREF(Py_None); + PyList_SET_ITEM(row, col - 1, Py_None); + return; + } + PyList_SET_ITEM(row, col - 1, pyInt); // Transfer ownership to list +} + +// Process SQL SMALLINT (2-byte int) column into Python int +// Performance: NULL check removed - handled centrally before processor is called +inline void ProcessSmallInt(PyObject* row, ColumnBuffers& buffers, const void*, SQLUSMALLINT col, + SQLULEN rowIdx, SQLHSTMT) { + // Performance: Direct Python C API call + PyObject* pyInt = PyLong_FromLong(buffers.smallIntBuffers[col - 1][rowIdx]); + if (!pyInt) { // Handle memory allocation failure + Py_INCREF(Py_None); + PyList_SET_ITEM(row, col - 1, Py_None); + return; + } + PyList_SET_ITEM(row, col - 1, pyInt); +} + +// Process SQL BIGINT (8-byte int) column into Python int +// Performance: NULL check removed - handled centrally before processor is called +inline void ProcessBigInt(PyObject* row, ColumnBuffers& buffers, const void*, SQLUSMALLINT col, + SQLULEN rowIdx, SQLHSTMT) { + // Performance: Direct Python C API call + PyObject* pyInt = PyLong_FromLongLong(buffers.bigIntBuffers[col - 1][rowIdx]); + if (!pyInt) { // Handle memory allocation failure + Py_INCREF(Py_None); + PyList_SET_ITEM(row, col - 1, Py_None); + return; + } + PyList_SET_ITEM(row, col - 1, pyInt); +} + +// Process SQL TINYINT (1-byte unsigned int) column into Python int +// Performance: NULL check removed - handled centrally before processor is called +inline void ProcessTinyInt(PyObject* row, ColumnBuffers& buffers, const void*, SQLUSMALLINT col, + SQLULEN rowIdx, SQLHSTMT) { + // Performance: Direct Python C API call + PyObject* pyInt = PyLong_FromLong(buffers.charBuffers[col - 1][rowIdx]); + if (!pyInt) { // Handle memory allocation failure + Py_INCREF(Py_None); + PyList_SET_ITEM(row, col - 1, Py_None); + return; + } + PyList_SET_ITEM(row, col - 1, pyInt); +} + +// Process SQL BIT column into Python bool +// Performance: NULL check removed - handled centrally before processor is called +inline void ProcessBit(PyObject* row, ColumnBuffers& buffers, const void*, SQLUSMALLINT col, + SQLULEN rowIdx, SQLHSTMT) { + // Performance: Direct Python C API call (converts 0/1 to True/False) + PyObject* pyBool = PyBool_FromLong(buffers.charBuffers[col - 1][rowIdx]); + if (!pyBool) { // Handle memory allocation failure + Py_INCREF(Py_None); + PyList_SET_ITEM(row, col - 1, Py_None); + return; + } + PyList_SET_ITEM(row, col - 1, pyBool); +} + +// Process SQL REAL (4-byte float) column into Python float +// Performance: NULL check removed - handled centrally before processor is called +inline void ProcessReal(PyObject* row, ColumnBuffers& buffers, const void*, SQLUSMALLINT col, + SQLULEN rowIdx, SQLHSTMT) { + // Performance: Direct Python C API call + PyObject* pyFloat = PyFloat_FromDouble(buffers.realBuffers[col - 1][rowIdx]); + if (!pyFloat) { // Handle memory allocation failure + Py_INCREF(Py_None); + PyList_SET_ITEM(row, col - 1, Py_None); + return; + } + PyList_SET_ITEM(row, col - 1, pyFloat); +} + +// Process SQL DOUBLE/FLOAT (8-byte float) column into Python float +// Performance: NULL check removed - handled centrally before processor is called +inline void ProcessDouble(PyObject* row, ColumnBuffers& buffers, const void*, SQLUSMALLINT col, + SQLULEN rowIdx, SQLHSTMT) { + // Performance: Direct Python C API call + PyObject* pyFloat = PyFloat_FromDouble(buffers.doubleBuffers[col - 1][rowIdx]); + if (!pyFloat) { // Handle memory allocation failure + Py_INCREF(Py_None); + PyList_SET_ITEM(row, col - 1, Py_None); + return; + } + PyList_SET_ITEM(row, col - 1, pyFloat); +} + +// Process SQL CHAR/VARCHAR (single-byte string) column into Python str +// Performance: NULL/NO_TOTAL checks removed - handled centrally before processor is called +inline void ProcessChar(PyObject* row, ColumnBuffers& buffers, const void* colInfoPtr, + SQLUSMALLINT col, SQLULEN rowIdx, SQLHSTMT hStmt) { + const ColumnInfoExt* colInfo = static_cast(colInfoPtr); + SQLLEN dataLen = buffers.indicators[col - 1][rowIdx]; + + // Handle empty strings + if (dataLen == 0) { + PyObject* emptyStr = PyUnicode_FromStringAndSize("", 0); + if (!emptyStr) { + Py_INCREF(Py_None); + PyList_SET_ITEM(row, col - 1, Py_None); + } else { + PyList_SET_ITEM(row, col - 1, emptyStr); + } + return; + } + + uint64_t numCharsInData = dataLen / sizeof(SQLCHAR); + // Fast path: Data fits in buffer (not LOB or truncated) + // fetchBufferSize includes null-terminator, numCharsInData doesn't. Hence '<' + if (!colInfo->isLob && numCharsInData < colInfo->fetchBufferSize) { + // Performance: Direct Python C API call - create string from buffer + PyObject* pyStr = PyUnicode_FromStringAndSize( + reinterpret_cast(&buffers.charBuffers[col - 1][rowIdx * colInfo->fetchBufferSize]), + numCharsInData); + if (!pyStr) { + Py_INCREF(Py_None); + PyList_SET_ITEM(row, col - 1, Py_None); + } else { + PyList_SET_ITEM(row, col - 1, pyStr); + } + } else { + // Slow path: LOB data requires separate fetch call + PyList_SET_ITEM(row, col - 1, FetchLobColumnData(hStmt, col, SQL_C_CHAR, false, false).release().ptr()); + } +} + +// Process SQL NCHAR/NVARCHAR (wide/Unicode string) column into Python str +// Performance: NULL/NO_TOTAL checks removed - handled centrally before processor is called +inline void ProcessWChar(PyObject* row, ColumnBuffers& buffers, const void* colInfoPtr, + SQLUSMALLINT col, SQLULEN rowIdx, SQLHSTMT hStmt) { + const ColumnInfoExt* colInfo = static_cast(colInfoPtr); + SQLLEN dataLen = buffers.indicators[col - 1][rowIdx]; + + // Handle empty strings + if (dataLen == 0) { + PyObject* emptyStr = PyUnicode_FromStringAndSize("", 0); + if (!emptyStr) { + Py_INCREF(Py_None); + PyList_SET_ITEM(row, col - 1, Py_None); + } else { + PyList_SET_ITEM(row, col - 1, emptyStr); + } + return; + } + + uint64_t numCharsInData = dataLen / sizeof(SQLWCHAR); + // Fast path: Data fits in buffer (not LOB or truncated) + // fetchBufferSize includes null-terminator, numCharsInData doesn't. Hence '<' + if (!colInfo->isLob && numCharsInData < colInfo->fetchBufferSize) { +#if defined(__APPLE__) || defined(__linux__) + // Performance: Direct UTF-16 decode (SQLWCHAR is 2 bytes on Linux/macOS) + SQLWCHAR* wcharData = &buffers.wcharBuffers[col - 1][rowIdx * colInfo->fetchBufferSize]; + PyObject* pyStr = PyUnicode_DecodeUTF16( + reinterpret_cast(wcharData), + numCharsInData * sizeof(SQLWCHAR), + NULL, // errors (use default strict) + NULL // byteorder (auto-detect) + ); + if (pyStr) { + PyList_SET_ITEM(row, col - 1, pyStr); + } else { + PyErr_Clear(); // Ignore decode error, return empty string + PyObject* emptyStr = PyUnicode_FromStringAndSize("", 0); + if (!emptyStr) { + Py_INCREF(Py_None); + PyList_SET_ITEM(row, col - 1, Py_None); + } else { + PyList_SET_ITEM(row, col - 1, emptyStr); + } + } +#else + // Performance: Direct Python C API call (Windows where SQLWCHAR == wchar_t) + PyObject* pyStr = PyUnicode_FromWideChar( + reinterpret_cast(&buffers.wcharBuffers[col - 1][rowIdx * colInfo->fetchBufferSize]), + numCharsInData); + if (!pyStr) { + Py_INCREF(Py_None); + PyList_SET_ITEM(row, col - 1, Py_None); + } else { + PyList_SET_ITEM(row, col - 1, pyStr); + } +#endif + } else { + // Slow path: LOB data requires separate fetch call + PyList_SET_ITEM(row, col - 1, FetchLobColumnData(hStmt, col, SQL_C_WCHAR, true, false).release().ptr()); + } +} + +// Process SQL BINARY/VARBINARY (binary data) column into Python bytes +// Performance: NULL/NO_TOTAL checks removed - handled centrally before processor is called +inline void ProcessBinary(PyObject* row, ColumnBuffers& buffers, const void* colInfoPtr, + SQLUSMALLINT col, SQLULEN rowIdx, SQLHSTMT hStmt) { + const ColumnInfoExt* colInfo = static_cast(colInfoPtr); + SQLLEN dataLen = buffers.indicators[col - 1][rowIdx]; + + // Handle empty binary data + if (dataLen == 0) { + PyObject* emptyBytes = PyBytes_FromStringAndSize("", 0); + if (!emptyBytes) { + Py_INCREF(Py_None); + PyList_SET_ITEM(row, col - 1, Py_None); + } else { + PyList_SET_ITEM(row, col - 1, emptyBytes); + } + return; + } + + // Fast path: Data fits in buffer (not LOB or truncated) + if (!colInfo->isLob && static_cast(dataLen) <= colInfo->processedColumnSize) { + // Performance: Direct Python C API call - create bytes from buffer + PyObject* pyBytes = PyBytes_FromStringAndSize( + reinterpret_cast(&buffers.charBuffers[col - 1][rowIdx * colInfo->processedColumnSize]), + dataLen); + if (!pyBytes) { + Py_INCREF(Py_None); + PyList_SET_ITEM(row, col - 1, Py_None); + } else { + PyList_SET_ITEM(row, col - 1, pyBytes); + } + } else { + // Slow path: LOB data requires separate fetch call + PyList_SET_ITEM(row, col - 1, FetchLobColumnData(hStmt, col, SQL_C_BINARY, false, true).release().ptr()); + } +} + +} // namespace ColumnProcessors diff --git a/mssql_python/pybind/unix_buffers.h b/mssql_python/pybind/unix_buffers.h deleted file mode 100644 index b130d23d..00000000 --- a/mssql_python/pybind/unix_buffers.h +++ /dev/null @@ -1,171 +0,0 @@ -/** - * Copyright (c) Microsoft Corporation. - * Licensed under the MIT license. - * - * This file provides utilities for handling character encoding and buffer management - * specifically for macOS ODBC operations. It implements functionality similar to - * the UCS_dec function in the Python PoC. - */ - -#pragma once - -#include -#include -#include -#include -#include - -namespace unix_buffers { - -// Constants for Unicode character encoding -constexpr const char* ODBC_DECODING = "utf-16-le"; -constexpr size_t UCS_LENGTH = 2; - -/** - * SQLWCHARBuffer class manages buffers for SQLWCHAR data, - * handling memory allocation and conversion to std::wstring. - */ -class SQLWCHARBuffer { - private: - std::unique_ptr buffer; - size_t buffer_size; - - public: - /** - * Constructor allocates a buffer of the specified size - */ - explicit SQLWCHARBuffer(size_t size) : buffer_size(size) { - buffer = std::make_unique(size); - // Initialize to zero - for (size_t i = 0; i < size; i++) { - buffer[i] = 0; - } - } - - /** - * Returns the data pointer for use with ODBC functions - */ - SQLWCHAR* data() { - return buffer.get(); - } - - /** - * Returns the size of the buffer - */ - size_t size() const { - return buffer_size; - } - - /** - * Converts the SQLWCHAR buffer to std::wstring - * Similar to the UCS_dec function in the Python PoC - */ - std::wstring toString(SQLSMALLINT length = -1) const { - std::wstring result; - - // If length is provided, use it - if (length > 0) { - for (SQLSMALLINT i = 0; i < length; i++) { - result.push_back(static_cast(buffer[i])); - } - return result; - } - - // Otherwise, read until null terminator - for (size_t i = 0; i < buffer_size; i++) { - if (buffer[i] == 0) { - break; - } - result.push_back(static_cast(buffer[i])); - } - - return result; - } -}; - -/** - * Class to handle diagnostic records collection - * Similar to the error list handling in the Python PoC _check_ret function - */ -class DiagnosticRecords { - private: - struct Record { - std::wstring sqlState; - std::wstring message; - SQLINTEGER nativeError; - }; - - std::vector records; - - public: - void addRecord(const std::wstring& sqlState, - const std::wstring& message, SQLINTEGER nativeError) { - records.push_back({sqlState, message, nativeError}); - } - - bool empty() const { - return records.empty(); - } - - std::wstring getSQLState() const { - if (!records.empty()) { - return records[0].sqlState; - } - return L"HY000"; // General error - } - - std::wstring getFirstErrorMessage() const { - if (!records.empty()) { - return records[0].message; - } - return L"Unknown error"; - } - - std::wstring getFullErrorMessage() const { - if (records.empty()) { - return L"No error information available"; - } - - std::wstring fullMessage = records[0].message; - - // Add additional error messages if there are any - for (size_t i = 1; i < records.size(); i++) { - fullMessage += L"; [" + records[i].sqlState + L"] " + - records[i].message; - } - - return fullMessage; - } - - size_t size() const { - return records.size(); - } -}; - -/** - * Function that decodes a SQLWCHAR buffer into a std::wstring - * Direct implementation of the UCS_dec logic from the Python PoC - */ -inline std::wstring UCS_dec(const SQLWCHAR* buffer, size_t maxLength = 0) { - std::wstring result; - size_t i = 0; - - while (true) { - // Break if we've reached the maximum length - if (maxLength > 0 && i >= maxLength) { - break; - } - - // Break if we've reached a null terminator - if (buffer[i] == 0) { - break; - } - - result.push_back(static_cast(buffer[i])); - i++; - } - - return result; -} - -} // namespace unix_buffers diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 00000000..dc94ab9e --- /dev/null +++ b/pytest.ini @@ -0,0 +1,10 @@ +[pytest] +# Register custom markers +markers = + stress: marks tests as stress tests (long-running, resource-intensive) + +# Default options applied to all pytest runs +# Default: pytest -v → Skips stress tests (fast) +# To run ONLY stress tests: pytest -m stress +# To run ALL tests: pytest -v -m "" +addopts = -m "not stress" diff --git a/tests/test_004_cursor.py b/tests/test_004_cursor.py index 83f61e06..97c800b0 100644 --- a/tests/test_004_cursor.py +++ b/tests/test_004_cursor.py @@ -6985,12 +6985,12 @@ def test_varbinarymax_insert_fetch(cursor, db_connection): """ ) - # Prepare test data + # Prepare test data - use moderate sizes to guarantee LOB fetch path (line 867-868) efficiently test_data = [ (2, b""), # Empty bytes (3, b"1234567890"), # Small binary - (4, b"A" * 9000), # Large binary > 8000 (streaming) - (5, b"B" * 20000), # Large binary > 8000 (streaming) + (4, b"A" * 15000), # Large binary > 15KB (guaranteed LOB path) + (5, b"B" * 20000), # Large binary > 20KB (guaranteed LOB path) (6, b"C" * 8000), # Edge case: exactly 8000 bytes (7, b"D" * 8001), # Edge case: just over 8000 bytes ] @@ -7274,6 +7274,118 @@ def test_varbinarymax_insert_fetch_null(cursor, db_connection): db_connection.commit() +def test_sql_double_type(cursor, db_connection): + """Test SQL_DOUBLE type (FLOAT(53)) to cover line 3213 in dispatcher.""" + try: + drop_table_if_exists(cursor, "#pytest_double_type") + cursor.execute( + """ + CREATE TABLE #pytest_double_type ( + id INT PRIMARY KEY, + double_col FLOAT(53), + float_col FLOAT + ) + """ + ) + + # Insert test data with various double precision values + test_data = [ + (1, 1.23456789012345, 3.14159), + (2, -9876543210.123456, -2.71828), + (3, 0.0, 0.0), + (4, 1.7976931348623157e308, 1.0e10), # Near max double + (5, 2.2250738585072014e-308, 1.0e-10), # Near min positive double + ] + + for row in test_data: + cursor.execute( + "INSERT INTO #pytest_double_type VALUES (?, ?, ?)", row + ) + db_connection.commit() + + # Fetch and verify + cursor.execute("SELECT id, double_col, float_col FROM #pytest_double_type ORDER BY id") + rows = cursor.fetchall() + + assert len(rows) == len(test_data), f"Expected {len(test_data)} rows, got {len(rows)}" + + for i, (expected_id, expected_double, expected_float) in enumerate(test_data): + fetched_id, fetched_double, fetched_float = rows[i] + assert fetched_id == expected_id, f"Row {i+1} ID mismatch" + assert isinstance(fetched_double, float), f"Row {i+1} double_col should be float type" + assert isinstance(fetched_float, float), f"Row {i+1} float_col should be float type" + # Use relative tolerance for floating point comparison + assert abs(fetched_double - expected_double) < abs(expected_double * 1e-10) or abs(fetched_double - expected_double) < 1e-10, \ + f"Row {i+1} double_col mismatch: expected {expected_double}, got {fetched_double}" + assert abs(fetched_float - expected_float) < abs(expected_float * 1e-5) or abs(fetched_float - expected_float) < 1e-5, \ + f"Row {i+1} float_col mismatch: expected {expected_float}, got {fetched_float}" + + except Exception as e: + pytest.fail(f"SQL_DOUBLE type test failed: {e}") + + finally: + drop_table_if_exists(cursor, "#pytest_double_type") + db_connection.commit() + + +def test_null_guid_type(cursor, db_connection): + """Test NULL UNIQUEIDENTIFIER (GUID) to cover lines 3376-3377.""" + try: + drop_table_if_exists(cursor, "#pytest_null_guid") + cursor.execute( + """ + CREATE TABLE #pytest_null_guid ( + id INT PRIMARY KEY, + guid_col UNIQUEIDENTIFIER, + guid_nullable UNIQUEIDENTIFIER NULL + ) + """ + ) + + # Insert test data with NULL and non-NULL GUIDs + test_guid = uuid.uuid4() + test_data = [ + (1, test_guid, None), # NULL GUID + (2, uuid.uuid4(), uuid.uuid4()), # Both non-NULL + (3, uuid.UUID('12345678-1234-5678-1234-567812345678'), None), # NULL GUID + ] + + for row_id, guid1, guid2 in test_data: + cursor.execute( + "INSERT INTO #pytest_null_guid VALUES (?, ?, ?)", + (row_id, guid1, guid2) + ) + db_connection.commit() + + # Fetch and verify + cursor.execute("SELECT id, guid_col, guid_nullable FROM #pytest_null_guid ORDER BY id") + rows = cursor.fetchall() + + assert len(rows) == len(test_data), f"Expected {len(test_data)} rows, got {len(rows)}" + + for i, (expected_id, expected_guid1, expected_guid2) in enumerate(test_data): + fetched_id, fetched_guid1, fetched_guid2 = rows[i] + assert fetched_id == expected_id, f"Row {i+1} ID mismatch" + + # C++ layer returns uuid.UUID objects + assert isinstance(fetched_guid1, uuid.UUID), f"Row {i+1} guid_col should be UUID type, got {type(fetched_guid1)}" + assert fetched_guid1 == expected_guid1, f"Row {i+1} guid_col mismatch" + + # Verify NULL handling (NULL GUIDs are returned as None) + if expected_guid2 is None: + assert fetched_guid2 is None, f"Row {i+1} guid_nullable should be None" + else: + assert isinstance(fetched_guid2, uuid.UUID), f"Row {i+1} guid_nullable should be UUID type, got {type(fetched_guid2)}" + assert fetched_guid2 == expected_guid2, f"Row {i+1} guid_nullable mismatch" + + except Exception as e: + pytest.fail(f"NULL GUID type test failed: {e}") + + finally: + drop_table_if_exists(cursor, "#pytest_null_guid") + db_connection.commit() + + def test_only_null_and_empty_binary(cursor, db_connection): """Test table with only NULL and empty binary values to ensure fallback doesn't produce size=0""" try: @@ -7436,9 +7548,10 @@ def test_varcharmax_boundary(cursor, db_connection): def test_varcharmax_streaming(cursor, db_connection): - """Streaming fetch > 8k with all fetch modes.""" + """Streaming fetch > 8k with all fetch modes to ensure LOB path coverage.""" try: - values = ["Y" * 8100, "Z" * 10000] + # Use 15KB to guarantee LOB fetch path (line 774-775) while keeping test fast + values = ["Y" * 15000, "Z" * 20000] cursor.execute("CREATE TABLE #pytest_varcharmax (col VARCHAR(MAX))") db_connection.commit() for v in values: @@ -7563,9 +7676,10 @@ def test_nvarcharmax_boundary(cursor, db_connection): def test_nvarcharmax_streaming(cursor, db_connection): - """Streaming fetch > 4k unicode with all fetch modes.""" + """Streaming fetch > 4k unicode with all fetch modes to ensure LOB path coverage.""" try: - values = ["Ω" * 4100, "漢" * 5000] + # Use 10KB to guarantee LOB fetch path (line 830-831) while keeping test fast + values = ["Ω" * 10000, "漢" * 12000] cursor.execute("CREATE TABLE #pytest_nvarcharmax (col NVARCHAR(MAX))") db_connection.commit() for v in values: @@ -14424,6 +14538,870 @@ def test_row_cursor_log_method_availability(cursor, db_connection): db_connection.commit() +def test_all_numeric_types_with_nulls(cursor, db_connection): + """Test NULL handling for all numeric types to ensure processor functions handle NULLs correctly""" + try: + drop_table_if_exists(cursor, "#pytest_all_numeric_nulls") + cursor.execute( + """ + CREATE TABLE #pytest_all_numeric_nulls ( + int_col INT, + bigint_col BIGINT, + smallint_col SMALLINT, + tinyint_col TINYINT, + bit_col BIT, + real_col REAL, + float_col FLOAT + ) + """ + ) + db_connection.commit() + + # Insert row with all NULLs + cursor.execute( + "INSERT INTO #pytest_all_numeric_nulls VALUES (NULL, NULL, NULL, NULL, NULL, NULL, NULL)" + ) + # Insert row with actual values + cursor.execute( + "INSERT INTO #pytest_all_numeric_nulls VALUES (42, 9223372036854775807, 32767, 255, 1, 3.14, 2.718281828)" + ) + db_connection.commit() + + cursor.execute("SELECT * FROM #pytest_all_numeric_nulls ORDER BY int_col ASC") + rows = cursor.fetchall() + + # First row should be all NULLs + assert len(rows) == 2, "Should have exactly 2 rows" + assert all(val is None for val in rows[0]), "First row should be all NULLs" + + # Second row should have actual values + assert rows[1][0] == 42, "INT column should be 42" + assert rows[1][1] == 9223372036854775807, "BIGINT column should match" + assert rows[1][2] == 32767, "SMALLINT column should be 32767" + assert rows[1][3] == 255, "TINYINT column should be 255" + assert rows[1][4] == True, "BIT column should be True" + assert abs(rows[1][5] - 3.14) < 0.01, "REAL column should be approximately 3.14" + assert abs(rows[1][6] - 2.718281828) < 0.0001, "FLOAT column should be approximately 2.718281828" + + except Exception as e: + pytest.fail(f"All numeric types NULL test failed: {e}") + finally: + drop_table_if_exists(cursor, "#pytest_all_numeric_nulls") + db_connection.commit() + + +def test_lob_data_types(cursor, db_connection): + """Test LOB (Large Object) data types to ensure LOB fallback paths are exercised""" + try: + drop_table_if_exists(cursor, "#pytest_lob_test") + cursor.execute( + """ + CREATE TABLE #pytest_lob_test ( + id INT, + text_lob VARCHAR(MAX), + ntext_lob NVARCHAR(MAX), + binary_lob VARBINARY(MAX) + ) + """ + ) + db_connection.commit() + + # Create large data that will trigger LOB handling + large_text = 'A' * 10000 # 10KB text + large_ntext = 'B' * 10000 # 10KB unicode text + large_binary = b'\x01\x02\x03\x04' * 2500 # 10KB binary + + cursor.execute( + "INSERT INTO #pytest_lob_test VALUES (?, ?, ?, ?)", + (1, large_text, large_ntext, large_binary) + ) + db_connection.commit() + + cursor.execute("SELECT id, text_lob, ntext_lob, binary_lob FROM #pytest_lob_test") + row = cursor.fetchone() + + assert row[0] == 1, "ID should be 1" + assert row[1] == large_text, "VARCHAR(MAX) LOB data should match" + assert row[2] == large_ntext, "NVARCHAR(MAX) LOB data should match" + assert row[3] == large_binary, "VARBINARY(MAX) LOB data should match" + + except Exception as e: + pytest.fail(f"LOB data types test failed: {e}") + finally: + drop_table_if_exists(cursor, "#pytest_lob_test") + db_connection.commit() + + +def test_lob_char_column_types(cursor, db_connection): + """Test LOB fetching specifically for CHAR/VARCHAR columns (covers lines 3313-3314)""" + try: + drop_table_if_exists(cursor, "#pytest_lob_char") + cursor.execute( + """ + CREATE TABLE #pytest_lob_char ( + id INT, + char_lob VARCHAR(MAX) + ) + """ + ) + db_connection.commit() + + # Create data large enough to trigger LOB path (>8000 bytes) + large_char_data = 'X' * 20000 # 20KB text + + cursor.execute( + "INSERT INTO #pytest_lob_char VALUES (?, ?)", + (1, large_char_data) + ) + db_connection.commit() + + cursor.execute("SELECT id, char_lob FROM #pytest_lob_char") + row = cursor.fetchone() + + assert row[0] == 1, "ID should be 1" + assert row[1] == large_char_data, "VARCHAR(MAX) LOB data should match" + assert len(row[1]) == 20000, "VARCHAR(MAX) should be 20000 chars" + + except Exception as e: + pytest.fail(f"LOB CHAR column test failed: {e}") + finally: + drop_table_if_exists(cursor, "#pytest_lob_char") + db_connection.commit() + + +def test_lob_wchar_column_types(cursor, db_connection): + """Test LOB fetching specifically for WCHAR/NVARCHAR columns (covers lines 3358-3359)""" + try: + drop_table_if_exists(cursor, "#pytest_lob_wchar") + cursor.execute( + """ + CREATE TABLE #pytest_lob_wchar ( + id INT, + wchar_lob NVARCHAR(MAX) + ) + """ + ) + db_connection.commit() + + # Create unicode data large enough to trigger LOB path (>4000 characters for NVARCHAR) + large_wchar_data = '🔥' * 5000 + 'Unicode™' * 1000 # Mix of emoji and special chars + + cursor.execute( + "INSERT INTO #pytest_lob_wchar VALUES (?, ?)", + (1, large_wchar_data) + ) + db_connection.commit() + + cursor.execute("SELECT id, wchar_lob FROM #pytest_lob_wchar") + row = cursor.fetchone() + + assert row[0] == 1, "ID should be 1" + assert row[1] == large_wchar_data, "NVARCHAR(MAX) LOB data should match" + assert '🔥' in row[1], "Should contain emoji characters" + + except Exception as e: + pytest.fail(f"LOB WCHAR column test failed: {e}") + finally: + drop_table_if_exists(cursor, "#pytest_lob_wchar") + db_connection.commit() + + +def test_lob_binary_column_types(cursor, db_connection): + """Test LOB fetching specifically for BINARY/VARBINARY columns (covers lines 3384-3385)""" + try: + drop_table_if_exists(cursor, "#pytest_lob_binary") + cursor.execute( + """ + CREATE TABLE #pytest_lob_binary ( + id INT, + binary_lob VARBINARY(MAX) + ) + """ + ) + db_connection.commit() + + # Create binary data large enough to trigger LOB path (>8000 bytes) + large_binary_data = bytes(range(256)) * 100 # 25.6KB of varied binary data + + cursor.execute( + "INSERT INTO #pytest_lob_binary VALUES (?, ?)", + (1, large_binary_data) + ) + db_connection.commit() + + cursor.execute("SELECT id, binary_lob FROM #pytest_lob_binary") + row = cursor.fetchone() + + assert row[0] == 1, "ID should be 1" + assert row[1] == large_binary_data, "VARBINARY(MAX) LOB data should match" + assert len(row[1]) == 25600, "VARBINARY(MAX) should be 25600 bytes" + + except Exception as e: + pytest.fail(f"LOB BINARY column test failed: {e}") + finally: + drop_table_if_exists(cursor, "#pytest_lob_binary") + db_connection.commit() + + +def test_zero_length_complex_types(cursor, db_connection): + """Test zero-length data for complex types (covers lines 3531-3533)""" + try: + drop_table_if_exists(cursor, "#pytest_zero_length") + cursor.execute( + """ + CREATE TABLE #pytest_zero_length ( + id INT, + empty_varchar VARCHAR(100), + empty_nvarchar NVARCHAR(100), + empty_binary VARBINARY(100) + ) + """ + ) + db_connection.commit() + + # Insert empty (non-NULL) values + cursor.execute( + "INSERT INTO #pytest_zero_length VALUES (?, ?, ?, ?)", + (1, '', '', b'') + ) + db_connection.commit() + + cursor.execute("SELECT id, empty_varchar, empty_nvarchar, empty_binary FROM #pytest_zero_length") + row = cursor.fetchone() + + assert row[0] == 1, "ID should be 1" + assert row[1] == '', "Empty VARCHAR should be empty string" + assert row[2] == '', "Empty NVARCHAR should be empty string" + assert row[3] == b'', "Empty VARBINARY should be empty bytes" + + except Exception as e: + pytest.fail(f"Zero-length complex types test failed: {e}") + finally: + drop_table_if_exists(cursor, "#pytest_zero_length") + db_connection.commit() + + +def test_guid_with_nulls(cursor, db_connection): + """Test GUID type with NULL values""" + try: + drop_table_if_exists(cursor, "#pytest_guid_nulls") + cursor.execute( + """ + CREATE TABLE #pytest_guid_nulls ( + id INT, + guid_col UNIQUEIDENTIFIER + ) + """ + ) + db_connection.commit() + + # Insert NULL GUID + cursor.execute("INSERT INTO #pytest_guid_nulls VALUES (1, NULL)") + # Insert actual GUID + cursor.execute("INSERT INTO #pytest_guid_nulls VALUES (2, NEWID())") + db_connection.commit() + + cursor.execute("SELECT id, guid_col FROM #pytest_guid_nulls ORDER BY id") + rows = cursor.fetchall() + + assert len(rows) == 2, "Should have exactly 2 rows" + assert rows[0][1] is None, "First GUID should be NULL" + assert rows[1][1] is not None, "Second GUID should not be NULL" + + except Exception as e: + pytest.fail(f"GUID with NULLs test failed: {e}") + finally: + drop_table_if_exists(cursor, "#pytest_guid_nulls") + db_connection.commit() + + +def test_datetimeoffset_with_nulls(cursor, db_connection): + """Test DATETIMEOFFSET type with NULL values""" + try: + drop_table_if_exists(cursor, "#pytest_dto_nulls") + cursor.execute( + """ + CREATE TABLE #pytest_dto_nulls ( + id INT, + dto_col DATETIMEOFFSET + ) + """ + ) + db_connection.commit() + + # Insert NULL DATETIMEOFFSET + cursor.execute("INSERT INTO #pytest_dto_nulls VALUES (1, NULL)") + # Insert actual DATETIMEOFFSET + cursor.execute("INSERT INTO #pytest_dto_nulls VALUES (2, SYSDATETIMEOFFSET())") + db_connection.commit() + + cursor.execute("SELECT id, dto_col FROM #pytest_dto_nulls ORDER BY id") + rows = cursor.fetchall() + + assert len(rows) == 2, "Should have exactly 2 rows" + assert rows[0][1] is None, "First DATETIMEOFFSET should be NULL" + assert rows[1][1] is not None, "Second DATETIMEOFFSET should not be NULL" + + except Exception as e: + pytest.fail(f"DATETIMEOFFSET with NULLs test failed: {e}") + finally: + drop_table_if_exists(cursor, "#pytest_dto_nulls") + db_connection.commit() + + +def test_decimal_conversion_edge_cases(cursor, db_connection): + """Test DECIMAL/NUMERIC type conversion including edge cases""" + try: + drop_table_if_exists(cursor, "#pytest_decimal_edge") + cursor.execute( + """ + CREATE TABLE #pytest_decimal_edge ( + id INT, + dec_col DECIMAL(18, 4) + ) + """ + ) + db_connection.commit() + + # Insert various decimal values including edge cases + test_values = [ + (1, "123.4567"), + (2, "0.0001"), + (3, "-999999999999.9999"), + (4, "999999999999.9999"), + (5, "0.0000"), + ] + + for id_val, dec_val in test_values: + cursor.execute( + "INSERT INTO #pytest_decimal_edge VALUES (?, ?)", + (id_val, decimal.Decimal(dec_val)) + ) + + # Also insert NULL + cursor.execute("INSERT INTO #pytest_decimal_edge VALUES (6, NULL)") + db_connection.commit() + + cursor.execute("SELECT id, dec_col FROM #pytest_decimal_edge ORDER BY id") + rows = cursor.fetchall() + + assert len(rows) == 6, "Should have exactly 6 rows" + + # Verify the values + for i, (id_val, expected_str) in enumerate(test_values): + assert rows[i][0] == id_val, f"Row {i} ID should be {id_val}" + assert rows[i][1] == decimal.Decimal(expected_str), f"Row {i} decimal should match {expected_str}" + + # Verify NULL + assert rows[5][0] == 6, "Last row ID should be 6" + assert rows[5][1] is None, "Last decimal should be NULL" + + except Exception as e: + pytest.fail(f"Decimal conversion edge cases test failed: {e}") + finally: + drop_table_if_exists(cursor, "#pytest_decimal_edge") + db_connection.commit() + + +def test_fixed_length_char_type(cursor, db_connection): + """Test SQL_CHAR (fixed-length CHAR) column processor path (Lines 3464-3467)""" + try: + cursor.execute("CREATE TABLE #pytest_char_test (id INT, char_col CHAR(10))") + cursor.execute("INSERT INTO #pytest_char_test VALUES (1, 'hello')") + cursor.execute("INSERT INTO #pytest_char_test VALUES (2, 'world')") + + cursor.execute("SELECT char_col FROM #pytest_char_test ORDER BY id") + rows = cursor.fetchall() + + # CHAR pads with spaces to fixed length + assert len(rows) == 2, "Should fetch 2 rows" + assert rows[0][0].rstrip() == "hello", "First CHAR value should be 'hello'" + assert rows[1][0].rstrip() == "world", "Second CHAR value should be 'world'" + + cursor.execute("DROP TABLE #pytest_char_test") + except Exception as e: + pytest.fail(f"Fixed-length CHAR test failed: {e}") + + +def test_fixed_length_nchar_type(cursor, db_connection): + """Test SQL_WCHAR (fixed-length NCHAR) column processor path (Lines 3469-3472)""" + try: + cursor.execute("CREATE TABLE #pytest_nchar_test (id INT, nchar_col NCHAR(10))") + cursor.execute("INSERT INTO #pytest_nchar_test VALUES (1, N'hello')") + cursor.execute("INSERT INTO #pytest_nchar_test VALUES (2, N'世界')") # Unicode test + + cursor.execute("SELECT nchar_col FROM #pytest_nchar_test ORDER BY id") + rows = cursor.fetchall() + + # NCHAR pads with spaces to fixed length + assert len(rows) == 2, "Should fetch 2 rows" + assert rows[0][0].rstrip() == "hello", "First NCHAR value should be 'hello'" + assert rows[1][0].rstrip() == "世界", "Second NCHAR value should be '世界'" + + cursor.execute("DROP TABLE #pytest_nchar_test") + except Exception as e: + pytest.fail(f"Fixed-length NCHAR test failed: {e}") + + +def test_fixed_length_binary_type(cursor, db_connection): + """Test SQL_BINARY (fixed-length BINARY) column processor path (Lines 3474-3477)""" + try: + cursor.execute("CREATE TABLE #pytest_binary_test (id INT, binary_col BINARY(8))") + cursor.execute("INSERT INTO #pytest_binary_test VALUES (1, 0x0102030405)") + cursor.execute("INSERT INTO #pytest_binary_test VALUES (2, 0xAABBCCDD)") + + cursor.execute("SELECT binary_col FROM #pytest_binary_test ORDER BY id") + rows = cursor.fetchall() + + # BINARY pads with zeros to fixed length (8 bytes) + assert len(rows) == 2, "Should fetch 2 rows" + assert len(rows[0][0]) == 8, "BINARY(8) should be 8 bytes" + assert len(rows[1][0]) == 8, "BINARY(8) should be 8 bytes" + # First 5 bytes should match, rest padded with zeros + assert rows[0][0][:5] == b'\x01\x02\x03\x04\x05', "First BINARY value should start with inserted bytes" + assert rows[0][0][5:] == b'\x00\x00\x00', "BINARY should be zero-padded" + + cursor.execute("DROP TABLE #pytest_binary_test") + except Exception as e: + pytest.fail(f"Fixed-length BINARY test failed: {e}") + # The hasattr check should complete without error + # This covers the conditional log method availability checks + + except Exception as e: + pytest.fail(f"Cursor log method availability test failed: {e}") + finally: + drop_table_if_exists(cursor, "#pytest_log_check") + db_connection.commit() + + +def test_all_numeric_types_with_nulls(cursor, db_connection): + """Test NULL handling for all numeric types to ensure processor functions handle NULLs correctly""" + try: + drop_table_if_exists(cursor, "#pytest_all_numeric_nulls") + cursor.execute( + """ + CREATE TABLE #pytest_all_numeric_nulls ( + int_col INT, + bigint_col BIGINT, + smallint_col SMALLINT, + tinyint_col TINYINT, + bit_col BIT, + real_col REAL, + float_col FLOAT + ) + """ + ) + db_connection.commit() + + # Insert row with all NULLs + cursor.execute( + "INSERT INTO #pytest_all_numeric_nulls VALUES (NULL, NULL, NULL, NULL, NULL, NULL, NULL)" + ) + # Insert row with actual values + cursor.execute( + "INSERT INTO #pytest_all_numeric_nulls VALUES (42, 9223372036854775807, 32767, 255, 1, 3.14, 2.718281828)" + ) + db_connection.commit() + + cursor.execute("SELECT * FROM #pytest_all_numeric_nulls ORDER BY int_col ASC") + rows = cursor.fetchall() + + # First row should be all NULLs + assert len(rows) == 2, "Should have exactly 2 rows" + assert all(val is None for val in rows[0]), "First row should be all NULLs" + + # Second row should have actual values + assert rows[1][0] == 42, "INT column should be 42" + assert rows[1][1] == 9223372036854775807, "BIGINT column should match" + assert rows[1][2] == 32767, "SMALLINT column should be 32767" + assert rows[1][3] == 255, "TINYINT column should be 255" + assert rows[1][4] == True, "BIT column should be True" + assert abs(rows[1][5] - 3.14) < 0.01, "REAL column should be approximately 3.14" + assert abs(rows[1][6] - 2.718281828) < 0.0001, "FLOAT column should be approximately 2.718281828" + + except Exception as e: + pytest.fail(f"All numeric types NULL test failed: {e}") + finally: + drop_table_if_exists(cursor, "#pytest_all_numeric_nulls") + db_connection.commit() + + +def test_lob_data_types(cursor, db_connection): + """Test LOB (Large Object) data types to ensure LOB fallback paths are exercised""" + try: + drop_table_if_exists(cursor, "#pytest_lob_test") + cursor.execute( + """ + CREATE TABLE #pytest_lob_test ( + id INT, + text_lob VARCHAR(MAX), + ntext_lob NVARCHAR(MAX), + binary_lob VARBINARY(MAX) + ) + """ + ) + db_connection.commit() + + # Create large data that will trigger LOB handling + large_text = 'A' * 10000 # 10KB text + large_ntext = 'B' * 10000 # 10KB unicode text + large_binary = b'\x01\x02\x03\x04' * 2500 # 10KB binary + + cursor.execute( + "INSERT INTO #pytest_lob_test VALUES (?, ?, ?, ?)", + (1, large_text, large_ntext, large_binary) + ) + db_connection.commit() + + cursor.execute("SELECT id, text_lob, ntext_lob, binary_lob FROM #pytest_lob_test") + row = cursor.fetchone() + + assert row[0] == 1, "ID should be 1" + assert row[1] == large_text, "VARCHAR(MAX) LOB data should match" + assert row[2] == large_ntext, "NVARCHAR(MAX) LOB data should match" + assert row[3] == large_binary, "VARBINARY(MAX) LOB data should match" + + except Exception as e: + pytest.fail(f"LOB data types test failed: {e}") + finally: + drop_table_if_exists(cursor, "#pytest_lob_test") + db_connection.commit() + + +def test_lob_char_column_types(cursor, db_connection): + """Test LOB fetching specifically for CHAR/VARCHAR columns (covers lines 3313-3314)""" + try: + drop_table_if_exists(cursor, "#pytest_lob_char") + cursor.execute( + """ + CREATE TABLE #pytest_lob_char ( + id INT, + char_lob VARCHAR(MAX) + ) + """ + ) + db_connection.commit() + + # Create data large enough to trigger LOB path (>8000 bytes) + large_char_data = 'X' * 20000 # 20KB text + + cursor.execute( + "INSERT INTO #pytest_lob_char VALUES (?, ?)", + (1, large_char_data) + ) + db_connection.commit() + + cursor.execute("SELECT id, char_lob FROM #pytest_lob_char") + row = cursor.fetchone() + + assert row[0] == 1, "ID should be 1" + assert row[1] == large_char_data, "VARCHAR(MAX) LOB data should match" + assert len(row[1]) == 20000, "VARCHAR(MAX) should be 20000 chars" + + except Exception as e: + pytest.fail(f"LOB CHAR column test failed: {e}") + finally: + drop_table_if_exists(cursor, "#pytest_lob_char") + db_connection.commit() + + +def test_lob_wchar_column_types(cursor, db_connection): + """Test LOB fetching specifically for WCHAR/NVARCHAR columns (covers lines 3358-3359)""" + try: + drop_table_if_exists(cursor, "#pytest_lob_wchar") + cursor.execute( + """ + CREATE TABLE #pytest_lob_wchar ( + id INT, + wchar_lob NVARCHAR(MAX) + ) + """ + ) + db_connection.commit() + + # Create unicode data large enough to trigger LOB path (>4000 characters for NVARCHAR) + large_wchar_data = '🔥' * 5000 + 'Unicode™' * 1000 # Mix of emoji and special chars + + cursor.execute( + "INSERT INTO #pytest_lob_wchar VALUES (?, ?)", + (1, large_wchar_data) + ) + db_connection.commit() + + cursor.execute("SELECT id, wchar_lob FROM #pytest_lob_wchar") + row = cursor.fetchone() + + assert row[0] == 1, "ID should be 1" + assert row[1] == large_wchar_data, "NVARCHAR(MAX) LOB data should match" + assert '🔥' in row[1], "Should contain emoji characters" + + except Exception as e: + pytest.fail(f"LOB WCHAR column test failed: {e}") + finally: + drop_table_if_exists(cursor, "#pytest_lob_wchar") + db_connection.commit() + + +def test_lob_binary_column_types(cursor, db_connection): + """Test LOB fetching specifically for BINARY/VARBINARY columns (covers lines 3384-3385)""" + try: + drop_table_if_exists(cursor, "#pytest_lob_binary") + cursor.execute( + """ + CREATE TABLE #pytest_lob_binary ( + id INT, + binary_lob VARBINARY(MAX) + ) + """ + ) + db_connection.commit() + + # Create binary data large enough to trigger LOB path (>8000 bytes) + large_binary_data = bytes(range(256)) * 100 # 25.6KB of varied binary data + + cursor.execute( + "INSERT INTO #pytest_lob_binary VALUES (?, ?)", + (1, large_binary_data) + ) + db_connection.commit() + + cursor.execute("SELECT id, binary_lob FROM #pytest_lob_binary") + row = cursor.fetchone() + + assert row[0] == 1, "ID should be 1" + assert row[1] == large_binary_data, "VARBINARY(MAX) LOB data should match" + assert len(row[1]) == 25600, "VARBINARY(MAX) should be 25600 bytes" + + except Exception as e: + pytest.fail(f"LOB BINARY column test failed: {e}") + finally: + drop_table_if_exists(cursor, "#pytest_lob_binary") + db_connection.commit() + + +def test_zero_length_complex_types(cursor, db_connection): + """Test zero-length data for complex types (covers lines 3531-3533)""" + try: + drop_table_if_exists(cursor, "#pytest_zero_length") + cursor.execute( + """ + CREATE TABLE #pytest_zero_length ( + id INT, + empty_varchar VARCHAR(100), + empty_nvarchar NVARCHAR(100), + empty_binary VARBINARY(100) + ) + """ + ) + db_connection.commit() + + # Insert empty (non-NULL) values + cursor.execute( + "INSERT INTO #pytest_zero_length VALUES (?, ?, ?, ?)", + (1, '', '', b'') + ) + db_connection.commit() + + cursor.execute("SELECT id, empty_varchar, empty_nvarchar, empty_binary FROM #pytest_zero_length") + row = cursor.fetchone() + + assert row[0] == 1, "ID should be 1" + assert row[1] == '', "Empty VARCHAR should be empty string" + assert row[2] == '', "Empty NVARCHAR should be empty string" + assert row[3] == b'', "Empty VARBINARY should be empty bytes" + + except Exception as e: + pytest.fail(f"Zero-length complex types test failed: {e}") + finally: + drop_table_if_exists(cursor, "#pytest_zero_length") + db_connection.commit() + + +def test_guid_with_nulls(cursor, db_connection): + """Test GUID type with NULL values""" + try: + drop_table_if_exists(cursor, "#pytest_guid_nulls") + cursor.execute( + """ + CREATE TABLE #pytest_guid_nulls ( + id INT, + guid_col UNIQUEIDENTIFIER + ) + """ + ) + db_connection.commit() + + # Insert NULL GUID + cursor.execute("INSERT INTO #pytest_guid_nulls VALUES (1, NULL)") + # Insert actual GUID + cursor.execute("INSERT INTO #pytest_guid_nulls VALUES (2, NEWID())") + db_connection.commit() + + cursor.execute("SELECT id, guid_col FROM #pytest_guid_nulls ORDER BY id") + rows = cursor.fetchall() + + assert len(rows) == 2, "Should have exactly 2 rows" + assert rows[0][1] is None, "First GUID should be NULL" + assert rows[1][1] is not None, "Second GUID should not be NULL" + + except Exception as e: + pytest.fail(f"GUID with NULLs test failed: {e}") + finally: + drop_table_if_exists(cursor, "#pytest_guid_nulls") + db_connection.commit() + + +def test_datetimeoffset_with_nulls(cursor, db_connection): + """Test DATETIMEOFFSET type with NULL values""" + try: + drop_table_if_exists(cursor, "#pytest_dto_nulls") + cursor.execute( + """ + CREATE TABLE #pytest_dto_nulls ( + id INT, + dto_col DATETIMEOFFSET + ) + """ + ) + db_connection.commit() + + # Insert NULL DATETIMEOFFSET + cursor.execute("INSERT INTO #pytest_dto_nulls VALUES (1, NULL)") + # Insert actual DATETIMEOFFSET + cursor.execute("INSERT INTO #pytest_dto_nulls VALUES (2, SYSDATETIMEOFFSET())") + db_connection.commit() + + cursor.execute("SELECT id, dto_col FROM #pytest_dto_nulls ORDER BY id") + rows = cursor.fetchall() + + assert len(rows) == 2, "Should have exactly 2 rows" + assert rows[0][1] is None, "First DATETIMEOFFSET should be NULL" + assert rows[1][1] is not None, "Second DATETIMEOFFSET should not be NULL" + + except Exception as e: + pytest.fail(f"DATETIMEOFFSET with NULLs test failed: {e}") + finally: + drop_table_if_exists(cursor, "#pytest_dto_nulls") + db_connection.commit() + + +def test_decimal_conversion_edge_cases(cursor, db_connection): + """Test DECIMAL/NUMERIC type conversion including edge cases""" + try: + drop_table_if_exists(cursor, "#pytest_decimal_edge") + cursor.execute( + """ + CREATE TABLE #pytest_decimal_edge ( + id INT, + dec_col DECIMAL(18, 4) + ) + """ + ) + db_connection.commit() + + # Insert various decimal values including edge cases + test_values = [ + (1, "123.4567"), + (2, "0.0001"), + (3, "-999999999999.9999"), + (4, "999999999999.9999"), + (5, "0.0000"), + ] + + for id_val, dec_val in test_values: + cursor.execute( + "INSERT INTO #pytest_decimal_edge VALUES (?, ?)", + (id_val, decimal.Decimal(dec_val)) + ) + + # Also insert NULL + cursor.execute("INSERT INTO #pytest_decimal_edge VALUES (6, NULL)") + db_connection.commit() + + cursor.execute("SELECT id, dec_col FROM #pytest_decimal_edge ORDER BY id") + rows = cursor.fetchall() + + assert len(rows) == 6, "Should have exactly 6 rows" + + # Verify the values + for i, (id_val, expected_str) in enumerate(test_values): + assert rows[i][0] == id_val, f"Row {i} ID should be {id_val}" + assert rows[i][1] == decimal.Decimal(expected_str), f"Row {i} decimal should match {expected_str}" + + # Verify NULL + assert rows[5][0] == 6, "Last row ID should be 6" + assert rows[5][1] is None, "Last decimal should be NULL" + + except Exception as e: + pytest.fail(f"Decimal conversion edge cases test failed: {e}") + finally: + drop_table_if_exists(cursor, "#pytest_decimal_edge") + db_connection.commit() + + +def test_fixed_length_char_type(cursor, db_connection): + """Test SQL_CHAR (fixed-length CHAR) column processor path (Lines 3464-3467)""" + try: + cursor.execute("CREATE TABLE #pytest_char_test (id INT, char_col CHAR(10))") + cursor.execute("INSERT INTO #pytest_char_test VALUES (1, 'hello')") + cursor.execute("INSERT INTO #pytest_char_test VALUES (2, 'world')") + + cursor.execute("SELECT char_col FROM #pytest_char_test ORDER BY id") + rows = cursor.fetchall() + + # CHAR pads with spaces to fixed length + assert len(rows) == 2, "Should fetch 2 rows" + assert rows[0][0].rstrip() == "hello", "First CHAR value should be 'hello'" + assert rows[1][0].rstrip() == "world", "Second CHAR value should be 'world'" + + cursor.execute("DROP TABLE #pytest_char_test") + except Exception as e: + pytest.fail(f"Fixed-length CHAR test failed: {e}") + + +def test_fixed_length_nchar_type(cursor, db_connection): + """Test SQL_WCHAR (fixed-length NCHAR) column processor path (Lines 3469-3472)""" + try: + cursor.execute("CREATE TABLE #pytest_nchar_test (id INT, nchar_col NCHAR(10))") + cursor.execute("INSERT INTO #pytest_nchar_test VALUES (1, N'hello')") + cursor.execute("INSERT INTO #pytest_nchar_test VALUES (2, N'世界')") # Unicode test + + cursor.execute("SELECT nchar_col FROM #pytest_nchar_test ORDER BY id") + rows = cursor.fetchall() + + # NCHAR pads with spaces to fixed length + assert len(rows) == 2, "Should fetch 2 rows" + assert rows[0][0].rstrip() == "hello", "First NCHAR value should be 'hello'" + assert rows[1][0].rstrip() == "世界", "Second NCHAR value should be '世界'" + + cursor.execute("DROP TABLE #pytest_nchar_test") + except Exception as e: + pytest.fail(f"Fixed-length NCHAR test failed: {e}") + + +def test_fixed_length_binary_type(cursor, db_connection): + """Test SQL_BINARY (fixed-length BINARY) column processor path (Lines 3474-3477)""" + try: + cursor.execute("CREATE TABLE #pytest_binary_test (id INT, binary_col BINARY(8))") + cursor.execute("INSERT INTO #pytest_binary_test VALUES (1, 0x0102030405)") + cursor.execute("INSERT INTO #pytest_binary_test VALUES (2, 0xAABBCCDD)") + + cursor.execute("SELECT binary_col FROM #pytest_binary_test ORDER BY id") + rows = cursor.fetchall() + + # BINARY pads with zeros to fixed length (8 bytes) + assert len(rows) == 2, "Should fetch 2 rows" + assert len(rows[0][0]) == 8, "BINARY(8) should be 8 bytes" + assert len(rows[1][0]) == 8, "BINARY(8) should be 8 bytes" + # First 5 bytes should match, rest padded with zeros + assert rows[0][0][:5] == b'\x01\x02\x03\x04\x05', "First BINARY value should start with inserted bytes" + assert rows[0][0][5:] == b'\x00\x00\x00', "BINARY should be zero-padded" + + cursor.execute("DROP TABLE #pytest_binary_test") + except Exception as e: + pytest.fail(f"Fixed-length BINARY test failed: {e}") + + def test_close(db_connection): """Test closing the cursor""" try: diff --git a/tests/test_011_performance_stress.py b/tests/test_011_performance_stress.py new file mode 100644 index 00000000..0c577f98 --- /dev/null +++ b/tests/test_011_performance_stress.py @@ -0,0 +1,580 @@ +""" +Performance and stress tests for mssql-python driver. + +These tests verify the driver's behavior under stress conditions: +- Large result sets (100,000+ rows) +- Memory pressure scenarios +- Exception handling during batch processing +- Thousands of empty string allocations +- 10MB+ LOB data handling + +Tests are marked with @pytest.mark.stress and may be skipped in regular CI runs. +""" + +import pytest +import decimal +import hashlib +import sys +import platform +import threading +import time +from typing import List, Tuple + + +# Helper function to check if running on resource-limited platform +def supports_resource_limits(): + """Check if platform supports resource.setrlimit for memory limits""" + try: + import resource + return hasattr(resource, 'RLIMIT_AS') + except ImportError: + return False + + +def drop_table_if_exists(cursor, table_name): + """Helper to drop a table if it exists""" + try: + cursor.execute(f"DROP TABLE IF EXISTS {table_name}") + except Exception: + pass + + +@pytest.mark.stress +def test_exception_mid_batch_no_corrupt_data(cursor, db_connection): + """ + Test #1: Verify that batch processing handles data integrity correctly. + + When fetching large batches, verify that the returned result list does NOT + contain empty or partially-filled rows. Should either get complete valid rows + OR an exception, never corrupt data. + """ + try: + drop_table_if_exists(cursor, "#pytest_mid_batch_exception") + + # Create simple table to test batch processing integrity + cursor.execute(""" + CREATE TABLE #pytest_mid_batch_exception ( + id INT, + value NVARCHAR(50), + amount FLOAT + ) + """) + db_connection.commit() + + # Insert 1000 rows using individual inserts to avoid executemany complications + for i in range(1000): + cursor.execute( + "INSERT INTO #pytest_mid_batch_exception VALUES (?, ?, ?)", + (i, f"Value_{i}", float(i * 1.5)) + ) + db_connection.commit() + + # Fetch all rows in batch - this tests the fetch path integrity + cursor.execute("SELECT id, value, amount FROM #pytest_mid_batch_exception ORDER BY id") + rows = cursor.fetchall() + + # Verify: No empty rows, no None rows where data should exist + assert len(rows) == 1000, f"Expected 1000 rows, got {len(rows)}" + + for i, row in enumerate(rows): + assert row is not None, f"Row {i} is None - corrupt data detected" + assert len(row) == 3, f"Row {i} has {len(row)} columns, expected 3 - partial row detected" + assert row[0] == i, f"Row {i} has incorrect ID {row[0]}" + assert row[1] is not None, f"Row {i} has None value - corrupt data" + assert row[2] is not None, f"Row {i} has None amount - corrupt data" + # Verify actual values + assert row[1] == f"Value_{i}", f"Row {i} has wrong value" + assert abs(row[2] - (i * 1.5)) < 0.001, f"Row {i} has wrong amount" + + print(f"[OK] Batch integrity test passed: All 1000 rows complete, no corrupt data") + + except Exception as e: + pytest.fail(f"Batch integrity test failed: {e}") + finally: + drop_table_if_exists(cursor, "#pytest_mid_batch_exception") + db_connection.commit() + + +@pytest.mark.stress +@pytest.mark.skipif( + not supports_resource_limits() or platform.system() == 'Darwin', + reason="Requires Unix resource limits, not supported on macOS" +) +def test_python_c_api_null_handling_memory_pressure(cursor, db_connection): + """ + Test #2: Verify graceful handling when Python C API functions return NULL. + + Simulates low memory conditions where PyUnicode_FromStringAndSize, + PyBytes_FromStringAndSize might fail. Should not crash with segfault, + should handle gracefully with None or exception. + + Note: Skipped on macOS as it doesn't support RLIMIT_AS properly. + """ + import resource + + try: + drop_table_if_exists(cursor, "#pytest_memory_pressure") + + # Create table with various string types + cursor.execute(""" + CREATE TABLE #pytest_memory_pressure ( + id INT, + varchar_col VARCHAR(1000), + nvarchar_col NVARCHAR(1000), + varbinary_col VARBINARY(1000) + ) + """) + db_connection.commit() + + # Insert test data + test_string = "X" * 500 + test_binary = b"\x00\x01\x02" * 100 + + for i in range(1000): + cursor.execute( + "INSERT INTO #pytest_memory_pressure VALUES (?, ?, ?, ?)", + (i, test_string, test_string, test_binary) + ) + db_connection.commit() + + # Set memory limit (50MB) to create pressure + soft, hard = resource.getrlimit(resource.RLIMIT_AS) + # Use the smaller of 50MB or current soft limit to avoid exceeding hard limit + memory_limit = min(50 * 1024 * 1024, soft) if soft > 0 else 50 * 1024 * 1024 + try: + resource.setrlimit(resource.RLIMIT_AS, (memory_limit, hard)) + + # Try to fetch data under memory pressure + cursor.execute("SELECT * FROM #pytest_memory_pressure") + + # This might fail or return partial data, but should NOT segfault + try: + rows = cursor.fetchall() + # If we get here, verify data integrity + for row in rows: + if row is not None: # Some rows might be None under pressure + # Verify no corrupt data - either complete or None + assert len(row) == 4, "Partial row detected under memory pressure" + except MemoryError: + # Acceptable - ran out of memory, but didn't crash + print("[OK] Memory pressure caused MemoryError (expected, not a crash)") + pass + + finally: + # Restore memory limit + resource.setrlimit(resource.RLIMIT_AS, (soft, hard)) + + print("[OK] Python C API NULL handling test passed: No segfault under memory pressure") + + except Exception as e: + pytest.fail(f"Python C API NULL handling test failed: {e}") + finally: + drop_table_if_exists(cursor, "#pytest_memory_pressure") + db_connection.commit() + + +@pytest.mark.stress +def test_thousands_of_empty_strings_allocation_stress(cursor, db_connection): + """ + Test #3: Stress test with thousands of empty string allocations. + + Test fetching many rows with empty VARCHAR, NVARCHAR, and VARBINARY values. + Verifies that empty string creation failures don't cause crashes. + Process thousands of empty strings to stress the allocation path. + """ + try: + drop_table_if_exists(cursor, "#pytest_empty_stress") + + cursor.execute(""" + CREATE TABLE #pytest_empty_stress ( + id INT, + empty_varchar VARCHAR(100), + empty_nvarchar NVARCHAR(100), + empty_varbinary VARBINARY(100) + ) + """) + db_connection.commit() + + # Insert 10,000 rows with empty strings + num_rows = 10000 + print(f"Inserting {num_rows} rows with empty strings...") + + for i in range(num_rows): + cursor.execute( + "INSERT INTO #pytest_empty_stress VALUES (?, ?, ?, ?)", + (i, "", "", b"") + ) + if i % 1000 == 0 and i > 0: + print(f" Inserted {i} rows...") + + db_connection.commit() + print(f"[OK] Inserted {num_rows} rows") + + # Test 1: fetchall() - stress test all allocations at once + print("Testing fetchall()...") + cursor.execute("SELECT * FROM #pytest_empty_stress ORDER BY id") + rows = cursor.fetchall() + + assert len(rows) == num_rows, f"Expected {num_rows} rows, got {len(rows)}" + + # Verify all empty strings are correct + for i, row in enumerate(rows): + assert row[0] == i, f"Row {i} has incorrect ID {row[0]}" + assert row[1] == "", f"Row {i} varchar not empty string: {row[1]}" + assert row[2] == "", f"Row {i} nvarchar not empty string: {row[2]}" + assert row[3] == b"", f"Row {i} varbinary not empty bytes: {row[3]}" + + if i % 2000 == 0 and i > 0: + print(f" Verified {i} rows...") + + print(f"[OK] fetchall() test passed: All {num_rows} empty strings correct") + + # Test 2: fetchmany() - stress test batch allocations + print("Testing fetchmany(1000)...") + cursor.execute("SELECT * FROM #pytest_empty_stress ORDER BY id") + + total_fetched = 0 + batch_num = 0 + while True: + batch = cursor.fetchmany(1000) + if not batch: + break + + batch_num += 1 + for row in batch: + assert row[1] == "", f"Batch {batch_num}: varchar not empty" + assert row[2] == "", f"Batch {batch_num}: nvarchar not empty" + assert row[3] == b"", f"Batch {batch_num}: varbinary not empty" + + total_fetched += len(batch) + print(f" Batch {batch_num}: fetched {len(batch)} rows (total: {total_fetched})") + + assert total_fetched == num_rows, f"fetchmany total {total_fetched} != {num_rows}" + print(f"[OK] fetchmany() test passed: All {num_rows} empty strings correct") + + except Exception as e: + pytest.fail(f"Empty strings stress test failed: {e}") + finally: + drop_table_if_exists(cursor, "#pytest_empty_stress") + db_connection.commit() + + +@pytest.mark.stress +def test_large_result_set_100k_rows_no_overflow(cursor, db_connection): + """ + Test #5: Fetch very large result sets (100,000+ rows) to test buffer overflow protection. + + Tests that large rowIdx values don't cause buffer overflow when calculating + rowIdx × fetchBufferSize. Verifies data integrity across all rows - no crashes, + no corrupt data, correct values in all cells. + """ + try: + drop_table_if_exists(cursor, "#pytest_100k_rows") + + cursor.execute(""" + CREATE TABLE #pytest_100k_rows ( + id INT, + varchar_col VARCHAR(50), + nvarchar_col NVARCHAR(50), + int_col INT + ) + """) + db_connection.commit() + + # Insert 100,000 rows with sequential IDs and predictable data + num_rows = 100000 + print(f"Inserting {num_rows} rows...") + + # Use bulk insert for performance + batch_size = 1000 + for batch_start in range(0, num_rows, batch_size): + values = [] + for i in range(batch_start, min(batch_start + batch_size, num_rows)): + values.append(( + i, + f"VARCHAR_{i}", + f"NVARCHAR_{i}", + i * 2 + )) + + # Use executemany for faster insertion + cursor.executemany( + "INSERT INTO #pytest_100k_rows VALUES (?, ?, ?, ?)", + values + ) + + if (batch_start + batch_size) % 10000 == 0: + print(f" Inserted {batch_start + batch_size} rows...") + + db_connection.commit() + print(f"[OK] Inserted {num_rows} rows") + + # Fetch all rows and verify data integrity + print("Fetching all rows...") + cursor.execute("SELECT id, varchar_col, nvarchar_col, int_col FROM #pytest_100k_rows ORDER BY id") + rows = cursor.fetchall() + + assert len(rows) == num_rows, f"Expected {num_rows} rows, got {len(rows)}" + print(f"[OK] Fetched {num_rows} rows") + + # Verify first row + assert rows[0][0] == 0, f"First row ID incorrect: {rows[0][0]}" + assert rows[0][1] == "VARCHAR_0", f"First row varchar incorrect: {rows[0][1]}" + assert rows[0][2] == "NVARCHAR_0", f"First row nvarchar incorrect: {rows[0][2]}" + assert rows[0][3] == 0, f"First row int incorrect: {rows[0][3]}" + print("[OK] First row verified") + + # Verify last row + assert rows[-1][0] == num_rows - 1, f"Last row ID incorrect: {rows[-1][0]}" + assert rows[-1][1] == f"VARCHAR_{num_rows-1}", f"Last row varchar incorrect" + assert rows[-1][2] == f"NVARCHAR_{num_rows-1}", f"Last row nvarchar incorrect" + assert rows[-1][3] == (num_rows - 1) * 2, f"Last row int incorrect" + print("[OK] Last row verified") + + # Verify random spot checks throughout the dataset + check_indices = [10000, 25000, 50000, 75000, 99999] + for idx in check_indices: + row = rows[idx] + assert row[0] == idx, f"Row {idx} ID incorrect: {row[0]}" + assert row[1] == f"VARCHAR_{idx}", f"Row {idx} varchar incorrect: {row[1]}" + assert row[2] == f"NVARCHAR_{idx}", f"Row {idx} nvarchar incorrect: {row[2]}" + assert row[3] == idx * 2, f"Row {idx} int incorrect: {row[3]}" + print(f"[OK] Spot checks verified at indices: {check_indices}") + + # Verify all rows have correct sequential IDs (full integrity check) + print("Performing full integrity check...") + for i, row in enumerate(rows): + if row[0] != i: + pytest.fail(f"Data corruption at row {i}: expected ID {i}, got {row[0]}") + + if i % 20000 == 0 and i > 0: + print(f" Verified {i} rows...") + + print(f"[OK] Full integrity check passed: All {num_rows} rows correct, no buffer overflow") + + except Exception as e: + pytest.fail(f"Large result set test failed: {e}") + finally: + drop_table_if_exists(cursor, "#pytest_100k_rows") + db_connection.commit() + + +@pytest.mark.stress +def test_very_large_lob_10mb_data_integrity(cursor, db_connection): + """ + Test #6: Fetch VARCHAR(MAX), NVARCHAR(MAX), VARBINARY(MAX) with 10MB+ data. + + Verifies: + 1. Correct LOB detection + 2. Data fetched completely and correctly + 3. No buffer overflow when determining LOB vs non-LOB path + 4. Data integrity verified byte-by-byte using SHA256 + """ + try: + drop_table_if_exists(cursor, "#pytest_10mb_lob") + + cursor.execute(""" + CREATE TABLE #pytest_10mb_lob ( + id INT, + varchar_lob VARCHAR(MAX), + nvarchar_lob NVARCHAR(MAX), + varbinary_lob VARBINARY(MAX) + ) + """) + db_connection.commit() + + # Create 10MB+ data + mb_10 = 10 * 1024 * 1024 + + print("Creating 10MB test data...") + varchar_data = "A" * mb_10 # 10MB ASCII + nvarchar_data = "🔥" * (mb_10 // 4) # ~10MB Unicode (emoji is 4 bytes in UTF-8) + varbinary_data = bytes(range(256)) * (mb_10 // 256) # 10MB binary + + # Calculate checksums for verification + varchar_hash = hashlib.sha256(varchar_data.encode('utf-8')).hexdigest() + nvarchar_hash = hashlib.sha256(nvarchar_data.encode('utf-8')).hexdigest() + varbinary_hash = hashlib.sha256(varbinary_data).hexdigest() + + print(f" VARCHAR size: {len(varchar_data):,} bytes, SHA256: {varchar_hash[:16]}...") + print(f" NVARCHAR size: {len(nvarchar_data):,} chars, SHA256: {nvarchar_hash[:16]}...") + print(f" VARBINARY size: {len(varbinary_data):,} bytes, SHA256: {varbinary_hash[:16]}...") + + # Insert LOB data + print("Inserting 10MB LOB data...") + cursor.execute( + "INSERT INTO #pytest_10mb_lob VALUES (?, ?, ?, ?)", + (1, varchar_data, nvarchar_data, varbinary_data) + ) + db_connection.commit() + print("[OK] Inserted 10MB LOB data") + + # Fetch and verify + print("Fetching 10MB LOB data...") + cursor.execute("SELECT id, varchar_lob, nvarchar_lob, varbinary_lob FROM #pytest_10mb_lob") + row = cursor.fetchone() + + assert row is not None, "Failed to fetch LOB data" + assert row[0] == 1, f"ID incorrect: {row[0]}" + + # Verify VARCHAR(MAX) - byte-by-byte integrity + print("Verifying VARCHAR(MAX) integrity...") + fetched_varchar = row[1] + assert len(fetched_varchar) == len(varchar_data), \ + f"VARCHAR size mismatch: expected {len(varchar_data)}, got {len(fetched_varchar)}" + + fetched_varchar_hash = hashlib.sha256(fetched_varchar.encode('utf-8')).hexdigest() + assert fetched_varchar_hash == varchar_hash, \ + f"VARCHAR data corruption: hash mismatch" + print(f"[OK] VARCHAR(MAX) verified: {len(fetched_varchar):,} bytes, SHA256 match") + + # Verify NVARCHAR(MAX) - byte-by-byte integrity + print("Verifying NVARCHAR(MAX) integrity...") + fetched_nvarchar = row[2] + assert len(fetched_nvarchar) == len(nvarchar_data), \ + f"NVARCHAR size mismatch: expected {len(nvarchar_data)}, got {len(fetched_nvarchar)}" + + fetched_nvarchar_hash = hashlib.sha256(fetched_nvarchar.encode('utf-8')).hexdigest() + assert fetched_nvarchar_hash == nvarchar_hash, \ + f"NVARCHAR data corruption: hash mismatch" + print(f"[OK] NVARCHAR(MAX) verified: {len(fetched_nvarchar):,} chars, SHA256 match") + + # Verify VARBINARY(MAX) - byte-by-byte integrity + print("Verifying VARBINARY(MAX) integrity...") + fetched_varbinary = row[3] + assert len(fetched_varbinary) == len(varbinary_data), \ + f"VARBINARY size mismatch: expected {len(varbinary_data)}, got {len(fetched_varbinary)}" + + fetched_varbinary_hash = hashlib.sha256(fetched_varbinary).hexdigest() + assert fetched_varbinary_hash == varbinary_hash, \ + f"VARBINARY data corruption: hash mismatch" + print(f"[OK] VARBINARY(MAX) verified: {len(fetched_varbinary):,} bytes, SHA256 match") + + print("[OK] All 10MB+ LOB data verified: LOB detection correct, no overflow, integrity perfect") + + except Exception as e: + pytest.fail(f"Very large LOB test failed: {e}") + finally: + drop_table_if_exists(cursor, "#pytest_10mb_lob") + db_connection.commit() + + +@pytest.mark.stress +def test_concurrent_fetch_data_integrity_no_corruption(db_connection, conn_str): + """ + Test #7: Multiple threads/cursors fetching data simultaneously. + + Verifies: + 1. No data corruption occurs + 2. Each cursor gets correct data + 3. No crashes or race conditions + 4. Data from one cursor doesn't leak into another + """ + import mssql_python + + num_threads = 5 + num_rows_per_table = 1000 + results = [] + errors = [] + + def worker_thread(thread_id: int, conn_str: str, results_list: List, errors_list: List): + """Worker thread that creates its own connection and fetches data""" + try: + # Each thread gets its own connection and cursor + conn = mssql_python.connect(conn_str) + cursor = conn.cursor() + + # Create thread-specific table + table_name = f"#pytest_concurrent_t{thread_id}" + drop_table_if_exists(cursor, table_name) + + cursor.execute(f""" + CREATE TABLE {table_name} ( + id INT, + thread_id INT, + data VARCHAR(100) + ) + """) + conn.commit() + + # Insert thread-specific data + for i in range(num_rows_per_table): + cursor.execute( + f"INSERT INTO {table_name} VALUES (?, ?, ?)", + (i, thread_id, f"Thread_{thread_id}_Row_{i}") + ) + conn.commit() + + # Small delay to ensure concurrent execution + time.sleep(0.01) + + # Fetch data and verify + cursor.execute(f"SELECT id, thread_id, data FROM {table_name} ORDER BY id") + rows = cursor.fetchall() + + # Verify all rows belong to this thread only (no cross-contamination) + for i, row in enumerate(rows): + if row[0] != i: + raise ValueError(f"Thread {thread_id}: Row {i} has wrong ID {row[0]}") + if row[1] != thread_id: + raise ValueError(f"Thread {thread_id}: Data corruption! Got thread_id {row[1]}") + expected_data = f"Thread_{thread_id}_Row_{i}" + if row[2] != expected_data: + raise ValueError(f"Thread {thread_id}: Data corruption! Expected '{expected_data}', got '{row[2]}'") + + # Record success + results_list.append({ + 'thread_id': thread_id, + 'rows_fetched': len(rows), + 'success': True + }) + + # Cleanup + drop_table_if_exists(cursor, table_name) + conn.commit() + cursor.close() + conn.close() + + except Exception as e: + errors_list.append({ + 'thread_id': thread_id, + 'error': str(e) + }) + + # Create and start threads + threads = [] + print(f"Starting {num_threads} concurrent threads...") + + for i in range(num_threads): + thread = threading.Thread( + target=worker_thread, + args=(i, conn_str, results, errors) + ) + threads.append(thread) + thread.start() + + # Wait for all threads to complete + for thread in threads: + thread.join() + + # Verify results + print(f"\nConcurrent fetch results:") + for result in results: + print(f" Thread {result['thread_id']}: Fetched {result['rows_fetched']} rows - {'OK' if result['success'] else 'FAILED'}") + + if errors: + print(f"\nErrors encountered:") + for error in errors: + print(f" Thread {error['thread_id']}: {error['error']}") + pytest.fail(f"Concurrent fetch had {len(errors)} errors") + + # All threads should have succeeded + assert len(results) == num_threads, \ + f"Expected {num_threads} successful threads, got {len(results)}" + + # All threads should have fetched correct number of rows + for result in results: + assert result['rows_fetched'] == num_rows_per_table, \ + f"Thread {result['thread_id']} fetched {result['rows_fetched']} rows, expected {num_rows_per_table}" + + print(f"\n[OK] Concurrent fetch test passed: {num_threads} threads, no corruption, no race conditions")