From d0400f468e271fc17cd84befadd7dc1027f10b7d Mon Sep 17 00:00:00 2001 From: Thomas Elvey Date: Wed, 10 Apr 2019 16:06:48 +0100 Subject: [PATCH] [AIRFLOW-4268] Add MsSqlToGoogleCloudStorageOperator * Add operator MsSqlToGoogleCloudStorageOperator * Add unit test for MsSqlToGoogleCloudStorageOperator * Update intergration.rst documentation --- airflow/contrib/operators/mssql_to_gcs.py | 235 ++++++++++++++++++ docs/integration.rst | 3 + .../operators/test_mssql_to_gcs_operator.py | 157 ++++++++++++ 3 files changed, 395 insertions(+) create mode 100644 airflow/contrib/operators/mssql_to_gcs.py create mode 100644 tests/contrib/operators/test_mssql_to_gcs_operator.py diff --git a/airflow/contrib/operators/mssql_to_gcs.py b/airflow/contrib/operators/mssql_to_gcs.py new file mode 100644 index 0000000000000..1097d481675e8 --- /dev/null +++ b/airflow/contrib/operators/mssql_to_gcs.py @@ -0,0 +1,235 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import json +import decimal + +from airflow.models import BaseOperator +from airflow.utils.decorators import apply_defaults +from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook +from airflow.hooks.mssql_hook import MsSqlHook +from tempfile import NamedTemporaryFile + + +class MsSqlToGoogleCloudStorageOperator(BaseOperator): + """ + Copy data from Microsoft SQL Server to Google Cloud Storage + in JSON format. + + :param sql: The SQL to execute on the MSSQL table. + :type sql: str + :param bucket: The bucket to upload to. + :type bucket: str + :param filename: The filename to use as the object name when uploading + to Google Cloud Storage. A {} should be specified in the filename + to allow the operator to inject file numbers in cases where the + file is split due to size, e.g. filename='data/customers/export_{}.json'. + :type filename: str + :param schema_filename: If set, the filename to use as the object name + when uploading a .json file containing the BigQuery schema fields + for the table that was dumped from MSSQL. + :type schema_filename: str + :param approx_max_file_size_bytes: This operator supports the ability + to split large table dumps into multiple files. + :type approx_max_file_size_bytes: long + :param gzip: Option to compress file for upload (does not apply to schemas). + :type gzip: bool + :param mssql_conn_id: Reference to a specific MSSQL hook. + :type mssql_conn_id: str + :param google_cloud_storage_conn_id: Reference to a specific Google + cloud storage hook. + :type google_cloud_storage_conn_id: str + :param delegate_to: The account to impersonate, if any. For this to + work, the service account making the request must have domain-wide + delegation enabled. + :type delegate_to: str + + **Example**: + The following operator will export data from the Customers table + within the given MSSQL Database and then upload it to the + 'mssql-export' GCS bucket (along with a schema file). :: + + export_customers = MsSqlToGoogleCloudStorageOperator( + task_id='export_customers', + sql='SELECT * FROM dbo.Customers;', + bucket='mssql-export', + filename='data/customers/export.json', + schema_filename='schemas/export.json', + mssql_conn_id='mssql_default', + google_cloud_storage_conn_id='google_cloud_default', + dag=dag + ) + """ + + template_fields = ('sql', 'bucket', 'filename', 'schema_filename') + template_ext = ('.sql',) + ui_color = '#e0a98c' + + @apply_defaults + def __init__(self, + sql, + bucket, + filename, + schema_filename=None, + approx_max_file_size_bytes=1900000000, + gzip=False, + mssql_conn_id='mssql_default', + google_cloud_storage_conn_id='google_cloud_default', + delegate_to=None, + *args, + **kwargs): + + super(MsSqlToGoogleCloudStorageOperator, self).__init__(*args, **kwargs) + self.sql = sql + self.bucket = bucket + self.filename = filename + self.schema_filename = schema_filename + self.approx_max_file_size_bytes = approx_max_file_size_bytes + self.gzip = gzip + self.mssql_conn_id = mssql_conn_id + self.google_cloud_storage_conn_id = google_cloud_storage_conn_id + self.delegate_to = delegate_to + + def execute(self, context): + cursor = self._query_mssql() + files_to_upload = self._write_local_data_files(cursor) + + # If a schema is set, create a BQ schema JSON file. + if self.schema_filename: + files_to_upload.update(self._write_local_schema_file(cursor)) + + # Flush all files before uploading + for file_handle in files_to_upload.values(): + file_handle.flush() + + self._upload_to_gcs(files_to_upload) + + # Close all temp file handles + for file_handle in files_to_upload.values(): + file_handle.close() + + def _query_mssql(self): + """ + Queries MSSQL and returns a cursor of results. + + :return: mssql cursor + """ + mssql = MsSqlHook(mssql_conn_id=self.mssql_conn_id) + conn = mssql.get_conn() + cursor = conn.cursor() + cursor.execute(self.sql) + return cursor + + def _write_local_data_files(self, cursor): + """ + Takes a cursor, and writes results to a local file. + + :return: A dictionary where keys are filenames to be used as object + names in GCS, and values are file handles to local files that + contain the data for the GCS objects. + """ + schema = list(map(lambda schema_tuple: schema_tuple[0].replace(' ', '_'), cursor.description)) + file_no = 0 + tmp_file_handle = NamedTemporaryFile(delete=True) + tmp_file_handles = {self.filename.format(file_no): tmp_file_handle} + + for row in cursor: + # Convert if needed + row = map(self.convert_types, row) + row_dict = dict(zip(schema, row)) + + s = json.dumps(row_dict, sort_keys=True) + s = s.encode('utf-8') + tmp_file_handle.write(s) + + # Append newline to make dumps BQ compatible + tmp_file_handle.write(b'\n') + + # Stop if the file exceeds the file size limit + if tmp_file_handle.tell() >= self.approx_max_file_size_bytes: + file_no += 1 + tmp_file_handle = NamedTemporaryFile(delete=True) + tmp_file_handles[self.filename.format(file_no)] = tmp_file_handle + + return tmp_file_handles + + def _write_local_schema_file(self, cursor): + """ + Takes a cursor, and writes the BigQuery schema for the results to a + local file system. + + :return: A dictionary where key is a filename to be used as an object + name in GCS, and values are file handles to local files that + contains the BigQuery schema fields in .json format. + """ + schema = [] + for field in cursor.description: + # See PEP 249 for details about the description tuple. + field_name = field[0].replace(' ', '_') # Clean spaces + field_type = self.type_map(field[1]) + field_mode = 'NULLABLE' # pymssql doesn't support field_mode + + schema.append({ + 'name': field_name, + 'type': field_type, + 'mode': field_mode, + }) + + self.log.info('Using schema for %s: %s', self.schema_filename, schema) + tmp_schema_file_handle = NamedTemporaryFile(delete=True) + s = json.dumps(schema, sort_keys=True) + s = s.encode('utf-8') + tmp_schema_file_handle.write(s) + return {self.schema_filename: tmp_schema_file_handle} + + def _upload_to_gcs(self, files_to_upload): + """ + Upload all of the file splits (and optionally the schema .json file) to + Google cloud storage. + """ + hook = GoogleCloudStorageHook( + google_cloud_storage_conn_id=self.google_cloud_storage_conn_id, + delegate_to=self.delegate_to) + for object_name, tmp_file_handle in files_to_upload.items(): + hook.upload(self.bucket, object_name, tmp_file_handle.name, 'application/json', + (self.gzip if object_name != self.schema_filename else False)) + + @classmethod + def convert_types(cls, value): + """ + Takes a value from MSSQL, and converts it to a value that's safe for + JSON/Google Cloud Storage/BigQuery. + """ + if isinstance(value, decimal.Decimal): + return float(value) + else: + return value + + @classmethod + def type_map(cls, mssql_type): + """ + Helper function that maps from MSSQL fields to BigQuery fields. Used + when a schema_filename is set. + """ + d = { + 3: 'INTEGER', + 4: 'TIMESTAMP', + 5: 'NUMERIC' + } + return d[mssql_type] if mssql_type in d else 'STRING' diff --git a/docs/integration.rst b/docs/integration.rst index 419a36631d8e2..a8b7d2dff0e55 100644 --- a/docs/integration.rst +++ b/docs/integration.rst @@ -579,6 +579,9 @@ Cloud Storage :class:`airflow.contrib.operators.mysql_to_gcs.MySqlToGoogleCloudStorageOperator` Copy data from any MySQL Database to Google cloud storage in JSON format. +:class:`airflow.contrib.operators.mssql_to_gcs.MsSqlToGoogleCloudStorageOperator` + Copy data from any Microsoft SQL Server Database to Google Cloud Storage in JSON format. + They also use :class:`airflow.contrib.hooks.gcs_hook.GoogleCloudStorageHook` to communicate with Google Cloud Platform. diff --git a/tests/contrib/operators/test_mssql_to_gcs_operator.py b/tests/contrib/operators/test_mssql_to_gcs_operator.py new file mode 100644 index 0000000000000..81a5cb21769ab --- /dev/null +++ b/tests/contrib/operators/test_mssql_to_gcs_operator.py @@ -0,0 +1,157 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import sys +import unittest + +from airflow.contrib.operators.mssql_to_gcs import \ + MsSqlToGoogleCloudStorageOperator +from tests.compat import mock + +PY3 = sys.version_info[0] == 3 + +TASK_ID = 'test-mssql-to-gcs' +MSSQL_CONN_ID = 'mssql_conn_test' +SQL = 'select 1' +BUCKET = 'gs://test' +JSON_FILENAME = 'test_{}.ndjson' +GZIP = False + +ROWS = [ + ('mock_row_content_1', 42), + ('mock_row_content_2', 43), + ('mock_row_content_3', 44) +] +CURSOR_DESCRIPTION = ( + ('some_str', 0, None, None, None, None, None), + ('some_num', 3, None, None, None, None, None) +) +NDJSON_LINES = [ + b'{"some_num": 42, "some_str": "mock_row_content_1"}\n', + b'{"some_num": 43, "some_str": "mock_row_content_2"}\n', + b'{"some_num": 44, "some_str": "mock_row_content_3"}\n' +] +SCHEMA_FILENAME = 'schema_test.json' +SCHEMA_JSON = [ + b'[{"mode": "NULLABLE", "name": "some_str", "type": "STRING"}, ', + b'{"mode": "NULLABLE", "name": "some_num", "type": "INTEGER"}]' +] + + +class MsSqlToGoogleCloudStorageOperatorTest(unittest.TestCase): + + def test_init(self): + """Test MySqlToGoogleCloudStorageOperator instance is properly initialized.""" + op = MsSqlToGoogleCloudStorageOperator( + task_id=TASK_ID, sql=SQL, bucket=BUCKET, filename=JSON_FILENAME) + self.assertEqual(op.task_id, TASK_ID) + self.assertEqual(op.sql, SQL) + self.assertEqual(op.bucket, BUCKET) + self.assertEqual(op.filename, JSON_FILENAME) + + @mock.patch('airflow.contrib.operators.mssql_to_gcs.MsSqlHook') + @mock.patch('airflow.contrib.operators.mssql_to_gcs.GoogleCloudStorageHook') + def test_exec_success_json(self, gcs_hook_mock_class, mssql_hook_mock_class): + """Test successful run of execute function for JSON""" + op = MsSqlToGoogleCloudStorageOperator( + task_id=TASK_ID, + mssql_conn_id=MSSQL_CONN_ID, + sql=SQL, + bucket=BUCKET, + filename=JSON_FILENAME) + + mssql_hook_mock = mssql_hook_mock_class.return_value + mssql_hook_mock.get_conn().cursor().__iter__.return_value = iter(ROWS) + mssql_hook_mock.get_conn().cursor().description = CURSOR_DESCRIPTION + + gcs_hook_mock = gcs_hook_mock_class.return_value + + def _assert_upload(bucket, obj, tmp_filename, mime_type=None, gzip=False): + self.assertEqual(BUCKET, bucket) + self.assertEqual(JSON_FILENAME.format(0), obj) + self.assertEqual('application/json', mime_type) + self.assertEqual(GZIP, gzip) + with open(tmp_filename, 'rb') as f: + self.assertEqual(b''.join(NDJSON_LINES), f.read()) + + gcs_hook_mock.upload.side_effect = _assert_upload + + op.execute(None) + + mssql_hook_mock_class.assert_called_once_with(mssql_conn_id=MSSQL_CONN_ID) + mssql_hook_mock.get_conn().cursor().execute.assert_called_once_with(SQL) + + @mock.patch('airflow.contrib.operators.mssql_to_gcs.MsSqlHook') + @mock.patch('airflow.contrib.operators.mssql_to_gcs.GoogleCloudStorageHook') + def test_file_splitting(self, gcs_hook_mock_class, mssql_hook_mock_class): + """Test that ndjson is split by approx_max_file_size_bytes param.""" + mssql_hook_mock = mssql_hook_mock_class.return_value + mssql_hook_mock.get_conn().cursor().__iter__.return_value = iter(ROWS) + mssql_hook_mock.get_conn().cursor().description = CURSOR_DESCRIPTION + + gcs_hook_mock = gcs_hook_mock_class.return_value + expected_upload = { + JSON_FILENAME.format(0): b''.join(NDJSON_LINES[:2]), + JSON_FILENAME.format(1): NDJSON_LINES[2], + } + + def _assert_upload(bucket, obj, tmp_filename, mime_type=None, gzip=False): + self.assertEqual(BUCKET, bucket) + self.assertEqual('application/json', mime_type) + self.assertEqual(GZIP, gzip) + with open(tmp_filename, 'rb') as f: + self.assertEqual(expected_upload[obj], f.read()) + + gcs_hook_mock.upload.side_effect = _assert_upload + + op = MsSqlToGoogleCloudStorageOperator( + task_id=TASK_ID, + sql=SQL, + bucket=BUCKET, + filename=JSON_FILENAME, + approx_max_file_size_bytes=len(expected_upload[JSON_FILENAME.format(0)])) + op.execute(None) + + @mock.patch('airflow.contrib.operators.mssql_to_gcs.MsSqlHook') + @mock.patch('airflow.contrib.operators.mssql_to_gcs.GoogleCloudStorageHook') + def test_schema_file(self, gcs_hook_mock_class, mssql_hook_mock_class): + """Test writing schema files.""" + mssql_hook_mock = mssql_hook_mock_class.return_value + mssql_hook_mock.get_conn().cursor().__iter__.return_value = iter(ROWS) + mssql_hook_mock.get_conn().cursor().description = CURSOR_DESCRIPTION + + gcs_hook_mock = gcs_hook_mock_class.return_value + + def _assert_upload(bucket, obj, tmp_filename, mime_type, gzip): + if obj == SCHEMA_FILENAME: + with open(tmp_filename, 'rb') as f: + self.assertEqual(b''.join(SCHEMA_JSON), f.read()) + + gcs_hook_mock.upload.side_effect = _assert_upload + + op = MsSqlToGoogleCloudStorageOperator( + task_id=TASK_ID, + sql=SQL, + bucket=BUCKET, + filename=JSON_FILENAME, + schema_filename=SCHEMA_FILENAME) + op.execute(None) + + # once for the file and once for the schema + self.assertEqual(2, gcs_hook_mock.upload.call_count)