Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Propagate __exit__ call to underlying filestream #786

Merged
merged 6 commits into from
Feb 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def _get_version():
def read(fname):
return io.open(os.path.join(os.path.dirname(__file__), fname), encoding='utf-8').read()


base_deps = ['wrapt']
aws_deps = ['boto3']
gcs_deps = ['google-cloud-storage>=2.6.0']
azure_deps = ['azure-storage-blob', 'azure-common', 'azure-core']
Expand Down Expand Up @@ -70,6 +70,7 @@ def read(fname):
license='MIT',
platforms='any',

install_requires=base_deps,
tests_require=tests_require,
extras_require={
'test': tests_require,
Expand Down
5 changes: 3 additions & 2 deletions smart_open/compression.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ def register_compressor(ext, callback):
def tweak_close(outer, inner):
"""Ensure that closing the `outer` stream closes the `inner` stream as well.

Deprecated: smart_open.open().__exit__ now always calls __exit__ on the
underlying filestream.

Use this when your compression library's `close` method does not
automatically close the underlying filestream. See
https://github.com/RaRe-Technologies/smart_open/issues/630 for an
Expand All @@ -93,14 +96,12 @@ def close_both(*args):
def _handle_bz2(file_obj, mode):
from bz2 import BZ2File
result = BZ2File(file_obj, mode)
tweak_close(result, file_obj)
return result


def _handle_gzip(file_obj, mode):
import gzip
result = gzip.GzipFile(fileobj=file_obj, mode=mode)
tweak_close(result, file_obj)
return result


Expand Down
7 changes: 5 additions & 2 deletions smart_open/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -826,6 +826,7 @@ def close(self):
if self._buf.tell():
self._upload_next_part()

logger.debug('%s: completing multipart upload', self)
if self._total_bytes and self._upload_id:
partial = functools.partial(
self._client.complete_multipart_upload,
Expand All @@ -844,7 +845,6 @@ def close(self):
#
# We work around this by creating an empty file explicitly.
#
assert self._upload_id, "no multipart upload in progress"
self._client.abort_multipart_upload(
Bucket=self._bucket,
Key=self._key,
Expand Down Expand Up @@ -909,13 +909,16 @@ def write(self, b):

def terminate(self):
"""Cancel the underlying multipart upload."""
assert self._upload_id, "no multipart upload in progress"
if self._upload_id is None:
return
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

terminate() called before a multipart upload was initiated:

self = <smart_open.tests.test_smart_open.S3OpenTest testMethod=test_write_bad_encoding_strict>

    @moto.mock_s3
    def test_write_bad_encoding_strict(self):
        """Should open the file for writing with the correct encoding."""
        s3 = _resource('s3')
        s3.create_bucket(Bucket='bucket')
        key = "s3://bucket/key.txt"
        text = u'欲しい気持ちが成長しすぎて'
    
        with self.assertRaises(UnicodeEncodeError):
            with smart_open.open(key, 'w', encoding='koi8-r', errors='strict') as fout:
>               fout.write(text)

logger.debug('%s: terminating multipart upload', self)
self._client.abort_multipart_upload(
Bucket=self._bucket,
Key=self._key,
UploadId=self._upload_id,
)
self._upload_id = None
logger.debug('%s: terminated multipart upload', self)

def to_boto3(self, resource):
"""Create an **independent** `boto3.s3.Object` instance that points to
Expand Down
3 changes: 2 additions & 1 deletion smart_open/smart_open_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#
import smart_open.local_file as so_file
import smart_open.compression as so_compression
import smart_open.utils as so_utils

from smart_open import doctools
from smart_open import transport
Expand Down Expand Up @@ -248,7 +249,7 @@ def open(
except AttributeError:
pass

return decoded
return so_utils.FileLikeProxy(decoded, binary)


def _get_binary_mode(mode_str):
Expand Down
24 changes: 24 additions & 0 deletions smart_open/tests/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,30 @@ def test_writebuffer(self):

assert actual == contents

def test_write_gz_with_error(self):
"""Does s3 multipart upload abort when for a failed compressed file upload?"""
with self.assertRaises(ValueError):
with smart_open.open(
f's3://{BUCKET_NAME}/{WRITE_KEY_NAME}',
mode="wb",
compression='.gz',
transport_params={
"multipart_upload": True,
"min_part_size": 10,
}
) as fout:
fout.write(b"test12345678test12345678")
fout.write(b"test\n")

raise ValueError("some error")

# no multipart upload was committed:
# smart_open.s3.MultipartWriter.__exit__ was called
with self.assertRaises(OSError) as cm:
smart_open.s3.open(BUCKET_NAME, WRITE_KEY_NAME, 'rb')

assert 'The specified key does not exist.' in cm.exception.args[0]


@moto.mock_s3
class SinglepartWriterTest(unittest.TestCase):
Expand Down
15 changes: 15 additions & 0 deletions smart_open/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import logging
import urllib.parse

import wrapt

logger = logging.getLogger(__name__)

WORKAROUND_SCHEMES = ['s3', 's3n', 's3u', 's3a', 'gs']
Expand Down Expand Up @@ -189,3 +191,16 @@ def safe_urlsplit(url):

path = sr.path.replace(placeholder, '?')
return urllib.parse.SplitResult(sr.scheme, sr.netloc, path, '', '')


class FileLikeProxy(wrapt.ObjectProxy):
def __init__(self, outer, inner):
super().__init__(outer)
self.__inner = inner

def __exit__(self, *args, **kwargs):
"""Exit inner after exiting outer."""
try:
super().__exit__(*args, **kwargs)
finally:
self.__inner.__exit__(*args, **kwargs)
Loading