Skip to content

Commit

Permalink
Add new LocalFilesystemToS3Operator under Amazon provider (#17168) (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
denimalpaca authored Aug 14, 2021
1 parent 721d4e7 commit 1632c9f
Show file tree
Hide file tree
Showing 3 changed files with 245 additions and 0 deletions.
42 changes: 42 additions & 0 deletions airflow/providers/amazon/aws/example_dags/example_local_to_s3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.


import os

from airflow import models
from airflow.providers.amazon.aws.transfers.local_to_s3 import LocalFilesystemToS3Operator
from airflow.utils.dates import datetime

S3_BUCKET = os.environ.get("S3_BUCKET", "test-bucket")
S3_KEY = os.environ.get("S3_KEY", "key")

with models.DAG(
"example_local_to_s3",
schedule_interval=None,
start_date=datetime(2021, 1, 1), # Override to match your needs
) as dag:
# [START howto_local_transfer_data_to_s3]
create_local_to_s3_job = LocalFilesystemToS3Operator(
task_id="create_local_to_s3_job",
filename="relative/path/to/file.csv",
dest_key=S3_KEY,
dest_bucket=S3_BUCKET,
)

create_local_to_s3_job
# [END howto_local_transfer_data_to_s3]
110 changes: 110 additions & 0 deletions airflow/providers/amazon/aws/transfers/local_to_s3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import Optional, Union

from airflow.models import BaseOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook


class LocalFilesystemToS3Operator(BaseOperator):
"""
Uploads a file from a local filesystem to Amazon S3.
:param filename: Path to the local file. Path can be either absolute
(e.g. /path/to/file.ext) or relative (e.g. ../../foo/*/*.csv). (templated)
:type filename: str
:param dest_key: The key of the object to copy to. (templated)
It can be either full s3:// style url or relative path from root level.
When it's specified as a full s3:// url, including dest_bucket results in a TypeError.
:type dest_key: str
:param dest_bucket: Name of the S3 bucket to where the object is copied. (templated)
Inclusion when `dest_key` is provided as a full s3:// url results in a TypeError.
:type dest_bucket: str
:param aws_conn_id: Connection id of the S3 connection to use
:type aws_conn_id: str
:param verify: Whether or not to verify SSL certificates for S3 connection.
By default SSL certificates are verified.
You can provide the following values:
- False: do not validate SSL certificates. SSL will still be used,
but SSL certificates will not be
verified.
- path/to/cert/bundle.pem: A filename of the CA cert bundle to uses.
You can specify this argument if you want to use a different
CA cert bundle than the one used by botocore.
:type verify: bool or str
:param replace: A flag to decide whether or not to overwrite the key
if it already exists. If replace is False and the key exists, an
error will be raised.
:type replace: bool
:param encrypt: If True, the file will be encrypted on the server-side
by S3 and will be stored in an encrypted form while at rest in S3.
:type encrypt: bool
:param gzip: If True, the file will be compressed locally
:type gzip: bool
:param acl_policy: String specifying the canned ACL policy for the file being
uploaded to the S3 bucket.
:type acl_policy: str
"""

template_fields = ('filename', 'dest_key', 'dest_bucket')

def __init__(
self,
*,
filename: str,
dest_key: str,
dest_bucket: Optional[str] = None,
aws_conn_id: str = 'aws_default',
verify: Optional[Union[str, bool]] = None,
replace: bool = False,
encrypt: bool = False,
gzip: bool = False,
acl_policy: Optional[str] = None,
**kwargs,
):
super().__init__(**kwargs)

self.filename = filename
self.dest_key = dest_key
self.dest_bucket = dest_bucket
self.aws_conn_id = aws_conn_id
self.verify = verify
self.replace = replace
self.encrypt = encrypt
self.gzip = gzip
self.acl_policy = acl_policy

if 's3://' in self.dest_key and self.dest_bucket is not None:
raise TypeError('dest_bucket should be None when dest_key is provided as a full s3:// file path.')

def execute(self, context):
s3_hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
s3_hook.load_file(
self.filename,
self.dest_key,
self.dest_bucket,
self.replace,
self.encrypt,
self.gzip,
self.acl_policy,
)
93 changes: 93 additions & 0 deletions tests/providers/amazon/aws/transfers/test_local_to_s3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

import datetime
import os
import unittest

import boto3
from moto import mock_s3

from airflow.models.dag import DAG
from airflow.providers.amazon.aws.transfers.local_to_s3 import LocalFilesystemToS3Operator


class TestFileToS3Operator(unittest.TestCase):

_config = {'verify': False, 'replace': False, 'encrypt': False, 'gzip': False}

def setUp(self):
args = {'owner': 'airflow', 'start_date': datetime.datetime(2017, 1, 1)}
self.dag = DAG('test_dag_id', default_args=args)
self.dest_key = 'test/test1.csv'
self.dest_bucket = 'dummy'
self.testfile1 = '/tmp/fake1.csv'
with open(self.testfile1, 'wb') as f:
f.write(b"x" * 393216)

def tearDown(self):
os.remove(self.testfile1)

def test_init(self):
operator = LocalFilesystemToS3Operator(
task_id='file_to_s3_operator',
dag=self.dag,
filename=self.testfile1,
dest_key=self.dest_key,
dest_bucket=self.dest_bucket,
**self._config,
)
assert operator.filename == self.testfile1
assert operator.dest_key == self.dest_key
assert operator.dest_bucket == self.dest_bucket
assert operator.verify == self._config['verify']
assert operator.replace == self._config['replace']
assert operator.encrypt == self._config['encrypt']
assert operator.gzip == self._config['gzip']

def test_init_exception(self):
with self.assertRaises(TypeError):
LocalFilesystemToS3Operator(
task_id='file_to_s3_operatro_exception',
dag=self.dag,
filename=self.testfile1,
dest_key=f's3://dummy/{self.dest_key}',
dest_bucket=self.dest_bucket,
**self._config,
)

@mock_s3
def test_execute(self):
conn = boto3.client('s3')
conn.create_bucket(Bucket=self.dest_bucket)
operator = LocalFilesystemToS3Operator(
task_id='s3_to_file_sensor',
dag=self.dag,
filename=self.testfile1,
dest_key=self.dest_key,
dest_bucket=self.dest_bucket,
**self._config,
)
operator.execute(None)

objects_in_dest_bucket = conn.list_objects(Bucket=self.dest_bucket, Prefix=self.dest_key)
# there should be object found, and there should only be one object found
assert len(objects_in_dest_bucket['Contents']) == 1
# the object found should be consistent with dest_key specified earlier
assert objects_in_dest_bucket['Contents'][0]['Key'] == self.dest_key

0 comments on commit 1632c9f

Please sign in to comment.