Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…open into patch-2

* 'develop' of https://github.com/RaRe-Technologies/smart_open:
  Propagate __exit__ call to underlying filestream (piskvorky#786)
  Retry finalizing multipart s3 upload (piskvorky#785)
  Fix `KeyError: 'ContentRange'` when received full content from S3 (piskvorky#789)
  Add support for SSH connection via aliases from `~/.ssh/config` (piskvorky#790)
  Make calls to smart_open.open() for GCS 1000x faster by avoiding unnecessary GCS API call (piskvorky#788)
  Add zstandard compression feature (piskvorky#801)
  Support moto 4 & 5 (piskvorky#802)
  Secure the connection using SSL when connecting to the FTPS server (piskvorky#793)
  upgrade dev status classifier to stable (piskvorky#798)
  Fix formatting of python code (piskvorky#795)
  • Loading branch information
ddelange committed Feb 21, 2024
2 parents df668b8 + 192845b commit 9c43da7
Show file tree
Hide file tree
Showing 16 changed files with 461 additions and 132 deletions.
18 changes: 9 additions & 9 deletions MIGRATING_FROM_OLDER_VERSIONS.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,17 @@ Migrating to the new compression parameter
smart_open versions 6.0.0 and above no longer support the ``ignore_ext`` parameter.
Use the ``compression`` parameter instead:

```python
fin = smart_open.open("/path/file.gz", ignore_ext=True) # No
fin = smart_open.open("/path/file.gz", compression="disable") # Yes
fin = smart_open.open("/path/file.gz", ignore_ext=False) # No
fin = smart_open.open("/path/file.gz") # Yes
fin = smart_open.open("/path/file.gz", compression="infer_from_extension") # Yes, if you want to be explicit
.. code-block:: python
fin = smart_open.open("/path/file", compression=".gz") # Yes
fin = smart_open.open("/path/file.gz", ignore_ext=True) # No
fin = smart_open.open("/path/file.gz", compression="disable") # Yes
fin = smart_open.open("/path/file.gz", ignore_ext=False) # No
fin = smart_open.open("/path/file.gz") # Yes
fin = smart_open.open("/path/file.gz", compression="infer_from_extension") # Yes, if you want to be explicit
fin = smart_open.open("/path/file", compression=".gz") # Yes
```
Migrating to the new client-based S3 API
========================================
Expand Down
10 changes: 10 additions & 0 deletions integration-tests/test_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ def write_read(key, content, write_mode, read_mode, **kwargs):
with smart_open.open(key, read_mode, **kwargs) as fin:
return fin.read()

def open_only(key, read_mode, **kwargs) -> None:
with smart_open.open(key, read_mode, **kwargs):
pass

def read_length_prefixed_messages(key, read_mode, **kwargs):
result = io.BytesIO()
Expand Down Expand Up @@ -121,3 +124,10 @@ def test_gcs_performance_small_reads(benchmark):

actual = benchmark(read_length_prefixed_messages, key, 'rb', buffering=ONE_MIB)
assert actual == one_megabyte_of_msgs

def test_gcs_performance_open(benchmark):
# we don't need to use a uri that actually exists in order to call GCS's open()
key = "gs://some-bucket/some_blob.txt"
transport_params = {'client': google.cloud.storage.Client()}
benchmark(open_only, key, 'rb', transport_params=transport_params)
assert True
12 changes: 8 additions & 4 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,22 @@ def _get_version():
def read(fname):
return io.open(os.path.join(os.path.dirname(__file__), fname), encoding='utf-8').read()


base_deps = ['wrapt']
aws_deps = ['boto3']
gcs_deps = ['google-cloud-storage>=2.6.0']
azure_deps = ['azure-storage-blob', 'azure-common', 'azure-core']
http_deps = ['requests']
ssh_deps = ['paramiko']
zst_deps = ['zstandard']

all_deps = aws_deps + gcs_deps + azure_deps + http_deps + ssh_deps
all_deps = aws_deps + gcs_deps + azure_deps + http_deps + ssh_deps + zst_deps
tests_require = all_deps + [
'moto[server]',
'responses',
'boto3',
'pytest',
'pytest-rerunfailures'
'pytest-rerunfailures',
'pytest-benchmark',
]

setup(
Expand All @@ -70,6 +72,7 @@ def read(fname):
license='MIT',
platforms='any',

install_requires=base_deps,
tests_require=tests_require,
extras_require={
'test': tests_require,
Expand All @@ -80,13 +83,14 @@ def read(fname):
'http': http_deps,
'webhdfs': http_deps,
'ssh': ssh_deps,
'zst': zst_deps,
},
python_requires=">=3.6,<4.0",

test_suite="smart_open.tests",

classifiers=[
'Development Status :: 4 - Beta',
'Development Status :: 5 - Production/Stable',
'Environment :: Console',
'Intended Audience :: Developers',
'License :: OSI Approved :: MIT License',
Expand Down
12 changes: 10 additions & 2 deletions smart_open/compression.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ def register_compressor(ext, callback):
def tweak_close(outer, inner):
"""Ensure that closing the `outer` stream closes the `inner` stream as well.
Deprecated: smart_open.open().__exit__ now always calls __exit__ on the
underlying filestream.
Use this when your compression library's `close` method does not
automatically close the underlying filestream. See
https://github.com/RaRe-Technologies/smart_open/issues/630 for an
Expand All @@ -93,14 +96,18 @@ def close_both(*args):
def _handle_bz2(file_obj, mode):
from bz2 import BZ2File
result = BZ2File(file_obj, mode)
tweak_close(result, file_obj)
return result


def _handle_gzip(file_obj, mode):
import gzip
result = gzip.GzipFile(fileobj=file_obj, mode=mode)
tweak_close(result, file_obj)
return result


def _handle_zstd(file_obj, mode):
import zstandard as zstd
result = zstd.ZstdDecompressor().stream_reader(file_obj, closefd=True)
return result


Expand Down Expand Up @@ -145,3 +152,4 @@ def compression_wrapper(file_obj, mode, compression=INFER_FROM_EXTENSION, filena
#
register_compressor('.bz2', _handle_bz2)
register_compressor('.gz', _handle_gzip)
register_compressor('.zst', _handle_zstd)
4 changes: 3 additions & 1 deletion smart_open/ftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
"""

import logging
import ssl
import urllib.parse
import smart_open.utils
from ftplib import FTP, FTP_TLS, error_reply
Expand Down Expand Up @@ -85,7 +86,8 @@ def convert_transport_params_to_args(transport_params):
def _connect(hostname, username, port, password, secure_connection, transport_params):
kwargs = convert_transport_params_to_args(transport_params)
if secure_connection:
ftp = FTP_TLS(**kwargs)
ssl_context = ssl.create_default_context(purpose=ssl.Purpose.CLIENT_AUTH)
ftp = FTP_TLS(context=ssl_context, **kwargs)
else:
ftp = FTP(**kwargs)
try:
Expand Down
5 changes: 1 addition & 4 deletions smart_open/gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,7 @@ def Reader(bucket,
warn_deprecated('line_terminator')

bkt = client.bucket(bucket)
blob = bkt.get_blob(key)

if blob is None:
raise google.cloud.exceptions.NotFound(f'blob {key} not found in {bucket}')
blob = bkt.blob(key)

return blob.open('rb', **blob_open_kwargs)

Expand Down
98 changes: 66 additions & 32 deletions smart_open/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#
"""Implements file-like objects for reading and writing from/to AWS S3."""

import http
import io
import functools
import logging
Expand All @@ -27,6 +28,11 @@

from smart_open import constants

from typing import (
Callable,
List,
)

logger = logging.getLogger(__name__)

DEFAULT_MIN_PART_SIZE = 50 * 1024**2
Expand All @@ -46,13 +52,52 @@
's3://my_key:my_secret@my_server:my_port@my_bucket/my_key',
)

_UPLOAD_ATTEMPTS = 6
_SLEEP_SECONDS = 10

# Returned by AWS when we try to seek beyond EOF.
_OUT_OF_RANGE = 'InvalidRange'


class Retry:
def __init__(self):
self.attempts: int = 6
self.sleep_seconds: int = 10
self.exceptions: List[Exception] = [botocore.exceptions.EndpointConnectionError]
self.client_error_codes: List[str] = ['NoSuchUpload']

def _do(self, fn: Callable):
for attempt in range(self.attempts):
try:
return fn()
except tuple(self.exceptions) as err:
logger.critical(
'Caught non-fatal %s, retrying %d more times',
err,
self.attempts - attempt - 1,
)
logger.exception(err)
time.sleep(self.sleep_seconds)
except botocore.exceptions.ClientError as err:
error_code = err.response['Error'].get('Code')
if error_code not in self.client_error_codes:
raise
logger.critical(
'Caught non-fatal ClientError (%s), retrying %d more times',
error_code,
self.attempts - attempt - 1,
)
logger.exception(err)
time.sleep(self.sleep_seconds)
else:
logger.critical('encountered too many non-fatal errors, giving up')
raise IOError('%s failed after %d attempts', fn.func, self.attempts)


#
# The retry mechanism for this submodule. Client code may modify it, e.g. by
# updating RETRY.sleep_seconds and friends.
#
RETRY = Retry()


class _ClientWrapper:
"""Wraps a client to inject the appropriate keyword args into each method call.
Expand Down Expand Up @@ -486,9 +531,17 @@ def _open_body(self, start=None, stop=None):
self,
response['ResponseMetadata']['RetryAttempts'],
)
_, start, stop, length = smart_open.utils.parse_content_range(response['ContentRange'])
#
# range request may not always return partial content, see:
# https://developer.mozilla.org/en-US/docs/Web/HTTP/Range_requests#partial_request_responses
#
status_code = response['ResponseMetadata']['HTTPStatusCode']
if status_code == http.HTTPStatus.PARTIAL_CONTENT:
_, start, stop, length = smart_open.utils.parse_content_range(response['ContentRange'])
self._position = start
elif status_code == http.HTTPStatus.OK:
length = response["ContentLength"]
self._content_length = length
self._position = start
self._body = response['Body']

def read(self, size=-1):
Expand Down Expand Up @@ -794,7 +847,7 @@ def __init__(
Bucket=bucket,
Key=key,
)
self._upload_id = _retry_if_failed(partial)['UploadId']
self._upload_id = RETRY._do(partial)['UploadId']
except botocore.client.ClientError as error:
raise ValueError(
'the bucket %r does not exist, or is forbidden for access (%r)' % (
Expand Down Expand Up @@ -826,6 +879,7 @@ def close(self):
if self._buf.tell():
self._upload_next_part()

logger.debug('%s: completing multipart upload', self)
if self._total_bytes and self._upload_id:
partial = functools.partial(
self._client.complete_multipart_upload,
Expand All @@ -834,7 +888,7 @@ def close(self):
UploadId=self._upload_id,
MultipartUpload={'Parts': self._parts},
)
_retry_if_failed(partial)
RETRY._do(partial)
logger.debug('%s: completed multipart upload', self)
elif self._upload_id:
#
Expand All @@ -844,7 +898,6 @@ def close(self):
#
# We work around this by creating an empty file explicitly.
#
assert self._upload_id, "no multipart upload in progress"
self._client.abort_multipart_upload(
Bucket=self._bucket,
Key=self._key,
Expand Down Expand Up @@ -909,13 +962,16 @@ def write(self, b):

def terminate(self):
"""Cancel the underlying multipart upload."""
assert self._upload_id, "no multipart upload in progress"
if self._upload_id is None:
return
logger.debug('%s: terminating multipart upload', self)
self._client.abort_multipart_upload(
Bucket=self._bucket,
Key=self._key,
UploadId=self._upload_id,
)
self._upload_id = None
logger.debug('%s: terminated multipart upload', self)

def to_boto3(self, resource):
"""Create an **independent** `boto3.s3.Object` instance that points to
Expand Down Expand Up @@ -945,7 +1001,7 @@ def _upload_next_part(self):
# of a temporary connection problem, so this part needs to be
# especially robust.
#
upload = _retry_if_failed(
upload = RETRY._do(
functools.partial(
self._client.upload_part,
Bucket=self._bucket,
Expand Down Expand Up @@ -1110,28 +1166,6 @@ def __repr__(self):
return "smart_open.s3.SinglepartWriter(bucket=%r, key=%r)" % (self._bucket, self._key)


def _retry_if_failed(
partial,
attempts=_UPLOAD_ATTEMPTS,
sleep_seconds=_SLEEP_SECONDS,
exceptions=None):
if exceptions is None:
exceptions = (botocore.exceptions.EndpointConnectionError, )
for attempt in range(attempts):
try:
return partial()
except exceptions:
logger.critical(
'Unable to connect to the endpoint. Check your network connection. '
'Sleeping and retrying %d more times '
'before giving up.' % (attempts - attempt - 1)
)
time.sleep(sleep_seconds)
else:
logger.critical('Unable to connect to the endpoint. Giving up.')
raise IOError('Unable to connect to the endpoint after %d attempts' % attempts)


def _accept_all(key):
return True

Expand Down
3 changes: 2 additions & 1 deletion smart_open/smart_open_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#
import smart_open.local_file as so_file
import smart_open.compression as so_compression
import smart_open.utils as so_utils

from smart_open import doctools
from smart_open import transport
Expand Down Expand Up @@ -248,7 +249,7 @@ def open(
except AttributeError:
pass

return decoded
return so_utils.FileLikeProxy(decoded, binary)


def _get_binary_mode(mode_str):
Expand Down
Loading

0 comments on commit 9c43da7

Please sign in to comment.