Skip to content

Commit

Permalink
Merge pull request #503 from rayluo/fix-bug-when-downloading-large-fi…
Browse files Browse the repository at this point in the history
…le-from-requester-paid-bucket

The extra_args was not properly relayed within MultipartDownloader
  • Loading branch information
rayluo committed Feb 22, 2016
2 parents 4d5888b + 9e7f58e commit a8afc9f
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 5 deletions.
12 changes: 7 additions & 5 deletions boto3/s3/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ def download_file(self, bucket, key, filename, object_size,
# 1 thread for the future that manages IO writes.
download_parts_handler = functools.partial(
self._download_file_as_future,
bucket, key, filename, object_size, callback)
bucket, key, filename, object_size, extra_args, callback)
parts_future = controller.submit(download_parts_handler)

io_writes_handler = functools.partial(
Expand All @@ -483,13 +483,13 @@ def _process_future_results(self, futures):
future.result()

def _download_file_as_future(self, bucket, key, filename, object_size,
callback):
extra_args, callback):
part_size = self._config.multipart_chunksize
num_parts = int(math.ceil(object_size / float(part_size)))
max_workers = self._config.max_concurrency
download_partial = functools.partial(
self._download_range, bucket, key, filename,
part_size, num_parts, callback)
part_size, num_parts, extra_args, callback)
try:
with self._executor_cls(max_workers=max_workers) as executor:
list(executor.map(download_partial, range(num_parts)))
Expand All @@ -506,7 +506,8 @@ def _calculate_range_param(self, part_size, part_index, num_parts):
return range_param

def _download_range(self, bucket, key, filename,
part_size, num_parts, callback, part_index):
part_size, num_parts,
extra_args, callback, part_index):
try:
range_param = self._calculate_range_param(
part_size, part_index, num_parts)
Expand All @@ -517,7 +518,8 @@ def _download_range(self, bucket, key, filename,
try:
logger.debug("Making get_object call.")
response = self._client.get_object(
Bucket=bucket, Key=key, Range=range_param)
Bucket=bucket, Key=key, Range=range_param,
**extra_args)
streaming_body = StreamReaderProgress(
response['Body'], callback)
buffer_size = 1024 * 16
Expand Down
19 changes: 19 additions & 0 deletions tests/unit/s3/test_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
from contextlib import closing

import mock
from botocore.stub import Stubber
from botocore.session import Session
from botocore.vendored import six
from concurrent import futures

Expand Down Expand Up @@ -391,6 +393,23 @@ def test_multipart_download_with_multiple_parts(self):
mock.call(Range='bytes=4-7', **extra),
mock.call(Range='bytes=8-', **extra)])

def test_multipart_download_with_multiple_parts_and_extra_args(self):
client = Session().create_client('s3')
stubber = Stubber(client)
response_body = b'foobarbaz'
response = {'Body': six.BytesIO(response_body)}
expected_params = {
'Range': mock.ANY, 'Bucket': mock.ANY, 'Key': mock.ANY,
'RequestPayer': 'requester'}
stubber.add_response('get_object', response, expected_params)
stubber.activate()
downloader = MultipartDownloader(
client, TransferConfig(), InMemoryOSLayer({}), SequentialExecutor)
downloader.download_file(
'bucket', 'key', 'filename', len(response_body),
{'RequestPayer': 'requester'})
stubber.assert_no_pending_responses()

def test_retry_on_failures_from_stream_reads(self):
# If we get an exception during a call to the response body's .read()
# method, we should retry the request.
Expand Down

0 comments on commit a8afc9f

Please sign in to comment.