Skip to content

Commit

Permalink
Merge branch 's3-configuration' into develop
Browse files Browse the repository at this point in the history
* s3-configuration:
  Support IEC suffix for sizes
  Fix typo in doc string
  Add test for --force option that was moved
  Fix s3 streaming test for memory utilization
  Plumb through S3 runtime config
  Add class for merging converting s3 runtime config
  Move constants to transferconfig
  Move part size constants out to their respective modules
  Remove trailing/leading whitespace
  Remove session dep from CommandParameters
  Remove unused method
  Cleanup code formatting
  • Loading branch information
jamesls committed Feb 6, 2015
2 parents 7af0f46 + 4efade1 commit 9911da3
Show file tree
Hide file tree
Showing 13 changed files with 475 additions and 162 deletions.
20 changes: 0 additions & 20 deletions awscli/customizations/s3/constants.py

This file was deleted.

3 changes: 3 additions & 0 deletions awscli/customizations/s3/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ class Executor(object):
def __init__(self, num_threads, result_queue, quiet,
only_show_errors, max_queue_size, write_queue):
self._max_queue_size = max_queue_size
LOGGER.debug("Using max queue size for s3 tasks of: %s",
self._max_queue_size)
self.queue = StablePriorityQueue(maxsize=self._max_queue_size,
max_priority=20)
self.num_threads = num_threads
Expand Down Expand Up @@ -78,6 +80,7 @@ def start(self):
# explicit about it rather than relying on the threads_list order.
# See .join() for more info.
self.print_thread.start()
LOGGER.debug("Using a threadpool size of: %s", self.num_threads)
for i in range(self.num_threads):
worker = Worker(queue=self.queue)
worker.setDaemon(True)
Expand Down
42 changes: 29 additions & 13 deletions awscli/customizations/s3/s3handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,22 @@
import os
import sys

from awscli.customizations.s3.constants import MULTI_THRESHOLD, CHUNKSIZE, \
NUM_THREADS, MAX_UPLOAD_SIZE, MAX_QUEUE_SIZE
from awscli.customizations.s3.utils import find_chunksize, \
operate, find_bucket_key, relative_path, PrintTask, create_warning
from awscli.customizations.s3.executor import Executor
from awscli.customizations.s3 import tasks
from awscli.customizations.s3.transferconfig import RuntimeConfig
from awscli.compat import six
from awscli.compat import queue


LOGGER = logging.getLogger(__name__)
# Maximum object size allowed in S3.
# See: http://docs.aws.amazon.com/AmazonS3/latest/dev/qfacts.html
MAX_UPLOAD_SIZE = 5 * (1024 ** 4)

CommandResult = namedtuple('CommandResult',
['num_tasks_failed', 'num_tasks_warned'])
['num_tasks_failed', 'num_tasks_warned'])


class S3Handler(object):
Expand All @@ -39,12 +41,13 @@ class S3Handler(object):
class pull tasks from to complete.
"""
MAX_IO_QUEUE_SIZE = 20
MAX_EXECUTOR_QUEUE_SIZE = MAX_QUEUE_SIZE
EXECUTOR_NUM_THREADS = NUM_THREADS

def __init__(self, session, params, result_queue=None,
multi_threshold=MULTI_THRESHOLD, chunksize=CHUNKSIZE):
runtime_config=None):
self.session = session
if runtime_config is None:
runtime_config = RuntimeConfig.defaults()
self._runtime_config = runtime_config
# The write_queue has potential for optimizations, so the constant
# for maxsize is scoped to this class (as opposed to constants.py)
# so we have the ability to change this value later.
Expand All @@ -65,14 +68,16 @@ def __init__(self, session, params, result_queue=None,
for key in self.params.keys():
if key in params:
self.params[key] = params[key]
self.multi_threshold = multi_threshold
self.chunksize = chunksize
self.multi_threshold = self._runtime_config['multipart_threshold']
self.chunksize = self._runtime_config['multipart_chunksize']
LOGGER.debug("Using a multipart threshold of %s and a part size of %s",
self.multi_threshold, self.chunksize)
self.executor = Executor(
num_threads=self.EXECUTOR_NUM_THREADS,
num_threads=self._runtime_config['max_concurrent_requests'],
result_queue=self.result_queue,
quiet=self.params['quiet'],
only_show_errors=self.params['only_show_errors'],
max_queue_size=self.MAX_EXECUTOR_QUEUE_SIZE,
max_queue_size=self._runtime_config['max_queue_size'],
write_queue=self.write_queue
)
self._multipart_uploads = []
Expand Down Expand Up @@ -111,7 +116,7 @@ def call(self, files):
priority=self.executor.IMMEDIATE_PRIORITY)
self._shutdown()
self.executor.wait_until_shutdown()

return CommandResult(self.executor.num_tasks_failed,
self.executor.num_tasks_warned)

Expand Down Expand Up @@ -350,12 +355,23 @@ class S3StreamHandler(S3Handler):
involves a stream since the logic is different when uploading and
downloading streams.
"""

# This ensures that the number of multipart chunks waiting in the
# executor queue and in the threads is limited.
MAX_EXECUTOR_QUEUE_SIZE = 2
EXECUTOR_NUM_THREADS = 6

def __init__(self, session, params, result_queue=None,
runtime_config=None):
if runtime_config is None:
# Rather than using the .defaults(), streaming
# has different default values so that it does not
# consume large amounts of memory.
runtime_config = RuntimeConfig().build_config(
max_queue_size=self.MAX_EXECUTOR_QUEUE_SIZE,
max_concurrent_requests=self.EXECUTOR_NUM_THREADS)
super(S3StreamHandler, self).__init__(session, params, result_queue,
runtime_config)

def _enqueue_tasks(self, files):
total_files = 0
total_parts = 0
Expand Down Expand Up @@ -490,7 +506,7 @@ def _enqueue_upload_tasks(self, num_uploads, chunksize, upload_context,
task_class=task_class,
payload=payload
)
num_uploads += 1
num_uploads += 1
if not is_remaining:
break
# Once there is no more data left, announce to the context how
Expand Down
Loading

0 comments on commit 9911da3

Please sign in to comment.