Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add logic for handling large files in MultipartWriter uploads to s3 #796

Merged
merged 11 commits into from
Feb 22, 2024
102 changes: 84 additions & 18 deletions smart_open/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@
# from the MIT License (MIT).
#
"""Implements file-like objects for reading and writing from/to AWS S3."""
from __future__ import annotations
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mpenkov this breaks compatibility for py3.6

Copy link
Collaborator

@mpenkov mpenkov Mar 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we no longer support Py3.6

python_requires=">=3.7,<4.0",


import io
import functools
import logging
import time
import warnings
from typing import TYPE_CHECKING

try:
import boto3
Expand All @@ -27,13 +29,33 @@

from smart_open import constants

if TYPE_CHECKING:
from mypy_boto3_s3.client import S3Client
from typing_extensions import Buffer

logger = logging.getLogger(__name__)

DEFAULT_MIN_PART_SIZE = 50 * 1024**2
"""Default minimum part size for S3 multipart uploads"""
MIN_MIN_PART_SIZE = 5 * 1024 ** 2
#
# AWS puts restrictions on the part size for multipart uploads.
# Each part must be more than 5MB, and less than 5GB.
#
# On top of that, our MultipartWriter has a min_part_size option.
# In retrospect, it's an unfortunate name, because it conflicts with the
# minimum allowable part size (5MB), but it's too late to change it, because
# people are using that parameter (unlike the MIN, DEFAULT, MAX constants).
# It really just means "part size": as soon as you have this many bytes,
# write a part to S3 (see the MultipartWriter.write method).
#

MIN_PART_SIZE = 5 * 1024 ** 2
"""The absolute minimum permitted by Amazon."""

DEFAULT_PART_SIZE = 50 * 1024**2
"""The default part size for S3 multipart uploads, chosen carefully by smart_open"""

MAX_PART_SIZE = 5 * 1024 ** 3
"""The absolute maximum permitted by Amazon."""

SCHEMES = ("s3", "s3n", 's3u', "s3a")
DEFAULT_PORT = 443
DEFAULT_HOST = 's3.amazonaws.com'
Expand Down Expand Up @@ -241,12 +263,13 @@ def open(
mode,
version_id=None,
buffer_size=DEFAULT_BUFFER_SIZE,
min_part_size=DEFAULT_MIN_PART_SIZE,
min_part_size=DEFAULT_PART_SIZE,
multipart_upload=True,
defer_seek=False,
client=None,
client_kwargs=None,
writebuffer=None,
max_part_size=MAX_PART_SIZE,
):
"""Open an S3 object for reading or writing.

Expand Down Expand Up @@ -317,6 +340,7 @@ def open(
client=client,
client_kwargs=client_kwargs,
writebuffer=writebuffer,
max_part_size=max_part_size,
)
else:
fileobj = SinglepartWriter(
Expand Down Expand Up @@ -776,17 +800,39 @@ def __init__(
self,
bucket,
key,
min_part_size=DEFAULT_MIN_PART_SIZE,
min_part_size=DEFAULT_PART_SIZE,
client=None,
client_kwargs=None,
writebuffer=None,
writebuffer: io.BytesIO | None = None,
max_part_size=MAX_PART_SIZE,
):
if min_part_size < MIN_MIN_PART_SIZE:
logger.warning("S3 requires minimum part size >= 5MB; \
multipart upload may fail")
#
# As part of parameter checking, we need to ensure:
#
# 1) We're within the limits set by AWS
# 2) The specified minimum is less than the specified maximum (sanity)
#
# Adjust the parameters as needed, because otherwise, writes _will_ fail.
#
min_ps = smart_open.utils.clamp(min_part_size, MIN_PART_SIZE, MAX_PART_SIZE)
max_ps = smart_open.utils.clamp(max_part_size, MIN_PART_SIZE, MAX_PART_SIZE)
min_ps = min(min_ps, max_ps)
max_ps = max(min_ps, max_ps)

if min_ps != min_part_size:
logger.warning(f"adjusting min_part_size from {min_part_size} to {min_ps}")
min_part_size = min_ps
if max_ps != max_part_size:
logger.warning(f"adjusting max_part_size from {max_part_size} to {max_ps}")
max_part_size = max_ps

self._min_part_size = min_part_size
self._max_part_size = max_part_size

_initialize_boto3(self, client, client_kwargs, bucket, key)
self._client: S3Client
self._bucket: str
self._key: str

try:
partial = functools.partial(
Expand All @@ -809,12 +855,12 @@ def __init__(

self._total_bytes = 0
self._total_parts = 0
self._parts = []
self._parts: list[dict[str, object]] = []

#
# This member is part of the io.BufferedIOBase interface.
#
self.raw = None
self.raw = None # type: ignore[assignment]

def flush(self):
pass
Expand Down Expand Up @@ -890,22 +936,42 @@ def tell(self):
def detach(self):
raise io.UnsupportedOperation("detach() not supported")

def write(self, b):
def write(self, b: Buffer) -> int:
"""Write the given buffer (bytes, bytearray, memoryview or any buffer
interface implementation) to the S3 file.

For more information about buffers, see https://docs.python.org/3/c-api/buffer.html

