Skip to content

Commit

Permalink
Merge branch 'upstream-master' into feature/s3-remove-batch
Browse files Browse the repository at this point in the history
* upstream-master:
  Add Python 3.7 compatibility (spotify#2466)
  Refactor s3 copy into sub-methods (spotify#2508)
  Remove s3 bucket validation prior to file upload (spotify#2528)
  Version 2.7.9
  set upper bound of python-daemon
  Update MockTarget mode to accept r* or w* (spotify#2519)
  • Loading branch information
dlstadther committed Sep 28, 2018
2 parents 4fc1f6d + 4389332 commit 953906c
Show file tree
Hide file tree
Showing 8 changed files with 130 additions and 95 deletions.
23 changes: 16 additions & 7 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
language: python

dist: xenial

services:
- elasticsearch
- mysql
- docker
- postgresql

env:
global:
Expand All @@ -22,7 +25,6 @@ env:
- TOXENV=flake8
- TOXENV=docs
- TOXENV=py27-nonhdfs
- TOXENV=py34-nonhdfs
- TOXENV=py27-unixsocket
# - TOXENV=py27-cdh
# minicluster (cdh) tests disabled as lack of love, #2140.
Expand All @@ -33,18 +35,25 @@ env:
# - TOXENV=visualiser
# Disabling this test because of intermittent failures :-/

# Python 3.5 has to go here until Travis adds it to the default build images.
# https://github.com/travis-ci/travis-ci/issues/4794#issuecomment-143758799
sudo: false

# Python 3.7 needs to have its dist/sudo flags set in matrix so they don't
# break the other python builds.
# https://github.com/travis-ci/travis-ci/issues/9815
matrix:
include:
- python: 3.5
env: TOXENV=py35-nonhdfs
- python: 3.6
env: TOXENV=py36-nonhdfs
- python: 3.6
env: TOXENV=py36-unixsocket

sudo: false
- python: 3.7
dist: xenial
sudo: true
env: TOXENV=py37-nonhdfs
- python: 3.7
dist: xenial
sudo: true
env: TOXENV=py37-unixsocket

cache:
directories:
Expand Down
2 changes: 1 addition & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
.. image:: https://img.shields.io/pypi/l/luigi.svg?style=flat
:target: https://pypi.python.org/pypi/luigi

Luigi is a Python (2.7, 3.3, 3.4, 3.5, 3.6) package that helps you build complex
Luigi is a Python (2.7, 3.6, 3.7 tested) package that helps you build complex
pipelines of batch jobs. It handles dependency resolution, workflow management,
visualization, handling failures, command line integration, and much more.

Expand Down
140 changes: 61 additions & 79 deletions luigi/contrib/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@
import os
import os.path
import warnings

from multiprocessing.pool import ThreadPool

import botocore
from boto3.s3.transfer import TransferConfig

try:
from urlparse import urlsplit
Expand Down Expand Up @@ -94,6 +94,8 @@ class S3Client(FileSystem):
"""

_s3 = None
DEFAULT_PART_SIZE = 8388608
DEFAULT_THREADS = 100

def __init__(self, aws_access_key_id=None, aws_secret_access_key=None,
**kwargs):
Expand Down Expand Up @@ -269,14 +271,11 @@ def put_string(self, content, destination_s3_path, **kwargs):
self._check_deprecated_argument(**kwargs)
(bucket, key) = self._path_to_bucket_and_key(destination_s3_path)

# validate the bucket
self._validate_bucket(bucket)

# put the file
self.s3.meta.client.put_object(
Key=key, Bucket=bucket, Body=content, **kwargs)

def put_multipart(self, local_path, destination_s3_path, part_size=8388608, **kwargs):
def put_multipart(self, local_path, destination_s3_path, part_size=DEFAULT_PART_SIZE, **kwargs):
"""
Put an object stored locally to an S3 path
using S3 multi-part upload (for files > 8Mb).
Expand All @@ -294,13 +293,11 @@ def put_multipart(self, local_path, destination_s3_path, part_size=8388608, **kw

(bucket, key) = self._path_to_bucket_and_key(destination_s3_path)

# validate the bucket
self._validate_bucket(bucket)

self.s3.meta.client.upload_fileobj(
Fileobj=open(local_path, 'rb'), Bucket=bucket, Key=key, Config=transfer_config, ExtraArgs=kwargs)

def copy(self, source_path, destination_path, threads=100, start_time=None, end_time=None, part_size=8388608, **kwargs):
def copy(self, source_path, destination_path, threads=DEFAULT_THREADS, start_time=None, end_time=None,
part_size=DEFAULT_PART_SIZE, **kwargs):
"""
Copy object(s) from one S3 location to another. Works for individual keys or entire directories.
When files are larger than `part_size`, multipart uploading will be used.
Expand All @@ -314,71 +311,69 @@ def copy(self, source_path, destination_path, threads=100, start_time=None, end_
:returns tuple (number_of_files_copied, total_size_copied_in_bytes)
"""

start = datetime.datetime.now()

(src_bucket, src_key) = self._path_to_bucket_and_key(source_path)
(dst_bucket, dst_key) = self._path_to_bucket_and_key(destination_path)

# don't allow threads to be less than 3
threads = 3 if threads < 3 else threads
from boto3.s3.transfer import TransferConfig

transfer_config = TransferConfig(max_concurrency=threads, multipart_chunksize=part_size)
total_keys = 0
total_size_bytes = 0

if self.isdir(source_path):
copy_jobs = []
management_pool = ThreadPool(processes=threads)

(bucket, key) = self._path_to_bucket_and_key(source_path)
key_path = self._add_path_delimiter(key)
key_path_len = len(key_path)
src_prefix = self._add_path_delimiter(src_key)
dst_prefix = self._add_path_delimiter(dst_key)

for item in self.list(source_path, start_time=start_time, end_time=end_time, return_key=True):
path = item.key[key_path_len:]
# prevents copy attempt of empty key in folder
if path != '' and path != '/':
total_keys += 1
total_size_bytes += item.size
copy_source = {
'Bucket': src_bucket,
'Key': src_prefix + path
}

the_kwargs = {'Config': transfer_config, 'ExtraArgs': kwargs}
job = management_pool.apply_async(self.s3.meta.client.copy,
args=(copy_source, dst_bucket, dst_prefix + path),
kwds=the_kwargs)
copy_jobs.append(job)

# Wait for the pools to finish scheduling all the copies
management_pool.close()
management_pool.join()

# Raise any errors encountered in any of the copy processes
for result in copy_jobs:
result.get()

end = datetime.datetime.now()
duration = end - start
logger.info('%s : Complete : %s total keys copied in %s' %
(datetime.datetime.now(), total_keys, duration))
return self._copy_dir(source_path, destination_path, threads=threads,
start_time=start_time, end_time=end_time, part_size=part_size, **kwargs)

# If the file isn't a directory just perform a simple copy
else:
total_keys += 1
copy_source = {
'Bucket': src_bucket,
'Key': src_key
}
item = self.get_key(source_path)
total_size_bytes += item.size
self.s3.meta.client.copy(
copy_source, dst_bucket, dst_key, Config=transfer_config, ExtraArgs=kwargs)
return self._copy_file(source_path, destination_path, threads=threads, part_size=part_size, **kwargs)

def _copy_file(self, source_path, destination_path, threads=DEFAULT_THREADS, part_size=DEFAULT_PART_SIZE, **kwargs):
src_bucket, src_key = self._path_to_bucket_and_key(source_path)
dst_bucket, dst_key = self._path_to_bucket_and_key(destination_path)
transfer_config = TransferConfig(max_concurrency=threads, multipart_chunksize=part_size)
item = self.get_key(source_path)
copy_source = {
'Bucket': src_bucket,
'Key': src_key
}

self.s3.meta.client.copy(copy_source, dst_bucket, dst_key, Config=transfer_config, ExtraArgs=kwargs)

return 1, item.size

def _copy_dir(self, source_path, destination_path, threads=DEFAULT_THREADS,
start_time=None, end_time=None, part_size=DEFAULT_PART_SIZE, **kwargs):
start = datetime.datetime.now()
copy_jobs = []
management_pool = ThreadPool(processes=threads)
transfer_config = TransferConfig(max_concurrency=threads, multipart_chunksize=part_size)
src_bucket, src_key = self._path_to_bucket_and_key(source_path)
dst_bucket, dst_key = self._path_to_bucket_and_key(destination_path)
src_prefix = self._add_path_delimiter(src_key)
dst_prefix = self._add_path_delimiter(dst_key)
key_path_len = len(src_prefix)
total_size_bytes = 0
total_keys = 0
for item in self.list(source_path, start_time=start_time, end_time=end_time, return_key=True):
path = item.key[key_path_len:]
# prevents copy attempt of empty key in folder
if path != '' and path != '/':
total_keys += 1
total_size_bytes += item.size
copy_source = {
'Bucket': src_bucket,
'Key': src_prefix + path
}
the_kwargs = {'Config': transfer_config, 'ExtraArgs': kwargs}
job = management_pool.apply_async(self.s3.meta.client.copy,
args=(copy_source, dst_bucket, dst_prefix + path),
kwds=the_kwargs)
copy_jobs.append(job)
# Wait for the pools to finish scheduling all the copies
management_pool.close()
management_pool.join()
# Raise any errors encountered in any of the copy processes
for result in copy_jobs:
result.get()
end = datetime.datetime.now()
duration = end - start
logger.info('%s : Complete : %s total keys copied in %s' %
(datetime.datetime.now(), total_keys, duration))
return total_keys, total_size_bytes

def get(self, s3_path, destination_local_path):
Expand Down Expand Up @@ -540,19 +535,6 @@ def _check_deprecated_argument(**kwargs):
'For region names, refer to the amazon S3 region documentation\n'
'https://docs.aws.amazon.com/general/latest/gr/rande.html#s3_region')

def _validate_bucket(self, bucket_name):
exists = True

try:
self.s3.meta.client.head_bucket(Bucket=bucket_name)
except botocore.exceptions.ClientError as e:
error_code = e.response['Error']['Code']
if error_code in ('404', 'NoSuchBucket'):
exists = False
else:
raise
return exists

def _exists(self, bucket, key):
try:
self.s3.Object(bucket, key).load()
Expand Down
10 changes: 5 additions & 5 deletions luigi/mock.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def rename(self, *args, **kwargs):
"""
self.move(*args, **kwargs)

def open(self, mode):
def open(self, mode='r'):
fn = self.path
mock_target = self

Expand All @@ -158,7 +158,7 @@ def write(self, data):
super(Buffer, self).write(data)

def close(self):
if mode == 'w':
if mode[0] == 'w':
try:
mock_target.wrapper.flush()
except AttributeError:
Expand All @@ -174,15 +174,15 @@ def __enter__(self):
return self

def readable(self):
return mode == 'r'
return mode[0] == 'r'

def writeable(self):
return mode == 'w'
return mode[0] == 'w'

def seekable(self):
return False

if mode == 'w':
if mode[0] == 'w':
wrapper = self.format.pipe_writer(Buffer())
wrapper.set_wrapper(wrapper)
return wrapper
Expand Down
7 changes: 5 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ def get_static_files(path):

install_requires = [
'tornado>=4.0,<5',
'python-daemon<3.0',
# https://pagure.io/python-daemon/issue/18
'python-daemon<2.2.0',
]

if os.environ.get('READTHEDOCS', None) == 'True':
Expand All @@ -54,7 +55,7 @@ def get_static_files(path):

setup(
name='luigi',
version='2.7.8',
version='2.7.9',
description='Workflow mgmgt + task scheduling + dependency resolution',
long_description=long_description,
author='The Luigi Authors',
Expand Down Expand Up @@ -94,6 +95,8 @@ def get_static_files(path):
'Programming Language :: Python :: 3.3',
'Programming Language :: Python :: 3.4',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
'Topic :: System :: Monitoring',
],
)
18 changes: 18 additions & 0 deletions test/contrib/s3_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,12 @@ def test_put(self):
s3_client.put(self.tempFilePath, 's3://mybucket/putMe')
self.assertTrue(s3_client.exists('s3://mybucket/putMe'))

def test_put_no_such_bucket(self):
# intentionally don't create bucket
s3_client = S3Client(AWS_ACCESS_KEY, AWS_SECRET_KEY)
with self.assertRaises(s3_client.s3.meta.client.exceptions.NoSuchBucket):
s3_client.put(self.tempFilePath, 's3://mybucket/putMe')

def test_put_sse_deprecated(self):
create_bucket()
s3_client = S3Client(AWS_ACCESS_KEY, AWS_SECRET_KEY)
Expand All @@ -197,6 +203,12 @@ def test_put_string(self):
s3_client.put_string("SOMESTRING", 's3://mybucket/putString')
self.assertTrue(s3_client.exists('s3://mybucket/putString'))

def test_put_string_no_such_bucket(self):
# intentionally don't create bucket
s3_client = S3Client(AWS_ACCESS_KEY, AWS_SECRET_KEY)
with self.assertRaises(s3_client.s3.meta.client.exceptions.NoSuchBucket):
s3_client.put_string("SOMESTRING", 's3://mybucket/putString')

def test_put_string_sse_deprecated(self):
create_bucket()
s3_client = S3Client(AWS_ACCESS_KEY, AWS_SECRET_KEY)
Expand Down Expand Up @@ -259,6 +271,12 @@ def test_put_multipart_less_than_split_size(self):
file_size = 5000
return self._run_multipart_test(part_size, file_size)

def test_put_multipart_no_such_bucket(self):
# intentionally don't create bucket
s3_client = S3Client(AWS_ACCESS_KEY, AWS_SECRET_KEY)
with self.assertRaises(s3_client.s3.meta.client.exceptions.NoSuchBucket):
s3_client.put_multipart(self.tempFilePath, 's3://mybucket/putMe')

def test_exists(self):
create_bucket()
s3_client = S3Client(AWS_ACCESS_KEY, AWS_SECRET_KEY)
Expand Down
Loading

0 comments on commit 953906c

Please sign in to comment.