Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allow use of S3 single part uploads #400

Merged
merged 7 commits into from
Mar 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ script:
# The creds won't be available unless this is a branch belonging to the
# RaRe-Technologies organization.
#
- if [[ ${TRAVIS_SECURE_ENV_VARS} && ${RUN_BENCHMARKS} ]]; then
- if [[ "${TRAVIS_SECURE_ENV_VARS}" = "true" && "${RUN_BENCHMARKS}" = "true" ]]; then
export SO_S3_URL=$SO_S3_URL/$(python -c 'from uuid import uuid4;print(uuid4())');
pip install pytest_benchmark awscli;
set -e;
Expand All @@ -68,7 +68,7 @@ script:
#
# These integration tests require AWS creds and an initialized S3 bucket
#
- if [[ ${TRAVIS_SECURE_ENV_VARS} ]]; then
- if [[ "${TRAVIS_SECURE_ENV_VARS}" = "true" ]]; then
pytest integration-tests/test_s3_ported.py;
fi

Expand Down
10 changes: 10 additions & 0 deletions help.txt
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,16 @@ FUNCTIONS
multipart_upload_kwargs: dict, optional
Additional parameters to pass to boto3's initiate_multipart_upload function.
For writing only.
singlepart_upload_kwargs: dict, optional
Additional parameters to pass to boto3's S3.Object.put function when using single
part upload.
For writing only.
multipart_upload: bool, optional
Default: `True`
If set to `True`, will use multipart upload for writing to S3. If set
to `False`, S3 upload will use the S3 Single-Part Upload API, which
is more ideal for small file sizes.
For writing only.
version_id: str, optional
Version of the object, used when reading object. If None, will fetch the most recent version.

Expand Down
203 changes: 170 additions & 33 deletions smart_open/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ def open(
session=None,
resource_kwargs=None,
multipart_upload_kwargs=None,
multipart_upload=True,
singlepart_upload_kwargs=None,
object_kwargs=None,
):
"""Open an S3 object for reading or writing.
Expand All @@ -111,6 +113,16 @@ def open(
multipart_upload_kwargs: dict, optional
Additional parameters to pass to boto3's initiate_multipart_upload function.
For writing only.
singlepart_upload_kwargs: dict, optional
Additional parameters to pass to boto3's S3.Object.put function when using single
part upload.
For writing only.
multipart_upload: bool, optional
Default: `True`
If set to `True`, will use multipart upload for writing to S3. If set
to `False`, S3 upload will use the S3 Single-Part Upload API, which
is more ideal for small file sizes.
For writing only.
version_id: str, optional
Version of the object, used when reading object.
If None, will fetch the most recent version.
Expand All @@ -123,13 +135,6 @@ def open(
if mode not in MODES:
raise NotImplementedError('bad mode: %r expected one of %r' % (mode, MODES))

if resource_kwargs is None:
resource_kwargs = {}
if multipart_upload_kwargs is None:
multipart_upload_kwargs = {}
if object_kwargs is None:
object_kwargs = {}

if (mode == WRITE_BINARY) and (version_id is not None):
raise ValueError("version_id must be None when writing")

Expand All @@ -144,14 +149,23 @@ def open(
object_kwargs=object_kwargs,
)
elif mode == WRITE_BINARY:
fileobj = MultipartWriter(
bucket_id,
key_id,
min_part_size=min_part_size,
session=session,
multipart_upload_kwargs=multipart_upload_kwargs,
resource_kwargs=resource_kwargs,
)
if multipart_upload:
fileobj = MultipartWriter(
bucket_id,
key_id,
min_part_size=min_part_size,
session=session,
upload_kwargs=multipart_upload_kwargs,
resource_kwargs=resource_kwargs,
)
else:
fileobj = SinglepartWriter(
bucket_id,
key_id,
session=session,
upload_kwargs=singlepart_upload_kwargs,
resource_kwargs=resource_kwargs,
)
else:
assert False, 'unexpected mode: %r' % mode
return fileobj
Expand Down Expand Up @@ -468,7 +482,7 @@ def __repr__(self):


class MultipartWriter(io.BufferedIOBase):
"""Writes bytes to S3.
"""Writes bytes to S3 using the multi part API.

Implements the io.BufferedIOBase interface of the standard library."""

Expand All @@ -479,11 +493,8 @@ def __init__(
min_part_size=DEFAULT_MIN_PART_SIZE,
session=None,
resource_kwargs=None,
multipart_upload_kwargs=None,
upload_kwargs=None,
):

self._multipart_upload_kwargs = multipart_upload_kwargs

