diff --git a/mssql_python/pybind/ddbc_bindings.cpp b/mssql_python/pybind/ddbc_bindings.cpp index c4d0c49a..bbc3a2f5 100644 --- a/mssql_python/pybind/ddbc_bindings.cpp +++ b/mssql_python/pybind/ddbc_bindings.cpp @@ -1352,11 +1352,25 @@ SQLRETURN BindParameterArray(SQLHANDLE hStmt, std::memset(wcharArray + i * (info.columnSize + 1), 0, (info.columnSize + 1) * sizeof(SQLWCHAR)); } else { std::wstring wstr = columnValues[i].cast(); +#if defined(__APPLE__) || defined(__linux__) + // Convert to UTF-16 first, then check the actual UTF-16 length + auto utf16Buf = WStringToSQLWCHAR(wstr); + // Check UTF-16 length (excluding null terminator) against column size + if (utf16Buf.size() > 0 && (utf16Buf.size() - 1) > info.columnSize) { + std::string offending = WideToUTF8(wstr); + ThrowStdException("Input string UTF-16 length exceeds allowed column size at parameter index " + std::to_string(paramIndex) + + ". UTF-16 length: " + std::to_string(utf16Buf.size() - 1) + ", Column size: " + std::to_string(info.columnSize)); + } + // If we reach here, the UTF-16 string fits - copy it completely + std::memcpy(wcharArray + i * (info.columnSize + 1), utf16Buf.data(), utf16Buf.size() * sizeof(SQLWCHAR)); +#else + // On Windows, wchar_t is already UTF-16, so the original check is sufficient if (wstr.length() > info.columnSize) { std::string offending = WideToUTF8(wstr); ThrowStdException("Input string exceeds allowed column size at parameter index " + std::to_string(paramIndex)); } std::memcpy(wcharArray + i * (info.columnSize + 1), wstr.c_str(), (wstr.length() + 1) * sizeof(SQLWCHAR)); +#endif strLenOrIndArray[i] = SQL_NTS; } } diff --git a/tests/test_004_cursor.py b/tests/test_004_cursor.py index f6c5ee3e..a45b288b 100644 --- a/tests/test_004_cursor.py +++ b/tests/test_004_cursor.py @@ -887,6 +887,384 @@ def test_execute_many(cursor, db_connection): count = cursor.fetchone()[0] assert count == 11, "Executemany failed" +def test_executemany_empty_strings(cursor, db_connection): + """Test executemany with empty strings - regression test for Unix UTF-16 conversion issue""" + try: + # Create test table for empty string testing + cursor.execute(""" + CREATE TABLE #pytest_empty_batch ( + id INT, + data NVARCHAR(50) + ) + """) + + # Clear any existing data + cursor.execute("DELETE FROM #pytest_empty_batch") + db_connection.commit() + + # Test data with mix of empty strings and regular strings + test_data = [ + (1, ''), + (2, 'non-empty'), + (3, ''), + (4, 'another'), + (5, '') + ] + + # Execute the batch insert + cursor.executemany("INSERT INTO #pytest_empty_batch VALUES (?, ?)", test_data) + db_connection.commit() + + # Verify the data was inserted correctly + cursor.execute("SELECT id, data FROM #pytest_empty_batch ORDER BY id") + results = cursor.fetchall() + + # Check that we got the right number of rows + assert len(results) == 5, f"Expected 5 rows, got {len(results)}" + + # Check each row individually + expected = [ + (1, ''), + (2, 'non-empty'), + (3, ''), + (4, 'another'), + (5, '') + ] + + for i, (actual, expected_row) in enumerate(zip(results, expected)): + assert actual[0] == expected_row[0], f"Row {i}: ID mismatch - expected {expected_row[0]}, got {actual[0]}" + assert actual[1] == expected_row[1], f"Row {i}: Data mismatch - expected '{expected_row[1]}', got '{actual[1]}'" + except Exception as e: + pytest.fail(f"Executemany with empty strings failed: {e}") + finally: + cursor.execute("DROP TABLE IF EXISTS #pytest_empty_batch") + db_connection.commit() + +def test_executemany_empty_strings_various_types(cursor, db_connection): + """Test executemany with empty strings in different column types""" + try: + # Create test table with different string types + cursor.execute(""" + CREATE TABLE #pytest_string_types ( + id INT, + varchar_col VARCHAR(50), + nvarchar_col NVARCHAR(50), + text_col TEXT, + ntext_col NTEXT + ) + """) + + # Clear any existing data + cursor.execute("DELETE FROM #pytest_string_types") + db_connection.commit() + + # Test data with empty strings for different column types + test_data = [ + (1, '', '', '', ''), + (2, 'varchar', 'nvarchar', 'text', 'ntext'), + (3, '', '', '', ''), + ] + + # Execute the batch insert + cursor.executemany( + "INSERT INTO #pytest_string_types VALUES (?, ?, ?, ?, ?)", + test_data + ) + db_connection.commit() + + # Verify the data was inserted correctly + cursor.execute("SELECT * FROM #pytest_string_types ORDER BY id") + results = cursor.fetchall() + + # Check that we got the right number of rows + assert len(results) == 3, f"Expected 3 rows, got {len(results)}" + + # Check each row + for i, (actual, expected_row) in enumerate(zip(results, test_data)): + for j, (actual_val, expected_val) in enumerate(zip(actual, expected_row)): + assert actual_val == expected_val, f"Row {i}, Col {j}: expected '{expected_val}', got '{actual_val}'" + except Exception as e: + pytest.fail(f"Executemany with empty strings in various types failed: {e}") + finally: + cursor.execute("DROP TABLE IF EXISTS #pytest_string_types") + db_connection.commit() + +def test_executemany_unicode_and_empty_strings(cursor, db_connection): + """Test executemany with mix of Unicode characters and empty strings""" + try: + # Create test table + cursor.execute(""" + CREATE TABLE #pytest_unicode_test ( + id INT, + data NVARCHAR(100) + ) + """) + + # Clear any existing data + cursor.execute("DELETE FROM #pytest_unicode_test") + db_connection.commit() + + # Test data with Unicode and empty strings + test_data = [ + (1, ''), + (2, 'Hello πŸ˜„'), + (3, ''), + (4, 'δΈ­ζ–‡'), + (5, ''), + (6, 'Γ‘ice tΓ«xt'), + (7, ''), + ] + + # Execute the batch insert + cursor.executemany("INSERT INTO #pytest_unicode_test VALUES (?, ?)", test_data) + db_connection.commit() + + # Verify the data was inserted correctly + cursor.execute("SELECT id, data FROM #pytest_unicode_test ORDER BY id") + results = cursor.fetchall() + + # Check that we got the right number of rows + assert len(results) == 7, f"Expected 7 rows, got {len(results)}" + + # Check each row + for i, (actual, expected_row) in enumerate(zip(results, test_data)): + assert actual[0] == expected_row[0], f"Row {i}: ID mismatch" + assert actual[1] == expected_row[1], f"Row {i}: Data mismatch - expected '{expected_row[1]}', got '{actual[1]}'" + except Exception as e: + pytest.fail(f"Executemany with Unicode and empty strings failed: {e}") + finally: + cursor.execute("DROP TABLE IF EXISTS #pytest_unicode_test") + db_connection.commit() + +def test_executemany_large_batch_with_empty_strings(cursor, db_connection): + """Test executemany with large batch containing empty strings""" + try: + # Create test table + cursor.execute(""" + CREATE TABLE #pytest_large_batch ( + id INT, + data NVARCHAR(50) + ) + """) + + # Clear any existing data + cursor.execute("DELETE FROM #pytest_large_batch") + db_connection.commit() + + # Create large test data with alternating empty and non-empty strings + test_data = [] + for i in range(100): + if i % 3 == 0: + test_data.append((i, '')) # Every 3rd row is empty + else: + test_data.append((i, f'data_{i}')) + + # Execute the batch insert + cursor.executemany("INSERT INTO #pytest_large_batch VALUES (?, ?)", test_data) + db_connection.commit() + + # Verify the data was inserted correctly + cursor.execute("SELECT COUNT(*) FROM #pytest_large_batch") + count = cursor.fetchone()[0] + assert count == 100, f"Expected 100 rows, got {count}" + + # Check a few specific rows + cursor.execute("SELECT id, data FROM #pytest_large_batch WHERE id IN (0, 1, 3, 6, 9) ORDER BY id") + results = cursor.fetchall() + + expected_subset = [ + (0, ''), # 0 % 3 == 0, should be empty + (1, 'data_1'), # 1 % 3 != 0, should have data + (3, ''), # 3 % 3 == 0, should be empty + (6, ''), # 6 % 3 == 0, should be empty + (9, ''), # 9 % 3 == 0, should be empty + ] + + for actual, expected in zip(results, expected_subset): + assert actual[0] == expected[0], f"ID mismatch: expected {expected[0]}, got {actual[0]}" + assert actual[1] == expected[1], f"Data mismatch for ID {actual[0]}: expected '{expected[1]}', got '{actual[1]}'" + except Exception as e: + pytest.fail(f"Executemany with large batch and empty strings failed: {e}") + finally: + cursor.execute("DROP TABLE IF EXISTS #pytest_large_batch") + db_connection.commit() + +def test_executemany_compare_with_execute(cursor, db_connection): + """Test that executemany produces same results as individual execute calls""" + try: + # Create test table + cursor.execute(""" + CREATE TABLE #pytest_compare_test ( + id INT, + data NVARCHAR(50) + ) + """) + + # Test data with empty strings + test_data = [ + (1, ''), + (2, 'test'), + (3, ''), + (4, 'another'), + (5, ''), + ] + + # First, insert using individual execute calls + cursor.execute("DELETE FROM #pytest_compare_test") + for row_data in test_data: + cursor.execute("INSERT INTO #pytest_compare_test VALUES (?, ?)", row_data) + db_connection.commit() + + # Get results from individual inserts + cursor.execute("SELECT id, data FROM #pytest_compare_test ORDER BY id") + execute_results = cursor.fetchall() + + # Clear and insert using executemany + cursor.execute("DELETE FROM #pytest_compare_test") + cursor.executemany("INSERT INTO #pytest_compare_test VALUES (?, ?)", test_data) + db_connection.commit() + + # Get results from batch insert + cursor.execute("SELECT id, data FROM #pytest_compare_test ORDER BY id") + executemany_results = cursor.fetchall() + + # Compare results + assert len(execute_results) == len(executemany_results), "Row count mismatch between execute and executemany" + + for i, (exec_row, batch_row) in enumerate(zip(execute_results, executemany_results)): + assert exec_row[0] == batch_row[0], f"Row {i}: ID mismatch between execute and executemany" + assert exec_row[1] == batch_row[1], f"Row {i}: Data mismatch between execute and executemany - execute: '{exec_row[1]}', executemany: '{batch_row[1]}'" + except Exception as e: + pytest.fail(f"Executemany vs execute comparison failed: {e}") + finally: + cursor.execute("DROP TABLE IF EXISTS #pytest_compare_test") + db_connection.commit() + +def test_executemany_edge_cases_empty_strings(cursor, db_connection): + """Test executemany edge cases with empty strings and special characters""" + try: + # Create test table + cursor.execute(""" + CREATE TABLE #pytest_edge_cases ( + id INT, + varchar_data VARCHAR(100), + nvarchar_data NVARCHAR(100) + ) + """) + + # Clear any existing data + cursor.execute("DELETE FROM #pytest_edge_cases") + db_connection.commit() + + # Edge case test data + test_data = [ + # All empty strings + (1, '', ''), + # One empty, one not + (2, '', 'not empty'), + (3, 'not empty', ''), + # Special whitespace cases + (4, ' ', ' '), # Single and double space + (5, '\t', '\n'), # Tab and newline + # Mixed Unicode and empty + # (6, '', 'πŸš€'), #TODO: Uncomment once nvarcharmax, varcharmax and unicode support is implemented for executemany + (7, 'ASCII', ''), + # Boundary cases + (8, '', ''), # Another all empty + ] + + # Execute the batch insert + cursor.executemany( + "INSERT INTO #pytest_edge_cases VALUES (?, ?, ?)", + test_data + ) + db_connection.commit() + + # Verify the data was inserted correctly + cursor.execute("SELECT id, varchar_data, nvarchar_data FROM #pytest_edge_cases ORDER BY id") + results = cursor.fetchall() + + # Check that we got the right number of rows + assert len(results) == len(test_data), f"Expected {len(test_data)} rows, got {len(results)}" + + # Check each row + for i, (actual, expected_row) in enumerate(zip(results, test_data)): + assert actual[0] == expected_row[0], f"Row {i}: ID mismatch" + assert actual[1] == expected_row[1], f"Row {i}: VARCHAR mismatch - expected '{repr(expected_row[1])}', got '{repr(actual[1])}'" + assert actual[2] == expected_row[2], f"Row {i}: NVARCHAR mismatch - expected '{repr(expected_row[2])}', got '{repr(actual[2])}'" + except Exception as e: + pytest.fail(f"Executemany edge cases with empty strings failed: {e}") + finally: + cursor.execute("DROP TABLE IF EXISTS #pytest_edge_cases") + db_connection.commit() + +def test_executemany_null_vs_empty_string(cursor, db_connection): + """Test that executemany correctly distinguishes between NULL and empty string""" + try: + # Create test table + cursor.execute(""" + CREATE TABLE #pytest_null_vs_empty ( + id INT, + data NVARCHAR(50) + ) + """) + + # Clear any existing data + cursor.execute("DELETE FROM #pytest_null_vs_empty") + db_connection.commit() + + # Test data with NULLs and empty strings + test_data = [ + (1, None), # NULL + (2, ''), # Empty string + (3, None), # NULL + (4, 'data'), # Regular string + (5, ''), # Empty string + (6, None), # NULL + ] + + # Execute the batch insert + cursor.executemany("INSERT INTO #pytest_null_vs_empty VALUES (?, ?)", test_data) + db_connection.commit() + + # Verify the data was inserted correctly + cursor.execute("SELECT id, data FROM #pytest_null_vs_empty ORDER BY id") + results = cursor.fetchall() + + # Check that we got the right number of rows + assert len(results) == 6, f"Expected 6 rows, got {len(results)}" + + # Check each row, paying attention to NULL vs empty string + expected_results = [ + (1, None), # NULL should remain NULL + (2, ''), # Empty string should remain empty string + (3, None), # NULL should remain NULL + (4, 'data'), # Regular string + (5, ''), # Empty string should remain empty string + (6, None), # NULL should remain NULL + ] + + for i, (actual, expected) in enumerate(zip(results, expected_results)): + assert actual[0] == expected[0], f"Row {i}: ID mismatch" + if expected[1] is None: + assert actual[1] is None, f"Row {i}: Expected NULL, got '{actual[1]}'" + else: + assert actual[1] == expected[1], f"Row {i}: Expected '{expected[1]}', got '{actual[1]}'" + + # Also test with explicit queries for NULL vs empty + cursor.execute("SELECT COUNT(*) FROM #pytest_null_vs_empty WHERE data IS NULL") + null_count = cursor.fetchone()[0] + assert null_count == 3, f"Expected 3 NULL values, got {null_count}" + + cursor.execute("SELECT COUNT(*) FROM #pytest_null_vs_empty WHERE data = ''") + empty_count = cursor.fetchone()[0] + assert empty_count == 2, f"Expected 2 empty strings, got {empty_count}" + except Exception as e: + pytest.fail(f"Executemany NULL vs empty string test failed: {e}") + finally: + cursor.execute("DROP TABLE IF EXISTS #pytest_null_vs_empty") + db_connection.commit() + def test_nextset(cursor): """Test nextset""" cursor.execute("SELECT * FROM #pytest_all_data_types WHERE id = 1;") @@ -5706,6 +6084,143 @@ def test_batch_fetch_empty_values_no_assertion_failure(cursor, db_connection): cursor.execute("DROP TABLE #pytest_batch_empty_assertions") db_connection.commit() +def test_executemany_utf16_length_validation(cursor, db_connection): + """Test UTF-16 length validation for executemany - prevents data corruption from Unicode expansion""" + import platform + + try: + # Create test table with small column size to trigger validation + drop_table_if_exists(cursor, "#pytest_utf16_validation") + cursor.execute(""" + CREATE TABLE #pytest_utf16_validation ( + id INT, + short_text NVARCHAR(5), -- Small column to test length validation + medium_text NVARCHAR(10) -- Medium column for edge cases + ) + """) + db_connection.commit() + + # Test 1: Valid strings that should work on all platforms + valid_data = [ + (1, "Hi", "Hello"), # Well within limits + (2, "Test", "World"), # At or near limits + (3, "", ""), # Empty strings + (4, "12345", "1234567890") # Exactly at limits + ] + + cursor.executemany("INSERT INTO #pytest_utf16_validation VALUES (?, ?, ?)", valid_data) + db_connection.commit() + + # Verify valid data was inserted correctly + cursor.execute("SELECT COUNT(*) FROM #pytest_utf16_validation") + count = cursor.fetchone()[0] + assert count == 4, "All valid UTF-16 strings should be inserted successfully" + + # Test 2: String too long for short_text column (6 characters > 5 limit) + with pytest.raises(Exception) as exc_info: + cursor.executemany("INSERT INTO #pytest_utf16_validation VALUES (?, ?, ?)", + [(5, "TooLong", "Valid")]) + + error_msg = str(exc_info.value) + # Accept either our validation error or SQL Server's truncation error + assert ("exceeds allowed column size" in error_msg or + "String or binary data would be truncated" in error_msg), f"Should get length validation error, got: {error_msg}" + + # Test 3: Unicode characters that specifically test UTF-16 expansion + # This is the core test for our fix - emoji that expand from UTF-32 to UTF-16 + + # Create a string that's exactly at the UTF-32 limit but exceeds UTF-16 limit + # "πŸ˜€πŸ˜€πŸ˜€" = 3 UTF-32 chars, but 6 UTF-16 code units (each emoji = 2 units) + # This should fit in UTF-32 length check but fail UTF-16 length check on Unix + emoji_overflow_test = [ + # 3 emoji = 3 UTF-32 chars (might pass initial check) but 6 UTF-16 units > 5 limit + (6, "πŸ˜€πŸ˜€πŸ˜€", "Valid") # Should fail on short_text due to UTF-16 expansion + ] + + with pytest.raises(Exception) as exc_info: + cursor.executemany("INSERT INTO #pytest_utf16_validation VALUES (?, ?, ?)", + emoji_overflow_test) + + error_msg = str(exc_info.value) + # This should trigger either our UTF-16 validation or SQL Server's length validation + # Both are correct - the important thing is that it fails instead of silently truncating + is_unix = platform.system() in ['Darwin', 'Linux'] + + print(f"Emoji overflow test error on {platform.system()}: {error_msg[:100]}...") + + # Accept any of these error types - all indicate proper validation + assert ("UTF-16 length exceeds" in error_msg or + "exceeds allowed column size" in error_msg or + "String or binary data would be truncated" in error_msg or + "illegal UTF-16 surrogate" in error_msg or + "utf-16" in error_msg.lower()), f"Should catch UTF-16 expansion issue, got: {error_msg}" + + # Test 4: Valid emoji string that should work + valid_emoji_test = [ + # 2 emoji = 2 UTF-32 chars, 4 UTF-16 units (fits in 5 unit limit) + (7, "πŸ˜€πŸ˜€", "Hello🌟") # Should work: 4 units, 7 units + ] + + cursor.executemany("INSERT INTO #pytest_utf16_validation VALUES (?, ?, ?)", + valid_emoji_test) + db_connection.commit() + + # Verify emoji string was inserted correctly + cursor.execute("SELECT short_text, medium_text FROM #pytest_utf16_validation WHERE id = 7") + result = cursor.fetchone() + assert result[0] == "πŸ˜€πŸ˜€", "Valid emoji string should be stored correctly" + assert result[1] == "Hello🌟", "Valid emoji string should be stored correctly" + + # Test 5: Edge case - string with mixed ASCII and Unicode + mixed_cases = [ + # "AπŸ˜€B" = 1 + 2 + 1 = 4 UTF-16 units (should fit in 5) + (8, "AπŸ˜€B", "Test"), + # "AπŸ˜€BπŸ˜€C" = 1 + 2 + 1 + 2 + 1 = 7 UTF-16 units (should fail for short_text) + (9, "AπŸ˜€BπŸ˜€C", "Test") + ] + + # Should work + cursor.executemany("INSERT INTO #pytest_utf16_validation VALUES (?, ?, ?)", + [mixed_cases[0]]) + db_connection.commit() + + # Should fail + with pytest.raises(Exception) as exc_info: + cursor.executemany("INSERT INTO #pytest_utf16_validation VALUES (?, ?, ?)", + [mixed_cases[1]]) + + error_msg = str(exc_info.value) + # Accept either our validation error or SQL Server's truncation error or UTF-16 encoding errors + assert ("exceeds allowed column size" in error_msg or + "String or binary data would be truncated" in error_msg or + "illegal UTF-16 surrogate" in error_msg or + "utf-16" in error_msg.lower()), f"Mixed Unicode string should trigger length error, got: {error_msg}" + + # Test 6: Verify no silent truncation occurs + # Before the fix, oversized strings might get silently truncated + cursor.execute("SELECT short_text FROM #pytest_utf16_validation WHERE short_text LIKE '%πŸ˜€%'") + emoji_results = cursor.fetchall() + + # All emoji strings should be complete (no truncation) + for result in emoji_results: + text = result[0] + # Count actual emoji characters - they should all be present + emoji_count = text.count('πŸ˜€') + assert emoji_count > 0, f"Emoji should be preserved in result: {text}" + + # String should not end with incomplete surrogate pairs or truncation + # This would happen if UTF-16 conversion was truncated mid-character + assert len(text) > 0, "String should not be empty due to truncation" + + print(f"UTF-16 length validation test completed successfully on {platform.system()}") + + except Exception as e: + pytest.fail(f"UTF-16 length validation test failed: {e}") + + finally: + drop_table_if_exists(cursor, "#pytest_utf16_validation") + db_connection.commit() + def test_close(db_connection): """Test closing the cursor""" try: