diff --git a/mssql_python/pybind/ddbc_bindings.cpp b/mssql_python/pybind/ddbc_bindings.cpp index 3955c003..8314d255 100644 --- a/mssql_python/pybind/ddbc_bindings.cpp +++ b/mssql_python/pybind/ddbc_bindings.cpp @@ -31,6 +31,7 @@ #define ARCHITECTURE "win64" // Default to win64 if not defined during compilation #endif #define DAE_CHUNK_SIZE 8192 +#define SQL_MAX_LOB_SIZE 8000 //------------------------------------------------------------------------------------------------- // Class definitions //------------------------------------------------------------------------------------------------- @@ -1747,8 +1748,13 @@ static py::object FetchLobColumnData(SQLHSTMT hStmt, &actualRead); if (ret == SQL_ERROR || !SQL_SUCCEEDED(ret) && ret != SQL_SUCCESS_WITH_INFO) { - LOG("Loop {}: Error fetching column {} with cType={}", loopCount, colIndex, cType); - ThrowStdException("Error fetching column data"); + std::ostringstream oss; + oss << "Error fetching LOB for column " << colIndex + << ", cType=" << cType + << ", loop=" << loopCount + << ", SQLGetData return=" << ret; + LOG(oss.str()); + ThrowStdException(oss.str()); } if (actualRead == SQL_NULL_DATA) { LOG("Loop {}: Column {} is NULL", loopCount, colIndex); @@ -1862,7 +1868,7 @@ SQLRETURN SQLGetData_wrap(SqlHandlePtr StatementHandle, SQLUSMALLINT colCount, p case SQL_CHAR: case SQL_VARCHAR: case SQL_LONGVARCHAR: { - if (columnSize == SQL_NO_TOTAL || columnSize == 0 || columnSize > 8000) { + if (columnSize == SQL_NO_TOTAL || columnSize == 0 || columnSize > SQL_MAX_LOB_SIZE) { LOG("Streaming LOB for column {}", i); row.append(FetchLobColumnData(hStmt, i, SQL_C_CHAR, false, false)); } else { @@ -2406,7 +2412,7 @@ SQLRETURN SQLBindColums(SQLHSTMT hStmt, ColumnBuffers& buffers, py::list& column // Fetch rows in batches // TODO: Move to anonymous namespace, since it is not used outside this file SQLRETURN FetchBatchData(SQLHSTMT hStmt, ColumnBuffers& buffers, py::list& columnNames, - py::list& rows, SQLUSMALLINT numCols, SQLULEN& numRowsFetched) { + py::list& rows, SQLUSMALLINT numCols, SQLULEN& numRowsFetched, const std::vector& lobColumns) { LOG("Fetching data in batches"); SQLRETURN ret = SQLFetchScroll_ptr(hStmt, SQL_FETCH_NEXT, 0); if (ret == SQL_NO_DATA) { @@ -2466,25 +2472,19 @@ SQLRETURN FetchBatchData(SQLHSTMT hStmt, ColumnBuffers& buffers, py::list& colum case SQL_CHAR: case SQL_VARCHAR: case SQL_LONGVARCHAR: { - // TODO: variable length data needs special handling, this logic wont suffice SQLULEN columnSize = columnMeta["ColumnSize"].cast(); HandleZeroColumnSizeAtFetch(columnSize); uint64_t fetchBufferSize = columnSize + 1 /*null-terminator*/; uint64_t numCharsInData = dataLen / sizeof(SQLCHAR); + bool isLob = std::find(lobColumns.begin(), lobColumns.end(), col) != lobColumns.end(); // fetchBufferSize includes null-terminator, numCharsInData doesn't. Hence '<' - if (numCharsInData < fetchBufferSize) { + if (!isLob && numCharsInData < fetchBufferSize) { // SQLFetch will nullterminate the data row.append(std::string( reinterpret_cast(&buffers.charBuffers[col - 1][i * fetchBufferSize]), numCharsInData)); } else { - // In this case, buffer size is smaller, and data to be retrieved is longer - // TODO: Revisit - std::ostringstream oss; - oss << "Buffer length for fetch (" << columnSize << ") is smaller, & data " - << "to be retrieved is longer (" << numCharsInData << "). ColumnID - " - << col << ", datatype - " << dataType; - ThrowStdException(oss.str()); + row.append(FetchLobColumnData(hStmt, col, SQL_C_CHAR, false, false)); } break; } @@ -2496,8 +2496,9 @@ SQLRETURN FetchBatchData(SQLHSTMT hStmt, ColumnBuffers& buffers, py::list& colum HandleZeroColumnSizeAtFetch(columnSize); uint64_t fetchBufferSize = columnSize + 1 /*null-terminator*/; uint64_t numCharsInData = dataLen / sizeof(SQLWCHAR); + bool isLob = std::find(lobColumns.begin(), lobColumns.end(), col) != lobColumns.end(); // fetchBufferSize includes null-terminator, numCharsInData doesn't. Hence '<' - if (numCharsInData < fetchBufferSize) { + if (!isLob && numCharsInData < fetchBufferSize) { // SQLFetch will nullterminate the data #if defined(__APPLE__) || defined(__linux__) // Use unix-specific conversion to handle the wchar_t/SQLWCHAR size difference @@ -2511,13 +2512,7 @@ SQLRETURN FetchBatchData(SQLHSTMT hStmt, ColumnBuffers& buffers, py::list& colum numCharsInData)); #endif } else { - // In this case, buffer size is smaller, and data to be retrieved is longer - // TODO: Revisit - std::ostringstream oss; - oss << "Buffer length for fetch (" << columnSize << ") is smaller, & data " - << "to be retrieved is longer (" << numCharsInData << "). ColumnID - " - << col << ", datatype - " << dataType; - ThrowStdException(oss.str()); + row.append(FetchLobColumnData(hStmt, col, SQL_C_WCHAR, true, false)); } break; } @@ -2603,21 +2598,15 @@ SQLRETURN FetchBatchData(SQLHSTMT hStmt, ColumnBuffers& buffers, py::list& colum case SQL_BINARY: case SQL_VARBINARY: case SQL_LONGVARBINARY: { - // TODO: variable length data needs special handling, this logic wont suffice SQLULEN columnSize = columnMeta["ColumnSize"].cast(); HandleZeroColumnSizeAtFetch(columnSize); - if (static_cast(dataLen) <= columnSize) { + bool isLob = std::find(lobColumns.begin(), lobColumns.end(), col) != lobColumns.end(); + if (!isLob && static_cast(dataLen) <= columnSize) { row.append(py::bytes(reinterpret_cast( &buffers.charBuffers[col - 1][i * columnSize]), dataLen)); } else { - // In this case, buffer size is smaller, and data to be retrieved is longer - // TODO: Revisit - std::ostringstream oss; - oss << "Buffer length for fetch (" << columnSize << ") is smaller, & data " - << "to be retrieved is longer (" << dataLen << "). ColumnID - " - << col << ", datatype - " << dataType; - ThrowStdException(oss.str()); + row.append(FetchLobColumnData(hStmt, col, SQL_C_BINARY, false, true)); } break; } @@ -2746,6 +2735,35 @@ SQLRETURN FetchMany_wrap(SqlHandlePtr StatementHandle, py::list& rows, int fetch return ret; } + std::vector lobColumns; + for (SQLSMALLINT i = 0; i < numCols; i++) { + auto colMeta = columnNames[i].cast(); + SQLSMALLINT dataType = colMeta["DataType"].cast(); + SQLULEN columnSize = colMeta["ColumnSize"].cast(); + + if ((dataType == SQL_WVARCHAR || dataType == SQL_WLONGVARCHAR || + dataType == SQL_VARCHAR || dataType == SQL_LONGVARCHAR || + dataType == SQL_VARBINARY || dataType == SQL_LONGVARBINARY) && + (columnSize == 0 || columnSize == SQL_NO_TOTAL || columnSize > SQL_MAX_LOB_SIZE)) { + lobColumns.push_back(i + 1); // 1-based + } + } + + // If we have LOBs → fall back to row-by-row fetch + SQLGetData_wrap + if (!lobColumns.empty()) { + LOG("LOB columns detected → using per-row SQLGetData path"); + while (true) { + ret = SQLFetch_ptr(hStmt); + if (ret == SQL_NO_DATA) break; + if (!SQL_SUCCEEDED(ret)) return ret; + + py::list row; + SQLGetData_wrap(StatementHandle, numCols, row); // <-- streams LOBs correctly + rows.append(row); + } + return SQL_SUCCESS; + } + // Initialize column buffers ColumnBuffers buffers(numCols, fetchSize); @@ -2760,7 +2778,7 @@ SQLRETURN FetchMany_wrap(SqlHandlePtr StatementHandle, py::list& rows, int fetch SQLSetStmtAttr_ptr(hStmt, SQL_ATTR_ROW_ARRAY_SIZE, (SQLPOINTER)(intptr_t)fetchSize, 0); SQLSetStmtAttr_ptr(hStmt, SQL_ATTR_ROWS_FETCHED_PTR, &numRowsFetched, 0); - ret = FetchBatchData(hStmt, buffers, columnNames, rows, numCols, numRowsFetched); + ret = FetchBatchData(hStmt, buffers, columnNames, rows, numCols, numRowsFetched, lobColumns); if (!SQL_SUCCEEDED(ret) && ret != SQL_NO_DATA) { LOG("Error when fetching data"); return ret; @@ -2839,6 +2857,35 @@ SQLRETURN FetchAll_wrap(SqlHandlePtr StatementHandle, py::list& rows) { } LOG("Fetching data in batch sizes of {}", fetchSize); + std::vector lobColumns; + for (SQLSMALLINT i = 0; i < numCols; i++) { + auto colMeta = columnNames[i].cast(); + SQLSMALLINT dataType = colMeta["DataType"].cast(); + SQLULEN columnSize = colMeta["ColumnSize"].cast(); + + if ((dataType == SQL_WVARCHAR || dataType == SQL_WLONGVARCHAR || + dataType == SQL_VARCHAR || dataType == SQL_LONGVARCHAR || + dataType == SQL_VARBINARY || dataType == SQL_LONGVARBINARY) && + (columnSize == 0 || columnSize == SQL_NO_TOTAL || columnSize > SQL_MAX_LOB_SIZE)) { + lobColumns.push_back(i + 1); // 1-based + } + } + + // If we have LOBs → fall back to row-by-row fetch + SQLGetData_wrap + if (!lobColumns.empty()) { + LOG("LOB columns detected → using per-row SQLGetData path"); + while (true) { + ret = SQLFetch_ptr(hStmt); + if (ret == SQL_NO_DATA) break; + if (!SQL_SUCCEEDED(ret)) return ret; + + py::list row; + SQLGetData_wrap(StatementHandle, numCols, row); // <-- streams LOBs correctly + rows.append(row); + } + return SQL_SUCCESS; + } + ColumnBuffers buffers(numCols, fetchSize); // Bind columns @@ -2853,7 +2900,7 @@ SQLRETURN FetchAll_wrap(SqlHandlePtr StatementHandle, py::list& rows) { SQLSetStmtAttr_ptr(hStmt, SQL_ATTR_ROWS_FETCHED_PTR, &numRowsFetched, 0); while (ret != SQL_NO_DATA) { - ret = FetchBatchData(hStmt, buffers, columnNames, rows, numCols, numRowsFetched); + ret = FetchBatchData(hStmt, buffers, columnNames, rows, numCols, numRowsFetched, lobColumns); if (!SQL_SUCCEEDED(ret) && ret != SQL_NO_DATA) { LOG("Error when fetching data"); return ret; diff --git a/tests/test_004_cursor.py b/tests/test_004_cursor.py index b002ad2d..df102064 100644 --- a/tests/test_004_cursor.py +++ b/tests/test_004_cursor.py @@ -523,60 +523,6 @@ def test_varbinary_full_capacity(cursor, db_connection): cursor.execute("DROP TABLE #pytest_varbinary_test") db_connection.commit() -def test_varchar_max(cursor, db_connection): - """Test SQL_VARCHAR with MAX length""" - try: - cursor.execute("CREATE TABLE #pytest_varchar_test (varchar_column VARCHAR(MAX))") - db_connection.commit() - cursor.execute("INSERT INTO #pytest_varchar_test (varchar_column) VALUES (?), (?)", ["ABCDEFGHI", None]) - db_connection.commit() - expectedRows = 2 - # fetchone test - cursor.execute("SELECT varchar_column FROM #pytest_varchar_test") - rows = [] - for i in range(0, expectedRows): - rows.append(cursor.fetchone()) - assert cursor.fetchone() == None, "varchar_column is expected to have only {} rows".format(expectedRows) - assert rows[0] == ["ABCDEFGHI"], "SQL_VARCHAR parsing failed for fetchone - row 0" - assert rows[1] == [None], "SQL_VARCHAR parsing failed for fetchone - row 1" - # fetchall test - cursor.execute("SELECT varchar_column FROM #pytest_varchar_test") - rows = cursor.fetchall() - assert rows[0] == ["ABCDEFGHI"], "SQL_VARCHAR parsing failed for fetchall - row 0" - assert rows[1] == [None], "SQL_VARCHAR parsing failed for fetchall - row 1" - except Exception as e: - pytest.fail(f"SQL_VARCHAR parsing test failed: {e}") - finally: - cursor.execute("DROP TABLE #pytest_varchar_test") - db_connection.commit() - -def test_wvarchar_max(cursor, db_connection): - """Test SQL_WVARCHAR with MAX length""" - try: - cursor.execute("CREATE TABLE #pytest_wvarchar_test (wvarchar_column NVARCHAR(MAX))") - db_connection.commit() - cursor.execute("INSERT INTO #pytest_wvarchar_test (wvarchar_column) VALUES (?), (?)", ["!@#$%^&*()_+", None]) - db_connection.commit() - expectedRows = 2 - # fetchone test - cursor.execute("SELECT wvarchar_column FROM #pytest_wvarchar_test") - rows = [] - for i in range(0, expectedRows): - rows.append(cursor.fetchone()) - assert cursor.fetchone() == None, "wvarchar_column is expected to have only {} rows".format(expectedRows) - assert rows[0] == ["!@#$%^&*()_+"], "SQL_WVARCHAR parsing failed for fetchone - row 0" - assert rows[1] == [None], "SQL_WVARCHAR parsing failed for fetchone - row 1" - # fetchall test - cursor.execute("SELECT wvarchar_column FROM #pytest_wvarchar_test") - rows = cursor.fetchall() - assert rows[0] == ["!@#$%^&*()_+"], "SQL_WVARCHAR parsing failed for fetchall - row 0" - assert rows[1] == [None], "SQL_WVARCHAR parsing failed for fetchall - row 1" - except Exception as e: - pytest.fail(f"SQL_WVARCHAR parsing test failed: {e}") - finally: - cursor.execute("DROP TABLE #pytest_wvarchar_test") - db_connection.commit() - def test_varbinary_max(cursor, db_connection): """Test SQL_VARBINARY with MAX length""" try: @@ -5680,294 +5626,6 @@ def test_emoji_round_trip(cursor, db_connection): except Exception as e: pytest.fail(f"Error for input {repr(text)}: {e}") -def test_varchar_max_insert_non_lob(cursor, db_connection): - """Test small VARCHAR(MAX) insert (non-LOB path).""" - try: - cursor.execute("CREATE TABLE #pytest_varchar_nonlob (col VARCHAR(MAX))") - db_connection.commit() - - small_str = "Hello, world!" # small, non-LOB - cursor.execute( - "INSERT INTO #pytest_varchar_nonlob (col) VALUES (?)", - [small_str] - ) - db_connection.commit() - - empty_str = "" - cursor.execute( - "INSERT INTO #pytest_varchar_nonlob (col) VALUES (?)", - [empty_str] - ) - db_connection.commit() - - # None value - cursor.execute( - "INSERT INTO #pytest_varchar_nonlob (col) VALUES (?)", - [None] - ) - db_connection.commit() - - # Fetch commented for now - # cursor.execute("SELECT col FROM #pytest_varchar_nonlob") - # rows = cursor.fetchall() - # assert rows == [[small_str], [empty_str], [None]] - - finally: - pass - - -def test_varchar_max_insert_lob(cursor, db_connection): - """Test large VARCHAR(MAX) insert (LOB path).""" - try: - cursor.execute("CREATE TABLE #pytest_varchar_lob (col VARCHAR(MAX))") - db_connection.commit() - - large_str = "A" * 100_000 # > 8k to trigger LOB - cursor.execute( - "INSERT INTO #pytest_varchar_lob (col) VALUES (?)", - [large_str] - ) - db_connection.commit() - - # Fetch commented for now - # cursor.execute("SELECT col FROM #pytest_varchar_lob") - # rows = cursor.fetchall() - # assert rows == [[large_str]] - - finally: - pass - - -def test_nvarchar_max_insert_non_lob(cursor, db_connection): - """Test small NVARCHAR(MAX) insert (non-LOB path).""" - try: - cursor.execute("CREATE TABLE #pytest_nvarchar_nonlob (col NVARCHAR(MAX))") - db_connection.commit() - - small_str = "Unicode ✨ test" - cursor.execute( - "INSERT INTO #pytest_nvarchar_nonlob (col) VALUES (?)", - [small_str] - ) - db_connection.commit() - - empty_str = "" - cursor.execute( - "INSERT INTO #pytest_nvarchar_nonlob (col) VALUES (?)", - [empty_str] - ) - db_connection.commit() - - cursor.execute( - "INSERT INTO #pytest_nvarchar_nonlob (col) VALUES (?)", - [None] - ) - db_connection.commit() - - # Fetch commented for now - # cursor.execute("SELECT col FROM #pytest_nvarchar_nonlob") - # rows = cursor.fetchall() - # assert rows == [[small_str], [empty_str], [None]] - - finally: - pass - - -def test_nvarchar_max_insert_lob(cursor, db_connection): - """Test large NVARCHAR(MAX) insert (LOB path).""" - try: - cursor.execute("CREATE TABLE #pytest_nvarchar_lob (col NVARCHAR(MAX))") - db_connection.commit() - - large_str = "📝" * 50_000 # each emoji = 2 UTF-16 code units, total > 100k bytes - cursor.execute( - "INSERT INTO #pytest_nvarchar_lob (col) VALUES (?)", - [large_str] - ) - db_connection.commit() - - # Fetch commented for now - # cursor.execute("SELECT col FROM #pytest_nvarchar_lob") - # rows = cursor.fetchall() - # assert rows == [[large_str]] - - finally: - pass - -def test_nvarchar_max_boundary(cursor, db_connection): - """Test NVARCHAR(MAX) at LOB boundary sizes.""" - try: - cursor.execute("DROP TABLE IF EXISTS #pytest_nvarchar_boundary") - cursor.execute("CREATE TABLE #pytest_nvarchar_boundary (col NVARCHAR(MAX))") - db_connection.commit() - - # 4k BMP chars = 8k bytes - cursor.execute("INSERT INTO #pytest_nvarchar_boundary (col) VALUES (?)", ["A" * 4096]) - # 4k emojis = 8k UTF-16 code units (16k bytes) - cursor.execute("INSERT INTO #pytest_nvarchar_boundary (col) VALUES (?)", ["📝" * 4096]) - db_connection.commit() - - # Fetch commented for now - # cursor.execute("SELECT col FROM #pytest_nvarchar_boundary") - # rows = cursor.fetchall() - # assert rows == [["A" * 4096], ["📝" * 4096]] - finally: - pass - - -def test_nvarchar_max_chunk_edge(cursor, db_connection): - """Test NVARCHAR(MAX) insert slightly larger than a chunk.""" - try: - cursor.execute("DROP TABLE IF EXISTS #pytest_nvarchar_chunk") - cursor.execute("CREATE TABLE #pytest_nvarchar_chunk (col NVARCHAR(MAX))") - db_connection.commit() - - chunk_size = 8192 # bytes - test_str = "📝" * ((chunk_size // 4) + 3) # slightly > 1 chunk - cursor.execute("INSERT INTO #pytest_nvarchar_chunk (col) VALUES (?)", [test_str]) - db_connection.commit() - - # Fetch commented for now - # cursor.execute("SELECT col FROM #pytest_nvarchar_chunk") - # row = cursor.fetchone() - # assert row[0] == test_str - finally: - pass - -def test_empty_string_chunk(cursor, db_connection): - """Test inserting empty strings into VARCHAR(MAX) and NVARCHAR(MAX).""" - try: - cursor.execute("DROP TABLE IF EXISTS #pytest_empty_string") - cursor.execute(""" - CREATE TABLE #pytest_empty_string ( - varchar_col VARCHAR(MAX), - nvarchar_col NVARCHAR(MAX) - ) - """) - db_connection.commit() - - empty_varchar = "" - empty_nvarchar = "" - cursor.execute( - "INSERT INTO #pytest_empty_string (varchar_col, nvarchar_col) VALUES (?, ?)", - [empty_varchar, empty_nvarchar] - ) - db_connection.commit() - - cursor.execute("SELECT LEN(varchar_col), LEN(nvarchar_col) FROM #pytest_empty_string") - row = tuple(int(x) for x in cursor.fetchone()) - assert row == (0, 0), f"Expected lengths (0,0), got {row}" - finally: - cursor.execute("DROP TABLE IF EXISTS #pytest_empty_string") - db_connection.commit() - - -def test_varcharmax_short(cursor, db_connection): - """Test inserting and retrieving a small string well below any size thresholds. - # Verifies basic functionality for VARCHAR(MAX) with typical input size.""" - try: - cursor.execute("DROP TABLE IF EXISTS #pytest_varcharmax") - cursor.execute("CREATE TABLE #pytest_varcharmax (col VARCHAR(MAX))") - db_connection.commit() - - short_str = "hello" - cursor.execute("INSERT INTO #pytest_varcharmax VALUES (?)", [short_str]) - db_connection.commit() - cursor.execute("SELECT col FROM #pytest_varcharmax WHERE col = ?", [short_str]) - assert cursor.fetchone()[0] == short_str - finally: - cursor.execute("DROP TABLE IF EXISTS #pytest_varcharmax") - db_connection.commit() - - -def test_varcharmax_boundary(cursor, db_connection): - """Test inserting and retrieving a string at the boundary size (8000 characters), - which is the largest size supported without switching to streaming or large object handling. - Ensures proper handling at this edge case.""" - try: - cursor.execute("DROP TABLE IF EXISTS #pytest_varcharmax") - cursor.execute("CREATE TABLE #pytest_varcharmax (col VARCHAR(MAX))") - db_connection.commit() - - boundary_str = "X" * 8000 - cursor.execute("INSERT INTO #pytest_varcharmax VALUES (?)", [boundary_str]) - db_connection.commit() - cursor.execute("SELECT col FROM #pytest_varcharmax WHERE col = ?", [boundary_str]) - assert cursor.fetchone()[0] == boundary_str - finally: - cursor.execute("DROP TABLE IF EXISTS #pytest_varcharmax") - db_connection.commit() - - -def test_varcharmax_streaming(cursor, db_connection): - """Test inserting and retrieving a string just above the boundary size (8100 characters), - which requires streaming mechanisms to handle data efficiently. - Validates that larger data triggers correct processing without truncation.""" - try: - cursor.execute("DROP TABLE IF EXISTS #pytest_varcharmax") - cursor.execute("CREATE TABLE #pytest_varcharmax (col VARCHAR(MAX))") - db_connection.commit() - - streaming_str = "Y" * 8100 - cursor.execute("INSERT INTO #pytest_varcharmax VALUES (?)", [streaming_str]) - db_connection.commit() - cursor.execute("SELECT col FROM #pytest_varcharmax WHERE col = ?", [streaming_str]) - assert cursor.fetchone()[0] == streaming_str - finally: - cursor.execute("DROP TABLE IF EXISTS #pytest_varcharmax") - db_connection.commit() - - -def test_varcharmax_large(cursor, db_connection): - """Test inserting and retrieving a very large string (100,000 characters), - which is well beyond typical sizes and ensures that the system can handle large VARCHAR(MAX) values.""" - try: - cursor.execute("DROP TABLE IF EXISTS #pytest_varcharmax") - cursor.execute("CREATE TABLE #pytest_varcharmax (col VARCHAR(MAX))") - db_connection.commit() - - large_str = "Z" * 100_000 - cursor.execute("INSERT INTO #pytest_varcharmax VALUES (?)", [large_str]) - db_connection.commit() - cursor.execute("SELECT col FROM #pytest_varcharmax WHERE col = ?", [large_str]) - assert cursor.fetchone()[0] == large_str - finally: - cursor.execute("DROP TABLE IF EXISTS #pytest_varcharmax") - db_connection.commit() - - -def test_varcharmax_empty_string(cursor, db_connection): - """Test inserting and retrieving an empty string to verify correct handling of zero-length data.""" - try: - cursor.execute("DROP TABLE IF EXISTS #pytest_varcharmax") - cursor.execute("CREATE TABLE #pytest_varcharmax (col VARCHAR(MAX))") - db_connection.commit() - - cursor.execute("INSERT INTO #pytest_varcharmax VALUES (?)", [""]) - db_connection.commit() - cursor.execute("SELECT col FROM #pytest_varcharmax WHERE col = ?", [""]) - assert cursor.fetchone()[0] == "" - finally: - cursor.execute("DROP TABLE IF EXISTS #pytest_varcharmax") - db_connection.commit() - - -def test_varcharmax_null(cursor, db_connection): - """Test inserting and retrieving a NULL value to ensure proper handling of SQL NULLs.""" - try: - cursor.execute("DROP TABLE IF EXISTS #pytest_varcharmax") - cursor.execute("CREATE TABLE #pytest_varcharmax (col VARCHAR(MAX))") - db_connection.commit() - - cursor.execute("INSERT INTO #pytest_varcharmax VALUES (?)", [None]) - db_connection.commit() - cursor.execute("SELECT col FROM #pytest_varcharmax WHERE col IS NULL") - assert cursor.fetchone()[0] is None - finally: - cursor.execute("DROP TABLE IF EXISTS #pytest_varcharmax") - db_connection.commit() - - def test_varcharmax_transaction_rollback(cursor, db_connection): """Test that inserting a large VARCHAR(MAX) within a transaction that is rolled back does not persist the data, ensuring transactional integrity.""" @@ -5987,107 +5645,6 @@ def test_varcharmax_transaction_rollback(cursor, db_connection): cursor.execute("DROP TABLE IF EXISTS #pytest_varcharmax") db_connection.commit() - -def test_nvarcharmax_short(cursor, db_connection): - """Test inserting and retrieving a small string well below any size thresholds.""" - try: - cursor.execute("DROP TABLE IF EXISTS #pytest_nvarcharmax") - cursor.execute("CREATE TABLE #pytest_nvarcharmax (col NVARCHAR(MAX))") - db_connection.commit() - - short_str = "hello" - cursor.execute("INSERT INTO #pytest_nvarcharmax VALUES (?)", [short_str]) - db_connection.commit() - cursor.execute("SELECT col FROM #pytest_nvarcharmax WHERE col = ?", [short_str]) - assert cursor.fetchone()[0] == short_str - finally: - cursor.execute("DROP TABLE IF EXISTS #pytest_nvarcharmax") - db_connection.commit() - - -def test_nvarcharmax_boundary(cursor, db_connection): - """Test NVARCHAR(MAX) at the boundary size of 4000 characters (8000 bytes).""" - try: - cursor.execute("DROP TABLE IF EXISTS #pytest_nvarcharmax") - cursor.execute("CREATE TABLE #pytest_nvarcharmax (col NVARCHAR(MAX))") - db_connection.commit() - - boundary_str = "X" * 4000 # NVARCHAR inline limit - cursor.execute("INSERT INTO #pytest_nvarcharmax VALUES (?)", [boundary_str]) - db_connection.commit() - cursor.execute("SELECT col FROM #pytest_nvarcharmax WHERE col = ?", [boundary_str]) - assert cursor.fetchone()[0] == boundary_str - finally: - cursor.execute("DROP TABLE IF EXISTS #pytest_nvarcharmax") - db_connection.commit() - - -def test_nvarcharmax_streaming(cursor, db_connection): - """Test NVARCHAR(MAX) just above the boundary size (4100 characters) to trigger streaming.""" - try: - cursor.execute("DROP TABLE IF EXISTS #pytest_nvarcharmax") - cursor.execute("CREATE TABLE #pytest_nvarcharmax (col NVARCHAR(MAX))") - db_connection.commit() - - streaming_str = "Y" * 4100 # Exceeds inline threshold → triggers streaming - cursor.execute("INSERT INTO #pytest_nvarcharmax VALUES (?)", [streaming_str]) - db_connection.commit() - cursor.execute("SELECT col FROM #pytest_nvarcharmax WHERE col = ?", [streaming_str]) - assert cursor.fetchone()[0] == streaming_str - finally: - cursor.execute("DROP TABLE IF EXISTS #pytest_nvarcharmax") - db_connection.commit() - - -def test_nvarcharmax_large(cursor, db_connection): - """Test inserting and retrieving a very large NVARCHAR(MAX) string (100,000 characters).""" - try: - cursor.execute("DROP TABLE IF EXISTS #pytest_nvarcharmax") - cursor.execute("CREATE TABLE #pytest_nvarcharmax (col NVARCHAR(MAX))") - db_connection.commit() - - large_str = "Z" * 100_000 - cursor.execute("INSERT INTO #pytest_nvarcharmax VALUES (?)", [large_str]) - db_connection.commit() - cursor.execute("SELECT col FROM #pytest_nvarcharmax WHERE col = ?", [large_str]) - assert cursor.fetchone()[0] == large_str - finally: - cursor.execute("DROP TABLE IF EXISTS #pytest_nvarcharmax") - db_connection.commit() - - -def test_nvarcharmax_empty_string(cursor, db_connection): - """Test inserting and retrieving an empty NVARCHAR(MAX) string.""" - try: - cursor.execute("DROP TABLE IF EXISTS #pytest_nvarcharmax") - cursor.execute("CREATE TABLE #pytest_nvarcharmax (col NVARCHAR(MAX))") - db_connection.commit() - - cursor.execute("INSERT INTO #pytest_nvarcharmax VALUES (?)", [""]) - db_connection.commit() - cursor.execute("SELECT col FROM #pytest_nvarcharmax WHERE col = ?", [""]) - assert cursor.fetchone()[0] == "" - finally: - cursor.execute("DROP TABLE IF EXISTS #pytest_nvarcharmax") - db_connection.commit() - - -def test_nvarcharmax_null(cursor, db_connection): - """Test inserting and retrieving a NULL NVARCHAR(MAX) value.""" - try: - cursor.execute("DROP TABLE IF EXISTS #pytest_nvarcharmax") - cursor.execute("CREATE TABLE #pytest_nvarcharmax (col NVARCHAR(MAX))") - db_connection.commit() - - cursor.execute("INSERT INTO #pytest_nvarcharmax VALUES (?)", [None]) - db_connection.commit() - cursor.execute("SELECT col FROM #pytest_nvarcharmax WHERE col IS NULL") - assert cursor.fetchone()[0] is None - finally: - cursor.execute("DROP TABLE IF EXISTS #pytest_nvarcharmax") - db_connection.commit() - - def test_nvarcharmax_transaction_rollback(cursor, db_connection): """Test that inserting a large NVARCHAR(MAX) within a transaction that is rolled back does not persist the data, ensuring transactional integrity.""" @@ -6795,6 +6352,259 @@ def test_only_null_and_empty_binary(cursor, db_connection): drop_table_if_exists(cursor, "#pytest_null_empty_binary") db_connection.commit() +# ---------------------- VARCHAR(MAX) ---------------------- + +def test_varcharmax_short_fetch(cursor, db_connection): + """Small VARCHAR(MAX), fetchone/fetchall/fetchmany.""" + try: + cursor.execute("DROP TABLE IF EXISTS #pytest_varcharmax") + cursor.execute("CREATE TABLE #pytest_varcharmax (col VARCHAR(MAX))") + db_connection.commit() + + values = ["hello", "world"] + for val in values: + cursor.execute("INSERT INTO #pytest_varcharmax VALUES (?)", [val]) + db_connection.commit() + + # fetchone + cursor.execute("SELECT col FROM #pytest_varcharmax ORDER BY col") + row1 = cursor.fetchone()[0] + row2 = cursor.fetchone()[0] + assert {row1, row2} == set(values) + assert cursor.fetchone() is None + + # fetchall + cursor.execute("SELECT col FROM #pytest_varcharmax ORDER BY col") + all_rows = [r[0] for r in cursor.fetchall()] + assert set(all_rows) == set(values) + + # fetchmany + cursor.execute("SELECT col FROM #pytest_varcharmax ORDER BY col") + many = [r[0] for r in cursor.fetchmany(1)] + assert many[0] in values + finally: + cursor.execute("DROP TABLE IF EXISTS #pytest_varcharmax") + db_connection.commit() + + +def test_varcharmax_empty_string(cursor, db_connection): + """Empty string in VARCHAR(MAX).""" + try: + cursor.execute("CREATE TABLE #pytest_varcharmax (col VARCHAR(MAX))") + db_connection.commit() + cursor.execute("INSERT INTO #pytest_varcharmax VALUES (?)", [""]) + db_connection.commit() + + cursor.execute("SELECT col FROM #pytest_varcharmax") + assert cursor.fetchone()[0] == "" + finally: + cursor.execute("DROP TABLE #pytest_varcharmax") + db_connection.commit() + + +def test_varcharmax_null(cursor, db_connection): + """NULL in VARCHAR(MAX).""" + try: + cursor.execute("CREATE TABLE #pytest_varcharmax (col VARCHAR(MAX))") + db_connection.commit() + cursor.execute("INSERT INTO #pytest_varcharmax VALUES (?)", [None]) + db_connection.commit() + + cursor.execute("SELECT col FROM #pytest_varcharmax") + assert cursor.fetchone()[0] is None + finally: + cursor.execute("DROP TABLE #pytest_varcharmax") + db_connection.commit() + + +def test_varcharmax_boundary(cursor, db_connection): + """Boundary at 8000 (inline limit).""" + try: + boundary_str = "X" * 8000 + cursor.execute("CREATE TABLE #pytest_varcharmax (col VARCHAR(MAX))") + db_connection.commit() + cursor.execute("INSERT INTO #pytest_varcharmax VALUES (?)", [boundary_str]) + db_connection.commit() + + cursor.execute("SELECT col FROM #pytest_varcharmax") + assert cursor.fetchone()[0] == boundary_str + finally: + cursor.execute("DROP TABLE #pytest_varcharmax") + db_connection.commit() + + +def test_varcharmax_streaming(cursor, db_connection): + """Streaming fetch > 8k with all fetch modes.""" + try: + values = ["Y" * 8100, "Z" * 10000] + cursor.execute("CREATE TABLE #pytest_varcharmax (col VARCHAR(MAX))") + db_connection.commit() + for v in values: + cursor.execute("INSERT INTO #pytest_varcharmax VALUES (?)", [v]) + db_connection.commit() + + # --- fetchall --- + cursor.execute("SELECT col FROM #pytest_varcharmax ORDER BY LEN(col)") + rows = [r[0] for r in cursor.fetchall()] + assert rows == sorted(values, key=len) + + # --- fetchone --- + cursor.execute("SELECT col FROM #pytest_varcharmax ORDER BY LEN(col)") + r1 = cursor.fetchone()[0] + r2 = cursor.fetchone()[0] + assert {r1, r2} == set(values) + assert cursor.fetchone() is None + + # --- fetchmany --- + cursor.execute("SELECT col FROM #pytest_varcharmax ORDER BY LEN(col)") + batch = [r[0] for r in cursor.fetchmany(1)] + assert batch[0] in values + finally: + cursor.execute("DROP TABLE #pytest_varcharmax") + db_connection.commit() + + +def test_varcharmax_large(cursor, db_connection): + """Very large VARCHAR(MAX).""" + try: + large_str = "L" * 100_000 + cursor.execute("CREATE TABLE #pytest_varcharmax (col VARCHAR(MAX))") + db_connection.commit() + cursor.execute("INSERT INTO #pytest_varcharmax VALUES (?)", [large_str]) + db_connection.commit() + + cursor.execute("SELECT col FROM #pytest_varcharmax") + assert cursor.fetchone()[0] == large_str + finally: + cursor.execute("DROP TABLE #pytest_varcharmax") + db_connection.commit() + + +# ---------------------- NVARCHAR(MAX) ---------------------- + +def test_nvarcharmax_short_fetch(cursor, db_connection): + """Small NVARCHAR(MAX), unicode, fetch modes.""" + try: + values = ["hello", "world_ß"] + cursor.execute("CREATE TABLE #pytest_nvarcharmax (col NVARCHAR(MAX))") + db_connection.commit() + for v in values: + cursor.execute("INSERT INTO #pytest_nvarcharmax VALUES (?)", [v]) + db_connection.commit() + + # fetchone + cursor.execute("SELECT col FROM #pytest_nvarcharmax ORDER BY col") + r1 = cursor.fetchone()[0] + r2 = cursor.fetchone()[0] + assert {r1, r2} == set(values) + assert cursor.fetchone() is None + + # fetchall + cursor.execute("SELECT col FROM #pytest_nvarcharmax ORDER BY col") + all_rows = [r[0] for r in cursor.fetchall()] + assert set(all_rows) == set(values) + + # fetchmany + cursor.execute("SELECT col FROM #pytest_nvarcharmax ORDER BY col") + many = [r[0] for r in cursor.fetchmany(1)] + assert many[0] in values + finally: + cursor.execute("DROP TABLE #pytest_nvarcharmax") + db_connection.commit() + + +def test_nvarcharmax_empty_string(cursor, db_connection): + """Empty string in NVARCHAR(MAX).""" + try: + cursor.execute("CREATE TABLE #pytest_nvarcharmax (col NVARCHAR(MAX))") + db_connection.commit() + cursor.execute("INSERT INTO #pytest_nvarcharmax VALUES (?)", [""]) + db_connection.commit() + + cursor.execute("SELECT col FROM #pytest_nvarcharmax") + assert cursor.fetchone()[0] == "" + finally: + cursor.execute("DROP TABLE #pytest_nvarcharmax") + db_connection.commit() + + +def test_nvarcharmax_null(cursor, db_connection): + """NULL in NVARCHAR(MAX).""" + try: + cursor.execute("CREATE TABLE #pytest_nvarcharmax (col NVARCHAR(MAX))") + db_connection.commit() + cursor.execute("INSERT INTO #pytest_nvarcharmax VALUES (?)", [None]) + db_connection.commit() + + cursor.execute("SELECT col FROM #pytest_nvarcharmax") + assert cursor.fetchone()[0] is None + finally: + cursor.execute("DROP TABLE #pytest_nvarcharmax") + db_connection.commit() + + +def test_nvarcharmax_boundary(cursor, db_connection): + """Boundary at 4000 characters (inline limit).""" + try: + boundary_str = "X" * 4000 + cursor.execute("CREATE TABLE #pytest_nvarcharmax (col NVARCHAR(MAX))") + db_connection.commit() + cursor.execute("INSERT INTO #pytest_nvarcharmax VALUES (?)", [boundary_str]) + db_connection.commit() + + cursor.execute("SELECT col FROM #pytest_nvarcharmax") + assert cursor.fetchone()[0] == boundary_str + finally: + cursor.execute("DROP TABLE #pytest_nvarcharmax") + db_connection.commit() + + +def test_nvarcharmax_streaming(cursor, db_connection): + """Streaming fetch > 4k unicode with all fetch modes.""" + try: + values = ["Ω" * 4100, "漢" * 5000] + cursor.execute("CREATE TABLE #pytest_nvarcharmax (col NVARCHAR(MAX))") + db_connection.commit() + for v in values: + cursor.execute("INSERT INTO #pytest_nvarcharmax VALUES (?)", [v]) + db_connection.commit() + + # --- fetchall --- + cursor.execute("SELECT col FROM #pytest_nvarcharmax ORDER BY LEN(col)") + rows = [r[0] for r in cursor.fetchall()] + assert rows == sorted(values, key=len) + + # --- fetchone --- + cursor.execute("SELECT col FROM #pytest_nvarcharmax ORDER BY LEN(col)") + r1 = cursor.fetchone()[0] + r2 = cursor.fetchone()[0] + assert {r1, r2} == set(values) + assert cursor.fetchone() is None + + # --- fetchmany --- + cursor.execute("SELECT col FROM #pytest_nvarcharmax ORDER BY LEN(col)") + batch = [r[0] for r in cursor.fetchmany(1)] + assert batch[0] in values + finally: + cursor.execute("DROP TABLE #pytest_nvarcharmax") + db_connection.commit() + + +def test_nvarcharmax_large(cursor, db_connection): + """Very large NVARCHAR(MAX).""" + try: + large_str = "漢" * 50_000 + cursor.execute("CREATE TABLE #pytest_nvarcharmax (col NVARCHAR(MAX))") + db_connection.commit() + cursor.execute("INSERT INTO #pytest_nvarcharmax VALUES (?)", [large_str]) + db_connection.commit() + + cursor.execute("SELECT col FROM #pytest_nvarcharmax") + assert cursor.fetchone()[0] == large_str + finally: + cursor.execute("DROP TABLE #pytest_nvarcharmax") + db_connection.commit() + def test_close(db_connection): """Test closing the cursor""" try: @@ -6804,4 +6614,4 @@ def test_close(db_connection): except Exception as e: pytest.fail(f"Cursor close test failed: {e}") finally: - cursor = db_connection.cursor() \ No newline at end of file + cursor = db_connection.cursor()