diff --git a/MIGRATING_FROM_OLDER_VERSIONS.rst b/MIGRATING_FROM_OLDER_VERSIONS.rst index 98354f2c..6d6261ff 100644 --- a/MIGRATING_FROM_OLDER_VERSIONS.rst +++ b/MIGRATING_FROM_OLDER_VERSIONS.rst @@ -1,3 +1,108 @@ +Migrating to the new client-based S3 API +======================================== + +Version of smart_open prior to 5.0.0 used the boto3 `resource API`_ for communicating with S3. +This API was easy to integrate for smart_open developers, but this came at a cost: it was not thread- or multiprocess-safe. +Furthermore, as smart_open supported more and more options, the transport parameter list grew, making it less maintainable. + +Starting with version 5.0.0, smart_open uses the `client API`_ instead of the resource API. +Functionally, very little changes for the smart_open user. +The only difference is in passing transport parameters to the S3 backend. + +More specifically, the following S3 transport parameters are no longer supported: + +- `multipart_upload_kwargs` +- `object_kwargs` +- `resource` +- `resource_kwargs` +- `session` +- `singlepart_upload_kwargs` + +**If you weren't using the above parameters, nothing changes for you.** + +However, if you were using any of the above, then you need to adjust your code. +Here are some quick recipes below. + +If you were previously passing `session`, then construct an S3 client from the session and pass that instead. +For example, before: + +.. code-block:: python + + smart_open.open('s3://bucket/key', transport_params={'session': session}) + +After: + +.. code-block:: python + + smart_open.open('s3://bucket/key', transport_params={'client': session.client('s3')}) + +If you were passing `resource`, then replace the resource with a client, and pass that instead. +For example, before: + +.. code-block:: python + + resource = session.resource('s3', **resource_kwargs) + smart_open.open('s3://bucket/key', transport_params={'resource': resource}) + +After: + +.. code-block:: python + + client = session.client('s3') + smart_open.open('s3://bucket/key', transport_params={'client': client}) + +If you were passing any of the `*_kwargs` parameters, you will need to include them in `client_kwargs`, keeping in mind the following transformations. + +========================== ====================================== ========================== +Parameter name Resource API method Client API function +========================== ====================================== ========================== +`multipart_upload_kwargs` `s3.Object.initiate_multipart_upload`_ `s3.Client.create_multipart_upload`_ +`object_kwargs` `s3.Object.get`_ `s3.Client.get_object`_ +`resource_kwargs` s3.resource `s3.client`_ +`singlepart_upload_kwargs` `s3.Object.put`_ `s3.Client.put_object`_ +========================== ====================================== ========================== + +Most of the above is self-explanatory, with the exception of `resource_kwargs`. +These were previously used mostly for passing a custom endpoint URL. + +The `client_kwargs` dict can thus contain the following members: + +- `s3.Client`: initializer parameters, e.g. those to pass directly to the `boto3.client` function, such as `endpoint_url`. +- `s3.Client.create_multipart_upload` +- `s3.Client.get_object` +- `s3.Client.put_object` + +Here's a before-and-after example for connecting to a custom endpoint. Before: + +.. code-block:: python + + session = boto3.Session(profile_name='digitalocean') + resource_kwargs = {'endpoint_url': 'https://ams3.digitaloceanspaces.com'} + with open('s3://bucket/key.txt', 'wb', transport_params={'resource_kwarg': resource_kwargs}) as fout: + fout.write(b'here we stand') + +After: + +.. code-block:: python + + session = boto3.Session(profile_name='digitalocean') + client = session.client('s3', endpoint_url='https://ams3.digitaloceanspaces.com') + with open('s3://bucket/key.txt', 'wb', transport_params={'client': client}) as fout: + fout.write(b'here we stand') + +See `README `_ and `HOWTO `_ for more examples. + +.. _resource API: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#service-resource +.. _s3.Object.initiate_multipart_upload: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Object.initiate_multipart_upload +.. _s3.Object.get: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.ObjectSummary.get +.. _s3.Object.put: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.ObjectSummary.put + +.. _client API: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#client +.. _s3.Client: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#client +.. _s3.Client.create_multipart_upload: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.create_multipart_upload +.. _s3.Client.get_object: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.get_object +.. _s3.Client.put_object: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.put_object + Migrating to the new dependency management subsystem ==================================================== @@ -111,4 +216,3 @@ or view the help online `here >> url = 's3://smart-open-py37-benchmark-results/test.txt' - >>> with open(url, 'wb', transport_params={'session': session}) as fout: + >>> with open(url, 'wb', transport_params={'client': session.client('s3')}) as fout: ... bytes_written = fout.write(b'hello world!') ... print(bytes_written) 12 @@ -182,12 +182,9 @@ For the sake of simplicity, the examples below assume you have all the dependenc print(line) # Stream to Digital Ocean Spaces bucket providing credentials from boto3 profile - transport_params = { - 'session': boto3.Session(profile_name='digitalocean'), - 'resource_kwargs': { - 'endpoint_url': 'https://ams3.digitaloceanspaces.com', - } - } + session = boto3.Session(profile_name='digitalocean') + client = session.client('s3', endpoint_url='https://ams3.digitaloceanspaces.com') + transport_params = {'client': client} with open('s3://bucket/key.txt', 'wb', transport_params=transport_params) as fout: fout.write(b'here we stand') @@ -202,7 +199,7 @@ For the sake of simplicity, the examples below assume you have all the dependenc # stream from Azure Blob Storage connect_str = os.environ['AZURE_STORAGE_CONNECTION_STRING'] transport_params = { - client: azure.storage.blob.BlobServiceClient.from_connection_string(connect_str) + 'client': azure.storage.blob.BlobServiceClient.from_connection_string(connect_str), } for line in open('azure://mycontainer/myfile.txt', transport_params=transport_params): print(line) @@ -210,7 +207,7 @@ For the sake of simplicity, the examples below assume you have all the dependenc # stream content *into* Azure Blob Storage (write mode): connect_str = os.environ['AZURE_STORAGE_CONNECTION_STRING'] transport_params = { - client: azure.storage.blob.BlobServiceClient.from_connection_string(connect_str) + 'client': azure.storage.blob.BlobServiceClient.from_connection_string(connect_str), } with open('azure://mycontainer/my_file.txt', 'wb', transport_params=transport_params) as fout: fout.write(b'hello world') @@ -264,7 +261,7 @@ Here are some examples of using this parameter: .. code-block:: python >>> import boto3 - >>> fin = open('s3://commoncrawl/robots.txt', transport_params=dict(session=boto3.Session())) + >>> fin = open('s3://commoncrawl/robots.txt', transport_params=dict(client=boto3.client('s3'))) >>> fin = open('s3://commoncrawl/robots.txt', transport_params=dict(buffer_size=1024)) For the full list of keyword arguments supported by each transport option, see the documentation: @@ -281,8 +278,8 @@ S3 Credentials By default, ``smart_open`` will defer to ``boto3`` and let the latter take care of the credentials. There are several ways to override this behavior. -The first is to pass a ``boto3.Session`` object as a transport parameter to the ``open`` function. -You can customize the credentials when constructing the session. +The first is to pass a ``boto3.Client`` object as a transport parameter to the ``open`` function. +You can customize the credentials when constructing the session for the client. ``smart_open`` will then use the session when talking to S3. .. code-block:: python @@ -292,7 +289,8 @@ You can customize the credentials when constructing the session. aws_secret_access_key=SECRET_KEY, aws_session_token=SESSION_TOKEN, ) - fin = open('s3://bucket/key', transport_params=dict(session=session), ...) + client = session.client('s3', endpoint_url=..., config=...) + fin = open('s3://bucket/key', transport_params=dict(client=client)) Your second option is to specify the credentials within the S3 URL itself: @@ -300,7 +298,7 @@ Your second option is to specify the credentials within the S3 URL itself: fin = open('s3://aws_access_key_id:aws_secret_access_key@bucket/key', ...) -*Important*: The two methods above are **mutually exclusive**. If you pass an AWS session *and* the URL contains credentials, ``smart_open`` will ignore the latter. +*Important*: The two methods above are **mutually exclusive**. If you pass an AWS client *and* the URL contains credentials, ``smart_open`` will ignore the latter. *Important*: ``smart_open`` ignores configuration files from the older ``boto`` library. Port your old ``boto`` settings to ``boto3`` in order to use them with ``smart_open``. diff --git a/help.txt b/help.txt index ec5ebf0b..6d642bba 100644 --- a/help.txt +++ b/help.txt @@ -137,17 +137,6 @@ FUNCTIONS The buffer size to use when performing I/O. min_part_size: int, optional The minimum part size for multipart uploads. For writing only. - session: object, optional - The S3 session to use when working with boto3. - resource_kwargs: dict, optional - Keyword arguments to use when accessing the S3 resource for reading or writing. - multipart_upload_kwargs: dict, optional - Additional parameters to pass to boto3's initiate_multipart_upload function. - For writing only. - singlepart_upload_kwargs: dict, optional - Additional parameters to pass to boto3's S3.Object.put function when using single - part upload. - For writing only. multipart_upload: bool, optional Default: `True` If set to `True`, will use multipart upload for writing to S3. If set @@ -157,14 +146,18 @@ FUNCTIONS version_id: str, optional Version of the object, used when reading object. If None, will fetch the most recent version. - object_kwargs: dict, optional - Additional parameters to pass to boto3's object.get function. - Used during reading only. defer_seek: boolean, optional Default: `False` If set to `True` on a file opened for reading, GetObject will not be called until the first seek() or read(). Avoids redundant API queries when seeking before reading. + client: object, optional + The S3 client to use when working with boto3. + If you don't specify this, then smart_open will create a new client for you. + client_kwargs: dict, optional + Additional parameters to pass to the relevant functions of the client. + The keys are fully qualified method names, e.g. `S3.Client.create_multipart_upload`. + The values are kwargs to pass to that method each time it is called. writebuffer: IO[bytes], optional By default, this module will buffer data in memory using io.BytesIO when writing. Pass another binary IO instance here to use it instead. @@ -325,13 +318,13 @@ FUNCTIONS s3_iter_bucket(bucket_name, prefix='', accept_key=None, key_limit=None, workers=16, retries=3, **session_kwargs) Deprecated. Use smart_open.s3.iter_bucket instead. - smart_open(uri, mode='rb', **kw) + smart_open(uri, mode='rb', buffering=-1, encoding=None, errors=None, newline=None, closefd=True, opener=None, ignore_extension=False, **kwargs) DATA __all__ = ['open', 'parse_uri', 'register_compressor', 's3_iter_bucket... VERSION - 2.2.1 + 4.1.2.dev0 FILE /Users/misha/git/smart_open/smart_open/__init__.py diff --git a/howto.md b/howto.md index a6505de1..2dd332b6 100644 --- a/howto.md +++ b/howto.md @@ -73,11 +73,12 @@ The `boto3` library that `smart_open` uses for accessing S3 signs each request u If you'd like to access S3 without using an S3 account, then you need disable this signing mechanism. ```python +>>> import boto3 >>> import botocore >>> import botocore.client >>> from smart_open import open >>> config = botocore.client.Config(signature_version=botocore.UNSIGNED) ->>> params = {'resource_kwargs': {'config': config}} +>>> params = {'client': boto3.client('s3', config=config)} >>> with open('s3://commoncrawl/robots.txt', transport_params=params) as fin: ... fin.readline() 'User-Agent: *\n' @@ -175,15 +176,15 @@ s3.ObjectVersion(bucket_name='smart-open-versioned', object_key='demo.txt', id=' ## How to Read from S3 Efficiently -Under the covers, `smart_open` uses the [boto3 resource API](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/resources.html) to read from S3. -By default, calling `smart_open.open` with an S3 URL will create its own boto3 session and resource. +Under the covers, `smart_open` uses the [boto3 client API](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#client) to read from S3. +By default, calling `smart_open.open` with an S3 URL will create its own boto3 client. These are expensive operations: they require both CPU time to construct the objects from a low-level API definition, and memory to store the objects once they have been created. It is possible to save both CPU time and memory by sharing the same resource across multiple `smart_open.open` calls, for example: ```python >>> import boto3 >>> from smart_open import open ->>> tp = {'resource': boto3.resource('s3')} +>>> tp = {'client': boto3.client('s3')} >>> for month in (1, 2, 3): ... url = 's3://nyc-tlc/trip data/yellow_tripdata_2020-%02d.csv' % month ... with open(url, transport_params=tp) as fin: @@ -195,7 +196,7 @@ It is possible to save both CPU time and memory by sharing the same resource acr ``` -The above sharing is safe because it is all happening in the same thread and subprocess (see below for details). +Clients are thread-safe and multiprocess-safe, so you may share them between other threads and subprocesses. By default, `smart_open` buffers the most recent part of a multipart upload in memory. The default part size is 50MB. @@ -226,14 +227,6 @@ with tempfile.NamedTemporaryFile() as tmp: This option reduces memory usage at the expense of additional disk I/O (writing to a reading from a hard disk is slower). -## How to Work in a Parallelized Environment - -Under the covers, `smart_open` uses the [boto3 resource API](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/resources.html) to read from S3. -This API is not thread-safe or multiprocess-safe. -Do not share the same `smart_open` objects across different threads or subprocesses. -`smart_open` will create its own session and resource objects for each individual `open` call, so you don't have to worry about managing boto3 objects. -This comes at a price: each session and resource requires CPU time to create and memory to store, so be wary of keeping hundreds of threads or subprocesses reading/writing from/to S3. - ## How to Specify the Request Payer (S3 only) Some public buckets require you to [pay for S3 requests for the data in the bucket](https://docs.aws.amazon.com/AmazonS3/latest/dev/RequesterPaysBuckets.html). @@ -243,7 +236,7 @@ To access such buckets, you need to pass some special transport parameters: ```python >>> from smart_open import open ->>> params = {'object_kwargs': {'RequestPayer': 'requester'}} +>>> params = {'client_kwargs': {'S3.Client.get_object': {RequestPayer': 'requester'}}} >>> with open('s3://arxiv/pdf/arXiv_pdf_manifest.xml', transport_params=params) as fin: ... print(fin.readline()) @@ -258,41 +251,83 @@ This works only when reading and writing via S3. Boto3 has a [built-in mechanism](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html) for retrying after a recoverable error. You can fine-tune it using several ways: -### Pre-configuring a boto3 resource and then passing the resource to smart_open +### Pre-configuring a boto3 client and then passing the client to smart_open ```python >>> import boto3 >>> import botocore.config >>> import smart_open >>> config = botocore.config.Config(retries={'mode': 'standard'}) ->>> resource = boto3.resource('s3', config=config) ->>> tp = {'resource': resource} +>>> client = boto3.client('s3', config=config) +>>> tp = {'client': client} >>> with smart_open.open('s3://commoncrawl/robots.txt', transport_params=tp) as fin: ... print(fin.readline()) User-Agent: * ``` -### Directly passing configuration as transport parameters to smart_open +To verify your settings have effect: + +```python +import logging +logging.getLogger('smart_open.s3').setLevel(logging.DEBUG) +``` + +and check the log output of your code. + +## How to Pass Additional Parameters to boto3 + +`boto3` is a highly configurable library, and each function call accepts many optional parameters. +`smart_open` does not attempt to replicate this behavior, since most of these parameters often do not influence the behavior of `smart_open` itself. +Instead, `smart_open` offers the caller of the function to pass additional parameters as necessary: ```python >>> import boto3 ->>> import botocore.config ->>> import smart_open ->>> config = botocore.config.Config(retries={'mode': 'standard'}) ->>> tp = {'resource_kwargs': {'config': config}} ->>> with smart_open.open('s3://commoncrawl/robots.txt', transport_params=tp) as fin: -... print(fin.readline()) -User-Agent: * +>>> client_kwargs = {'S3.Client.get_object': {RequestPayer': 'requester'}}} +>>> with open('s3://arxiv/pdf/arXiv_pdf_manifest.xml', transport_params=params) as fin: +... pass ``` -To verify your settings have effect: +The above example influences how the [S3.Client.get_object function](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.get_object) gets called by `smart_open` when reading the specified URL. +More specifically, the `RequestPayer` parameter will be set to `requester` **for each call**. +Influential functions include: + +- S3.Client (the initializer function) +- S3.Client.abort_multipart_upload +- S3.Client.complete_multipart_upload +- S3.Client.create_multipart_upload +- S3.Client.get_object +- S3.Client.head_bucket +- S3.Client.put_object +- S3.Client.upload_part + +If you choose to pass additional parameters, keep the following in mind: + +1. Study the [boto3 client API](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client) and ensure the function and parameters are valid. +2. Study the [code for the smart_open.s3 submodule](smart_open/s3.py) and ensure `smart_open` is actually calling the function you're passing additional parameters for. + +Finally, in some cases, it's possible to work directly with `boto3` without going through `smart_open`. +For example, setting the ACL for an object is possible after the object is created (with `boto3`), as opposed to at creation time (with `smart_open`). +More specifically, here's the direct method: ```python -import logging -logging.getLogger('smart_open.s3').setLevel(logging.DEBUG) +import boto3 +import smart_open +with smart_open.open('s3://bucket/key', 'wb') as fout: + fout.write(b'hello world!') +client = boto3.client('s3') +client.put_object_acl(ACL=acl_as_string) ``` -and check the log output of your code. +Here's the same code that passes the above parameter via `smart_open`: + +```python +import smart_open +tp = {'client_kwargs': {'S3.Client.create_multipart_upload': {'ACL': acl_as_string}}} +with smart_open.open('s3://bucket/key', 'wb', transport_params=tp) as fout: + fout.write(b'hello world!') +``` + +If passing everything via `smart_open` feels awkward, try passing part of the parameters directly to `boto3`. ## How to Read/Write from localstack @@ -315,8 +350,10 @@ where `http://localhost:4566` is the default host/port that localstack uses to l You can now read/write to the bucket the same way you would to a real S3 bucket: ```python +>>> import boto3 >>> from smart_open import open ->>> tparams = {'resource_kwargs': {'endpoint_url': 'http://localhost:4566'}} +>>> client = boto3.client('s3', endpoint_url='http://localhost:4566') +>>> tparams = {'client': client} >>> with open('s3://mybucket/hello.txt', 'wt', transport_params=tparams) as fout: ... fout.write('hello world!') >>> with open('s3://mybucket/hello.txt', 'rt', transport_params=tparams) as fin: diff --git a/smart_open/s3.py b/smart_open/s3.py index 2ef3f340..17b64eb3 100644 --- a/smart_open/s3.py +++ b/smart_open/s3.py @@ -52,6 +52,26 @@ _OUT_OF_RANGE = 'InvalidRange' +class _ClientWrapper: + """Wraps a client to inject the appropriate keyword args into each method call. + + The keyword args are a dictionary keyed by the fully qualified method name. + For example, S3.Client.create_multipart_upload. + + See https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#client + + This wrapper behaves identically to the client otherwise. + """ + def __init__(self, client, kwargs): + self.client = client + self.kwargs = kwargs + + def __getattr__(self, method_name): + method = getattr(self.client, method_name) + kwargs = self.kwargs.get('S3.Client.%s' % method_name, {}) + return functools.partial(method, **kwargs) + + def parse_uri(uri_as_string): # # Restrictions on bucket names and labels: @@ -127,41 +147,46 @@ def _consolidate_params(uri, transport_params): """ transport_params = dict(transport_params) - session = transport_params.get('session') - if session is not None and (uri['access_id'] or uri['access_secret']): + def inject(**kwargs): + try: + client_kwargs = transport_params['client_kwargs'] + except KeyError: + client_kwargs = transport_params['client_kwargs'] = {} + + try: + init_kwargs = client_kwargs['S3.Client'] + except KeyError: + init_kwargs = client_kwargs['S3.Client'] = {} + + init_kwargs.update(**kwargs) + + client = transport_params.get('client') + if client is not None and (uri['access_id'] or uri['access_secret']): logger.warning( 'ignoring credentials parsed from URL because they conflict with ' - 'transport_params["session"]. Set transport_params["session"] to None ' + 'transport_params["client"]. Set transport_params["client"] to None ' 'to suppress this warning.' ) uri.update(access_id=None, access_secret=None) elif (uri['access_id'] and uri['access_secret']): - transport_params['session'] = boto3.Session( + inject( aws_access_key_id=uri['access_id'], aws_secret_access_key=uri['access_secret'], ) uri.update(access_id=None, access_secret=None) - if uri['host'] != DEFAULT_HOST: - endpoint_url = 'https://%(host)s:%(port)d' % uri - _override_endpoint_url(transport_params, endpoint_url) - - return uri, transport_params - - -def _override_endpoint_url(transport_params, url): - try: - resource_kwargs = transport_params['resource_kwargs'] - except KeyError: - resource_kwargs = transport_params['resource_kwargs'] = {} - - if resource_kwargs.get('endpoint_url'): + if client is not None and uri['host'] != DEFAULT_HOST: logger.warning( - 'ignoring endpoint_url parsed from URL because it conflicts ' - 'with transport_params["resource_kwargs"]["endpoint_url"]' + 'ignoring endpoint_url parsed from URL because they conflict with ' + 'transport_params["client"]. Set transport_params["client"] to None ' + 'to suppress this warning.' ) - else: - resource_kwargs.update(endpoint_url=url) + uri.update(host=None) + elif uri['host'] != DEFAULT_HOST: + inject(endpoint_url='https://%(host)s:%(port)d' % uri) + uri.update(host=None) + + return uri, transport_params def open_uri(uri, mode, transport_params): @@ -178,14 +203,10 @@ def open( version_id=None, buffer_size=DEFAULT_BUFFER_SIZE, min_part_size=DEFAULT_MIN_PART_SIZE, - session=None, - resource=None, - resource_kwargs=None, - multipart_upload_kwargs=None, multipart_upload=True, - singlepart_upload_kwargs=None, - object_kwargs=None, defer_seek=False, + client=None, + client_kwargs=None, writebuffer=None, ): """Open an S3 object for reading or writing. @@ -202,22 +223,6 @@ def open( The buffer size to use when performing I/O. min_part_size: int, optional The minimum part size for multipart uploads. For writing only. - session: object, optional - The S3 session to use when working with boto3. - If you don't specify this, then smart_open will create a new session for you. - resource: object, optional - The S3 resource to use when working with boto3. - If you don't specify this, then smart_open will create a new resource for you. - resource_kwargs: dict, optional - Keyword arguments to use when creating the S3 resource for reading or writing. - Will be ignored if you specify the resource object explicitly. - multipart_upload_kwargs: dict, optional - Additional parameters to pass to boto3's initiate_multipart_upload function. - For writing only. - singlepart_upload_kwargs: dict, optional - Additional parameters to pass to boto3's S3.Object.put function when using single - part upload. - For writing only. multipart_upload: bool, optional Default: `True` If set to `True`, will use multipart upload for writing to S3. If set @@ -227,14 +232,18 @@ def open( version_id: str, optional Version of the object, used when reading object. If None, will fetch the most recent version. - object_kwargs: dict, optional - Additional parameters to pass to boto3's object.get function. - Used during reading only. defer_seek: boolean, optional Default: `False` If set to `True` on a file opened for reading, GetObject will not be called until the first seek() or read(). Avoids redundant API queries when seeking before reading. + client: object, optional + The S3 client to use when working with boto3. + If you don't specify this, then smart_open will create a new client for you. + client_kwargs: dict, optional + Additional parameters to pass to the relevant functions of the client. + The keys are fully qualified method names, e.g. `S3.Client.create_multipart_upload`. + The values are kwargs to pass to that method each time it is called. writebuffer: IO[bytes], optional By default, this module will buffer data in memory using io.BytesIO when writing. Pass another binary IO instance here to use it instead. @@ -256,11 +265,9 @@ def open( key_id, version_id=version_id, buffer_size=buffer_size, - session=session, - resource=resource, - resource_kwargs=resource_kwargs, - object_kwargs=object_kwargs, defer_seek=defer_seek, + client=client, + client_kwargs=client_kwargs, ) elif mode == constants.WRITE_BINARY: if multipart_upload: @@ -268,20 +275,16 @@ def open( bucket_id, key_id, min_part_size=min_part_size, - session=session, - resource=resource, - upload_kwargs=multipart_upload_kwargs, - resource_kwargs=resource_kwargs, + client=client, + client_kwargs=client_kwargs, writebuffer=writebuffer, ) else: fileobj = SinglepartWriter( bucket_id, key_id, - session=session, - resource=resource, - upload_kwargs=singlepart_upload_kwargs, - resource_kwargs=resource_kwargs, + client=client, + client_kwargs=client_kwargs, writebuffer=writebuffer, ) else: @@ -291,15 +294,16 @@ def open( return fileobj -def _get(s3_object, version=None, **kwargs): - if version is not None: - kwargs['VersionId'] = version +def _get(client, bucket, key, version, range_string): try: - return s3_object.get(**kwargs) + if version: + return client.get_object(Bucket=bucket, Key=key, VersionId=version, Range=range_string) + else: + return client.get_object(Bucket=bucket, Key=key, Range=range_string) except botocore.client.ClientError as error: wrapped_error = IOError( 'unable to access bucket: %r key: %r version: %r error: %s' % ( - s3_object.bucket_name, s3_object.key, version, error + bucket, key, version, error ) ) wrapped_error.backend_error = error @@ -322,16 +326,19 @@ class _SeekableRawReader(object): def __init__( self, - s3_object, + client, + bucket, + key, version_id=None, - object_kwargs=None, ): - self._object = s3_object - self._content_length = None + self._client = client + self._bucket = bucket + self._key = key self._version_id = version_id + + self._content_length = None self._position = 0 self._body = None - self._object_kwargs = object_kwargs if object_kwargs else {} def seek(self, offset, whence=constants.WHENCE_START): """Seek to the specified position. @@ -401,10 +408,11 @@ def _open_body(self, start=None, stop=None): try: # Optimistically try to fetch the requested content range. response = _get( - self._object, - version=self._version_id, - Range=range_string, - **self._object_kwargs + self._client, + self._bucket, + self._key, + self._version_id, + range_string, ) except IOError as ioe: # Handle requested content range exceeding content size. @@ -478,43 +486,23 @@ def read(self, size=-1): raise IOError('%s: failed to read %d bytes after %d attempts' % (self, size, attempt)) def __str__(self): - return 'smart_open.s3._SeekableReader(%r, %r)' % ( - self._object.bucket_name, - self._object.key, - ) + return 'smart_open.s3._SeekableReader(%r, %r)' % (self._bucket, self._key) -def _initialize_boto3(rw, session, resource, resource_kwargs): +def _initialize_boto3(rw, client, client_kwargs, bucket, key): """Created the required objects for accessing S3. Ideally, they have - been already created for us and we can just reuse them. - - We only really need one thing: the resource. There are multiple ways of - getting one, in order of effort: + been already created for us and we can just reuse them.""" + if client_kwargs is None: + client_kwargs = {} - 1) Directly from the user - 2) From the session directly specified by the user - 3) From an entirely new session + if client is None: + init_kwargs = client_kwargs.get('S3.Client', {}) + client = boto3.client('s3', **init_kwargs) + assert client - Once we have the resource, we no longer need the session. - """ - if resource_kwargs is None: - resource_kwargs = {} - - if resource: - if session: - logger.warning('ignoring session because resource was passed explicitly') - if resource_kwargs: - logger.warning('ignoring resource_kwargs because resource was passed explicitly') - rw._session = None - rw._resource = resource - elif session: - rw._session = session - rw._resource = rw._session.resource('s3', **resource_kwargs) - rw._resource_kwargs = resource_kwargs - else: - rw._session = boto3.Session() - rw._resource = rw._session.resource('s3', **resource_kwargs) - rw._resource_kwargs = resource_kwargs + rw._client = _ClientWrapper(client, client_kwargs) + rw._bucket = bucket + rw._key = key class Reader(io.BufferedIOBase): @@ -529,29 +517,20 @@ def __init__( version_id=None, buffer_size=DEFAULT_BUFFER_SIZE, line_terminator=constants.BINARY_NEWLINE, - session=None, - resource=None, - resource_kwargs=None, - object_kwargs=None, defer_seek=False, + client=None, + client_kwargs=None, ): + self._version_id = version_id self._buffer_size = buffer_size - if resource_kwargs is None: - resource_kwargs = {} - if object_kwargs is None: - object_kwargs = {} - - _initialize_boto3(self, session, resource, resource_kwargs) - - self._object_kwargs = object_kwargs - self._object = self._resource.Object(bucket, key) - self._version_id = version_id + _initialize_boto3(self, client, client_kwargs, bucket, key) self._raw_reader = _SeekableRawReader( - self._object, + self._client, + bucket, + key, self._version_id, - self._object_kwargs, ) self._current_pos = 0 self._buffer = smart_open.bytebuffer.ByteBuffer(buffer_size) @@ -572,7 +551,7 @@ def __init__( def close(self): """Flush and close this stream.""" - self._object = None + pass def readable(self): """Return True if the stream can be read from.""" @@ -678,7 +657,7 @@ def terminate(self): """Do nothing.""" pass - def to_boto3(self): + def to_boto3(self, resource=None): """Create an **independent** `boto3.s3.Object` instance that points to the same resource as this instance. @@ -687,13 +666,13 @@ def to_boto3(self): `boto3.s3.Object` may not necessarily affect the current instance. """ + if resource is None: + resource = boto3.resource('s3') + obj = resource.Object(self._bucket, self._key) if self._version_id is not None: - return self._resource.Object( - self._object.bucket_name, - self._object.key, - ).Version(self._version_id) + return obj.Version(self._version_id) else: - return self._resource.Object(self._object.bucket_name, self._object.key) + return obj # # Internal methods. @@ -714,9 +693,7 @@ def _fill_buffer(self, size=-1): self._eof = True def __str__(self): - return "smart_open.s3.Reader(%r, %r)" % ( - self._object.bucket_name, self._object.key - ) + return "smart_open.s3.Reader(%r, %r)" % (self._bucket, self._key) def __repr__(self): return ( @@ -725,17 +702,13 @@ def __repr__(self): "key=%r, " "version_id=%r, " "buffer_size=%r, " - "line_terminator=%r, " - "session=%r, " - "resource_kwargs=%r)" + "line_terminator=%r)" ) % ( - self._object.bucket_name, - self._object.key, + self._bucket, + self._key, self._version_id, self._buffer_size, self._line_terminator, - self._session, - self._resource_kwargs, ) @@ -749,28 +722,24 @@ def __init__( bucket, key, min_part_size=DEFAULT_MIN_PART_SIZE, - session=None, - resource=None, - resource_kwargs=None, - upload_kwargs=None, + client=None, + client_kwargs=None, writebuffer=None, ): if min_part_size < MIN_MIN_PART_SIZE: logger.warning("S3 requires minimum part size >= 5MB; \ multipart upload may fail") + self._min_part_size = min_part_size - _initialize_boto3(self, session, resource, resource_kwargs) - - if upload_kwargs is None: - upload_kwargs = {} - - self._upload_kwargs = upload_kwargs + _initialize_boto3(self, client, client_kwargs, bucket, key) try: - self._object = self._resource.Object(bucket, key) - self._min_part_size = min_part_size - partial = functools.partial(self._object.initiate_multipart_upload, **self._upload_kwargs) - self._mp = _retry_if_failed(partial) + partial = functools.partial( + self._client.create_multipart_upload, + Bucket=bucket, + Key=key, + ) + self._upload_id = _retry_if_failed(partial)['UploadId'] except botocore.client.ClientError as error: raise ValueError( 'the bucket %r does not exist, or is forbidden for access (%r)' % ( @@ -802,11 +771,17 @@ def close(self): if self._buf.tell(): self._upload_next_part() - if self._total_bytes and self._mp: - partial = functools.partial(self._mp.complete, MultipartUpload={'Parts': self._parts}) + if self._total_bytes and self._upload_id: + partial = functools.partial( + self._client.complete_multipart_upload, + Bucket=self._bucket, + Key=self._key, + UploadId=self._upload_id, + MultipartUpload={'Parts': self._parts}, + ) _retry_if_failed(partial) logger.debug('%s: completed multipart upload', self) - elif self._mp: + elif self._upload_id: # # AWS complains with "The XML you provided was not well-formed or # did not validate against our published schema" when the input is @@ -814,15 +789,23 @@ def close(self): # # We work around this by creating an empty file explicitly. # - assert self._mp, "no multipart upload in progress" - self._mp.abort() - self._object.put(Body=b'') + assert self._upload_id, "no multipart upload in progress" + self._client.abort_multipart_upload( + Bucket=self._bucket, + Key=self._key, + UploadId=self._upload_id, + ) + self._client.put_object( + Bucket=self._bucket, + Key=self._key, + Body=b'', + ) logger.debug('%s: wrote 0 bytes to imitate multipart upload', self) - self._mp = None + self._upload_id = None @property def closed(self): - return self._mp is None + return self._upload_id is None def writable(self): """Return True if the stream supports writing.""" @@ -871,11 +854,15 @@ def write(self, b): def terminate(self): """Cancel the underlying multipart upload.""" - assert self._mp, "no multipart upload in progress" - self._mp.abort() - self._mp = None + assert self._upload_id, "no multipart upload in progress" + self._client.abort_multipart_upload( + Bucket=self._bucket, + Key=self._key, + UploadId=self._upload_id, + ) + self._upload_id = None - def to_boto3(self): + def to_boto3(self, resource=None): """Create an **independent** `boto3.s3.Object` instance that points to the same resource as this instance. @@ -884,7 +871,9 @@ def to_boto3(self): `boto3.s3.Object` may not necessary affect the current instance. """ - return self._resource.Object(self._object.bucket_name, self._object.key) + if not resource: + resource = boto3.resource('s3') + return resource.Object(self._bucket, self._key) # # Internal methods. @@ -899,7 +888,6 @@ def _upload_next_part(self): self._total_bytes / 1024.0 ** 3, ) self._buf.seek(0) - part = self._mp.Part(part_num) # # Network problems in the middle of an upload are particularly @@ -907,7 +895,16 @@ def _upload_next_part(self): # of a temporary connection problem, so this part needs to be # especially robust. # - upload = _retry_if_failed(functools.partial(part.upload, Body=self._buf)) + upload = _retry_if_failed( + functools.partial( + self._client.upload_part, + Bucket=self._bucket, + Key=self._key, + UploadId=self._upload_id, + PartNumber=part_num, + Body=self._buf, + ) + ) self._parts.append({'ETag': upload['ETag'], 'PartNumber': part_num}) logger.debug("%s: upload of part_num #%i finished", self, part_num) @@ -927,21 +924,13 @@ def __exit__(self, exc_type, exc_val, exc_tb): self.close() def __str__(self): - return "smart_open.s3.MultipartWriter(%r, %r)" % ( - self._object.bucket_name, self._object.key, - ) + return "smart_open.s3.MultipartWriter(%r, %r)" % (self._bucket, self._key) def __repr__(self): - return ( - "smart_open.s3.MultipartWriter(bucket=%r, key=%r, " - "min_part_size=%r, session=%r, resource_kwargs=%r, upload_kwargs=%r)" - ) % ( - self._object.bucket_name, - self._object.key, + return "smart_open.s3.MultipartWriter(bucket=%r, key=%r, min_part_size=%r)" % ( + self._bucket, + self._key, self._min_part_size, - self._session, - self._resource_kwargs, - self._upload_kwargs, ) @@ -957,22 +946,14 @@ def __init__( self, bucket, key, - session=None, - resource=None, - resource_kwargs=None, - upload_kwargs=None, + client=None, + client_kwargs=None, writebuffer=None, ): - _initialize_boto3(self, session, resource, resource_kwargs) - - if upload_kwargs is None: - upload_kwargs = {} - - self._upload_kwargs = upload_kwargs + _initialize_boto3(self, client, client_kwargs, bucket, key) try: - self._object = self._resource.Object(bucket, key) - self._resource.meta.client.head_bucket(Bucket=bucket) + self._client.head_bucket(Bucket=bucket) except botocore.client.ClientError as e: raise ValueError('the bucket %r does not exist, or is forbidden for access' % bucket) from e @@ -1001,10 +982,14 @@ def close(self): self._buf.seek(0) try: - self._object.put(Body=self._buf, **self._upload_kwargs) + self._client.put_object( + Bucket=self._bucket, + Key=self._key, + Body=self._buf, + ) except botocore.client.ClientError as e: raise ValueError( - 'the bucket %r does not exist, or is forbidden for access' % self._object.bucket_name) from e + 'the bucket %r does not exist, or is forbidden for access' % self._bucket) from e logger.debug("%s: direct upload finished", self) self._buf = None @@ -1072,16 +1057,7 @@ def __str__(self): return "smart_open.s3.SinglepartWriter(%r, %r)" % (self._object.bucket_name, self._object.key) def __repr__(self): - return ( - "smart_open.s3.SinglepartWriter(bucket=%r, key=%r, session=%r, " - "resource_kwargs=%r, upload_kwargs=%r)" - ) % ( - self._object.bucket_name, - self._object.key, - self._session, - self._resource_kwargs, - self._upload_kwargs, - ) + return "smart_open.s3.SinglepartWriter(bucket=%r, key=%r)" % (self._bucket, self._key) def _retry_if_failed( @@ -1106,13 +1082,6 @@ def _retry_if_failed( raise IOError('Unable to connect to the endpoint after %d attempts' % attempts) -# -# For backward compatibility -# -SeekableBufferedInputBase = Reader -BufferedOutputBase = MultipartWriter - - def _accept_all(key): return True diff --git a/smart_open/tests/test_s3.py b/smart_open/tests/test_s3.py index d571070b..6e1ba228 100644 --- a/smart_open/tests/test_s3.py +++ b/smart_open/tests/test_s3.py @@ -137,20 +137,19 @@ def mock_make_request(self, operation_model, *args, **kwargs): 'The test case needs a Moto server running on the local 5000 port.' ) class SeekableRawReaderTest(unittest.TestCase): - def setUp(self): self._body = b'123456' self._local_resource = boto3.resource('s3', endpoint_url='http://localhost:5000') self._local_resource.Bucket(BUCKET_NAME).create() self._local_resource.Object(BUCKET_NAME, KEY_NAME).put(Body=self._body) + self._local_client = boto3.client('s3', endpoint_url='http://localhost:5000') def tearDown(self): self._local_resource.Object(BUCKET_NAME, KEY_NAME).delete() self._local_resource.Bucket(BUCKET_NAME).delete() def test_read_from_a_closed_body(self): - obj = self._local_resource.Object(BUCKET_NAME, KEY_NAME) - reader = smart_open.s3._SeekableRawReader(obj) + reader = smart_open.s3._SeekableRawReader(self._local_client, BUCKET_NAME, KEY_NAME) self.assertEqual(reader.read(1), b'1') reader._body.close() self.assertEqual(reader.read(2), b'23') @@ -171,13 +170,12 @@ def read(self, size=-1): return the_bytes -class CrapObject: +class CrapClient: def __init__(self, data, modulus=2): self._datasize = len(data) self._body = CrapStream(data, modulus=modulus) - self.bucket_name, self.key = 'crap', 'object' - def get(self, *args, **kwargs): + def get_object(self, *args, **kwargs): return { 'ActualObjectSize': self._datasize, 'ContentLength': self._datasize, @@ -189,7 +187,7 @@ def get(self, *args, **kwargs): class IncrementalBackoffTest(unittest.TestCase): def test_every_read_fails(self): - reader = smart_open.s3._SeekableRawReader(CrapObject(b'hello', 1)) + reader = smart_open.s3._SeekableRawReader(CrapClient(b'hello', 1), 'bucket', 'key') with mock.patch('time.sleep') as mock_sleep: with self.assertRaises(IOError): reader.read() @@ -201,7 +199,7 @@ def test_every_read_fails(self): def test_every_second_read_fails(self): """Can we read from a stream that raises exceptions from time to time?""" - reader = smart_open.s3._SeekableRawReader(CrapObject(b'hello')) + reader = smart_open.s3._SeekableRawReader(CrapClient(b'hello'), 'bucket', 'key') with mock.patch('time.sleep') as mock_sleep: assert reader.read(1) == b'h' mock_sleep.assert_not_called() @@ -223,7 +221,7 @@ def test_every_second_read_fails(self): @moto.mock_s3 -class SeekableBufferedInputBaseTest(BaseTest): +class ReaderTest(BaseTest): def setUp(self): # lower the multipart upload size, to speed up these tests self.old_min_part_size = smart_open.s3.DEFAULT_MIN_PART_SIZE @@ -245,7 +243,7 @@ def test_iter(self): # connect to fake s3 and read from the fake key we filled above with self.assertApiCalls(GetObject=1): - fin = smart_open.s3.SeekableBufferedInputBase(BUCKET_NAME, KEY_NAME) + fin = smart_open.s3.Reader(BUCKET_NAME, KEY_NAME) output = [line.rstrip(b'\n') for line in fin] self.assertEqual(output, expected.split(b'\n')) @@ -254,7 +252,7 @@ def test_iter_context_manager(self): expected = u"hello wořld\nhow are you?".encode('utf8') put_to_bucket(contents=expected) with self.assertApiCalls(GetObject=1): - with smart_open.s3.SeekableBufferedInputBase(BUCKET_NAME, KEY_NAME) as fin: + with smart_open.s3.Reader(BUCKET_NAME, KEY_NAME) as fin: output = [line.rstrip(b'\n') for line in fin] self.assertEqual(output, expected.split(b'\n')) @@ -265,7 +263,7 @@ def test_read(self): logger.debug('content: %r len: %r', content, len(content)) with self.assertApiCalls(GetObject=1): - fin = smart_open.s3.SeekableBufferedInputBase(BUCKET_NAME, KEY_NAME) + fin = smart_open.s3.Reader(BUCKET_NAME, KEY_NAME) self.assertEqual(content[:6], fin.read(6)) self.assertEqual(content[6:14], fin.read(8)) # ř is 2 bytes self.assertEqual(content[14:], fin.read()) # read the rest @@ -276,7 +274,7 @@ def test_seek_beginning(self): put_to_bucket(contents=content) with self.assertApiCalls(GetObject=1): - fin = smart_open.s3.SeekableBufferedInputBase(BUCKET_NAME, KEY_NAME) + fin = smart_open.s3.Reader(BUCKET_NAME, KEY_NAME) self.assertEqual(content[:6], fin.read(6)) self.assertEqual(content[6:14], fin.read(8)) # ř is 2 bytes @@ -294,7 +292,7 @@ def test_seek_start(self): put_to_bucket(contents=content) with self.assertApiCalls(GetObject=1): - fin = smart_open.s3.SeekableBufferedInputBase(BUCKET_NAME, KEY_NAME, defer_seek=True) + fin = smart_open.s3.Reader(BUCKET_NAME, KEY_NAME, defer_seek=True) seek = fin.seek(6) self.assertEqual(seek, 6) self.assertEqual(fin.tell(), 6) @@ -306,7 +304,7 @@ def test_seek_current(self): put_to_bucket(contents=content) with self.assertApiCalls(GetObject=1): - fin = smart_open.s3.SeekableBufferedInputBase(BUCKET_NAME, KEY_NAME) + fin = smart_open.s3.Reader(BUCKET_NAME, KEY_NAME) self.assertEqual(fin.read(5), b'hello') with self.assertApiCalls(GetObject=1): @@ -320,7 +318,7 @@ def test_seek_end(self): put_to_bucket(contents=content) with self.assertApiCalls(GetObject=1): - fin = smart_open.s3.SeekableBufferedInputBase(BUCKET_NAME, KEY_NAME, defer_seek=True) + fin = smart_open.s3.Reader(BUCKET_NAME, KEY_NAME, defer_seek=True) seek = fin.seek(-4, whence=smart_open.constants.WHENCE_END) self.assertEqual(seek, len(content) - 4) self.assertEqual(fin.read(), b'you?') @@ -339,7 +337,7 @@ def test_detect_eof(self): put_to_bucket(contents=content) with self.assertApiCalls(GetObject=1): - fin = smart_open.s3.SeekableBufferedInputBase(BUCKET_NAME, KEY_NAME) + fin = smart_open.s3.Reader(BUCKET_NAME, KEY_NAME) fin.read() eof = fin.tell() self.assertEqual(eof, len(content)) @@ -359,7 +357,7 @@ def test_read_gzip(self): # # Make sure we're reading things correctly. # - with smart_open.s3.SeekableBufferedInputBase(BUCKET_NAME, KEY_NAME) as fin: + with smart_open.s3.Reader(BUCKET_NAME, KEY_NAME) as fin: self.assertEqual(fin.read(), buf.getvalue()) # @@ -371,7 +369,7 @@ def test_read_gzip(self): logger.debug('starting actual test') with self.assertApiCalls(GetObject=1): - with smart_open.s3.SeekableBufferedInputBase(BUCKET_NAME, KEY_NAME) as fin: + with smart_open.s3.Reader(BUCKET_NAME, KEY_NAME) as fin: with gzip.GzipFile(fileobj=fin) as zipfile: actual = zipfile.read() @@ -382,7 +380,7 @@ def test_readline(self): put_to_bucket(contents=content) with self.assertApiCalls(GetObject=2): - with smart_open.s3.SeekableBufferedInputBase(BUCKET_NAME, KEY_NAME) as fin: + with smart_open.s3.Reader(BUCKET_NAME, KEY_NAME) as fin: fin.readline() self.assertEqual(fin.tell(), content.index(b'\n')+1) @@ -398,7 +396,7 @@ def test_readline_tiny_buffer(self): put_to_bucket(contents=content) with self.assertApiCalls(GetObject=1): - with smart_open.s3.SeekableBufferedInputBase(BUCKET_NAME, KEY_NAME, buffer_size=8) as fin: + with smart_open.s3.Reader(BUCKET_NAME, KEY_NAME, buffer_size=8) as fin: actual = list(fin) expected = [b'englishman\n', b'in\n', b'new\n', b'york\n'] @@ -410,7 +408,7 @@ def test_read0_does_not_return_data(self): with self.assertApiCalls(): # set defer_seek to verify that read(0) doesn't trigger an unnecessary API call - with smart_open.s3.SeekableBufferedInputBase(BUCKET_NAME, KEY_NAME, defer_seek=True) as fin: + with smart_open.s3.Reader(BUCKET_NAME, KEY_NAME, defer_seek=True) as fin: data = fin.read(0) self.assertEqual(data, b'') @@ -421,7 +419,7 @@ def test_to_boto3(self): with self.assertApiCalls(): # set defer_seek to verify that to_boto3() doesn't trigger an unnecessary API call - with smart_open.s3.SeekableBufferedInputBase(BUCKET_NAME, KEY_NAME, defer_seek=True) as fin: + with smart_open.s3.Reader(BUCKET_NAME, KEY_NAME, defer_seek=True) as fin: returned_obj = fin.to_boto3() boto3_body = returned_obj.get()['Body'].read() @@ -440,12 +438,12 @@ def test_defer_seek(self): put_to_bucket(contents=content) with self.assertApiCalls(): - fin = smart_open.s3.SeekableBufferedInputBase(BUCKET_NAME, KEY_NAME, defer_seek=True) + fin = smart_open.s3.Reader(BUCKET_NAME, KEY_NAME, defer_seek=True) with self.assertApiCalls(GetObject=1): self.assertEqual(fin.read(), content) with self.assertApiCalls(): - fin = smart_open.s3.SeekableBufferedInputBase(BUCKET_NAME, KEY_NAME, defer_seek=True) + fin = smart_open.s3.Reader(BUCKET_NAME, KEY_NAME, defer_seek=True) with self.assertApiCalls(GetObject=1): fin.seek(10) self.assertEqual(fin.read(), content[10:]) @@ -544,7 +542,7 @@ def test_gzip(self): with gzip.GzipFile(fileobj=fout, mode='w') as zipfile: zipfile.write(expected) - with smart_open.s3.SeekableBufferedInputBase(BUCKET_NAME, WRITE_KEY_NAME) as fin: + with smart_open.s3.Reader(BUCKET_NAME, WRITE_KEY_NAME) as fin: with gzip.GzipFile(fileobj=fin) as zipfile: actual = zipfile.read() @@ -966,8 +964,8 @@ def test_failure(self): @moto.mock_s3() -def test_resource_propagation_singlepart(): - """Does the resource parameter make it from the caller to Boto3?""" +def test_client_propagation_singlepart(): + """Does the client parameter make it from the caller to Boto3?""" # # Not sure why we need to create the bucket here, as setUpModule should # have done that for us by now. @@ -977,34 +975,38 @@ def test_resource_propagation_singlepart(): bucket = resource.create_bucket(Bucket=BUCKET_NAME) bucket.wait_until_exists() + client = session.client('s3') + with smart_open.s3.open( BUCKET_NAME, WRITE_KEY_NAME, mode='wb', - resource=resource, + client=client, multipart_upload=False, ) as writer: - assert writer._resource == resource - assert id(writer._resource) == id(resource) + assert writer._client.client == client + assert id(writer._client.client) == id(client) @moto.mock_s3() -def test_resource_propagation_multipart(): +def test_client_propagation_multipart(): """Does the resource parameter make it from the caller to Boto3?""" session = boto3.Session() resource = session.resource('s3') bucket = resource.create_bucket(Bucket=BUCKET_NAME) bucket.wait_until_exists() + client = session.client('s3') + with smart_open.s3.open( BUCKET_NAME, WRITE_KEY_NAME, mode='wb', - resource=resource, + client=client, multipart_upload=True, ) as writer: - assert writer._resource == resource - assert id(writer._resource) == id(resource) + assert writer._client.client == client + assert id(writer._client.client) == id(client) @moto.mock_s3() @@ -1015,12 +1017,14 @@ def test_resource_propagation_reader(): bucket = resource.create_bucket(Bucket=BUCKET_NAME) bucket.wait_until_exists() + client = session.client('s3') + with smart_open.s3.open(BUCKET_NAME, WRITE_KEY_NAME, mode='wb') as writer: writer.write(b'hello world') - with smart_open.s3.open(BUCKET_NAME, WRITE_KEY_NAME, mode='rb', resource=resource) as reader: - assert reader._resource == resource - assert id(reader._resource) == id(resource) + with smart_open.s3.open(BUCKET_NAME, WRITE_KEY_NAME, mode='rb', client=client) as reader: + assert reader._client.client == client + assert id(reader._client.client) == id(client) if __name__ == '__main__': diff --git a/smart_open/tests/test_smart_open.py b/smart_open/tests/test_smart_open.py index c455445a..06045443 100644 --- a/smart_open/tests/test_smart_open.py +++ b/smart_open/tests/test_smart_open.py @@ -21,9 +21,9 @@ import boto3 import mock from moto import mock_s3 -import responses import parameterizedtestcase import pytest +import responses import smart_open from smart_open import smart_open_lib @@ -1169,72 +1169,54 @@ def test_s3_tell(self): class SmartOpenS3KwargsTest(unittest.TestCase): - @mock.patch('boto3.Session') - def test_no_kwargs(self, mock_session): + @mock.patch('boto3.client') + def test_no_kwargs(self, mock_client): smart_open.open('s3://mybucket/mykey', transport_params=dict(defer_seek=True)) - mock_session.return_value.resource.assert_called_with('s3') + mock_client.assert_called_with('s3') - @mock.patch('boto3.Session') - def test_credentials(self, mock_session): + @mock.patch('boto3.client') + def test_credentials(self, mock_client): smart_open.open('s3://access_id:access_secret@mybucket/mykey', transport_params=dict(defer_seek=True)) - mock_session.assert_called_with(aws_access_key_id='access_id', aws_secret_access_key='access_secret') - mock_session.return_value.resource.assert_called_with('s3') - - @mock.patch('boto3.Session') - def test_host(self, mock_session): - transport_params = {'resource_kwargs': {'endpoint_url': 'http://aa.domain.com'}, 'defer_seek': True} - smart_open.open("s3://access_id:access_secret@mybucket/mykey", transport_params=transport_params) - mock_session.assert_called_with( + mock_client.assert_called_with( + 's3', aws_access_key_id='access_id', aws_secret_access_key='access_secret', ) - mock_session.return_value.resource.assert_called_with( + + @mock.patch('boto3.client') + def test_host(self, mock_client): + tp = { + 'client_kwargs': { + 'S3.Client': {'endpoint_url': 'http://aa.domain.com'}, + }, + 'defer_seek': True, + } + smart_open.open("s3://access_id:access_secret@mybucket/mykey", transport_params=tp) + mock_client.assert_called_with( 's3', + aws_access_key_id='access_id', + aws_secret_access_key='access_secret', endpoint_url='http://aa.domain.com', ) - @mock.patch('boto3.Session') - def test_s3_upload(self, mock_session): - smart_open.open( - "s3://bucket/key", 'wb', transport_params={ - 'multipart_upload_kwargs': { + @mock.patch('boto3.client') + def test_s3_upload(self, mock_client): + tp = { + 'client_kwargs': { + 'S3.Client.create_multipart_upload': { 'ServerSideEncryption': 'AES256', 'ContentType': 'application/json', } } - ) - - # Locate the s3.Object instance (mock) - s3_resource = mock_session.return_value.resource.return_value - s3_object = s3_resource.Object.return_value - - # Check that `initiate_multipart_upload` was called - # with the desired args - s3_object.initiate_multipart_upload.assert_called_with( + } + smart_open.open("s3://bucket/key", 'wb', transport_params=tp) + mock_client.return_value.create_multipart_upload.assert_called_with( + Bucket='bucket', + Key='key', ServerSideEncryption='AES256', - ContentType='application/json' + ContentType='application/json', ) - def test_session_read_mode(self): - """ - Read stream should use a custom boto3.Session - """ - session = boto3.Session() - session.resource = mock.MagicMock() - - smart_open.open('s3://bucket/key', transport_params={'session': session, 'defer_seek': True}) - session.resource.assert_called_with('s3') - - def test_session_write_mode(self): - """ - Write stream should use a custom boto3.Session - """ - session = boto3.Session() - session.resource = mock.MagicMock() - - smart_open.open('s3://bucket/key', 'wb', transport_params={'session': session}) - session.resource.assert_called_with('s3') - class SmartOpenTest(unittest.TestCase): """ @@ -1336,16 +1318,18 @@ def test_newline_csv(self): assert content == expected - @mock.patch('boto3.Session') - def test_s3_mode_mock(self, mock_session): + @mock.patch('boto3.client') + def test_s3_mode_mock(self, mock_client): """Are s3:// open modes passed correctly?""" # correct write mode, correct s3 URI - transport_params = {'resource_kwargs': {'endpoint_url': 'http://s3.amazonaws.com'}} + transport_params = { + 'client_kwargs': { + 'S3.Client': {'endpoint_url': 'http://s3.amazonaws.com'}, + } + } smart_open.open("s3://mybucket/mykey", "w", transport_params=transport_params) - mock_session.return_value.resource.assert_called_with( - 's3', endpoint_url='http://s3.amazonaws.com' - ) + mock_client.assert_called_with('s3', endpoint_url='http://s3.amazonaws.com') @mock.patch('smart_open.hdfs.subprocess') def test_hdfs(self, mock_subprocess): @@ -1395,15 +1379,19 @@ def test_s3_metadata_write(self): s3 = boto3.resource('s3') s3.create_bucket(Bucket='mybucket') - # Write data, with multipart_upload options - write_stream = smart_open.open( - 's3://mybucket/crime-and-punishment.txt.gz', 'wb', - transport_params={ - 'multipart_upload_kwargs': { + tp = { + 'client_kwargs': { + 'S3.Client.create_multipart_upload': { 'ContentType': 'text/plain', 'ContentEncoding': 'gzip', } } + } + + # Write data, with multipart_upload options + write_stream = smart_open.open( + 's3://mybucket/crime-and-punishment.txt.gz', 'wb', + transport_params=tp, ) with write_stream as fout: fout.write(data) @@ -1801,31 +1789,43 @@ def test_write_text_gzip(self): @mock.patch('smart_open.s3.Reader') def test_transport_params_is_not_mutable(self, mock_open): smart_open.open('s3://access_key:secret_key@host@bucket/key') - smart_open.open('s3://bucket/key') + actual = mock_open.call_args_list[0][1]['client_kwargs'] + expected = { + 'S3.Client': { + 'aws_access_key_id': 'access_key', + 'aws_secret_access_key': 'secret_key', + 'endpoint_url': 'https://host:443', + } + } + assert actual == expected - # - # The first call should have a non-null session, because the session - # keys were explicitly specified in the URL. The second call should - # _not_ have a session. - # - self.assertIsNone(mock_open.call_args_list[1][1]['session']) - self.assertIsNotNone(mock_open.call_args_list[0][1]['session']) + smart_open.open('s3://bucket/key') + actual = mock_open.call_args_list[1][1].get('client_kwargs') + assert actual is None @mock.patch('smart_open.s3.Reader') def test_respects_endpoint_url_read(self, mock_open): url = 's3://key_id:secret_key@play.min.io:9000@smart-open-test/README.rst' smart_open.open(url) - expected = {'endpoint_url': 'https://play.min.io:9000'} - self.assertEqual(mock_open.call_args[1]['resource_kwargs'], expected) + expected = { + 'aws_access_key_id': 'key_id', + 'aws_secret_access_key': 'secret_key', + 'endpoint_url': 'https://play.min.io:9000', + } + self.assertEqual(mock_open.call_args[1]['client_kwargs']['S3.Client'], expected) @mock.patch('smart_open.s3.MultipartWriter') def test_respects_endpoint_url_write(self, mock_open): url = 's3://key_id:secret_key@play.min.io:9000@smart-open-test/README.rst' smart_open.open(url, 'wb') - expected = {'endpoint_url': 'https://play.min.io:9000'} - self.assertEqual(mock_open.call_args[1]['resource_kwargs'], expected) + expected = { + 'aws_access_key_id': 'key_id', + 'aws_secret_access_key': 'secret_key', + 'endpoint_url': 'https://play.min.io:9000', + } + self.assertEqual(mock_open.call_args[1]['client_kwargs']['S3.Client'], expected) def function(a, b, c, foo='bar', baz='boz'):