if min_part_size < MIN_MIN_PART_SIZE:
logger.warning("S3 requires minimum part size >= 5MB; \
multipart upload may fail")
Expand All @@ -492,19 +503,24 @@ def __init__(
session = boto3.Session()
if resource_kwargs is None:
resource_kwargs = {}
if multipart_upload_kwargs is None:
multipart_upload_kwargs = {}
if upload_kwargs is None:
upload_kwargs = {}

self._session = session
self._resource_kwargs = resource_kwargs
self._upload_kwargs = upload_kwargs

s3 = session.resource('s3', **resource_kwargs)
try:
self._object = s3.Object(bucket, key)
self._min_part_size = min_part_size
self._mp = self._object.initiate_multipart_upload(**multipart_upload_kwargs)
except botocore.client.ClientError:
raise ValueError('the bucket %r does not exist, or is forbidden for access' % bucket)
self._mp = self._object.initiate_multipart_upload(**self._upload_kwargs)
except botocore.client.ClientError as error:
raise ValueError(
'the bucket %r does not exist, or is forbidden for access (%r)' % (
bucket, error
)
)

self._buf = io.BytesIO()
self._total_bytes = 0
Expand Down Expand Up @@ -631,20 +647,141 @@ def __str__(self):

def __repr__(self):
return (
"smart_open.s3.MultipartWriter("
"bucket=%r, "
"key=%r, "
"min_part_size=%r, "
"session=%r, "
"resource_kwargs=%r, "
"multipart_upload_kwargs=%r)"
"smart_open.s3.MultipartWriter(bucket=%r, key=%r, "
"min_part_size=%r, session=%r, resource_kwargs=%r, upload_kwargs=%r)"
) % (
self._object.bucket_name,
self._object.key,
self._min_part_size,
self._session,
self._resource_kwargs,
self._multipart_upload_kwargs,
self._upload_kwargs,
)


class SinglepartWriter(io.BufferedIOBase):
"""Writes bytes to S3 using the single part API.

Implements the io.BufferedIOBase interface of the standard library.

This class buffers all of its input in memory until its `close` method is called. Only then will
the data be written to S3 and the buffer is released."""

def __init__(
self,
bucket,
key,
session=None,
resource_kwargs=None,
upload_kwargs=None,
):

self._session = session
self._resource_kwargs = resource_kwargs

if session is None:
session = boto3.Session()
if resource_kwargs is None:
resource_kwargs = {}
if upload_kwargs is None:
upload_kwargs = {}

self._upload_kwargs = upload_kwargs

s3 = session.resource('s3', **resource_kwargs)
try:
self._object = s3.Object(bucket, key)
s3.meta.client.head_bucket(Bucket=bucket)
except botocore.client.ClientError:
raise ValueError('the bucket %r does not exist, or is forbidden for access' % bucket)

self._buf = io.BytesIO()
self._total_bytes = 0

#
# This member is part of the io.BufferedIOBase interface.
#
self.raw = None

def flush(self):
pass

#
# Override some methods from io.IOBase.
#
def close(self):
if self._buf is None:
return

self._buf.seek(0)

try:
self._object.put(Body=self._buf, **self._upload_kwargs)
except botocore.client.ClientError:
raise ValueError(
'the bucket %r does not exist, or is forbidden for access' % self._object.bucket_name)

logger.debug("direct upload finished")
self._buf = None

@property
def closed(self):
return self._buf is None

def writable(self):
"""Return True if the stream supports writing."""
return True

def tell(self):
"""Return the current stream position."""
return self._total_bytes
mpenkov marked this conversation as resolved.
Show resolved Hide resolved

#
# io.BufferedIOBase methods.
#
def detach(self):
raise io.UnsupportedOperation("detach() not supported")

def write(self, b):
"""Write the given buffer (bytes, bytearray, memoryview or any buffer
interface implementation) into the buffer. Content of the buffer will be
written to S3 on close as a single-part upload.

For more information about buffers, see https://docs.python.org/3/c-api/buffer.html"""

length = self._buf.write(b)
self._total_bytes += length
return length

def terminate(self):
"""Nothing to cancel in single-part uploads."""
return

#
# Internal methods.
#
def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is not None:
self.terminate()
else:
self.close()

def __str__(self):
return "smart_open.s3.SinglepartWriter(%r, %r)" % (self._object.bucket_name, self._object.key)

def __repr__(self):
return (
"smart_open.s3.SinglepartWriter(bucket=%r, key=%r, session=%r, "
"resource_kwargs=%r, upload_kwargs=%r)"
) % (
self._object.bucket_name,
self._object.key,
self._session,
self._resource_kwargs,
self._upload_kwargs,
)


Expand Down
Loading