Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class MSSQLToGCSOperator(BaseSQLToGCSOperator):

ui_color = "#e0a98c"

type_map = {2: "BOOLEAN", 3: "INTEGER", 4: "TIMESTAMP", 5: "NUMERIC"}
type_map = {2: "BOOL", 3: "INTEGER", 4: "TIMESTAMP", 5: "NUMERIC"}

def __init__(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
SQL = "select 1"
BUCKET = "gs://test"
JSON_FILENAME = "test_{}.ndjson"
PARQUET_FILENAME = "test_{}.parquet"
GZIP = False

ROWS = [
Expand All @@ -57,14 +58,14 @@
SCHEMA_JSON = [
b'[{"mode": "NULLABLE", "name": "some_str", "type": "STRING"}, ',
b'{"mode": "NULLABLE", "name": "some_num", "type": "INTEGER"}, ',
b'{"mode": "NULLABLE", "name": "some_binary", "type": "BOOLEAN"}, ',
b'{"mode": "NULLABLE", "name": "some_bit", "type": "BOOLEAN"}]',
b'{"mode": "NULLABLE", "name": "some_binary", "type": "BOOL"}, ',
b'{"mode": "NULLABLE", "name": "some_bit", "type": "BOOL"}]',
]

SCHEMA_JSON_BIT_FIELDS = [
b'[{"mode": "NULLABLE", "name": "some_str", "type": "STRING"}, ',
b'{"mode": "NULLABLE", "name": "some_num", "type": "INTEGER"}, ',
b'{"mode": "NULLABLE", "name": "some_binary", "type": "BOOLEAN"}, ',
b'{"mode": "NULLABLE", "name": "some_binary", "type": "BOOL"}, ',
b'{"mode": "NULLABLE", "name": "some_bit", "type": "INTEGER"}]',
]

Expand Down Expand Up @@ -253,3 +254,53 @@ def db_hook(self):
assert len(lineage.job_facets) == 1
assert lineage.job_facets["sql"].query == sql
assert lineage.run_facets == {}

@mock.patch("airflow.providers.google.cloud.transfers.mssql_to_gcs.MsSqlHook")
@mock.patch("airflow.providers.google.cloud.transfers.sql_to_gcs.GCSHook")
def test_bit_to_boolean_field_conversion(self, gcs_hook_mock_class, mssql_hook_mock_class):
"""Test successful run of execute function for Parquet format with boolean fields.

This test verifies that MSSQL tables with columns of type "BIT" can exported
using the bit_fields parameter, resulting in boolean fields in the Parquet file.
"""
import pyarrow

op = MSSQLToGCSOperator(
task_id=TASK_ID,
mssql_conn_id=MSSQL_CONN_ID,
sql=SQL,
bucket=BUCKET,
filename=PARQUET_FILENAME,
export_format="parquet",
bit_fields=["some_binary", "some_bit"],
)

mssql_hook_mock = mssql_hook_mock_class.return_value
mssql_hook_mock.get_conn().cursor().__iter__.return_value = iter(ROWS)
mssql_hook_mock.get_conn().cursor().description = CURSOR_DESCRIPTION

gcs_hook_mock = gcs_hook_mock_class.return_value

upload_called = False

def _assert_upload(bucket, obj, tmp_filename, mime_type=None, gzip=False, metadata=None):
nonlocal upload_called
upload_called = True
assert bucket == BUCKET
assert obj == PARQUET_FILENAME.format(0)
assert mime_type == "application/octet-stream"
assert gzip == GZIP

parquet_file = pyarrow.parquet.ParquetFile(tmp_filename)
schema = parquet_file.schema_arrow
# Verify that bit fields are mapped to boolean type in parquet schema
assert schema.field("some_binary").type.equals(pyarrow.bool_())
assert schema.field("some_bit").type.equals(pyarrow.bool_())

gcs_hook_mock.upload.side_effect = _assert_upload

op.execute(None)

assert upload_called, "Expected upload to be called"
mssql_hook_mock_class.assert_called_once_with(mssql_conn_id=MSSQL_CONN_ID)
mssql_hook_mock.get_conn().cursor().execute.assert_called_once_with(SQL)