Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The extra_args was not properly relayed within MultipartDownloader #503

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions boto3/s3/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ def download_file(self, bucket, key, filename, object_size,
# 1 thread for the future that manages IO writes.
download_parts_handler = functools.partial(
self._download_file_as_future,
bucket, key, filename, object_size, callback)
bucket, key, filename, object_size, extra_args, callback)
parts_future = controller.submit(download_parts_handler)

io_writes_handler = functools.partial(
Expand All @@ -483,13 +483,13 @@ def _process_future_results(self, futures):
future.result()

def _download_file_as_future(self, bucket, key, filename, object_size,
callback):
extra_args, callback):
part_size = self._config.multipart_chunksize
num_parts = int(math.ceil(object_size / float(part_size)))
max_workers = self._config.max_concurrency
download_partial = functools.partial(
self._download_range, bucket, key, filename,
part_size, num_parts, callback)
part_size, num_parts, extra_args, callback)
try:
with self._executor_cls(max_workers=max_workers) as executor:
list(executor.map(download_partial, range(num_parts)))
Expand All @@ -506,7 +506,8 @@ def _calculate_range_param(self, part_size, part_index, num_parts):
return range_param

def _download_range(self, bucket, key, filename,
part_size, num_parts, callback, part_index):
part_size, num_parts,
extra_args, callback, part_index):
try:
range_param = self._calculate_range_param(
part_size, part_index, num_parts)
Expand All @@ -517,7 +518,8 @@ def _download_range(self, bucket, key, filename,
try:
logger.debug("Making get_object call.")
response = self._client.get_object(
Bucket=bucket, Key=key, Range=range_param)
Bucket=bucket, Key=key, Range=range_param,
**extra_args)
streaming_body = StreamReaderProgress(
response['Body'], callback)
buffer_size = 1024 * 16
Expand Down
19 changes: 19 additions & 0 deletions tests/unit/s3/test_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
from contextlib import closing

import mock
from botocore.stub import Stubber
from botocore.session import Session
from botocore.vendored import six
from concurrent import futures

Expand Down Expand Up @@ -391,6 +393,23 @@ def test_multipart_download_with_multiple_parts(self):
mock.call(Range='bytes=4-7', **extra),
mock.call(Range='bytes=8-', **extra)])

def test_multipart_download_with_multiple_parts_and_extra_args(self):
client = Session().create_client('s3')
stubber = Stubber(client)
response_body = b'foobarbaz'
response = {'Body': six.BytesIO(response_body)}
expected_params = {
'Range': mock.ANY, 'Bucket': mock.ANY, 'Key': mock.ANY,
'RequestPayer': 'requester'}
stubber.add_response('get_object', response, expected_params)
stubber.activate()
downloader = MultipartDownloader(
client, TransferConfig(), InMemoryOSLayer({}), SequentialExecutor)
downloader.download_file(
'bucket', 'key', 'filename', len(response_body),
{'RequestPayer': 'requester'})
stubber.assert_no_pending_responses()

def test_retry_on_failures_from_stream_reads(self):
# If we get an exception during a call to the response body's .read()
# method, we should retry the request.
Expand Down