Skip to content

Commit

Permalink
Propagate __exit__ call to underlying filestream (piskvorky#786)
Browse files Browse the repository at this point in the history
* Propagate __exit__ call to underlying filestream

* Switch to contextmanager

* Switch to wrapt.ObjectProxy

* Allow terminating when no part was uploaded yet

* Add debug logs like with close

* Unify log statements
  • Loading branch information
ddelange authored Feb 21, 2024
1 parent 1f96c22 commit 192845b
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 6 deletions.
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def _get_version():
def read(fname):
return io.open(os.path.join(os.path.dirname(__file__), fname), encoding='utf-8').read()


base_deps = ['wrapt']
aws_deps = ['boto3']
gcs_deps = ['google-cloud-storage>=2.6.0']
azure_deps = ['azure-storage-blob', 'azure-common', 'azure-core']
Expand Down Expand Up @@ -72,6 +72,7 @@ def read(fname):
license='MIT',
platforms='any',

install_requires=base_deps,
tests_require=tests_require,
extras_require={
'test': tests_require,
Expand Down
5 changes: 3 additions & 2 deletions smart_open/compression.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ def register_compressor(ext, callback):
def tweak_close(outer, inner):
"""Ensure that closing the `outer` stream closes the `inner` stream as well.
Deprecated: smart_open.open().__exit__ now always calls __exit__ on the
underlying filestream.
Use this when your compression library's `close` method does not
automatically close the underlying filestream. See
https://github.com/RaRe-Technologies/smart_open/issues/630 for an
Expand All @@ -93,14 +96,12 @@ def close_both(*args):
def _handle_bz2(file_obj, mode):
from bz2 import BZ2File
result = BZ2File(file_obj, mode)
tweak_close(result, file_obj)
return result


def _handle_gzip(file_obj, mode):
import gzip
result = gzip.GzipFile(fileobj=file_obj, mode=mode)
tweak_close(result, file_obj)
return result


Expand Down
7 changes: 5 additions & 2 deletions smart_open/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -879,6 +879,7 @@ def close(self):
if self._buf.tell():
self._upload_next_part()

logger.debug('%s: completing multipart upload', self)
if self._total_bytes and self._upload_id:
partial = functools.partial(
self._client.complete_multipart_upload,
Expand All @@ -897,7 +898,6 @@ def close(self):
#
# We work around this by creating an empty file explicitly.
#
assert self._upload_id, "no multipart upload in progress"
self._client.abort_multipart_upload(
Bucket=self._bucket,
Key=self._key,
Expand Down Expand Up @@ -962,13 +962,16 @@ def write(self, b):

def terminate(self):
"""Cancel the underlying multipart upload."""
assert self._upload_id, "no multipart upload in progress"
if self._upload_id is None:
return
logger.debug('%s: terminating multipart upload', self)
self._client.abort_multipart_upload(
Bucket=self._bucket,
Key=self._key,
UploadId=self._upload_id,
)
self._upload_id = None
logger.debug('%s: terminated multipart upload', self)

def to_boto3(self, resource):
"""Create an **independent** `boto3.s3.Object` instance that points to
Expand Down
3 changes: 2 additions & 1 deletion smart_open/smart_open_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#
import smart_open.local_file as so_file
import smart_open.compression as so_compression
import smart_open.utils as so_utils

from smart_open import doctools
from smart_open import transport
Expand Down Expand Up @@ -248,7 +249,7 @@ def open(
except AttributeError:
pass

return decoded
return so_utils.FileLikeProxy(decoded, binary)


def _get_binary_mode(mode_str):
Expand Down
24 changes: 24 additions & 0 deletions smart_open/tests/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,30 @@ def test_writebuffer(self):

assert actual == contents

def test_write_gz_with_error(self):
"""Does s3 multipart upload abort when for a failed compressed file upload?"""
with self.assertRaises(ValueError):
with smart_open.open(
f's3://{BUCKET_NAME}/{WRITE_KEY_NAME}',
mode="wb",
compression='.gz',
transport_params={
"multipart_upload": True,
"min_part_size": 10,
}
) as fout:
fout.write(b"test12345678test12345678")
fout.write(b"test\n")

raise ValueError("some error")

# no multipart upload was committed:
# smart_open.s3.MultipartWriter.__exit__ was called
with self.assertRaises(OSError) as cm:
smart_open.s3.open(BUCKET_NAME, WRITE_KEY_NAME, 'rb')

assert 'The specified key does not exist.' in cm.exception.args[0]


@mock_s3
class SinglepartWriterTest(unittest.TestCase):
Expand Down
15 changes: 15 additions & 0 deletions smart_open/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import logging
import urllib.parse

import wrapt

logger = logging.getLogger(__name__)

WORKAROUND_SCHEMES = ['s3', 's3n', 's3u', 's3a', 'gs']
Expand Down Expand Up @@ -189,3 +191,16 @@ def safe_urlsplit(url):

path = sr.path.replace(placeholder, '?')
return urllib.parse.SplitResult(sr.scheme, sr.netloc, path, '', '')


class FileLikeProxy(wrapt.ObjectProxy):
def __init__(self, outer, inner):
super().__init__(outer)
self.__inner = inner

def __exit__(self, *args, **kwargs):
"""Exit inner after exiting outer."""
try:
super().__exit__(*args, **kwargs)
finally:
self.__inner.__exit__(*args, **kwargs)

0 comments on commit 192845b

Please sign in to comment.