There's buffering happening under the covers, so this may not actually
do any HTTP transfer right away."""
#
# Part size: 5 MiB to 5 GiB. There is no minimum size limit on the last
# part of your multipart upload.

length = self._buf.write(b)
self._total_bytes += length
# botocore does not accept memoryview, otherwise we could've gotten
# away with not needing to write a copy to the buffer aside from cases
# where b is smaller than min_part_size
#

if self._buf.tell() >= self._min_part_size:
self._upload_next_part()
offset = 0
mv = memoryview(b)
self._total_bytes += len(mv)

return length
while offset < len(mv):
start = offset
end = offset + self._max_part_size - self._buf.tell()
mpenkov marked this conversation as resolved.
Show resolved Hide resolved
self._buf.write(mv[start:end])
if self._buf.tell() < self._min_part_size:
#
# Not enough data to write a new part just yet. The assert
# ensures that we've consumed all of the input buffer.
#
assert end >= len(mv)
return len(mv)

self._upload_next_part()
offset = end
return len(mv)

def terminate(self):
"""Cancel the underlying multipart upload."""
Expand All @@ -928,7 +994,7 @@ def to_boto3(self, resource):
#
# Internal methods.
#
def _upload_next_part(self):
def _upload_next_part(self) -> None:
part_num = self._total_parts + 1
logger.info(
"%s: uploading part_num: %i, %i bytes (total %.3fGB)",
Expand Down
92 changes: 76 additions & 16 deletions smart_open/tests/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,8 @@ def test_every_second_read_fails(self):
class ReaderTest(BaseTest):
def setUp(self):
# lower the multipart upload size, to speed up these tests
self.old_min_part_size = smart_open.s3.DEFAULT_MIN_PART_SIZE
smart_open.s3.DEFAULT_MIN_PART_SIZE = 5 * 1024**2
self.old_min_part_size = smart_open.s3.DEFAULT_PART_SIZE
smart_open.s3.DEFAULT_PART_SIZE = 5 * 1024**2

ignore_resource_warnings()

Expand All @@ -207,7 +207,7 @@ def setUp(self):
s3.Object(BUCKET_NAME, KEY_NAME).put(Body=self.body)

def tearDown(self):
smart_open.s3.DEFAULT_MIN_PART_SIZE = self.old_min_part_size
smart_open.s3.DEFAULT_PART_SIZE = self.old_min_part_size

def test_iter(self):
"""Are S3 files iterated over correctly?"""
Expand Down Expand Up @@ -456,21 +456,30 @@ def test_write_02(self):

def test_write_03(self):
"""Does s3 multipart chunking work correctly?"""
# write
smart_open_write = smart_open.s3.MultipartWriter(
BUCKET_NAME, WRITE_KEY_NAME, min_part_size=10
)
with smart_open_write as fout:
fout.write(b"test")
self.assertEqual(fout._buf.tell(), 4)
min_ps = smart_open.s3.MIN_PART_SIZE
max_ps = smart_open.s3.MAX_PART_SIZE

try:
smart_open.s3.MIN_PART_SIZE = 1
smart_open.s3.MAX_PART_SIZE = 100

smart_open_write = smart_open.s3.MultipartWriter(
BUCKET_NAME, WRITE_KEY_NAME, min_part_size=10
)
with smart_open_write as fout:
fout.write(b"test")
self.assertEqual(fout._buf.tell(), 4)

fout.write(b"test\n")
self.assertEqual(fout._buf.tell(), 9)
self.assertEqual(fout._total_parts, 0)
fout.write(b"test\n")
self.assertEqual(fout._buf.tell(), 9)
self.assertEqual(fout._total_parts, 0)

fout.write(b"test")
self.assertEqual(fout._buf.tell(), 0)
self.assertEqual(fout._total_parts, 1)
fout.write(b"test")
self.assertEqual(fout._buf.tell(), 0)
self.assertEqual(fout._total_parts, 1)
finally:
smart_open.s3.MIN_PART_SIZE = min_ps
smart_open.s3.MAX_PART_SIZE = max_ps
mpenkov marked this conversation as resolved.
Show resolved Hide resolved

# read back the same key and check its content
output = list(smart_open.s3.open(BUCKET_NAME, WRITE_KEY_NAME, 'rb'))
Expand Down Expand Up @@ -565,6 +574,57 @@ def test_writebuffer(self):

assert actual == contents

def test_max_part_size_1(self) -> None:
"""write successive chunks of size 5MiB-1 with a min_part_size of 5MiB and max_part_size=7MiB

There are no minimum size limits of the last part of a multipart upload, which
is why test_write03 can get away with small test data. But since we need to get
multiple parts we cannot avoid that.
"""
contents = bytes(5 * 2**20 - 1)

with smart_open.s3.open(
BUCKET_NAME,
WRITE_KEY_NAME,
"wb",
min_part_size=5 * 2**20,
max_part_size=7 * 2**20,
) as fout:
fout.write(contents)
assert fout._total_parts == 0
assert fout._buf.tell() == 5 * 2**20 - 1

fout.write(contents)
assert fout._total_parts == 1
assert fout._buf.tell() == 3 * 2**20 - 2

fout.write(contents)
assert fout._total_parts == 2
assert fout._buf.tell() == 1 * 2**20 - 3
contents = b""

output = list(smart_open.s3.open(BUCKET_NAME, WRITE_KEY_NAME, "rb"))
assert len(output[0]) == 3 * (5 * 2**20 - 1)

def test_max_part_size_2(self) -> None:
"""Do a single big write of 15MiB with a max_part_size of 5MiB"""
contents = bytes(15 * 2**20)

with smart_open.s3.open(
BUCKET_NAME,
WRITE_KEY_NAME,
"wb",
min_part_size=5 * 2**20,
max_part_size=5 * 2**20,
) as fout:
fout.write(contents)
assert fout._total_parts == 3
assert fout._buf.tell() == 0
contents = b""

output = list(smart_open.s3.open(BUCKET_NAME, WRITE_KEY_NAME, "rb"))
assert len(output[0]) == 15 * 2**20


@moto.mock_s3
class SinglepartWriterTest(unittest.TestCase):
Expand Down
Loading