Skip to content

Commit

Permalink
refactor s3 submodule to minimize resource usage (#569)
Browse files Browse the repository at this point in the history
* refactor s3 submodule to minimize resource usage

creating sessions and resources costs time and memory

* git add benchmark/read_s3.py

$ time python benchmark/read_s3.py < benchmark/urls.txt
real    1m20.786s
user    0m8.619s
sys     0m0.894s

$ time python benchmark/read_s3.py create_session < benchmark/urls.txt
real    1m45.826s
user    0m4.554s
sys     0m0.149s

$ time python benchmark/read_s3.py create_resource < benchmark/urls.txt
real    0m22.046s
user    0m1.474s
sys     0m0.065s

$ time python benchmark/read_s3.py create_session_and_resource < benchmark/urls.txt
real    0m21.086s
user    0m1.496s
sys     0m0.073s

* fixup

* Update smart_open/s3.py

Co-authored-by: Radim Řehůřek <radimrehurek@seznam.cz>

* Update read_s3.py

* update howto

* update howto.md

Co-authored-by: Radim Řehůřek <radimrehurek@seznam.cz>
  • Loading branch information
mpenkov and piskvorky authored Dec 27, 2020
1 parent 8f78878 commit 74afb2a
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 67 deletions.
18 changes: 18 additions & 0 deletions benchmark/read_s3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import sys

import boto3
import smart_open

urls = [line.strip() for line in sys.stdin]

tp = {}
if 'create_session_and_resource' in sys.argv:
tp['session'] = boto3.Session()
tp['resource'] = tp['session'].resource('s3')
elif 'create_resource' in sys.argv:
tp['resource'] = boto3.resource('s3')
elif 'create_session' in sys.argv:
tp['session'] = boto3.Session()

for url in urls:
smart_open.open(url, transport_params=tp).read()
34 changes: 33 additions & 1 deletion howto.md
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,38 @@ s3.ObjectVersion(bucket_name='smart-open-versioned', object_key='demo.txt', id='
```

## How to Read from S3 Efficiently

Under the covers, `smart_open` uses the [boto3 resource API](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/resources.html) to read from S3.
By default, calling `smart_open.open` with an S3 URL will create its own boto3 session and resource.
These are expensive operations: they require both CPU time to construct the objects from a low-level API definition, and memory to store the objects once they have been created.
It is possible to save both CPU time and memory by sharing the same resource across multiple `smart_open.open` calls, for example:

```
>>> import boto3
>>> from smart_open import open
>>> tp = {'resource': boto3.resource('s3')}
>>> for month in (1, 2, 3):
... url = 's3://nyc-tlc/trip data/yellow_tripdata_2020-%02d.csv' % month
... with open(url, transport_params=tp) as fin:
... _ = fin.readline() # skip CSV header
... print(fin.readline().strip())
1,2020-01-01 00:28:15,2020-01-01 00:33:03,1,1.20,1,N,238,239,1,6,3,0.5,1.47,0,0.3,11.27,2.5
1,2020-02-01 00:17:35,2020-02-01 00:30:32,1,2.60,1,N,145,7,1,11,0.5,0.5,2.45,0,0.3,14.75,0
1,2020-03-01 00:31:13,2020-03-01 01:01:42,1,4.70,1,N,88,255,1,22,3,0.5,2,0,0.3,27.8,2.5
```

The above sharing is safe because it is all happening in the same thread and subprocess (see below for details).

## How to Work in a Parallelized Environment

Under the covers, `smart_open` uses the [boto3 resource API](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/resources.html) to read from S3.
This API is not thread-safe or multiprocess-safe.
Do not share the same `smart_open` objects across different threads or subprocesses.
`smart_open` will create its own session and resource objects for each individual `open` call, so you don't have to worry about managing boto3 objects.
This comes at a price: each session and resource requires CPU time to create and memory to store, so be wary of keeping hundreds of threads or subprocesses reading/writing from/to S3.

## How to Specify the Request Payer (S3 only)

Some public buckets require you to [pay for S3 requests for the data in the bucket](https://docs.aws.amazon.com/AmazonS3/latest/dev/RequesterPaysBuckets.html).
Expand Down Expand Up @@ -206,7 +238,7 @@ First, install localstack and start it:
The start command is blocking, so you'll need to run it in a separate terminal session or run it in the background.
Before we can read/write, we'll need to create a bucket:

$ aws --endpoint-url http://localhost:4566 s3api create-bucket --bucket-name mybucket
$ aws --endpoint-url http://localhost:4566 s3api create-bucket --bucket mybucket

where `http://localhost:4566` is the default host/port that localstack uses to listen for requests.

Expand Down
14 changes: 6 additions & 8 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,13 @@ def _get_version():
def read(fname):
return io.open(os.path.join(os.path.dirname(__file__), fname), encoding='utf-8').read()

aws_deps = ['boto3']
gcp_deps = ['google-cloud-storage']
azure_deps = ['azure-storage-blob', 'azure-common', 'azure-core']
http_deps = ['requests']

tests_require = [
all_deps = aws_deps + gcp_deps + azure_deps + http_deps
tests_require = all_deps + [
'mock',
'moto[server]==1.3.14', # Older versions of moto appear broken
'pathlib2',
Expand All @@ -48,13 +53,6 @@ def read(fname):
'pytest-rerunfailures'
]

aws_deps = ['boto3']
gcp_deps = ['google-cloud-storage']
azure_deps = ['azure-storage-blob', 'azure-common', 'azure-core']
http_deps = ['requests']

all_deps = aws_deps + gcp_deps + azure_deps + http_deps

setup(
name='smart_open',
version=__version__,
Expand Down
153 changes: 95 additions & 58 deletions smart_open/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,20 +171,21 @@ def open_uri(uri, mode, transport_params):


def open(
bucket_id,
key_id,
mode,
version_id=None,
buffer_size=DEFAULT_BUFFER_SIZE,
min_part_size=DEFAULT_MIN_PART_SIZE,
session=None,
resource_kwargs=None,
multipart_upload_kwargs=None,
multipart_upload=True,
singlepart_upload_kwargs=None,
object_kwargs=None,
defer_seek=False,
):
bucket_id,
key_id,
mode,
version_id=None,
buffer_size=DEFAULT_BUFFER_SIZE,
min_part_size=DEFAULT_MIN_PART_SIZE,
session=None,
resource=None,
resource_kwargs=None,
multipart_upload_kwargs=None,
multipart_upload=True,
singlepart_upload_kwargs=None,
object_kwargs=None,
defer_seek=False,
):
"""Open an S3 object for reading or writing.
Parameters
Expand All @@ -201,8 +202,13 @@ def open(
The minimum part size for multipart uploads. For writing only.
session: object, optional
The S3 session to use when working with boto3.
If you don't specify this, then smart_open will create a new session for you.
resource: object, optional
The S3 resource to use when working with boto3.
If you don't specify this, then smart_open will create a new resource for you.
resource_kwargs: dict, optional
Keyword arguments to use when accessing the S3 resource for reading or writing.
Keyword arguments to use when creating the S3 resource for reading or writing.
Will be ignored if you specify the resource object explicitly.
multipart_upload_kwargs: dict, optional
Additional parameters to pass to boto3's initiate_multipart_upload function.
For writing only.
Expand Down Expand Up @@ -242,6 +248,7 @@ def open(
version_id=version_id,
buffer_size=buffer_size,
session=session,
resource=resource,
resource_kwargs=resource_kwargs,
object_kwargs=object_kwargs,
defer_seek=defer_seek,
Expand Down Expand Up @@ -420,30 +427,68 @@ def read(self, size=-1):
return binary


def _initialize_boto3(rw, session, resource, resource_kwargs):
"""Created the required objects for accessing S3. Ideally, they have
been already created for us and we can just reuse them.
We only really need one thing: the resource. There are multiple ways of
getting one, in order of effort:
1) Directly from the user
2) From the session directly specified by the user
3) From an entirely new session
Once we have the resource, we no longer need the session.
"""
if resource_kwargs is None:
resource_kwargs = {}

if resource:
if session:
logger.warning('ignoring session because resource was passed explicitly')
if resource_kwargs:
logger.warning('ignoring resource_kwargs because resource was passed explicitly')
rw._session = None
rw._resource = resource
elif session:
rw._session = session
rw._resource = rw._session.resource('s3', **resource_kwargs)
rw._resource_kwargs = resource_kwargs
else:
rw._session = boto3.Session()
rw._resource = rw._session.resource('s3', **resource_kwargs)
rw._resource_kwargs = resource_kwargs


class Reader(io.BufferedIOBase):
"""Reads bytes from S3.
Implements the io.BufferedIOBase interface of the standard library."""

def __init__(self, bucket, key, version_id=None, buffer_size=DEFAULT_BUFFER_SIZE,
line_terminator=constants.BINARY_NEWLINE, session=None, resource_kwargs=None,
object_kwargs=None, defer_seek=False):

def __init__(
self,
bucket,
key,
version_id=None,
buffer_size=DEFAULT_BUFFER_SIZE,
line_terminator=constants.BINARY_NEWLINE,
session=None,
resource=None,
resource_kwargs=None,
object_kwargs=None,
defer_seek=False,
):
self._buffer_size = buffer_size

if session is None:
session = boto3.Session()
if resource_kwargs is None:
resource_kwargs = {}
if object_kwargs is None:
object_kwargs = {}

self._session = session
self._resource_kwargs = resource_kwargs
self._object_kwargs = object_kwargs
_initialize_boto3(self, session, resource, resource_kwargs)

s3 = session.resource('s3', **resource_kwargs)
self._object = s3.Object(bucket, key)
self._object_kwargs = object_kwargs
self._object = self._resource.Object(bucket, key)
self._version_id = version_id

self._raw_reader = _SeekableRawReader(
Expand Down Expand Up @@ -586,14 +631,16 @@ def to_boto3(self):
The created instance will re-use the session and resource parameters of
the current instance, but it will be independent: changes to the
`boto3.s3.Object` may not necessary affect the current instance.
`boto3.s3.Object` may not necessarily affect the current instance.
"""
s3 = self._session.resource('s3', **self._resource_kwargs)
if self._version_id is not None:
return s3.Object(self._object.bucket_name, self._object.key).Version(self._version_id)
return self._resource.Object(
self._object.bucket_name,
self._object.key,
).Version(self._version_id)
else:
return s3.Object(self._object.bucket_name, self._object.key)
return self._resource.Object(self._object.bucket_name, self._object.key)

