From 04d75e135420893ca05a7b4d0c5cea6e3708b11a Mon Sep 17 00:00:00 2001 From: Adrian Partl Date: Sun, 8 Dec 2019 21:24:29 +0100 Subject: [PATCH 1/6] added support for using S3 single part uploads --- smart_open/s3.py | 158 ++++++++++++++++++++++++++++++++++-- smart_open/tests/test_s3.py | 111 +++++++++++++++++++++++-- 2 files changed, 252 insertions(+), 17 deletions(-) diff --git a/smart_open/s3.py b/smart_open/s3.py index 96bce460..27c4f648 100644 --- a/smart_open/s3.py +++ b/smart_open/s3.py @@ -78,6 +78,7 @@ def open( session=None, resource_kwargs=None, multipart_upload_kwargs=None, + multipart_upload=True, object_kwargs=None, ): """Open an S3 object for reading or writing. @@ -101,6 +102,12 @@ def open( multipart_upload_kwargs: dict, optional Additional parameters to pass to boto3's initiate_multipart_upload function. For writing only. + multipart_upload: bool, optional + Default: `True` + If set to `True`, will use multipart upload for writing to S3. If set + to `false`, S3 upload will use the S3 Single-Part Upload API, which + is more ideal for small file sizes. + For writing only. version_id: str, optional Version of the object, used when reading object. If None, will fetch the most recent version. @@ -134,14 +141,23 @@ def open( object_kwargs=object_kwargs, ) elif mode == WRITE_BINARY: - fileobj = MultipartWriter( - bucket_id, - key_id, - min_part_size=min_part_size, - session=session, - multipart_upload_kwargs=multipart_upload_kwargs, - resource_kwargs=resource_kwargs, - ) + if multipart_upload: + fileobj = MultipartWriter( + bucket_id, + key_id, + min_part_size=min_part_size, + session=session, + multipart_upload_kwargs=multipart_upload_kwargs, + resource_kwargs=resource_kwargs, + ) + else: + fileobj = SinglepartWriter( + bucket_id, + key_id, + session=session, + multipart_upload_kwargs=multipart_upload_kwargs, + resource_kwargs=resource_kwargs, + ) else: assert False, 'unexpected mode: %r' % mode return fileobj @@ -464,7 +480,7 @@ def __repr__(self): class MultipartWriter(io.BufferedIOBase): - """Writes bytes to S3. + """Writes bytes to S3 using the multi part API. Implements the io.BufferedIOBase interface of the standard library.""" @@ -644,6 +660,130 @@ def __repr__(self): ) +class SinglepartWriter(io.BufferedIOBase): + """Writes bytes to S3 using the single part API. + + Implements the io.BufferedIOBase interface of the standard library.""" + + def __init__( + self, + bucket, + key, + session=None, + resource_kwargs=None, + multipart_upload_kwargs=None, + ): + + self._session = session + self._resource_kwargs = resource_kwargs + + if session is None: + session = boto3.Session() + if resource_kwargs is None: + resource_kwargs = {} + if multipart_upload_kwargs is None: + multipart_upload_kwargs = {} + + self._multipart_upload_kwargs = multipart_upload_kwargs + + s3 = session.resource('s3', **resource_kwargs) + try: + self._object = s3.Object(bucket, key) + s3.meta.client.head_bucket(Bucket=bucket) + except botocore.client.ClientError: + raise ValueError('the bucket %r does not exist, or is forbidden for access' % bucket) + + self._buf = io.BytesIO() + self._total_bytes = 0 + self._closed = False + + # + # This member is part of the io.BufferedIOBase interface. + # + self.raw = None + + def flush(self): + pass + + # + # Override some methods from io.IOBase. + # + def close(self): + self._buf.seek(0) + + try: + self._object.put(Body=self._buf, **self._multipart_upload_kwargs) + except botocore.client.ClientError: + raise ValueError('the bucket %r does not exist, or is forbidden for access' % self._object.bucket_name) + + logger.debug("direct upload finished") + self._closed = True + + @property + def closed(self): + return self._closed + + def writable(self): + """Return True if the stream supports writing.""" + return True + + def tell(self): + """Return the current stream position.""" + return self._total_bytes + + # + # io.BufferedIOBase methods. + # + def detach(self): + raise io.UnsupportedOperation("detach() not supported") + + def write(self, b): + """Write the given buffer (bytes, bytearray, memoryview or any buffer + interface implementation) into the buffer. Content of the buffer will be + written to S3 on close as a single-part upload. + + For more information about buffers, see https://docs.python.org/3/c-api/buffer.html""" + + length = self._buf.write(b) + self._total_bytes += length + return length + + def terminate(self): + """Nothing to cancel in single-part uploads.""" + return + + # + # Internal methods. + # + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + if exc_type is not None: + self.terminate() + else: + self.close() + + def __str__(self): + return "smart_open.s3.SinglepartWriter(%r, %r)" % (self._object.bucket_name, self._object.key) + + def __repr__(self): + return ( + "smart_open.s3.SinglepartWriter(" + "bucket=%r, " + "key=%r, " + "session=%r, " + "resource_kwargs=%r, " + "multipart_upload_kwargs=%r)" + ) % ( + self._object.bucket_name, + self._object.key, + self._session, + self._resource_kwargs, + self._multipart_upload_kwargs, + ) + + # # For backward compatibility # diff --git a/smart_open/tests/test_s3.py b/smart_open/tests/test_s3.py index 4d793e36..c947e764 100644 --- a/smart_open/tests/test_s3.py +++ b/smart_open/tests/test_s3.py @@ -302,7 +302,7 @@ def test_to_boto3(self): @maybe_mock_s3 -class BufferedOutputBaseTest(unittest.TestCase): +class BufferedMultiPartOutputBaseTest(unittest.TestCase): """ Test writing into s3 files. @@ -318,7 +318,7 @@ def test_write_01(self): test_string = u"žluťoučký koníček".encode('utf8') # write into key - with smart_open.s3.BufferedOutputBase(BUCKET_NAME, WRITE_KEY_NAME) as fout: + with smart_open.s3.BufferedMultiPartOutputBase(BUCKET_NAME, WRITE_KEY_NAME) as fout: fout.write(test_string) # read key and test content @@ -329,7 +329,7 @@ def test_write_01(self): def test_write_01a(self): """Does s3 write fail on incorrect input?""" try: - with smart_open.s3.BufferedOutputBase(BUCKET_NAME, WRITE_KEY_NAME) as fin: + with smart_open.s3.BufferedMultiPartOutputBase(BUCKET_NAME, WRITE_KEY_NAME) as fin: fin.write(None) except TypeError: pass @@ -338,7 +338,7 @@ def test_write_01a(self): def test_write_02(self): """Does s3 write unicode-utf8 conversion work?""" - smart_open_write = smart_open.s3.BufferedOutputBase(BUCKET_NAME, WRITE_KEY_NAME) + smart_open_write = smart_open.s3.BufferedMultiPartOutputBase(BUCKET_NAME, WRITE_KEY_NAME) smart_open_write.tell() logger.info("smart_open_write: %r", smart_open_write) with smart_open_write as fout: @@ -348,7 +348,7 @@ def test_write_02(self): def test_write_03(self): """Does s3 multipart chunking work correctly?""" # write - smart_open_write = smart_open.s3.BufferedOutputBase( + smart_open_write = smart_open.s3.BufferedMultiPartOutputBase( BUCKET_NAME, WRITE_KEY_NAME, min_part_size=10 ) with smart_open_write as fout: @@ -369,7 +369,7 @@ def test_write_03(self): def test_write_04(self): """Does writing no data cause key with an empty value to be created?""" - smart_open_write = smart_open.s3.BufferedOutputBase(BUCKET_NAME, WRITE_KEY_NAME) + smart_open_write = smart_open.s3.BufferedMultiPartOutputBase(BUCKET_NAME, WRITE_KEY_NAME) with smart_open_write as fout: # noqa pass @@ -380,7 +380,7 @@ def test_write_04(self): def test_gzip(self): expected = u'а не спеть ли мне песню... о любви'.encode('utf-8') - with smart_open.s3.BufferedOutputBase(BUCKET_NAME, WRITE_KEY_NAME) as fout: + with smart_open.s3.BufferedMultiPartOutputBase(BUCKET_NAME, WRITE_KEY_NAME) as fout: with gzip.GzipFile(fileobj=fout, mode='w') as zipfile: zipfile.write(expected) @@ -397,7 +397,7 @@ def test_buffered_writer_wrapper_works(self): """ expected = u'не думай о секундах свысока' - with smart_open.s3.BufferedOutputBase(BUCKET_NAME, WRITE_KEY_NAME) as fout: + with smart_open.s3.BufferedMultiPartOutputBase(BUCKET_NAME, WRITE_KEY_NAME) as fout: with io.BufferedWriter(fout) as sub_out: sub_out.write(expected.encode('utf-8')) @@ -450,6 +450,101 @@ def test_to_boto3(self): self.assertEqual(contents, boto3_body) +@maybe_mock_s3 +class BufferedSinglePartOutputBaseTest(unittest.TestCase): + """ + Test writing into s3 files using single part upload. + + """ + def setUp(self): + ignore_resource_warnings() + + def tearDown(self): + cleanup_bucket() + + def test_write_01(self): + """Does writing into s3 work correctly?""" + test_string = u"žluťoučký koníček".encode('utf8') + + # write into key + with smart_open.s3.BufferedSinglePartOutputBase(BUCKET_NAME, WRITE_KEY_NAME) as fout: + fout.write(test_string) + + # read key and test content + output = list(smart_open.smart_open("s3://{}/{}".format(BUCKET_NAME, WRITE_KEY_NAME), "rb")) + + self.assertEqual(output, [test_string]) + + def test_write_01a(self): + """Does s3 write fail on incorrect input?""" + try: + with smart_open.s3.BufferedSinglePartOutputBase(BUCKET_NAME, WRITE_KEY_NAME) as fin: + fin.write(None) + except TypeError: + pass + else: + self.fail() + + def test_write_02(self): + """Does s3 write unicode-utf8 conversion work?""" + test_string = u"testžížáč".encode("utf-8") + + smart_open_write = smart_open.s3.BufferedSinglePartOutputBase(BUCKET_NAME, WRITE_KEY_NAME) + smart_open_write.tell() + logger.info("smart_open_write: %r", smart_open_write) + with smart_open_write as fout: + fout.write(test_string) + self.assertEqual(fout.tell(), 14) + + def test_write_04(self): + """Does writing no data cause key with an empty value to be created?""" + smart_open_write = smart_open.s3.BufferedSinglePartOutputBase(BUCKET_NAME, WRITE_KEY_NAME) + with smart_open_write as fout: # noqa + pass + + # read back the same key and check its content + output = list(smart_open.smart_open("s3://{}/{}".format(BUCKET_NAME, WRITE_KEY_NAME))) + + self.assertEqual(output, []) + + def test_buffered_writer_wrapper_works(self): + """ + Ensure that we can wrap a smart_open s3 stream in a BufferedWriter, which + passes a memoryview object to the underlying stream in python >= 2.7 + """ + expected = u'не думай о секундах свысока' + + with smart_open.s3.BufferedSinglePartOutputBase(BUCKET_NAME, WRITE_KEY_NAME) as fout: + with io.BufferedWriter(fout) as sub_out: + sub_out.write(expected.encode('utf-8')) + + with smart_open.smart_open("s3://{}/{}".format(BUCKET_NAME, WRITE_KEY_NAME)) as fin: + with io.TextIOWrapper(fin, encoding='utf-8') as text: + actual = text.read() + + self.assertEqual(expected, actual) + + def test_nonexisting_bucket(self): + expected = u"выйду ночью в поле с конём".encode('utf-8') + with self.assertRaises(ValueError): + with smart_open.s3.open('thisbucketdoesntexist', 'mykey', 'wb', multipart_upload=False) as fout: + fout.write(expected) + + def test_double_close(self): + text = u'там за туманами, вечными, пьяными'.encode('utf-8') + fout = smart_open.s3.open(BUCKET_NAME, 'key', 'wb', multipart_upload=False) + fout.write(text) + fout.close() + fout.close() + + def test_flush_close(self): + text = u'там за туманами, вечными, пьяными'.encode('utf-8') + fout = smart_open.s3.open(BUCKET_NAME, 'key', 'wb', multipart_upload=False) + fout.write(text) + fout.flush() + fout.close() + + class ClampTest(unittest.TestCase): def test(self): self.assertEqual(smart_open.s3.clamp(5, 0, 10), 5) From 741b04767b945107b5d6893d1f809a14c65b7824 Mon Sep 17 00:00:00 2001 From: Adrian Partl Date: Sun, 2 Feb 2020 20:28:10 +0100 Subject: [PATCH 2/6] addressed comments on PR --- help.txt | 10 ++++++++++ smart_open/s3.py | 31 ++++++++++++++++++++----------- smart_open/tests/test_s3.py | 28 ++++++++++++++-------------- 3 files changed, 44 insertions(+), 25 deletions(-) diff --git a/help.txt b/help.txt index 5dc9df58..efe503d5 100644 --- a/help.txt +++ b/help.txt @@ -101,6 +101,16 @@ FUNCTIONS multipart_upload_kwargs: dict, optional Additional parameters to pass to boto3's initiate_multipart_upload function. For writing only. + singlepart_upload_kwargs: dict, optional + Additional parameters to pass to boto3's S3.Object.put function when using single + part upload. + For writing only. + multipart_upload: bool, optional + Default: `True` + If set to `True`, will use multipart upload for writing to S3. If set + to `False`, S3 upload will use the S3 Single-Part Upload API, which + is more ideal for small file sizes. + For writing only. version_id: str, optional Version of the object, used when reading object. If None, will fetch the most recent version. diff --git a/smart_open/s3.py b/smart_open/s3.py index 27c4f648..80ad1969 100644 --- a/smart_open/s3.py +++ b/smart_open/s3.py @@ -79,6 +79,7 @@ def open( resource_kwargs=None, multipart_upload_kwargs=None, multipart_upload=True, + singlepart_upload_kwargs=None, object_kwargs=None, ): """Open an S3 object for reading or writing. @@ -102,10 +103,14 @@ def open( multipart_upload_kwargs: dict, optional Additional parameters to pass to boto3's initiate_multipart_upload function. For writing only. + singlepart_upload_kwargs: dict, optional + Additional parameters to pass to boto3's S3.Object.put function when using single + part upload. + For writing only. multipart_upload: bool, optional Default: `True` If set to `True`, will use multipart upload for writing to S3. If set - to `false`, S3 upload will use the S3 Single-Part Upload API, which + to `False`, S3 upload will use the S3 Single-Part Upload API, which is more ideal for small file sizes. For writing only. version_id: str, optional @@ -155,7 +160,7 @@ def open( bucket_id, key_id, session=session, - multipart_upload_kwargs=multipart_upload_kwargs, + singlepart_upload_kwargs=singlepart_upload_kwargs, resource_kwargs=resource_kwargs, ) else: @@ -663,7 +668,10 @@ def __repr__(self): class SinglepartWriter(io.BufferedIOBase): """Writes bytes to S3 using the single part API. - Implements the io.BufferedIOBase interface of the standard library.""" + Implements the io.BufferedIOBase interface of the standard library. + + This class buffers all of its input in memory until its `close` method is called. Only then will + the data be written to S3 and the buffer is released.""" def __init__( self, @@ -671,7 +679,7 @@ def __init__( key, session=None, resource_kwargs=None, - multipart_upload_kwargs=None, + singlepart_upload_kwargs=None, ): self._session = session @@ -681,10 +689,10 @@ def __init__( session = boto3.Session() if resource_kwargs is None: resource_kwargs = {} - if multipart_upload_kwargs is None: - multipart_upload_kwargs = {} + if singlepart_upload_kwargs is None: + singlepart_upload_kwargs = {} - self._multipart_upload_kwargs = multipart_upload_kwargs + self._singlepart_upload_kwargs = singlepart_upload_kwargs s3 = session.resource('s3', **resource_kwargs) try: @@ -712,9 +720,10 @@ def close(self): self._buf.seek(0) try: - self._object.put(Body=self._buf, **self._multipart_upload_kwargs) + self._object.put(Body=self._buf, **self._singlepart_upload_kwargs) except botocore.client.ClientError: - raise ValueError('the bucket %r does not exist, or is forbidden for access' % self._object.bucket_name) + raise ValueError( + 'the bucket %r does not exist, or is forbidden for access' % self._object.bucket_name) logger.debug("direct upload finished") self._closed = True @@ -774,13 +783,13 @@ def __repr__(self): "key=%r, " "session=%r, " "resource_kwargs=%r, " - "multipart_upload_kwargs=%r)" + "singlepart_upload_kwargs=%r)" ) % ( self._object.bucket_name, self._object.key, self._session, self._resource_kwargs, - self._multipart_upload_kwargs, + self._singlepart_upload_kwargs, ) diff --git a/smart_open/tests/test_s3.py b/smart_open/tests/test_s3.py index c947e764..f0d0bc39 100644 --- a/smart_open/tests/test_s3.py +++ b/smart_open/tests/test_s3.py @@ -302,7 +302,7 @@ def test_to_boto3(self): @maybe_mock_s3 -class BufferedMultiPartOutputBaseTest(unittest.TestCase): +class MultipartWriterTest(unittest.TestCase): """ Test writing into s3 files. @@ -318,7 +318,7 @@ def test_write_01(self): test_string = u"žluťoučký koníček".encode('utf8') # write into key - with smart_open.s3.BufferedMultiPartOutputBase(BUCKET_NAME, WRITE_KEY_NAME) as fout: + with smart_open.s3.MultipartWriter(BUCKET_NAME, WRITE_KEY_NAME) as fout: fout.write(test_string) # read key and test content @@ -329,7 +329,7 @@ def test_write_01(self): def test_write_01a(self): """Does s3 write fail on incorrect input?""" try: - with smart_open.s3.BufferedMultiPartOutputBase(BUCKET_NAME, WRITE_KEY_NAME) as fin: + with smart_open.s3.MultipartWriter(BUCKET_NAME, WRITE_KEY_NAME) as fin: fin.write(None) except TypeError: pass @@ -338,7 +338,7 @@ def test_write_01a(self): def test_write_02(self): """Does s3 write unicode-utf8 conversion work?""" - smart_open_write = smart_open.s3.BufferedMultiPartOutputBase(BUCKET_NAME, WRITE_KEY_NAME) + smart_open_write = smart_open.s3.MultipartWriter(BUCKET_NAME, WRITE_KEY_NAME) smart_open_write.tell() logger.info("smart_open_write: %r", smart_open_write) with smart_open_write as fout: @@ -348,7 +348,7 @@ def test_write_02(self): def test_write_03(self): """Does s3 multipart chunking work correctly?""" # write - smart_open_write = smart_open.s3.BufferedMultiPartOutputBase( + smart_open_write = smart_open.s3.MultipartWriter( BUCKET_NAME, WRITE_KEY_NAME, min_part_size=10 ) with smart_open_write as fout: @@ -369,7 +369,7 @@ def test_write_03(self): def test_write_04(self): """Does writing no data cause key with an empty value to be created?""" - smart_open_write = smart_open.s3.BufferedMultiPartOutputBase(BUCKET_NAME, WRITE_KEY_NAME) + smart_open_write = smart_open.s3.MultipartWriter(BUCKET_NAME, WRITE_KEY_NAME) with smart_open_write as fout: # noqa pass @@ -380,7 +380,7 @@ def test_write_04(self): def test_gzip(self): expected = u'а не спеть ли мне песню... о любви'.encode('utf-8') - with smart_open.s3.BufferedMultiPartOutputBase(BUCKET_NAME, WRITE_KEY_NAME) as fout: + with smart_open.s3.MultipartWriter(BUCKET_NAME, WRITE_KEY_NAME) as fout: with gzip.GzipFile(fileobj=fout, mode='w') as zipfile: zipfile.write(expected) @@ -397,7 +397,7 @@ def test_buffered_writer_wrapper_works(self): """ expected = u'не думай о секундах свысока' - with smart_open.s3.BufferedMultiPartOutputBase(BUCKET_NAME, WRITE_KEY_NAME) as fout: + with smart_open.s3.MultipartWriter(BUCKET_NAME, WRITE_KEY_NAME) as fout: with io.BufferedWriter(fout) as sub_out: sub_out.write(expected.encode('utf-8')) @@ -451,7 +451,7 @@ def test_to_boto3(self): @maybe_mock_s3 -class BufferedSinglePartOutputBaseTest(unittest.TestCase): +class SinglepartWriterTest(unittest.TestCase): """ Test writing into s3 files using single part upload. @@ -467,7 +467,7 @@ def test_write_01(self): test_string = u"žluťoučký koníček".encode('utf8') # write into key - with smart_open.s3.BufferedSinglePartOutputBase(BUCKET_NAME, WRITE_KEY_NAME) as fout: + with smart_open.s3.SinglepartWriter(BUCKET_NAME, WRITE_KEY_NAME) as fout: fout.write(test_string) # read key and test content @@ -478,7 +478,7 @@ def test_write_01(self): def test_write_01a(self): """Does s3 write fail on incorrect input?""" try: - with smart_open.s3.BufferedSinglePartOutputBase(BUCKET_NAME, WRITE_KEY_NAME) as fin: + with smart_open.s3.SinglepartWriter(BUCKET_NAME, WRITE_KEY_NAME) as fin: fin.write(None) except TypeError: pass @@ -489,7 +489,7 @@ def test_write_02(self): """Does s3 write unicode-utf8 conversion work?""" test_string = u"testžížáč".encode("utf-8") - smart_open_write = smart_open.s3.BufferedSinglePartOutputBase(BUCKET_NAME, WRITE_KEY_NAME) + smart_open_write = smart_open.s3.SinglepartWriter(BUCKET_NAME, WRITE_KEY_NAME) smart_open_write.tell() logger.info("smart_open_write: %r", smart_open_write) with smart_open_write as fout: @@ -498,7 +498,7 @@ def test_write_02(self): def test_write_04(self): """Does writing no data cause key with an empty value to be created?""" - smart_open_write = smart_open.s3.BufferedSinglePartOutputBase(BUCKET_NAME, WRITE_KEY_NAME) + smart_open_write = smart_open.s3.SinglepartWriter(BUCKET_NAME, WRITE_KEY_NAME) with smart_open_write as fout: # noqa pass @@ -514,7 +514,7 @@ def test_buffered_writer_wrapper_works(self): """ expected = u'не думай о секундах свысока' - with smart_open.s3.BufferedSinglePartOutputBase(BUCKET_NAME, WRITE_KEY_NAME) as fout: + with smart_open.s3.SinglepartWriter(BUCKET_NAME, WRITE_KEY_NAME) as fout: with io.BufferedWriter(fout) as sub_out: sub_out.write(expected.encode('utf-8')) From ef1f85418367e114719a006c7addad9556a65582 Mon Sep 17 00:00:00 2001 From: Michael Penkov Date: Sun, 15 Mar 2020 23:34:56 +1100 Subject: [PATCH 3/6] responding to review comments --- smart_open/s3.py | 60 +++++++++++++++++++----------------------------- 1 file changed, 23 insertions(+), 37 deletions(-) diff --git a/smart_open/s3.py b/smart_open/s3.py index 80ad1969..da59fa6b 100644 --- a/smart_open/s3.py +++ b/smart_open/s3.py @@ -125,13 +125,6 @@ def open( if mode not in MODES: raise NotImplementedError('bad mode: %r expected one of %r' % (mode, MODES)) - if resource_kwargs is None: - resource_kwargs = {} - if multipart_upload_kwargs is None: - multipart_upload_kwargs = {} - if object_kwargs is None: - object_kwargs = {} - if (mode == WRITE_BINARY) and (version_id is not None): raise ValueError("version_id must be None when writing") @@ -152,7 +145,7 @@ def open( key_id, min_part_size=min_part_size, session=session, - multipart_upload_kwargs=multipart_upload_kwargs, + upload_kwargs=multipart_upload_kwargs, resource_kwargs=resource_kwargs, ) else: @@ -160,7 +153,7 @@ def open( bucket_id, key_id, session=session, - singlepart_upload_kwargs=singlepart_upload_kwargs, + upload_kwargs=singlepart_upload_kwargs, resource_kwargs=resource_kwargs, ) else: @@ -496,10 +489,9 @@ def __init__( min_part_size=DEFAULT_MIN_PART_SIZE, session=None, resource_kwargs=None, - multipart_upload_kwargs=None, + upload_kwargs=None, ): - self._multipart_upload_kwargs = multipart_upload_kwargs if min_part_size < MIN_MIN_PART_SIZE: logger.warning("S3 requires minimum part size >= 5MB; \ @@ -509,17 +501,18 @@ def __init__( session = boto3.Session() if resource_kwargs is None: resource_kwargs = {} - if multipart_upload_kwargs is None: - multipart_upload_kwargs = {} + if upload_kwargs is None: + upload_kwargs = {} self._session = session self._resource_kwargs = resource_kwargs + self._upload_kwargs = upload_kwargs s3 = session.resource('s3', **resource_kwargs) try: self._object = s3.Object(bucket, key) self._min_part_size = min_part_size - self._mp = self._object.initiate_multipart_upload(**multipart_upload_kwargs) + self._mp = self._object.initiate_multipart_upload(**self._upload_kwargs) except botocore.client.ClientError: raise ValueError('the bucket %r does not exist, or is forbidden for access' % bucket) @@ -648,20 +641,15 @@ def __str__(self): def __repr__(self): return ( - "smart_open.s3.MultipartWriter(" - "bucket=%r, " - "key=%r, " - "min_part_size=%r, " - "session=%r, " - "resource_kwargs=%r, " - "multipart_upload_kwargs=%r)" + "smart_open.s3.MultipartWriter(bucket=%r, key=%r, " + "min_part_size=%r, session=%r, resource_kwargs=%r, upload_kwargs=%r)" ) % ( self._object.bucket_name, self._object.key, self._min_part_size, self._session, self._resource_kwargs, - self._multipart_upload_kwargs, + self._upload_kwargs, ) @@ -679,7 +667,7 @@ def __init__( key, session=None, resource_kwargs=None, - singlepart_upload_kwargs=None, + upload_kwargs=None, ): self._session = session @@ -689,10 +677,10 @@ def __init__( session = boto3.Session() if resource_kwargs is None: resource_kwargs = {} - if singlepart_upload_kwargs is None: - singlepart_upload_kwargs = {} + if upload_kwargs is None: + upload_kwargs = {} - self._singlepart_upload_kwargs = singlepart_upload_kwargs + self._upload_kwargs = upload_kwargs s3 = session.resource('s3', **resource_kwargs) try: @@ -703,7 +691,6 @@ def __init__( self._buf = io.BytesIO() self._total_bytes = 0 - self._closed = False # # This member is part of the io.BufferedIOBase interface. @@ -717,20 +704,23 @@ def flush(self): # Override some methods from io.IOBase. # def close(self): + if self._buf is None: + return + self._buf.seek(0) try: - self._object.put(Body=self._buf, **self._singlepart_upload_kwargs) + self._object.put(Body=self._buf, **self._upload_kwargs) except botocore.client.ClientError: raise ValueError( 'the bucket %r does not exist, or is forbidden for access' % self._object.bucket_name) logger.debug("direct upload finished") - self._closed = True + self._buf = None @property def closed(self): - return self._closed + return self._buf is None def writable(self): """Return True if the stream supports writing.""" @@ -778,18 +768,14 @@ def __str__(self): def __repr__(self): return ( - "smart_open.s3.SinglepartWriter(" - "bucket=%r, " - "key=%r, " - "session=%r, " - "resource_kwargs=%r, " - "singlepart_upload_kwargs=%r)" + "smart_open.s3.SinglepartWriter(bucket=%r, key=%r, session=%r, " + "resource_kwargs=%r, upload_kwargs=%r)" ) % ( self._object.bucket_name, self._object.key, self._session, self._resource_kwargs, - self._singlepart_upload_kwargs, + self._upload_kwargs, ) From e526ccf445c733f9b4dd50223ebe52a5ab29c75c Mon Sep 17 00:00:00 2001 From: Michael Penkov Date: Sun, 15 Mar 2020 23:39:59 +1100 Subject: [PATCH 4/6] fix decorator --- smart_open/tests/test_s3.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/smart_open/tests/test_s3.py b/smart_open/tests/test_s3.py index 71b4be82..5c7a76c4 100644 --- a/smart_open/tests/test_s3.py +++ b/smart_open/tests/test_s3.py @@ -427,7 +427,7 @@ def test_to_boto3(self): self.assertEqual(contents, boto3_body) -@maybe_mock_s3 +@moto.mock_s3 class SinglepartWriterTest(unittest.TestCase): """ Test writing into s3 files using single part upload. From 1432e1da7663e372f6c393c80195977ca0101396 Mon Sep 17 00:00:00 2001 From: Michael Penkov Date: Sun, 15 Mar 2020 23:41:18 +1100 Subject: [PATCH 5/6] flake8 --- smart_open/s3.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/smart_open/s3.py b/smart_open/s3.py index ad93e42f..b758d0a3 100644 --- a/smart_open/s3.py +++ b/smart_open/s3.py @@ -495,8 +495,6 @@ def __init__( resource_kwargs=None, upload_kwargs=None, ): - - if min_part_size < MIN_MIN_PART_SIZE: logger.warning("S3 requires minimum part size >= 5MB; \ multipart upload may fail") From a7a562c8876249e3163485922641a0313e53ff3c Mon Sep 17 00:00:00 2001 From: Michael Penkov Date: Sun, 15 Mar 2020 20:39:52 +1100 Subject: [PATCH 6/6] minor fixup in .travis.yml --- .travis.yml | 4 ++-- smart_open/s3.py | 8 ++++++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/.travis.yml b/.travis.yml index e46cb1e3..5b619679 100644 --- a/.travis.yml +++ b/.travis.yml @@ -52,7 +52,7 @@ script: # The creds won't be available unless this is a branch belonging to the # RaRe-Technologies organization. # - - if [[ ${TRAVIS_SECURE_ENV_VARS} && ${RUN_BENCHMARKS} ]]; then + - if [[ "${TRAVIS_SECURE_ENV_VARS}" = "true" && "${RUN_BENCHMARKS}" = "true" ]]; then export SO_S3_URL=$SO_S3_URL/$(python -c 'from uuid import uuid4;print(uuid4())'); pip install pytest_benchmark awscli; set -e; @@ -68,7 +68,7 @@ script: # # These integration tests require AWS creds and an initialized S3 bucket # - - if [[ ${TRAVIS_SECURE_ENV_VARS} ]]; then + - if [[ "${TRAVIS_SECURE_ENV_VARS}" = "true" ]]; then pytest integration-tests/test_s3_ported.py; fi diff --git a/smart_open/s3.py b/smart_open/s3.py index b758d0a3..098bbefe 100644 --- a/smart_open/s3.py +++ b/smart_open/s3.py @@ -515,8 +515,12 @@ def __init__( self._object = s3.Object(bucket, key) self._min_part_size = min_part_size self._mp = self._object.initiate_multipart_upload(**self._upload_kwargs) - except botocore.client.ClientError: - raise ValueError('the bucket %r does not exist, or is forbidden for access' % bucket) + except botocore.client.ClientError as error: + raise ValueError( + 'the bucket %r does not exist, or is forbidden for access (%r)' % ( + bucket, error + ) + ) self._buf = io.BytesIO() self._total_bytes = 0