Skip to content

Commit

Permalink
[AIRFLOW-2224] Add support CSV files in MySqlToGoogleCloudStorageOper…
Browse files Browse the repository at this point in the history
…ator

MySqlToGoogleCloudStorageOperator supported export from MySQL in
newline-delimited JSON format only. Added support for export in
CSV format with the option of specifying a field delimiter.
Thanks to Bernardo Najlis(@bnajlis) for the original PR(#3139).
I made some changes to the core logic of the original PR.
  • Loading branch information
ttanay committed Feb 19, 2019
1 parent c48e83e commit e6ef05e
Show file tree
Hide file tree
Showing 2 changed files with 289 additions and 54 deletions.
102 changes: 78 additions & 24 deletions airflow/contrib/operators/mysql_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,18 @@
from MySQLdb.constants import FIELD_TYPE
from tempfile import NamedTemporaryFile
from six import string_types
import unicodecsv as csv

PY3 = sys.version_info[0] == 3


class MySqlToGoogleCloudStorageOperator(BaseOperator):
"""
Copy data from MySQL to Google cloud storage in JSON format.
"""Copy data from MySQL to Google cloud storage in JSON or CSV format.
The JSON data files generated are newline-delimited to enable them to be
loaded into BigQuery.
Reference: https://cloud.google.com/bigquery/docs/
loading-data-cloud-storage-json#limitations
:param sql: The SQL to execute on the MySQL table.
:type sql: str
Expand Down Expand Up @@ -72,6 +77,10 @@ class MySqlToGoogleCloudStorageOperator(BaseOperator):
work, the service account making the request must have domain-wide
delegation enabled.
:type delegate_to: str
:param export_format: Desired format of files to be exported.
:type export_format: str
:param field_delimiter: The delimiter to be used for CSV files.
:type field_delimiter: str
"""
template_fields = ('sql', 'bucket', 'filename', 'schema_filename', 'schema')
template_ext = ('.sql',)
Expand All @@ -88,6 +97,8 @@ def __init__(self,
google_cloud_storage_conn_id='google_cloud_default',
schema=None,
delegate_to=None,
export_format='json',
field_delimiter=',',
*args,
**kwargs):
super(MySqlToGoogleCloudStorageOperator, self).__init__(*args, **kwargs)
Expand All @@ -100,24 +111,28 @@ def __init__(self,
self.google_cloud_storage_conn_id = google_cloud_storage_conn_id
self.schema = schema
self.delegate_to = delegate_to
self.export_format = export_format.lower()
self.field_delimiter = field_delimiter

def execute(self, context):
cursor = self._query_mysql()
files_to_upload = self._write_local_data_files(cursor)

# If a schema is set, create a BQ schema JSON file.
if self.schema_filename:
files_to_upload.update(self._write_local_schema_file(cursor))
files_to_upload.append(self._write_local_schema_file(cursor))

# Flush all files before uploading.
for file_handle in files_to_upload.values():
file_handle.flush()
for tmp_file in files_to_upload:
tmp_file_handle = tmp_file.get('file_handle')
tmp_file_handle.flush()

self._upload_to_gcs(files_to_upload)

# Close all temp file handles.
for file_handle in files_to_upload.values():
file_handle.close()
for tmp_file in files_to_upload:
tmp_file_handle = tmp_file.get('file_handle')
tmp_file_handle.close()

def _query_mysql(self):
"""
Expand All @@ -141,41 +156,73 @@ def _write_local_data_files(self, cursor):
col_type_dict = self._get_col_type_dict()
file_no = 0
tmp_file_handle = NamedTemporaryFile(delete=True)
tmp_file_handles = {self.filename.format(file_no): tmp_file_handle}
if self.export_format == 'csv':
file_mime_type = 'text/csv'
else:
file_mime_type = 'application/json'
files_to_upload = [{
'file_name': self.filename.format(file_no),
'file_handle': tmp_file_handle,
'file_mime_type': file_mime_type
}]

if self.export_format == 'csv':
csv_writer = self._configure_csv_file(tmp_file_handle, schema)

for row in cursor:
# Convert datetime objects to utc seconds, and decimals to floats.
# Convert binary type object to string encoded with base64.
row = self._convert_types(schema, col_type_dict, row)
row_dict = dict(zip(schema, row))

# TODO validate that row isn't > 2MB. BQ enforces a hard row size of 2MB.
s = json.dumps(row_dict)
if PY3:
s = s.encode('utf-8')
tmp_file_handle.write(s)
if self.export_format == 'csv':
csv_writer.writerow(row)
else:
row_dict = dict(zip(schema, row))

# TODO validate that row isn't > 2MB. BQ enforces a hard row size of 2MB.
s = json.dumps(row_dict, sort_keys=True)
if PY3:
s = s.encode('utf-8')
tmp_file_handle.write(s)

# Append newline to make dumps BigQuery compatible.
tmp_file_handle.write(b'\n')
# Append newline to make dumps BigQuery compatible.
tmp_file_handle.write(b'\n')

# Stop if the file exceeds the file size limit.
if tmp_file_handle.tell() >= self.approx_max_file_size_bytes:
file_no += 1
tmp_file_handle = NamedTemporaryFile(delete=True)
tmp_file_handles[self.filename.format(file_no)] = tmp_file_handle
files_to_upload.append({
'file_name': self.filename.format(file_no),
'file_handle': tmp_file_handle,
'file_mime_type': file_mime_type
})

if self.export_format == 'csv':
csv_writer = self._configure_csv_file(tmp_file_handle, schema)

return tmp_file_handles
return files_to_upload

def _configure_csv_file(self, file_handle, schema):
"""Configure a csv writer with the file_handle and write schema
as headers for the new file.
"""
csv_writer = csv.writer(file_handle, encoding='utf-8',
delimiter=self.field_delimiter)
csv_writer.writerow(schema)
return csv_writer

def _write_local_schema_file(self, cursor):
"""
Takes a cursor, and writes the BigQuery schema for the results to a
local file system.
Takes a cursor, and writes the BigQuery schema in .json format for the
results to a local file system.
:return: A dictionary where key is a filename to be used as an object
name in GCS, and values are file handles to local files that
contains the BigQuery schema fields in .json format.
"""
schema_str = None
schema_file_mime_type = 'application/json'
tmp_schema_file_handle = NamedTemporaryFile(delete=True)
if self.schema is not None and isinstance(self.schema, string_types):
schema_str = self.schema
Expand All @@ -199,13 +246,18 @@ def _write_local_schema_file(self, cursor):
'type': field_type,
'mode': field_mode,
})
schema_str = json.dumps(schema)
schema_str = json.dumps(schema, sort_keys=True)
if PY3:
schema_str = schema_str.encode('utf-8')
tmp_schema_file_handle.write(schema_str)

self.log.info('Using schema for %s: %s', self.schema_filename, schema_str)
return {self.schema_filename: tmp_schema_file_handle}
schema_file_to_upload = {
'file_name': self.schema_filename,
'file_handle': tmp_schema_file_handle,
'file_mime_type': schema_file_mime_type
}
return schema_file_to_upload

def _upload_to_gcs(self, files_to_upload):
"""
Expand All @@ -215,8 +267,10 @@ def _upload_to_gcs(self, files_to_upload):
hook = GoogleCloudStorageHook(
google_cloud_storage_conn_id=self.google_cloud_storage_conn_id,
delegate_to=self.delegate_to)
for object, tmp_file_handle in files_to_upload.items():
hook.upload(self.bucket, object, tmp_file_handle.name, 'application/json')
for tmp_file in files_to_upload:
hook.upload(self.bucket, tmp_file.get('file_name'),
tmp_file.get('file_handle').name,
mime_type=tmp_file.get('file_mime_type'))

@staticmethod
def _convert_types(schema, col_type_dict, row):
Expand Down
Loading

0 comments on commit e6ef05e

Please sign in to comment.