Skip to content

Commit

Permalink
[AIRFLOW-2932] GoogleCloudStorageHook - allow compression of file (ap…
Browse files Browse the repository at this point in the history
…ache#3893)

- Add gzip functionality to GoogleCloudStorageHook.upload
- Resolve docstring mistype, added additional information to
  tell user that there is option to compress
- Add test case for file_to_gcs
  • Loading branch information
neil90 authored and Fokko Driesprong committed Oct 13, 2018
1 parent 764c3bf commit f478ab5
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 5 deletions.
24 changes: 22 additions & 2 deletions airflow/contrib/hooks/gcs_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@
from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
from airflow.exceptions import AirflowException

import gzip as gz
import shutil
import re
import os


class GoogleCloudStorageHook(GoogleCloudBaseHook):
Expand Down Expand Up @@ -171,7 +174,8 @@ def download(self, bucket, object, filename=None):
return downloaded_file_bytes

# pylint:disable=redefined-builtin
def upload(self, bucket, object, filename, mime_type='application/octet-stream'):
def upload(self, bucket, object, filename,
mime_type='application/octet-stream', gzip=False):
"""
Uploads a local file to Google Cloud Storage.
Expand All @@ -182,15 +186,31 @@ def upload(self, bucket, object, filename, mime_type='application/octet-stream')
:param filename: The local file path to the file to be uploaded.
:type filename: string
:param mime_type: The MIME type to set when uploading the file.
:type mime_type: string
:type mime_type: str
:param gzip: Option to compress file for upload
:type gzip: bool
"""
service = self.get_conn()

if gzip:
filename_gz = filename + '.gz'

with open(filename, 'rb') as f_in:
with gz.open(filename_gz, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
filename = filename_gz

media = MediaFileUpload(filename, mime_type)

try:
service \
.objects() \
.insert(bucket=bucket, name=object, media_body=media) \
.execute()

# Clean up gzip file
if gzip:
os.remove(filename)
return True
except errors.HttpError as ex:
if ex.resp['status'] == '404':
Expand Down
13 changes: 10 additions & 3 deletions airflow/contrib/operators/file_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@

class FileToGoogleCloudStorageOperator(BaseOperator):
"""
Uploads a file to Google Cloud Storage
Uploads a file to Google Cloud Storage.
Optionally can compress the file for upload.
:param src: Path to the local file. (templated)
:type src: string
Expand All @@ -38,7 +39,9 @@ class FileToGoogleCloudStorageOperator(BaseOperator):
:param mime_type: The mime-type string
:type mime_type: string
:param delegate_to: The account to impersonate, if any
:type delegate_to: string
:type delegate_to: str
:param gzip: Allows for file to be compressed and uploaded as gzip
:type gzip: bool
"""
template_fields = ('src', 'dst', 'bucket')

Expand All @@ -50,6 +53,7 @@ def __init__(self,
google_cloud_storage_conn_id='google_cloud_default',
mime_type='application/octet-stream',
delegate_to=None,
gzip=False,
*args,
**kwargs):
super(FileToGoogleCloudStorageOperator, self).__init__(*args, **kwargs)
Expand All @@ -59,6 +63,7 @@ def __init__(self,
self.google_cloud_storage_conn_id = google_cloud_storage_conn_id
self.mime_type = mime_type
self.delegate_to = delegate_to
self.gzip = gzip

def execute(self, context):
"""
Expand All @@ -72,4 +77,6 @@ def execute(self, context):
bucket=self.bucket,
object=self.dst,
mime_type=self.mime_type,
filename=self.src)
filename=self.src,
gzip=self.gzip,
)
66 changes: 66 additions & 0 deletions tests/contrib/operators/test_file_to_gcs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import datetime
import unittest

from airflow import DAG, configuration
from airflow.contrib.operators.file_to_gcs import FileToGoogleCloudStorageOperator

try:
from unittest import mock
except ImportError:
try:
import mock
except ImportError:
mock = None


class TestFileToGcsOperator(unittest.TestCase):

_config = {
'src': '/tmp/fake.csv',
'dst': 'fake.csv',
'bucket': 'dummy',
'mime_type': 'application/octet-stream',
'gzip': False
}

def setUp(self):
configuration.load_test_config()
args = {
'owner': 'airflow',
'start_date': datetime.datetime(2017, 1, 1)
}
self.dag = DAG('test_dag_id', default_args=args)

def test_init(self):
operator = FileToGoogleCloudStorageOperator(
task_id='file_to_gcs_operator',
dag=self.dag,
**self._config
)
self.assertEqual(operator.src, self._config['src'])
self.assertEqual(operator.dst, self._config['dst'])
self.assertEqual(operator.bucket, self._config['bucket'])
self.assertEqual(operator.mime_type, self._config['mime_type'])
self.assertEqual(operator.gzip, self._config['gzip'])

@mock.patch('airflow.contrib.operators.file_to_gcs.GoogleCloudStorageHook',
autospec=True)
def test_execute(self, mock_hook):
mock_instance = mock_hook.return_value
operator = FileToGoogleCloudStorageOperator(
task_id='gcs_to_file_sensor',
dag=self.dag,
**self._config
)
operator.execute(None)
mock_instance.upload.assert_called_once_with(
bucket=self._config['bucket'],
filename=self._config['src'],
gzip=self._config['gzip'],
mime_type=self._config['mime_type'],
object=self._config['dst']
)


if __name__ == '__main__':
unittest.main()

0 comments on commit f478ab5

Please sign in to comment.