From 7e31a85b54de53f3f41ba6cc82f2e0674ebc26ee Mon Sep 17 00:00:00 2001 From: gargsaumya Date: Wed, 3 Sep 2025 17:54:59 +0530 Subject: [PATCH 1/6] adding streaming support in fetch for varcharmax type --- mssql_python/cursor.py | 28 ++++++++++++++-------------- tests/test_004_cursor.py | 1 - 2 files changed, 14 insertions(+), 15 deletions(-) diff --git a/mssql_python/cursor.py b/mssql_python/cursor.py index 7e8c58f8..7b130ec8 100644 --- a/mssql_python/cursor.py +++ b/mssql_python/cursor.py @@ -753,20 +753,20 @@ def execute( # Executing a new statement. Reset is_stmt_prepared to false self.is_stmt_prepared = [False] - log('debug', "Executing query: %s", operation) - for i, param in enumerate(parameters): - log('debug', - """Parameter number: %s, Parameter: %s, - Param Python Type: %s, ParamInfo: %s, %s, %s, %s, %s""", - i + 1, - param, - str(type(param)), - parameters_type[i].paramSQLType, - parameters_type[i].paramCType, - parameters_type[i].columnSize, - parameters_type[i].decimalDigits, - parameters_type[i].inputOutputType, - ) + # log('debug', "Executing query: %s", operation) + # for i, param in enumerate(parameters): + # log('debug', + # """Parameter number: %s, Parameter: %s, + # Param Python Type: %s, ParamInfo: %s, %s, %s, %s, %s""", + # i + 1, + # param, + # str(type(param)), + # parameters_type[i].paramSQLType, + # parameters_type[i].paramCType, + # parameters_type[i].columnSize, + # parameters_type[i].decimalDigits, + # parameters_type[i].inputOutputType, + # ) ret = ddbc_bindings.DDBCSQLExecute( self.hstmt, diff --git a/tests/test_004_cursor.py b/tests/test_004_cursor.py index b002ad2d..4ceeaa53 100644 --- a/tests/test_004_cursor.py +++ b/tests/test_004_cursor.py @@ -5987,7 +5987,6 @@ def test_varcharmax_transaction_rollback(cursor, db_connection): cursor.execute("DROP TABLE IF EXISTS #pytest_varcharmax") db_connection.commit() - def test_nvarcharmax_short(cursor, db_connection): """Test inserting and retrieving a small string well below any size thresholds.""" try: From 929701c24234ebe9a7b9583ecf4081f1843b892e Mon Sep 17 00:00:00 2001 From: gargsaumya Date: Wed, 3 Sep 2025 21:17:27 +0530 Subject: [PATCH 2/6] added streaming support for fetch for nvarcharmax type --- mssql_python/pybind/ddbc_bindings.cpp | 7 +- tests/test_004_cursor.py | 112 ++++++++++++++++++++++++++ 2 files changed, 115 insertions(+), 4 deletions(-) diff --git a/mssql_python/pybind/ddbc_bindings.cpp b/mssql_python/pybind/ddbc_bindings.cpp index 3955c003..0a978aae 100644 --- a/mssql_python/pybind/ddbc_bindings.cpp +++ b/mssql_python/pybind/ddbc_bindings.cpp @@ -1862,10 +1862,12 @@ SQLRETURN SQLGetData_wrap(SqlHandlePtr StatementHandle, SQLUSMALLINT colCount, p case SQL_CHAR: case SQL_VARCHAR: case SQL_LONGVARCHAR: { + // Use streaming for large VARCHAR / CHAR if (columnSize == SQL_NO_TOTAL || columnSize == 0 || columnSize > 8000) { LOG("Streaming LOB for column {}", i); row.append(FetchLobColumnData(hStmt, i, SQL_C_CHAR, false, false)); } else { + // Small VARCHAR, fetch directly uint64_t fetchBufferSize = columnSize + 1 /* null-termination */; std::vector dataBuffer(fetchBufferSize); SQLLEN dataLen; @@ -1878,7 +1880,7 @@ SQLRETURN SQLGetData_wrap(SqlHandlePtr StatementHandle, SQLUSMALLINT colCount, p if (numCharsInData < dataBuffer.size()) { // SQLGetData will null-terminate the data #if defined(__APPLE__) || defined(__linux__) - std::string fullStr(reinterpret_cast(dataBuffer.data())); + std::string fullStr(reinterpret_cast(dataBuffer.data()), dataLen); row.append(fullStr); LOG("macOS/Linux: Appended CHAR string of length {} to result row", fullStr.length()); #else @@ -1889,9 +1891,6 @@ SQLRETURN SQLGetData_wrap(SqlHandlePtr StatementHandle, SQLUSMALLINT colCount, p LOG("CHAR column {} data truncated, using streaming LOB", i); row.append(FetchLobColumnData(hStmt, i, SQL_C_CHAR, false, false)); } - } else if (dataLen == SQL_NULL_DATA) { - LOG("Column {} is NULL (CHAR)", i); - row.append(py::none()); } else if (dataLen == 0) { row.append(py::str("")); } else if (dataLen == SQL_NO_TOTAL) { diff --git a/tests/test_004_cursor.py b/tests/test_004_cursor.py index 4ceeaa53..5f149492 100644 --- a/tests/test_004_cursor.py +++ b/tests/test_004_cursor.py @@ -6794,6 +6794,118 @@ def test_only_null_and_empty_binary(cursor, db_connection): drop_table_if_exists(cursor, "#pytest_null_empty_binary") db_connection.commit() +def test_nvarcharmax_short(cursor, db_connection): + try: + cursor.execute("DROP TABLE IF EXISTS #pytest_nvarcharmax") + cursor.execute("CREATE TABLE #pytest_nvarcharmax (col NVARCHAR(MAX))") + db_connection.commit() + + short_str = "hello" + cursor.execute("INSERT INTO #pytest_nvarcharmax VALUES (?)", [short_str]) + db_connection.commit() + cursor.execute("SELECT col FROM #pytest_nvarcharmax WHERE col = ?", [short_str]) + assert cursor.fetchone()[0] == short_str + finally: + cursor.execute("DROP TABLE IF EXISTS #pytest_nvarcharmax") + db_connection.commit() + + +def test_nvarcharmax_boundary(cursor, db_connection): + try: + cursor.execute("DROP TABLE IF EXISTS #pytest_nvarcharmax") + cursor.execute("CREATE TABLE #pytest_nvarcharmax (col NVARCHAR(MAX))") + db_connection.commit() + + boundary_str = "X" * 4000 # NVARCHAR inline limit + cursor.execute("INSERT INTO #pytest_nvarcharmax VALUES (?)", [boundary_str]) + db_connection.commit() + cursor.execute("SELECT col FROM #pytest_nvarcharmax WHERE col = ?", [boundary_str]) + assert cursor.fetchone()[0] == boundary_str + finally: + cursor.execute("DROP TABLE IF EXISTS #pytest_nvarcharmax") + db_connection.commit() + + +def test_nvarcharmax_streaming(cursor, db_connection): + try: + cursor.execute("DROP TABLE IF EXISTS #pytest_nvarcharmax") + cursor.execute("CREATE TABLE #pytest_nvarcharmax (col NVARCHAR(MAX))") + db_connection.commit() + + streaming_str = "Y" * 4100 # Exceeds inline threshold → triggers streaming + cursor.execute("INSERT INTO #pytest_nvarcharmax VALUES (?)", [streaming_str]) + db_connection.commit() + cursor.execute("SELECT col FROM #pytest_nvarcharmax WHERE col = ?", [streaming_str]) + assert cursor.fetchone()[0] == streaming_str + finally: + cursor.execute("DROP TABLE IF EXISTS #pytest_nvarcharmax") + db_connection.commit() + + +def test_nvarcharmax_large(cursor, db_connection): + try: + cursor.execute("DROP TABLE IF EXISTS #pytest_nvarcharmax") + cursor.execute("CREATE TABLE #pytest_nvarcharmax (col NVARCHAR(MAX))") + db_connection.commit() + + large_str = "Z" * 100_000 + cursor.execute("INSERT INTO #pytest_nvarcharmax VALUES (?)", [large_str]) + db_connection.commit() + cursor.execute("SELECT col FROM #pytest_nvarcharmax WHERE col = ?", [large_str]) + assert cursor.fetchone()[0] == large_str + finally: + cursor.execute("DROP TABLE IF EXISTS #pytest_nvarcharmax") + db_connection.commit() + + +def test_nvarcharmax_empty_string(cursor, db_connection): + try: + cursor.execute("DROP TABLE IF EXISTS #pytest_nvarcharmax") + cursor.execute("CREATE TABLE #pytest_nvarcharmax (col NVARCHAR(MAX))") + db_connection.commit() + + cursor.execute("INSERT INTO #pytest_nvarcharmax VALUES (?)", [""]) + db_connection.commit() + cursor.execute("SELECT col FROM #pytest_nvarcharmax WHERE col = ?", [""]) + assert cursor.fetchone()[0] == "" + finally: + cursor.execute("DROP TABLE IF EXISTS #pytest_nvarcharmax") + db_connection.commit() + + +def test_nvarcharmax_null(cursor, db_connection): + try: + cursor.execute("DROP TABLE IF EXISTS #pytest_nvarcharmax") + cursor.execute("CREATE TABLE #pytest_nvarcharmax (col NVARCHAR(MAX))") + db_connection.commit() + + cursor.execute("INSERT INTO #pytest_nvarcharmax VALUES (?)", [None]) + db_connection.commit() + cursor.execute("SELECT col FROM #pytest_nvarcharmax WHERE col IS NULL") + assert cursor.fetchone()[0] is None + finally: + cursor.execute("DROP TABLE IF EXISTS #pytest_nvarcharmax") + db_connection.commit() + + +def test_nvarcharmax_transaction_rollback(cursor, db_connection): + try: + cursor.execute("DROP TABLE IF EXISTS #pytest_nvarcharmax") + cursor.execute("CREATE TABLE #pytest_nvarcharmax (col NVARCHAR(MAX))") + db_connection.commit() + + db_connection.autocommit = False + rollback_str = "ROLLBACK" * 2000 + cursor.execute("INSERT INTO #pytest_nvarcharmax VALUES (?)", [rollback_str]) + db_connection.rollback() + cursor.execute("SELECT COUNT(*) FROM #pytest_nvarcharmax WHERE col = ?", [rollback_str]) + assert cursor.fetchone()[0] == 0 + finally: + db_connection.autocommit = True + cursor.execute("DROP TABLE IF EXISTS #pytest_nvarcharmax") + db_connection.commit() + + def test_close(db_connection): """Test closing the cursor""" try: From 82db978e176d44823f243378d9bfa1f1daa8214b Mon Sep 17 00:00:00 2001 From: gargsaumya Date: Wed, 3 Sep 2025 22:59:18 +0530 Subject: [PATCH 3/6] added streaming support in fetchall and fetchmany --- mssql_python/pybind/ddbc_bindings.cpp | 100 ++-- tests/test_004_cursor.py | 683 +++++++------------------- 2 files changed, 262 insertions(+), 521 deletions(-) diff --git a/mssql_python/pybind/ddbc_bindings.cpp b/mssql_python/pybind/ddbc_bindings.cpp index 0a978aae..1e80b1a6 100644 --- a/mssql_python/pybind/ddbc_bindings.cpp +++ b/mssql_python/pybind/ddbc_bindings.cpp @@ -2405,7 +2405,7 @@ SQLRETURN SQLBindColums(SQLHSTMT hStmt, ColumnBuffers& buffers, py::list& column // Fetch rows in batches // TODO: Move to anonymous namespace, since it is not used outside this file SQLRETURN FetchBatchData(SQLHSTMT hStmt, ColumnBuffers& buffers, py::list& columnNames, - py::list& rows, SQLUSMALLINT numCols, SQLULEN& numRowsFetched) { + py::list& rows, SQLUSMALLINT numCols, SQLULEN& numRowsFetched, const std::vector& lobColumns) { LOG("Fetching data in batches"); SQLRETURN ret = SQLFetchScroll_ptr(hStmt, SQL_FETCH_NEXT, 0); if (ret == SQL_NO_DATA) { @@ -2465,25 +2465,20 @@ SQLRETURN FetchBatchData(SQLHSTMT hStmt, ColumnBuffers& buffers, py::list& colum case SQL_CHAR: case SQL_VARCHAR: case SQL_LONGVARCHAR: { - // TODO: variable length data needs special handling, this logic wont suffice + // TODO: variable length data needs special handling, this logic wont suffice SQLULEN columnSize = columnMeta["ColumnSize"].cast(); HandleZeroColumnSizeAtFetch(columnSize); uint64_t fetchBufferSize = columnSize + 1 /*null-terminator*/; uint64_t numCharsInData = dataLen / sizeof(SQLCHAR); + bool isLob = std::find(lobColumns.begin(), lobColumns.end(), col) != lobColumns.end(); // fetchBufferSize includes null-terminator, numCharsInData doesn't. Hence '<' - if (numCharsInData < fetchBufferSize) { + if (!isLob && numCharsInData < fetchBufferSize) { // SQLFetch will nullterminate the data row.append(std::string( reinterpret_cast(&buffers.charBuffers[col - 1][i * fetchBufferSize]), numCharsInData)); } else { - // In this case, buffer size is smaller, and data to be retrieved is longer - // TODO: Revisit - std::ostringstream oss; - oss << "Buffer length for fetch (" << columnSize << ") is smaller, & data " - << "to be retrieved is longer (" << numCharsInData << "). ColumnID - " - << col << ", datatype - " << dataType; - ThrowStdException(oss.str()); + row.append(FetchLobColumnData(hStmt, col, SQL_C_CHAR, false, false)); } break; } @@ -2495,8 +2490,9 @@ SQLRETURN FetchBatchData(SQLHSTMT hStmt, ColumnBuffers& buffers, py::list& colum HandleZeroColumnSizeAtFetch(columnSize); uint64_t fetchBufferSize = columnSize + 1 /*null-terminator*/; uint64_t numCharsInData = dataLen / sizeof(SQLWCHAR); + bool isLob = std::find(lobColumns.begin(), lobColumns.end(), col) != lobColumns.end(); // fetchBufferSize includes null-terminator, numCharsInData doesn't. Hence '<' - if (numCharsInData < fetchBufferSize) { + if (!isLob && numCharsInData < fetchBufferSize) { // SQLFetch will nullterminate the data #if defined(__APPLE__) || defined(__linux__) // Use unix-specific conversion to handle the wchar_t/SQLWCHAR size difference @@ -2510,13 +2506,7 @@ SQLRETURN FetchBatchData(SQLHSTMT hStmt, ColumnBuffers& buffers, py::list& colum numCharsInData)); #endif } else { - // In this case, buffer size is smaller, and data to be retrieved is longer - // TODO: Revisit - std::ostringstream oss; - oss << "Buffer length for fetch (" << columnSize << ") is smaller, & data " - << "to be retrieved is longer (" << numCharsInData << "). ColumnID - " - << col << ", datatype - " << dataType; - ThrowStdException(oss.str()); + row.append(FetchLobColumnData(hStmt, col, SQL_C_WCHAR, true, false)); } break; } @@ -2602,21 +2592,15 @@ SQLRETURN FetchBatchData(SQLHSTMT hStmt, ColumnBuffers& buffers, py::list& colum case SQL_BINARY: case SQL_VARBINARY: case SQL_LONGVARBINARY: { - // TODO: variable length data needs special handling, this logic wont suffice SQLULEN columnSize = columnMeta["ColumnSize"].cast(); HandleZeroColumnSizeAtFetch(columnSize); - if (static_cast(dataLen) <= columnSize) { + bool isLob = std::find(lobColumns.begin(), lobColumns.end(), col) != lobColumns.end(); + if (!isLob && static_cast(dataLen) <= columnSize) { row.append(py::bytes(reinterpret_cast( &buffers.charBuffers[col - 1][i * columnSize]), dataLen)); } else { - // In this case, buffer size is smaller, and data to be retrieved is longer - // TODO: Revisit - std::ostringstream oss; - oss << "Buffer length for fetch (" << columnSize << ") is smaller, & data " - << "to be retrieved is longer (" << dataLen << "). ColumnID - " - << col << ", datatype - " << dataType; - ThrowStdException(oss.str()); + row.append(FetchLobColumnData(hStmt, col, SQL_C_BINARY, false, true)); } break; } @@ -2745,6 +2729,35 @@ SQLRETURN FetchMany_wrap(SqlHandlePtr StatementHandle, py::list& rows, int fetch return ret; } + std::vector lobColumns; + for (SQLSMALLINT i = 0; i < numCols; i++) { + auto colMeta = columnNames[i].cast(); + SQLSMALLINT dataType = colMeta["DataType"].cast(); + SQLULEN columnSize = colMeta["ColumnSize"].cast(); + + if ((dataType == SQL_WVARCHAR || dataType == SQL_WLONGVARCHAR || + dataType == SQL_VARCHAR || dataType == SQL_LONGVARCHAR || + dataType == SQL_VARBINARY || dataType == SQL_LONGVARBINARY) && + (columnSize == 0 || columnSize == SQL_NO_TOTAL || columnSize > 8000)) { + lobColumns.push_back(i + 1); // 1-based + } + } + + // If we have LOBs → fall back to row-by-row fetch + SQLGetData_wrap + if (!lobColumns.empty()) { + LOG("LOB columns detected → using per-row SQLGetData path"); + while (true) { + ret = SQLFetch_ptr(hStmt); + if (ret == SQL_NO_DATA) break; + if (!SQL_SUCCEEDED(ret)) return ret; + + py::list row; + SQLGetData_wrap(StatementHandle, numCols, row); // <-- streams LOBs correctly + rows.append(row); + } + return SQL_SUCCESS; + } + // Initialize column buffers ColumnBuffers buffers(numCols, fetchSize); @@ -2759,7 +2772,7 @@ SQLRETURN FetchMany_wrap(SqlHandlePtr StatementHandle, py::list& rows, int fetch SQLSetStmtAttr_ptr(hStmt, SQL_ATTR_ROW_ARRAY_SIZE, (SQLPOINTER)(intptr_t)fetchSize, 0); SQLSetStmtAttr_ptr(hStmt, SQL_ATTR_ROWS_FETCHED_PTR, &numRowsFetched, 0); - ret = FetchBatchData(hStmt, buffers, columnNames, rows, numCols, numRowsFetched); + ret = FetchBatchData(hStmt, buffers, columnNames, rows, numCols, numRowsFetched, lobColumns); if (!SQL_SUCCEEDED(ret) && ret != SQL_NO_DATA) { LOG("Error when fetching data"); return ret; @@ -2838,6 +2851,35 @@ SQLRETURN FetchAll_wrap(SqlHandlePtr StatementHandle, py::list& rows) { } LOG("Fetching data in batch sizes of {}", fetchSize); + std::vector lobColumns; + for (SQLSMALLINT i = 0; i < numCols; i++) { + auto colMeta = columnNames[i].cast(); + SQLSMALLINT dataType = colMeta["DataType"].cast(); + SQLULEN columnSize = colMeta["ColumnSize"].cast(); + + if ((dataType == SQL_WVARCHAR || dataType == SQL_WLONGVARCHAR || + dataType == SQL_VARCHAR || dataType == SQL_LONGVARCHAR || + dataType == SQL_VARBINARY || dataType == SQL_LONGVARBINARY) && + (columnSize == 0 || columnSize == SQL_NO_TOTAL || columnSize > 8000)) { + lobColumns.push_back(i + 1); // 1-based + } + } + + // If we have LOBs → fall back to row-by-row fetch + SQLGetData_wrap + if (!lobColumns.empty()) { + LOG("LOB columns detected → using per-row SQLGetData path"); + while (true) { + ret = SQLFetch_ptr(hStmt); + if (ret == SQL_NO_DATA) break; + if (!SQL_SUCCEEDED(ret)) return ret; + + py::list row; + SQLGetData_wrap(StatementHandle, numCols, row); // <-- streams LOBs correctly + rows.append(row); + } + return SQL_SUCCESS; + } + ColumnBuffers buffers(numCols, fetchSize); // Bind columns @@ -2852,7 +2894,7 @@ SQLRETURN FetchAll_wrap(SqlHandlePtr StatementHandle, py::list& rows) { SQLSetStmtAttr_ptr(hStmt, SQL_ATTR_ROWS_FETCHED_PTR, &numRowsFetched, 0); while (ret != SQL_NO_DATA) { - ret = FetchBatchData(hStmt, buffers, columnNames, rows, numCols, numRowsFetched); + ret = FetchBatchData(hStmt, buffers, columnNames, rows, numCols, numRowsFetched, lobColumns); if (!SQL_SUCCEEDED(ret) && ret != SQL_NO_DATA) { LOG("Error when fetching data"); return ret; diff --git a/tests/test_004_cursor.py b/tests/test_004_cursor.py index 5f149492..df102064 100644 --- a/tests/test_004_cursor.py +++ b/tests/test_004_cursor.py @@ -523,60 +523,6 @@ def test_varbinary_full_capacity(cursor, db_connection): cursor.execute("DROP TABLE #pytest_varbinary_test") db_connection.commit() -def test_varchar_max(cursor, db_connection): - """Test SQL_VARCHAR with MAX length""" - try: - cursor.execute("CREATE TABLE #pytest_varchar_test (varchar_column VARCHAR(MAX))") - db_connection.commit() - cursor.execute("INSERT INTO #pytest_varchar_test (varchar_column) VALUES (?), (?)", ["ABCDEFGHI", None]) - db_connection.commit() - expectedRows = 2 - # fetchone test - cursor.execute("SELECT varchar_column FROM #pytest_varchar_test") - rows = [] - for i in range(0, expectedRows): - rows.append(cursor.fetchone()) - assert cursor.fetchone() == None, "varchar_column is expected to have only {} rows".format(expectedRows) - assert rows[0] == ["ABCDEFGHI"], "SQL_VARCHAR parsing failed for fetchone - row 0" - assert rows[1] == [None], "SQL_VARCHAR parsing failed for fetchone - row 1" - # fetchall test - cursor.execute("SELECT varchar_column FROM #pytest_varchar_test") - rows = cursor.fetchall() - assert rows[0] == ["ABCDEFGHI"], "SQL_VARCHAR parsing failed for fetchall - row 0" - assert rows[1] == [None], "SQL_VARCHAR parsing failed for fetchall - row 1" - except Exception as e: - pytest.fail(f"SQL_VARCHAR parsing test failed: {e}") - finally: - cursor.execute("DROP TABLE #pytest_varchar_test") - db_connection.commit() - -def test_wvarchar_max(cursor, db_connection): - """Test SQL_WVARCHAR with MAX length""" - try: - cursor.execute("CREATE TABLE #pytest_wvarchar_test (wvarchar_column NVARCHAR(MAX))") - db_connection.commit() - cursor.execute("INSERT INTO #pytest_wvarchar_test (wvarchar_column) VALUES (?), (?)", ["!@#$%^&*()_+", None]) - db_connection.commit() - expectedRows = 2 - # fetchone test - cursor.execute("SELECT wvarchar_column FROM #pytest_wvarchar_test") - rows = [] - for i in range(0, expectedRows): - rows.append(cursor.fetchone()) - assert cursor.fetchone() == None, "wvarchar_column is expected to have only {} rows".format(expectedRows) - assert rows[0] == ["!@#$%^&*()_+"], "SQL_WVARCHAR parsing failed for fetchone - row 0" - assert rows[1] == [None], "SQL_WVARCHAR parsing failed for fetchone - row 1" - # fetchall test - cursor.execute("SELECT wvarchar_column FROM #pytest_wvarchar_test") - rows = cursor.fetchall() - assert rows[0] == ["!@#$%^&*()_+"], "SQL_WVARCHAR parsing failed for fetchall - row 0" - assert rows[1] == [None], "SQL_WVARCHAR parsing failed for fetchall - row 1" - except Exception as e: - pytest.fail(f"SQL_WVARCHAR parsing test failed: {e}") - finally: - cursor.execute("DROP TABLE #pytest_wvarchar_test") - db_connection.commit() - def test_varbinary_max(cursor, db_connection): """Test SQL_VARBINARY with MAX length""" try: @@ -5680,294 +5626,6 @@ def test_emoji_round_trip(cursor, db_connection): except Exception as e: pytest.fail(f"Error for input {repr(text)}: {e}") -def test_varchar_max_insert_non_lob(cursor, db_connection): - """Test small VARCHAR(MAX) insert (non-LOB path).""" - try: - cursor.execute("CREATE TABLE #pytest_varchar_nonlob (col VARCHAR(MAX))") - db_connection.commit() - - small_str = "Hello, world!" # small, non-LOB - cursor.execute( - "INSERT INTO #pytest_varchar_nonlob (col) VALUES (?)", - [small_str] - ) - db_connection.commit() - - empty_str = "" - cursor.execute( - "INSERT INTO #pytest_varchar_nonlob (col) VALUES (?)", - [empty_str] - ) - db_connection.commit() - - # None value - cursor.execute( - "INSERT INTO #pytest_varchar_nonlob (col) VALUES (?)", - [None] - ) - db_connection.commit() - - # Fetch commented for now - # cursor.execute("SELECT col FROM #pytest_varchar_nonlob") - # rows = cursor.fetchall() - # assert rows == [[small_str], [empty_str], [None]] - - finally: - pass - - -def test_varchar_max_insert_lob(cursor, db_connection): - """Test large VARCHAR(MAX) insert (LOB path).""" - try: - cursor.execute("CREATE TABLE #pytest_varchar_lob (col VARCHAR(MAX))") - db_connection.commit() - - large_str = "A" * 100_000 # > 8k to trigger LOB - cursor.execute( - "INSERT INTO #pytest_varchar_lob (col) VALUES (?)", - [large_str] - ) - db_connection.commit() - - # Fetch commented for now - # cursor.execute("SELECT col FROM #pytest_varchar_lob") - # rows = cursor.fetchall() - # assert rows == [[large_str]] - - finally: - pass - - -def test_nvarchar_max_insert_non_lob(cursor, db_connection): - """Test small NVARCHAR(MAX) insert (non-LOB path).""" - try: - cursor.execute("CREATE TABLE #pytest_nvarchar_nonlob (col NVARCHAR(MAX))") - db_connection.commit() - - small_str = "Unicode ✨ test" - cursor.execute( - "INSERT INTO #pytest_nvarchar_nonlob (col) VALUES (?)", - [small_str] - ) - db_connection.commit() - - empty_str = "" - cursor.execute( - "INSERT INTO #pytest_nvarchar_nonlob (col) VALUES (?)", - [empty_str] - ) - db_connection.commit() - - cursor.execute( - "INSERT INTO #pytest_nvarchar_nonlob (col) VALUES (?)", - [None] - ) - db_connection.commit() - - # Fetch commented for now - # cursor.execute("SELECT col FROM #pytest_nvarchar_nonlob") - # rows = cursor.fetchall() - # assert rows == [[small_str], [empty_str], [None]] - - finally: - pass - - -def test_nvarchar_max_insert_lob(cursor, db_connection): - """Test large NVARCHAR(MAX) insert (LOB path).""" - try: - cursor.execute("CREATE TABLE #pytest_nvarchar_lob (col NVARCHAR(MAX))") - db_connection.commit() - - large_str = "📝" * 50_000 # each emoji = 2 UTF-16 code units, total > 100k bytes - cursor.execute( - "INSERT INTO #pytest_nvarchar_lob (col) VALUES (?)", - [large_str] - ) - db_connection.commit() - - # Fetch commented for now - # cursor.execute("SELECT col FROM #pytest_nvarchar_lob") - # rows = cursor.fetchall() - # assert rows == [[large_str]] - - finally: - pass - -def test_nvarchar_max_boundary(cursor, db_connection): - """Test NVARCHAR(MAX) at LOB boundary sizes.""" - try: - cursor.execute("DROP TABLE IF EXISTS #pytest_nvarchar_boundary") - cursor.execute("CREATE TABLE #pytest_nvarchar_boundary (col NVARCHAR(MAX))") - db_connection.commit() - - # 4k BMP chars = 8k bytes - cursor.execute("INSERT INTO #pytest_nvarchar_boundary (col) VALUES (?)", ["A" * 4096]) - # 4k emojis = 8k UTF-16 code units (16k bytes) - cursor.execute("INSERT INTO #pytest_nvarchar_boundary (col) VALUES (?)", ["📝" * 4096]) - db_connection.commit() - - # Fetch commented for now - # cursor.execute("SELECT col FROM #pytest_nvarchar_boundary") - # rows = cursor.fetchall() - # assert rows == [["A" * 4096], ["📝" * 4096]] - finally: - pass - - -def test_nvarchar_max_chunk_edge(cursor, db_connection): - """Test NVARCHAR(MAX) insert slightly larger than a chunk.""" - try: - cursor.execute("DROP TABLE IF EXISTS #pytest_nvarchar_chunk") - cursor.execute("CREATE TABLE #pytest_nvarchar_chunk (col NVARCHAR(MAX))") - db_connection.commit() - - chunk_size = 8192 # bytes - test_str = "📝" * ((chunk_size // 4) + 3) # slightly > 1 chunk - cursor.execute("INSERT INTO #pytest_nvarchar_chunk (col) VALUES (?)", [test_str]) - db_connection.commit() - - # Fetch commented for now - # cursor.execute("SELECT col FROM #pytest_nvarchar_chunk") - # row = cursor.fetchone() - # assert row[0] == test_str - finally: - pass - -def test_empty_string_chunk(cursor, db_connection): - """Test inserting empty strings into VARCHAR(MAX) and NVARCHAR(MAX).""" - try: - cursor.execute("DROP TABLE IF EXISTS #pytest_empty_string") - cursor.execute(""" - CREATE TABLE #pytest_empty_string ( - varchar_col VARCHAR(MAX), - nvarchar_col NVARCHAR(MAX) - ) - """) - db_connection.commit() - - empty_varchar = "" - empty_nvarchar = "" - cursor.execute( - "INSERT INTO #pytest_empty_string (varchar_col, nvarchar_col) VALUES (?, ?)", - [empty_varchar, empty_nvarchar] - ) - db_connection.commit() - - cursor.execute("SELECT LEN(varchar_col), LEN(nvarchar_col) FROM #pytest_empty_string") - row = tuple(int(x) for x in cursor.fetchone()) - assert row == (0, 0), f"Expected lengths (0,0), got {row}" - finally: - cursor.execute("DROP TABLE IF EXISTS #pytest_empty_string") - db_connection.commit() - - -def test_varcharmax_short(cursor, db_connection): - """Test inserting and retrieving a small string well below any size thresholds. - # Verifies basic functionality for VARCHAR(MAX) with typical input size.""" - try: - cursor.execute("DROP TABLE IF EXISTS #pytest_varcharmax") - cursor.execute("CREATE TABLE #pytest_varcharmax (col VARCHAR(MAX))") - db_connection.commit() - - short_str = "hello" - cursor.execute("INSERT INTO #pytest_varcharmax VALUES (?)", [short_str]) - db_connection.commit() - cursor.execute("SELECT col FROM #pytest_varcharmax WHERE col = ?", [short_str]) - assert cursor.fetchone()[0] == short_str - finally: - cursor.execute("DROP TABLE IF EXISTS #pytest_varcharmax") - db_connection.commit() - - -def test_varcharmax_boundary(cursor, db_connection): - """Test inserting and retrieving a string at the boundary size (8000 characters), - which is the largest size supported without switching to streaming or large object handling. - Ensures proper handling at this edge case.""" - try: - cursor.execute("DROP TABLE IF EXISTS #pytest_varcharmax") - cursor.execute("CREATE TABLE #pytest_varcharmax (col VARCHAR(MAX))") - db_connection.commit() - - boundary_str = "X" * 8000 - cursor.execute("INSERT INTO #pytest_varcharmax VALUES (?)", [boundary_str]) - db_connection.commit() - cursor.execute("SELECT col FROM #pytest_varcharmax WHERE col = ?", [boundary_str]) - assert cursor.fetchone()[0] == boundary_str - finally: - cursor.execute("DROP TABLE IF EXISTS #pytest_varcharmax") - db_connection.commit() - - -def test_varcharmax_streaming(cursor, db_connection): - """Test inserting and retrieving a string just above the boundary size (8100 characters), - which requires streaming mechanisms to handle data efficiently. - Validates that larger data triggers correct processing without truncation.""" - try: - cursor.execute("DROP TABLE IF EXISTS #pytest_varcharmax") - cursor.execute("CREATE TABLE #pytest_varcharmax (col VARCHAR(MAX))") - db_connection.commit() - - streaming_str = "Y" * 8100 - cursor.execute("INSERT INTO #pytest_varcharmax VALUES (?)", [streaming_str]) - db_connection.commit() - cursor.execute("SELECT col FROM #pytest_varcharmax WHERE col = ?", [streaming_str]) - assert cursor.fetchone()[0] == streaming_str - finally: - cursor.execute("DROP TABLE IF EXISTS #pytest_varcharmax") - db_connection.commit() - - -def test_varcharmax_large(cursor, db_connection): - """Test inserting and retrieving a very large string (100,000 characters), - which is well beyond typical sizes and ensures that the system can handle large VARCHAR(MAX) values.""" - try: - cursor.execute("DROP TABLE IF EXISTS #pytest_varcharmax") - cursor.execute("CREATE TABLE #pytest_varcharmax (col VARCHAR(MAX))") - db_connection.commit() - - large_str = "Z" * 100_000 - cursor.execute("INSERT INTO #pytest_varcharmax VALUES (?)", [large_str]) - db_connection.commit() - cursor.execute("SELECT col FROM #pytest_varcharmax WHERE col = ?", [large_str]) - assert cursor.fetchone()[0] == large_str - finally: - cursor.execute("DROP TABLE IF EXISTS #pytest_varcharmax") - db_connection.commit() - - -def test_varcharmax_empty_string(cursor, db_connection): - """Test inserting and retrieving an empty string to verify correct handling of zero-length data.""" - try: - cursor.execute("DROP TABLE IF EXISTS #pytest_varcharmax") - cursor.execute("CREATE TABLE #pytest_varcharmax (col VARCHAR(MAX))") - db_connection.commit() - - cursor.execute("INSERT INTO #pytest_varcharmax VALUES (?)", [""]) - db_connection.commit() - cursor.execute("SELECT col FROM #pytest_varcharmax WHERE col = ?", [""]) - assert cursor.fetchone()[0] == "" - finally: - cursor.execute("DROP TABLE IF EXISTS #pytest_varcharmax") - db_connection.commit() - - -def test_varcharmax_null(cursor, db_connection): - """Test inserting and retrieving a NULL value to ensure proper handling of SQL NULLs.""" - try: - cursor.execute("DROP TABLE IF EXISTS #pytest_varcharmax") - cursor.execute("CREATE TABLE #pytest_varcharmax (col VARCHAR(MAX))") - db_connection.commit() - - cursor.execute("INSERT INTO #pytest_varcharmax VALUES (?)", [None]) - db_connection.commit() - cursor.execute("SELECT col FROM #pytest_varcharmax WHERE col IS NULL") - assert cursor.fetchone()[0] is None - finally: - cursor.execute("DROP TABLE IF EXISTS #pytest_varcharmax") - db_connection.commit() - - def test_varcharmax_transaction_rollback(cursor, db_connection): """Test that inserting a large VARCHAR(MAX) within a transaction that is rolled back does not persist the data, ensuring transactional integrity.""" @@ -5987,106 +5645,6 @@ def test_varcharmax_transaction_rollback(cursor, db_connection): cursor.execute("DROP TABLE IF EXISTS #pytest_varcharmax") db_connection.commit() -def test_nvarcharmax_short(cursor, db_connection): - """Test inserting and retrieving a small string well below any size thresholds.""" - try: - cursor.execute("DROP TABLE IF EXISTS #pytest_nvarcharmax") - cursor.execute("CREATE TABLE #pytest_nvarcharmax (col NVARCHAR(MAX))") - db_connection.commit() - - short_str = "hello" - cursor.execute("INSERT INTO #pytest_nvarcharmax VALUES (?)", [short_str]) - db_connection.commit() - cursor.execute("SELECT col FROM #pytest_nvarcharmax WHERE col = ?", [short_str]) - assert cursor.fetchone()[0] == short_str - finally: - cursor.execute("DROP TABLE IF EXISTS #pytest_nvarcharmax") - db_connection.commit() - - -def test_nvarcharmax_boundary(cursor, db_connection): - """Test NVARCHAR(MAX) at the boundary size of 4000 characters (8000 bytes).""" - try: - cursor.execute("DROP TABLE IF EXISTS #pytest_nvarcharmax") - cursor.execute("CREATE TABLE #pytest_nvarcharmax (col NVARCHAR(MAX))") - db_connection.commit() - - boundary_str = "X" * 4000 # NVARCHAR inline limit - cursor.execute("INSERT INTO #pytest_nvarcharmax VALUES (?)", [boundary_str]) - db_connection.commit() - cursor.execute("SELECT col FROM #pytest_nvarcharmax WHERE col = ?", [boundary_str]) - assert cursor.fetchone()[0] == boundary_str - finally: - cursor.execute("DROP TABLE IF EXISTS #pytest_nvarcharmax") - db_connection.commit() - - -def test_nvarcharmax_streaming(cursor, db_connection): - """Test NVARCHAR(MAX) just above the boundary size (4100 characters) to trigger streaming.""" - try: - cursor.execute("DROP TABLE IF EXISTS #pytest_nvarcharmax") - cursor.execute("CREATE TABLE #pytest_nvarcharmax (col NVARCHAR(MAX))") - db_connection.commit() - - streaming_str = "Y" * 4100 # Exceeds inline threshold → triggers streaming - cursor.execute("INSERT INTO #pytest_nvarcharmax VALUES (?)", [streaming_str]) - db_connection.commit() - cursor.execute("SELECT col FROM #pytest_nvarcharmax WHERE col = ?", [streaming_str]) - assert cursor.fetchone()[0] == streaming_str - finally: - cursor.execute("DROP TABLE IF EXISTS #pytest_nvarcharmax") - db_connection.commit() - - -def test_nvarcharmax_large(cursor, db_connection): - """Test inserting and retrieving a very large NVARCHAR(MAX) string (100,000 characters).""" - try: - cursor.execute("DROP TABLE IF EXISTS #pytest_nvarcharmax") - cursor.execute("CREATE TABLE #pytest_nvarcharmax (col NVARCHAR(MAX))") - db_connection.commit() - - large_str = "Z" * 100_000 - cursor.execute("INSERT INTO #pytest_nvarcharmax VALUES (?)", [large_str]) - db_connection.commit() - cursor.execute("SELECT col FROM #pytest_nvarcharmax WHERE col = ?", [large_str]) - assert cursor.fetchone()[0] == large_str - finally: - cursor.execute("DROP TABLE IF EXISTS #pytest_nvarcharmax") - db_connection.commit() - - -def test_nvarcharmax_empty_string(cursor, db_connection): - """Test inserting and retrieving an empty NVARCHAR(MAX) string.""" - try: - cursor.execute("DROP TABLE IF EXISTS #pytest_nvarcharmax") - cursor.execute("CREATE TABLE #pytest_nvarcharmax (col NVARCHAR(MAX))") - db_connection.commit() - - cursor.execute("INSERT INTO #pytest_nvarcharmax VALUES (?)", [""]) - db_connection.commit() - cursor.execute("SELECT col FROM #pytest_nvarcharmax WHERE col = ?", [""]) - assert cursor.fetchone()[0] == "" - finally: - cursor.execute("DROP TABLE IF EXISTS #pytest_nvarcharmax") - db_connection.commit() - - -def test_nvarcharmax_null(cursor, db_connection): - """Test inserting and retrieving a NULL NVARCHAR(MAX) value.""" - try: - cursor.execute("DROP TABLE IF EXISTS #pytest_nvarcharmax") - cursor.execute("CREATE TABLE #pytest_nvarcharmax (col NVARCHAR(MAX))") - db_connection.commit() - - cursor.execute("INSERT INTO #pytest_nvarcharmax VALUES (?)", [None]) - db_connection.commit() - cursor.execute("SELECT col FROM #pytest_nvarcharmax WHERE col IS NULL") - assert cursor.fetchone()[0] is None - finally: - cursor.execute("DROP TABLE IF EXISTS #pytest_nvarcharmax") - db_connection.commit() - - def test_nvarcharmax_transaction_rollback(cursor, db_connection): """Test that inserting a large NVARCHAR(MAX) within a transaction that is rolled back does not persist the data, ensuring transactional integrity.""" @@ -6794,118 +6352,259 @@ def test_only_null_and_empty_binary(cursor, db_connection): drop_table_if_exists(cursor, "#pytest_null_empty_binary") db_connection.commit() -def test_nvarcharmax_short(cursor, db_connection): +# ---------------------- VARCHAR(MAX) ---------------------- + +def test_varcharmax_short_fetch(cursor, db_connection): + """Small VARCHAR(MAX), fetchone/fetchall/fetchmany.""" try: - cursor.execute("DROP TABLE IF EXISTS #pytest_nvarcharmax") - cursor.execute("CREATE TABLE #pytest_nvarcharmax (col NVARCHAR(MAX))") + cursor.execute("DROP TABLE IF EXISTS #pytest_varcharmax") + cursor.execute("CREATE TABLE #pytest_varcharmax (col VARCHAR(MAX))") db_connection.commit() - short_str = "hello" - cursor.execute("INSERT INTO #pytest_nvarcharmax VALUES (?)", [short_str]) + values = ["hello", "world"] + for val in values: + cursor.execute("INSERT INTO #pytest_varcharmax VALUES (?)", [val]) db_connection.commit() - cursor.execute("SELECT col FROM #pytest_nvarcharmax WHERE col = ?", [short_str]) - assert cursor.fetchone()[0] == short_str + + # fetchone + cursor.execute("SELECT col FROM #pytest_varcharmax ORDER BY col") + row1 = cursor.fetchone()[0] + row2 = cursor.fetchone()[0] + assert {row1, row2} == set(values) + assert cursor.fetchone() is None + + # fetchall + cursor.execute("SELECT col FROM #pytest_varcharmax ORDER BY col") + all_rows = [r[0] for r in cursor.fetchall()] + assert set(all_rows) == set(values) + + # fetchmany + cursor.execute("SELECT col FROM #pytest_varcharmax ORDER BY col") + many = [r[0] for r in cursor.fetchmany(1)] + assert many[0] in values finally: - cursor.execute("DROP TABLE IF EXISTS #pytest_nvarcharmax") + cursor.execute("DROP TABLE IF EXISTS #pytest_varcharmax") db_connection.commit() -def test_nvarcharmax_boundary(cursor, db_connection): +def test_varcharmax_empty_string(cursor, db_connection): + """Empty string in VARCHAR(MAX).""" try: - cursor.execute("DROP TABLE IF EXISTS #pytest_nvarcharmax") - cursor.execute("CREATE TABLE #pytest_nvarcharmax (col NVARCHAR(MAX))") + cursor.execute("CREATE TABLE #pytest_varcharmax (col VARCHAR(MAX))") + db_connection.commit() + cursor.execute("INSERT INTO #pytest_varcharmax VALUES (?)", [""]) db_connection.commit() - boundary_str = "X" * 4000 # NVARCHAR inline limit - cursor.execute("INSERT INTO #pytest_nvarcharmax VALUES (?)", [boundary_str]) + cursor.execute("SELECT col FROM #pytest_varcharmax") + assert cursor.fetchone()[0] == "" + finally: + cursor.execute("DROP TABLE #pytest_varcharmax") db_connection.commit() - cursor.execute("SELECT col FROM #pytest_nvarcharmax WHERE col = ?", [boundary_str]) + + +def test_varcharmax_null(cursor, db_connection): + """NULL in VARCHAR(MAX).""" + try: + cursor.execute("CREATE TABLE #pytest_varcharmax (col VARCHAR(MAX))") + db_connection.commit() + cursor.execute("INSERT INTO #pytest_varcharmax VALUES (?)", [None]) + db_connection.commit() + + cursor.execute("SELECT col FROM #pytest_varcharmax") + assert cursor.fetchone()[0] is None + finally: + cursor.execute("DROP TABLE #pytest_varcharmax") + db_connection.commit() + + +def test_varcharmax_boundary(cursor, db_connection): + """Boundary at 8000 (inline limit).""" + try: + boundary_str = "X" * 8000 + cursor.execute("CREATE TABLE #pytest_varcharmax (col VARCHAR(MAX))") + db_connection.commit() + cursor.execute("INSERT INTO #pytest_varcharmax VALUES (?)", [boundary_str]) + db_connection.commit() + + cursor.execute("SELECT col FROM #pytest_varcharmax") assert cursor.fetchone()[0] == boundary_str finally: - cursor.execute("DROP TABLE IF EXISTS #pytest_nvarcharmax") + cursor.execute("DROP TABLE #pytest_varcharmax") db_connection.commit() -def test_nvarcharmax_streaming(cursor, db_connection): +def test_varcharmax_streaming(cursor, db_connection): + """Streaming fetch > 8k with all fetch modes.""" try: - cursor.execute("DROP TABLE IF EXISTS #pytest_nvarcharmax") - cursor.execute("CREATE TABLE #pytest_nvarcharmax (col NVARCHAR(MAX))") + values = ["Y" * 8100, "Z" * 10000] + cursor.execute("CREATE TABLE #pytest_varcharmax (col VARCHAR(MAX))") + db_connection.commit() + for v in values: + cursor.execute("INSERT INTO #pytest_varcharmax VALUES (?)", [v]) db_connection.commit() - streaming_str = "Y" * 4100 # Exceeds inline threshold → triggers streaming - cursor.execute("INSERT INTO #pytest_nvarcharmax VALUES (?)", [streaming_str]) + # --- fetchall --- + cursor.execute("SELECT col FROM #pytest_varcharmax ORDER BY LEN(col)") + rows = [r[0] for r in cursor.fetchall()] + assert rows == sorted(values, key=len) + + # --- fetchone --- + cursor.execute("SELECT col FROM #pytest_varcharmax ORDER BY LEN(col)") + r1 = cursor.fetchone()[0] + r2 = cursor.fetchone()[0] + assert {r1, r2} == set(values) + assert cursor.fetchone() is None + + # --- fetchmany --- + cursor.execute("SELECT col FROM #pytest_varcharmax ORDER BY LEN(col)") + batch = [r[0] for r in cursor.fetchmany(1)] + assert batch[0] in values + finally: + cursor.execute("DROP TABLE #pytest_varcharmax") + db_connection.commit() + + +def test_varcharmax_large(cursor, db_connection): + """Very large VARCHAR(MAX).""" + try: + large_str = "L" * 100_000 + cursor.execute("CREATE TABLE #pytest_varcharmax (col VARCHAR(MAX))") db_connection.commit() - cursor.execute("SELECT col FROM #pytest_nvarcharmax WHERE col = ?", [streaming_str]) - assert cursor.fetchone()[0] == streaming_str + cursor.execute("INSERT INTO #pytest_varcharmax VALUES (?)", [large_str]) + db_connection.commit() + + cursor.execute("SELECT col FROM #pytest_varcharmax") + assert cursor.fetchone()[0] == large_str finally: - cursor.execute("DROP TABLE IF EXISTS #pytest_nvarcharmax") + cursor.execute("DROP TABLE #pytest_varcharmax") db_connection.commit() -def test_nvarcharmax_large(cursor, db_connection): +# ---------------------- NVARCHAR(MAX) ---------------------- + +def test_nvarcharmax_short_fetch(cursor, db_connection): + """Small NVARCHAR(MAX), unicode, fetch modes.""" try: - cursor.execute("DROP TABLE IF EXISTS #pytest_nvarcharmax") + values = ["hello", "world_ß"] cursor.execute("CREATE TABLE #pytest_nvarcharmax (col NVARCHAR(MAX))") db_connection.commit() - - large_str = "Z" * 100_000 - cursor.execute("INSERT INTO #pytest_nvarcharmax VALUES (?)", [large_str]) + for v in values: + cursor.execute("INSERT INTO #pytest_nvarcharmax VALUES (?)", [v]) db_connection.commit() - cursor.execute("SELECT col FROM #pytest_nvarcharmax WHERE col = ?", [large_str]) - assert cursor.fetchone()[0] == large_str + + # fetchone + cursor.execute("SELECT col FROM #pytest_nvarcharmax ORDER BY col") + r1 = cursor.fetchone()[0] + r2 = cursor.fetchone()[0] + assert {r1, r2} == set(values) + assert cursor.fetchone() is None + + # fetchall + cursor.execute("SELECT col FROM #pytest_nvarcharmax ORDER BY col") + all_rows = [r[0] for r in cursor.fetchall()] + assert set(all_rows) == set(values) + + # fetchmany + cursor.execute("SELECT col FROM #pytest_nvarcharmax ORDER BY col") + many = [r[0] for r in cursor.fetchmany(1)] + assert many[0] in values finally: - cursor.execute("DROP TABLE IF EXISTS #pytest_nvarcharmax") + cursor.execute("DROP TABLE #pytest_nvarcharmax") db_connection.commit() def test_nvarcharmax_empty_string(cursor, db_connection): + """Empty string in NVARCHAR(MAX).""" try: - cursor.execute("DROP TABLE IF EXISTS #pytest_nvarcharmax") cursor.execute("CREATE TABLE #pytest_nvarcharmax (col NVARCHAR(MAX))") db_connection.commit() - cursor.execute("INSERT INTO #pytest_nvarcharmax VALUES (?)", [""]) db_connection.commit() - cursor.execute("SELECT col FROM #pytest_nvarcharmax WHERE col = ?", [""]) + + cursor.execute("SELECT col FROM #pytest_nvarcharmax") assert cursor.fetchone()[0] == "" finally: - cursor.execute("DROP TABLE IF EXISTS #pytest_nvarcharmax") + cursor.execute("DROP TABLE #pytest_nvarcharmax") db_connection.commit() def test_nvarcharmax_null(cursor, db_connection): + """NULL in NVARCHAR(MAX).""" try: - cursor.execute("DROP TABLE IF EXISTS #pytest_nvarcharmax") cursor.execute("CREATE TABLE #pytest_nvarcharmax (col NVARCHAR(MAX))") db_connection.commit() - cursor.execute("INSERT INTO #pytest_nvarcharmax VALUES (?)", [None]) db_connection.commit() - cursor.execute("SELECT col FROM #pytest_nvarcharmax WHERE col IS NULL") + + cursor.execute("SELECT col FROM #pytest_nvarcharmax") assert cursor.fetchone()[0] is None finally: - cursor.execute("DROP TABLE IF EXISTS #pytest_nvarcharmax") + cursor.execute("DROP TABLE #pytest_nvarcharmax") db_connection.commit() -def test_nvarcharmax_transaction_rollback(cursor, db_connection): +def test_nvarcharmax_boundary(cursor, db_connection): + """Boundary at 4000 characters (inline limit).""" try: - cursor.execute("DROP TABLE IF EXISTS #pytest_nvarcharmax") + boundary_str = "X" * 4000 cursor.execute("CREATE TABLE #pytest_nvarcharmax (col NVARCHAR(MAX))") db_connection.commit() + cursor.execute("INSERT INTO #pytest_nvarcharmax VALUES (?)", [boundary_str]) + db_connection.commit() - db_connection.autocommit = False - rollback_str = "ROLLBACK" * 2000 - cursor.execute("INSERT INTO #pytest_nvarcharmax VALUES (?)", [rollback_str]) - db_connection.rollback() - cursor.execute("SELECT COUNT(*) FROM #pytest_nvarcharmax WHERE col = ?", [rollback_str]) - assert cursor.fetchone()[0] == 0 + cursor.execute("SELECT col FROM #pytest_nvarcharmax") + assert cursor.fetchone()[0] == boundary_str finally: - db_connection.autocommit = True - cursor.execute("DROP TABLE IF EXISTS #pytest_nvarcharmax") + cursor.execute("DROP TABLE #pytest_nvarcharmax") db_connection.commit() +def test_nvarcharmax_streaming(cursor, db_connection): + """Streaming fetch > 4k unicode with all fetch modes.""" + try: + values = ["Ω" * 4100, "漢" * 5000] + cursor.execute("CREATE TABLE #pytest_nvarcharmax (col NVARCHAR(MAX))") + db_connection.commit() + for v in values: + cursor.execute("INSERT INTO #pytest_nvarcharmax VALUES (?)", [v]) + db_connection.commit() + + # --- fetchall --- + cursor.execute("SELECT col FROM #pytest_nvarcharmax ORDER BY LEN(col)") + rows = [r[0] for r in cursor.fetchall()] + assert rows == sorted(values, key=len) + + # --- fetchone --- + cursor.execute("SELECT col FROM #pytest_nvarcharmax ORDER BY LEN(col)") + r1 = cursor.fetchone()[0] + r2 = cursor.fetchone()[0] + assert {r1, r2} == set(values) + assert cursor.fetchone() is None + + # --- fetchmany --- + cursor.execute("SELECT col FROM #pytest_nvarcharmax ORDER BY LEN(col)") + batch = [r[0] for r in cursor.fetchmany(1)] + assert batch[0] in values + finally: + cursor.execute("DROP TABLE #pytest_nvarcharmax") + db_connection.commit() + + +def test_nvarcharmax_large(cursor, db_connection): + """Very large NVARCHAR(MAX).""" + try: + large_str = "漢" * 50_000 + cursor.execute("CREATE TABLE #pytest_nvarcharmax (col NVARCHAR(MAX))") + db_connection.commit() + cursor.execute("INSERT INTO #pytest_nvarcharmax VALUES (?)", [large_str]) + db_connection.commit() + + cursor.execute("SELECT col FROM #pytest_nvarcharmax") + assert cursor.fetchone()[0] == large_str + finally: + cursor.execute("DROP TABLE #pytest_nvarcharmax") + db_connection.commit() + def test_close(db_connection): """Test closing the cursor""" try: @@ -6915,4 +6614,4 @@ def test_close(db_connection): except Exception as e: pytest.fail(f"Cursor close test failed: {e}") finally: - cursor = db_connection.cursor() \ No newline at end of file + cursor = db_connection.cursor() From 1b45c78cb3eb12f4357d1b6c0b0ffb9e17e19fd6 Mon Sep 17 00:00:00 2001 From: gargsaumya Date: Wed, 3 Sep 2025 23:06:11 +0530 Subject: [PATCH 4/6] removing comment --- mssql_python/pybind/ddbc_bindings.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/mssql_python/pybind/ddbc_bindings.cpp b/mssql_python/pybind/ddbc_bindings.cpp index 1e80b1a6..36c1cd60 100644 --- a/mssql_python/pybind/ddbc_bindings.cpp +++ b/mssql_python/pybind/ddbc_bindings.cpp @@ -2465,7 +2465,6 @@ SQLRETURN FetchBatchData(SQLHSTMT hStmt, ColumnBuffers& buffers, py::list& colum case SQL_CHAR: case SQL_VARCHAR: case SQL_LONGVARCHAR: { - // TODO: variable length data needs special handling, this logic wont suffice SQLULEN columnSize = columnMeta["ColumnSize"].cast(); HandleZeroColumnSizeAtFetch(columnSize); uint64_t fetchBufferSize = columnSize + 1 /*null-terminator*/; From a6e0cac8c0d0da0fb6444fa0a982197d50010bfc Mon Sep 17 00:00:00 2001 From: gargsaumya Date: Thu, 11 Sep 2025 13:00:48 +0530 Subject: [PATCH 5/6] resolving merge conflicts --- mssql_python/cursor.py | 28 +++++++++++++-------------- mssql_python/pybind/ddbc_bindings.cpp | 7 ++++--- 2 files changed, 18 insertions(+), 17 deletions(-) diff --git a/mssql_python/cursor.py b/mssql_python/cursor.py index 7b130ec8..7e8c58f8 100644 --- a/mssql_python/cursor.py +++ b/mssql_python/cursor.py @@ -753,20 +753,20 @@ def execute( # Executing a new statement. Reset is_stmt_prepared to false self.is_stmt_prepared = [False] - # log('debug', "Executing query: %s", operation) - # for i, param in enumerate(parameters): - # log('debug', - # """Parameter number: %s, Parameter: %s, - # Param Python Type: %s, ParamInfo: %s, %s, %s, %s, %s""", - # i + 1, - # param, - # str(type(param)), - # parameters_type[i].paramSQLType, - # parameters_type[i].paramCType, - # parameters_type[i].columnSize, - # parameters_type[i].decimalDigits, - # parameters_type[i].inputOutputType, - # ) + log('debug', "Executing query: %s", operation) + for i, param in enumerate(parameters): + log('debug', + """Parameter number: %s, Parameter: %s, + Param Python Type: %s, ParamInfo: %s, %s, %s, %s, %s""", + i + 1, + param, + str(type(param)), + parameters_type[i].paramSQLType, + parameters_type[i].paramCType, + parameters_type[i].columnSize, + parameters_type[i].decimalDigits, + parameters_type[i].inputOutputType, + ) ret = ddbc_bindings.DDBCSQLExecute( self.hstmt, diff --git a/mssql_python/pybind/ddbc_bindings.cpp b/mssql_python/pybind/ddbc_bindings.cpp index 36c1cd60..e36b2cc3 100644 --- a/mssql_python/pybind/ddbc_bindings.cpp +++ b/mssql_python/pybind/ddbc_bindings.cpp @@ -1862,12 +1862,10 @@ SQLRETURN SQLGetData_wrap(SqlHandlePtr StatementHandle, SQLUSMALLINT colCount, p case SQL_CHAR: case SQL_VARCHAR: case SQL_LONGVARCHAR: { - // Use streaming for large VARCHAR / CHAR if (columnSize == SQL_NO_TOTAL || columnSize == 0 || columnSize > 8000) { LOG("Streaming LOB for column {}", i); row.append(FetchLobColumnData(hStmt, i, SQL_C_CHAR, false, false)); } else { - // Small VARCHAR, fetch directly uint64_t fetchBufferSize = columnSize + 1 /* null-termination */; std::vector dataBuffer(fetchBufferSize); SQLLEN dataLen; @@ -1880,7 +1878,7 @@ SQLRETURN SQLGetData_wrap(SqlHandlePtr StatementHandle, SQLUSMALLINT colCount, p if (numCharsInData < dataBuffer.size()) { // SQLGetData will null-terminate the data #if defined(__APPLE__) || defined(__linux__) - std::string fullStr(reinterpret_cast(dataBuffer.data()), dataLen); + std::string fullStr(reinterpret_cast(dataBuffer.data())); row.append(fullStr); LOG("macOS/Linux: Appended CHAR string of length {} to result row", fullStr.length()); #else @@ -1891,6 +1889,9 @@ SQLRETURN SQLGetData_wrap(SqlHandlePtr StatementHandle, SQLUSMALLINT colCount, p LOG("CHAR column {} data truncated, using streaming LOB", i); row.append(FetchLobColumnData(hStmt, i, SQL_C_CHAR, false, false)); } + } else if (dataLen == SQL_NULL_DATA) { + LOG("Column {} is NULL (CHAR)", i); + row.append(py::none()); } else if (dataLen == 0) { row.append(py::str("")); } else if (dataLen == SQL_NO_TOTAL) { From 3bd4a1a55fcee96d0de30150b7939fed87c34d4a Mon Sep 17 00:00:00 2001 From: gargsaumya Date: Thu, 11 Sep 2025 13:12:28 +0530 Subject: [PATCH 6/6] addressed review commnets --- mssql_python/pybind/ddbc_bindings.cpp | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/mssql_python/pybind/ddbc_bindings.cpp b/mssql_python/pybind/ddbc_bindings.cpp index e36b2cc3..8314d255 100644 --- a/mssql_python/pybind/ddbc_bindings.cpp +++ b/mssql_python/pybind/ddbc_bindings.cpp @@ -31,6 +31,7 @@ #define ARCHITECTURE "win64" // Default to win64 if not defined during compilation #endif #define DAE_CHUNK_SIZE 8192 +#define SQL_MAX_LOB_SIZE 8000 //------------------------------------------------------------------------------------------------- // Class definitions //------------------------------------------------------------------------------------------------- @@ -1747,8 +1748,13 @@ static py::object FetchLobColumnData(SQLHSTMT hStmt, &actualRead); if (ret == SQL_ERROR || !SQL_SUCCEEDED(ret) && ret != SQL_SUCCESS_WITH_INFO) { - LOG("Loop {}: Error fetching column {} with cType={}", loopCount, colIndex, cType); - ThrowStdException("Error fetching column data"); + std::ostringstream oss; + oss << "Error fetching LOB for column " << colIndex + << ", cType=" << cType + << ", loop=" << loopCount + << ", SQLGetData return=" << ret; + LOG(oss.str()); + ThrowStdException(oss.str()); } if (actualRead == SQL_NULL_DATA) { LOG("Loop {}: Column {} is NULL", loopCount, colIndex); @@ -1862,7 +1868,7 @@ SQLRETURN SQLGetData_wrap(SqlHandlePtr StatementHandle, SQLUSMALLINT colCount, p case SQL_CHAR: case SQL_VARCHAR: case SQL_LONGVARCHAR: { - if (columnSize == SQL_NO_TOTAL || columnSize == 0 || columnSize > 8000) { + if (columnSize == SQL_NO_TOTAL || columnSize == 0 || columnSize > SQL_MAX_LOB_SIZE) { LOG("Streaming LOB for column {}", i); row.append(FetchLobColumnData(hStmt, i, SQL_C_CHAR, false, false)); } else { @@ -2738,7 +2744,7 @@ SQLRETURN FetchMany_wrap(SqlHandlePtr StatementHandle, py::list& rows, int fetch if ((dataType == SQL_WVARCHAR || dataType == SQL_WLONGVARCHAR || dataType == SQL_VARCHAR || dataType == SQL_LONGVARCHAR || dataType == SQL_VARBINARY || dataType == SQL_LONGVARBINARY) && - (columnSize == 0 || columnSize == SQL_NO_TOTAL || columnSize > 8000)) { + (columnSize == 0 || columnSize == SQL_NO_TOTAL || columnSize > SQL_MAX_LOB_SIZE)) { lobColumns.push_back(i + 1); // 1-based } } @@ -2860,7 +2866,7 @@ SQLRETURN FetchAll_wrap(SqlHandlePtr StatementHandle, py::list& rows) { if ((dataType == SQL_WVARCHAR || dataType == SQL_WLONGVARCHAR || dataType == SQL_VARCHAR || dataType == SQL_LONGVARCHAR || dataType == SQL_VARBINARY || dataType == SQL_LONGVARBINARY) && - (columnSize == 0 || columnSize == SQL_NO_TOTAL || columnSize > 8000)) { + (columnSize == 0 || columnSize == SQL_NO_TOTAL || columnSize > SQL_MAX_LOB_SIZE)) { lobColumns.push_back(i + 1); // 1-based } }