#
# Internal methods.
Expand Down Expand Up @@ -647,32 +694,28 @@ class MultipartWriter(io.BufferedIOBase):
Implements the io.BufferedIOBase interface of the standard library."""

def __init__(
self,
bucket,
key,
min_part_size=DEFAULT_MIN_PART_SIZE,
session=None,
resource_kwargs=None,
upload_kwargs=None,
):
self,
bucket,
key,
min_part_size=DEFAULT_MIN_PART_SIZE,
session=None,
resource=None,
resource_kwargs=None,
upload_kwargs=None,
):
if min_part_size < MIN_MIN_PART_SIZE:
logger.warning("S3 requires minimum part size >= 5MB; \
multipart upload may fail")

if session is None:
session = boto3.Session()
if resource_kwargs is None:
resource_kwargs = {}
_initialize_boto3(self, session, resource, resource_kwargs)

if upload_kwargs is None:
upload_kwargs = {}

self._session = session
self._resource_kwargs = resource_kwargs
self._upload_kwargs = upload_kwargs

s3 = session.resource('s3', **resource_kwargs)
try:
self._object = s3.Object(bucket, key)
self._object = self._resource.Object(bucket, key)
self._min_part_size = min_part_size
partial = functools.partial(self._object.initiate_multipart_upload, **self._upload_kwargs)
self._mp = _retry_if_failed(partial)
Expand Down Expand Up @@ -773,8 +816,7 @@ def to_boto3(self):
`boto3.s3.Object` may not necessary affect the current instance.
"""
s3 = self._session.resource('s3', **self._resource_kwargs)
return s3.Object(self._object.bucket_name, self._object.key)
return self._resource.Object(self._object.bucket_name, self._object.key)

