Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-2932] GoogleCloudStorageHook - allow compression of file #3893

Merged
merged 8 commits into from
Oct 13, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion airflow/contrib/hooks/gcs_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@
from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
from airflow.exceptions import AirflowException

import gzip as gz
import shutil
import re
import os


class GoogleCloudStorageHook(GoogleCloudBaseHook):
Expand Down Expand Up @@ -172,7 +175,8 @@ def download(self, bucket, object, filename=None):
return downloaded_file_bytes

# pylint:disable=redefined-builtin
def upload(self, bucket, object, filename, mime_type='application/octet-stream'):
def upload(self, bucket, object, filename,
mime_type='application/octet-stream', gzip=False):
"""
Uploads a local file to Google Cloud Storage.

Expand All @@ -184,14 +188,30 @@ def upload(self, bucket, object, filename, mime_type='application/octet-stream')
:type filename: str
:param mime_type: The MIME type to set when uploading the file.
:type mime_type: str
:param gzip: Option to compress file for upload
:type gzip: bool
"""
service = self.get_conn()

if gzip:
filename_gz = filename + '.gz'

with open(filename, 'rb') as f_in:
with gz.open(filename_gz, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
filename = filename_gz

media = MediaFileUpload(filename, mime_type)

try:
service \
.objects() \
.insert(bucket=bucket, name=object, media_body=media) \
.execute()

# Clean up gzip file
if gzip:
os.remove(filename)
return True
except errors.HttpError as ex:
if ex.resp['status'] == '404':
Expand Down
11 changes: 9 additions & 2 deletions airflow/contrib/operators/file_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@

class FileToGoogleCloudStorageOperator(BaseOperator):
"""
Uploads a file to Google Cloud Storage
Uploads a file to Google Cloud Storage.
Optionally can compress the file for upload.

:param src: Path to the local file. (templated)
:type src: str
Expand All @@ -39,6 +40,8 @@ class FileToGoogleCloudStorageOperator(BaseOperator):
:type mime_type: str
:param delegate_to: The account to impersonate, if any
:type delegate_to: str
:param gzip: Allows for file to be compressed and uploaded as gzip
:type gzip: bool
"""
template_fields = ('src', 'dst', 'bucket')

Expand All @@ -50,6 +53,7 @@ def __init__(self,
google_cloud_storage_conn_id='google_cloud_default',
mime_type='application/octet-stream',
delegate_to=None,
gzip=False,
*args,
**kwargs):
super(FileToGoogleCloudStorageOperator, self).__init__(*args, **kwargs)
Expand All @@ -59,6 +63,7 @@ def __init__(self,
self.google_cloud_storage_conn_id = google_cloud_storage_conn_id
self.mime_type = mime_type
self.delegate_to = delegate_to
self.gzip = gzip

def execute(self, context):
"""
Expand All @@ -72,4 +77,6 @@ def execute(self, context):
bucket=self.bucket,
object=self.dst,
mime_type=self.mime_type,
filename=self.src)
filename=self.src,
gzip=self.gzip,
)
66 changes: 66 additions & 0 deletions tests/contrib/operators/test_file_to_gcs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import datetime
import unittest

from airflow import DAG, configuration
from airflow.contrib.operators.file_to_gcs import FileToGoogleCloudStorageOperator

try:
from unittest import mock
except ImportError:
try:
import mock
except ImportError:
mock = None


class TestFileToGcsOperator(unittest.TestCase):

_config = {
'src': '/tmp/fake.csv',
'dst': 'fake.csv',
'bucket': 'dummy',
'mime_type': 'application/octet-stream',
'gzip': False
}

def setUp(self):
configuration.load_test_config()
args = {
'owner': 'airflow',
'start_date': datetime.datetime(2017, 1, 1)
}
self.dag = DAG('test_dag_id', default_args=args)

def test_init(self):
operator = FileToGoogleCloudStorageOperator(
task_id='file_to_gcs_operator',
dag=self.dag,
**self._config
)
self.assertEqual(operator.src, self._config['src'])
self.assertEqual(operator.dst, self._config['dst'])
self.assertEqual(operator.bucket, self._config['bucket'])
self.assertEqual(operator.mime_type, self._config['mime_type'])
self.assertEqual(operator.gzip, self._config['gzip'])

@mock.patch('airflow.contrib.operators.file_to_gcs.GoogleCloudStorageHook',
autospec=True)
def test_execute(self, mock_hook):
mock_instance = mock_hook.return_value
operator = FileToGoogleCloudStorageOperator(
task_id='gcs_to_file_sensor',
dag=self.dag,
**self._config
)
operator.execute(None)
mock_instance.upload.assert_called_once_with(
bucket=self._config['bucket'],
filename=self._config['src'],
gzip=self._config['gzip'],
mime_type=self._config['mime_type'],
object=self._config['dst']
)


if __name__ == '__main__':
unittest.main()