Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-2224] Add support CSV files in MySqlToGoogleCloudStorageOperator #4738

Merged
merged 1 commit into from
Mar 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 78 additions & 24 deletions airflow/contrib/operators/mysql_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,18 @@
from MySQLdb.constants import FIELD_TYPE
from tempfile import NamedTemporaryFile
from six import string_types
import unicodecsv as csv

PY3 = sys.version_info[0] == 3


class MySqlToGoogleCloudStorageOperator(BaseOperator):
"""
Copy data from MySQL to Google cloud storage in JSON format.
"""Copy data from MySQL to Google cloud storage in JSON or CSV format.

The JSON data files generated are newline-delimited to enable them to be
loaded into BigQuery.
Reference: https://cloud.google.com/bigquery/docs/
loading-data-cloud-storage-json#limitations

:param sql: The SQL to execute on the MySQL table.
:type sql: str
Expand Down Expand Up @@ -72,6 +77,10 @@ class MySqlToGoogleCloudStorageOperator(BaseOperator):
work, the service account making the request must have domain-wide
delegation enabled.
:type delegate_to: str
:param export_format: Desired format of files to be exported.
:type export_format: str
:param field_delimiter: The delimiter to be used for CSV files.
:type field_delimiter: str
"""
template_fields = ('sql', 'bucket', 'filename', 'schema_filename', 'schema')
template_ext = ('.sql',)
Expand All @@ -88,6 +97,8 @@ def __init__(self,
google_cloud_storage_conn_id='google_cloud_default',
schema=None,
delegate_to=None,
export_format='json',
field_delimiter=',',
*args,
**kwargs):
super(MySqlToGoogleCloudStorageOperator, self).__init__(*args, **kwargs)
Expand All @@ -100,24 +111,28 @@ def __init__(self,
self.google_cloud_storage_conn_id = google_cloud_storage_conn_id
self.schema = schema
self.delegate_to = delegate_to
self.export_format = export_format.lower()
self.field_delimiter = field_delimiter

def execute(self, context):
cursor = self._query_mysql()
files_to_upload = self._write_local_data_files(cursor)

# If a schema is set, create a BQ schema JSON file.
if self.schema_filename:
files_to_upload.update(self._write_local_schema_file(cursor))
files_to_upload.append(self._write_local_schema_file(cursor))

# Flush all files before uploading.
for file_handle in files_to_upload.values():
file_handle.flush()
for tmp_file in files_to_upload:
tmp_file_handle = tmp_file.get('file_handle')
tmp_file_handle.flush()

self._upload_to_gcs(files_to_upload)

# Close all temp file handles.
for file_handle in files_to_upload.values():
file_handle.close()
for tmp_file in files_to_upload:
tmp_file_handle = tmp_file.get('file_handle')
tmp_file_handle.close()

def _query_mysql(self):
"""
Expand All @@ -141,41 +156,73 @@ def _write_local_data_files(self, cursor):
col_type_dict = self._get_col_type_dict()
file_no = 0
tmp_file_handle = NamedTemporaryFile(delete=True)
tmp_file_handles = {self.filename.format(file_no): tmp_file_handle}
if self.export_format == 'csv':
file_mime_type = 'text/csv'
else:
file_mime_type = 'application/json'
files_to_upload = [{
'file_name': self.filename.format(file_no),
'file_handle': tmp_file_handle,
'file_mime_type': file_mime_type
}]

if self.export_format == 'csv':
csv_writer = self._configure_csv_file(tmp_file_handle, schema)

for row in cursor:
# Convert datetime objects to utc seconds, and decimals to floats.
# Convert binary type object to string encoded with base64.
row = self._convert_types(schema, col_type_dict, row)
row_dict = dict(zip(schema, row))

# TODO validate that row isn't > 2MB. BQ enforces a hard row size of 2MB.
s = json.dumps(row_dict)
if PY3:
s = s.encode('utf-8')
tmp_file_handle.write(s)
if self.export_format == 'csv':
csv_writer.writerow(row)
else:
row_dict = dict(zip(schema, row))

# TODO validate that row isn't > 2MB. BQ enforces a hard row size of 2MB.
s = json.dumps(row_dict, sort_keys=True)
if PY3:
s = s.encode('utf-8')
tmp_file_handle.write(s)

# Append newline to make dumps BigQuery compatible.
tmp_file_handle.write(b'\n')
# Append newline to make dumps BigQuery compatible.
tmp_file_handle.write(b'\n')

# Stop if the file exceeds the file size limit.
if tmp_file_handle.tell() >= self.approx_max_file_size_bytes:
file_no += 1
tmp_file_handle = NamedTemporaryFile(delete=True)
tmp_file_handles[self.filename.format(file_no)] = tmp_file_handle
files_to_upload.append({
'file_name': self.filename.format(file_no),
'file_handle': tmp_file_handle,
'file_mime_type': file_mime_type
})

if self.export_format == 'csv':
csv_writer = self._configure_csv_file(tmp_file_handle, schema)

return tmp_file_handles
return files_to_upload

def _configure_csv_file(self, file_handle, schema):
"""Configure a csv writer with the file_handle and write schema
as headers for the new file.
"""
csv_writer = csv.writer(file_handle, encoding='utf-8',
delimiter=self.field_delimiter)
csv_writer.writerow(schema)
return csv_writer

def _write_local_schema_file(self, cursor):
"""
Takes a cursor, and writes the BigQuery schema for the results to a
local file system.
Takes a cursor, and writes the BigQuery schema in .json format for the
results to a local file system.

:return: A dictionary where key is a filename to be used as an object
name in GCS, and values are file handles to local files that
contains the BigQuery schema fields in .json format.
"""
schema_str = None
schema_file_mime_type = 'application/json'
tmp_schema_file_handle = NamedTemporaryFile(delete=True)
if self.schema is not None and isinstance(self.schema, string_types):
schema_str = self.schema
Expand All @@ -199,13 +246,18 @@ def _write_local_schema_file(self, cursor):
'type': field_type,
'mode': field_mode,
})
schema_str = json.dumps(schema)
schema_str = json.dumps(schema, sort_keys=True)
if PY3:
schema_str = schema_str.encode('utf-8')
tmp_schema_file_handle.write(schema_str)

self.log.info('Using schema for %s: %s', self.schema_filename, schema_str)
return {self.schema_filename: tmp_schema_file_handle}
schema_file_to_upload = {
'file_name': self.schema_filename,
'file_handle': tmp_schema_file_handle,
'file_mime_type': schema_file_mime_type
}
return schema_file_to_upload

def _upload_to_gcs(self, files_to_upload):
"""
Expand All @@ -215,8 +267,10 @@ def _upload_to_gcs(self, files_to_upload):
hook = GoogleCloudStorageHook(
google_cloud_storage_conn_id=self.google_cloud_storage_conn_id,
delegate_to=self.delegate_to)
for object, tmp_file_handle in files_to_upload.items():
hook.upload(self.bucket, object, tmp_file_handle.name, 'application/json')
for tmp_file in files_to_upload:
hook.upload(self.bucket, tmp_file.get('file_name'),
tmp_file.get('file_handle').name,
mime_type=tmp_file.get('file_mime_type'))

@staticmethod
def _convert_types(schema, col_type_dict, row):
Expand Down
Loading