#
# Internal methods.
Expand Down Expand Up @@ -841,26 +883,21 @@ def __init__(
bucket,
key,
session=None,
resource=None,
resource_kwargs=None,
upload_kwargs=None,
):

self._session = session
self._resource_kwargs = resource_kwargs
_initialize_boto3(self, session, resource, resource_kwargs)

if session is None:
session = boto3.Session()
if resource_kwargs is None:
resource_kwargs = {}
if upload_kwargs is None:
upload_kwargs = {}

self._upload_kwargs = upload_kwargs

s3 = session.resource('s3', **resource_kwargs)
try:
self._object = s3.Object(bucket, key)
s3.meta.client.head_bucket(Bucket=bucket)
self._object = self._resource.Object(bucket, key)
self._resource.meta.client.head_bucket(Bucket=bucket)
except botocore.client.ClientError as e:
raise ValueError('the bucket %r does not exist, or is forbidden for access' % bucket) from e

Expand Down Expand Up @@ -1123,7 +1160,7 @@ def _download_key(key_name, bucket_name=None, retries=3, **session_kwargs):
raise ValueError('bucket_name may not be None')

#
# https://geekpete.com/blog/multithreading-boto3/
# https://boto3.amazonaws.com/v1/documentation/api/latest/guide/resources.html#multithreading-and-multiprocessing
#
session = boto3.session.Session(**session_kwargs)
s3 = session.resource('s3')
Expand Down

0 comments on commit 74afb2a

Please sign in to comment.