diff --git a/providers/google/src/airflow/providers/google/cloud/transfers/mssql_to_gcs.py b/providers/google/src/airflow/providers/google/cloud/transfers/mssql_to_gcs.py index 8f861aaee690a..a393a45688d4d 100644 --- a/providers/google/src/airflow/providers/google/cloud/transfers/mssql_to_gcs.py +++ b/providers/google/src/airflow/providers/google/cloud/transfers/mssql_to_gcs.py @@ -67,7 +67,7 @@ class MSSQLToGCSOperator(BaseSQLToGCSOperator): ui_color = "#e0a98c" - type_map = {2: "BOOLEAN", 3: "INTEGER", 4: "TIMESTAMP", 5: "NUMERIC"} + type_map = {2: "BOOL", 3: "INTEGER", 4: "TIMESTAMP", 5: "NUMERIC"} def __init__( self, diff --git a/providers/google/tests/unit/google/cloud/transfers/test_mssql_to_gcs.py b/providers/google/tests/unit/google/cloud/transfers/test_mssql_to_gcs.py index c04cf2079708d..7878ad156b734 100644 --- a/providers/google/tests/unit/google/cloud/transfers/test_mssql_to_gcs.py +++ b/providers/google/tests/unit/google/cloud/transfers/test_mssql_to_gcs.py @@ -35,6 +35,7 @@ SQL = "select 1" BUCKET = "gs://test" JSON_FILENAME = "test_{}.ndjson" +PARQUET_FILENAME = "test_{}.parquet" GZIP = False ROWS = [ @@ -57,14 +58,14 @@ SCHEMA_JSON = [ b'[{"mode": "NULLABLE", "name": "some_str", "type": "STRING"}, ', b'{"mode": "NULLABLE", "name": "some_num", "type": "INTEGER"}, ', - b'{"mode": "NULLABLE", "name": "some_binary", "type": "BOOLEAN"}, ', - b'{"mode": "NULLABLE", "name": "some_bit", "type": "BOOLEAN"}]', + b'{"mode": "NULLABLE", "name": "some_binary", "type": "BOOL"}, ', + b'{"mode": "NULLABLE", "name": "some_bit", "type": "BOOL"}]', ] SCHEMA_JSON_BIT_FIELDS = [ b'[{"mode": "NULLABLE", "name": "some_str", "type": "STRING"}, ', b'{"mode": "NULLABLE", "name": "some_num", "type": "INTEGER"}, ', - b'{"mode": "NULLABLE", "name": "some_binary", "type": "BOOLEAN"}, ', + b'{"mode": "NULLABLE", "name": "some_binary", "type": "BOOL"}, ', b'{"mode": "NULLABLE", "name": "some_bit", "type": "INTEGER"}]', ] @@ -253,3 +254,53 @@ def db_hook(self): assert len(lineage.job_facets) == 1 assert lineage.job_facets["sql"].query == sql assert lineage.run_facets == {} + + @mock.patch("airflow.providers.google.cloud.transfers.mssql_to_gcs.MsSqlHook") + @mock.patch("airflow.providers.google.cloud.transfers.sql_to_gcs.GCSHook") + def test_bit_to_boolean_field_conversion(self, gcs_hook_mock_class, mssql_hook_mock_class): + """Test successful run of execute function for Parquet format with boolean fields. + + This test verifies that MSSQL tables with columns of type "BIT" can exported + using the bit_fields parameter, resulting in boolean fields in the Parquet file. + """ + import pyarrow + + op = MSSQLToGCSOperator( + task_id=TASK_ID, + mssql_conn_id=MSSQL_CONN_ID, + sql=SQL, + bucket=BUCKET, + filename=PARQUET_FILENAME, + export_format="parquet", + bit_fields=["some_binary", "some_bit"], + ) + + mssql_hook_mock = mssql_hook_mock_class.return_value + mssql_hook_mock.get_conn().cursor().__iter__.return_value = iter(ROWS) + mssql_hook_mock.get_conn().cursor().description = CURSOR_DESCRIPTION + + gcs_hook_mock = gcs_hook_mock_class.return_value + + upload_called = False + + def _assert_upload(bucket, obj, tmp_filename, mime_type=None, gzip=False, metadata=None): + nonlocal upload_called + upload_called = True + assert bucket == BUCKET + assert obj == PARQUET_FILENAME.format(0) + assert mime_type == "application/octet-stream" + assert gzip == GZIP + + parquet_file = pyarrow.parquet.ParquetFile(tmp_filename) + schema = parquet_file.schema_arrow + # Verify that bit fields are mapped to boolean type in parquet schema + assert schema.field("some_binary").type.equals(pyarrow.bool_()) + assert schema.field("some_bit").type.equals(pyarrow.bool_()) + + gcs_hook_mock.upload.side_effect = _assert_upload + + op.execute(None) + + assert upload_called, "Expected upload to be called" + mssql_hook_mock_class.assert_called_once_with(mssql_conn_id=MSSQL_CONN_ID) + mssql_hook_mock.get_conn().cursor().execute.assert_called_once_with(SQL)