Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issues #152, #153 and #154 #155

Merged
merged 13 commits into from
Dec 6, 2017
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 42 additions & 10 deletions smart_open/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@
MODES = (READ, READ_BINARY, WRITE, WRITE_BINARY)
"""Allowed I/O modes for working with S3."""

BINARY_NEWLINE = b'\n'
TEXT_NEWLINE = b'\n'
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both are binary?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. We're not using TEXT_NEWLINE right now, so I removed it.

DEFAULT_BUFFER_SIZE = 256 * 1024


def _range_string(start, stop=None):
#
Expand All @@ -47,7 +51,6 @@ def open(bucket_id, key_id, mode, **kwargs):
if mode not in MODES:
raise NotImplementedError('bad mode: %r expected one of %r' % (mode, MODES))

buffer_size = kwargs.pop("buffer_size", io.DEFAULT_BUFFER_SIZE)
encoding = kwargs.pop("encoding", "utf-8")
errors = kwargs.pop("errors", None)
newline = kwargs.pop("newline", None)
Expand Down Expand Up @@ -96,7 +99,8 @@ class BufferedInputBase(io.BufferedIOBase):

Implements the io.BufferedIOBase interface of the standard library."""

def __init__(self, bucket, key, **kwargs):
def __init__(self, bucket, key, buffer_size=DEFAULT_BUFFER_SIZE,
line_terminator=BINARY_NEWLINE, **kwargs):
session = boto3.Session(profile_name=kwargs.pop('profile_name', None))
s3 = session.resource('s3', **kwargs)
self._object = s3.Object(bucket, key)
Expand All @@ -105,6 +109,8 @@ def __init__(self, bucket, key, **kwargs):
self._current_pos = 0
self._buffer = b''
self._eof = False
self._buffer_size = buffer_size
self._line_terminator = line_terminator

#
# This member is part of the io.BufferedIOBase interface.
Expand Down Expand Up @@ -195,14 +201,7 @@ def read(self, size=-1):
# Fill our buffer to the required size.
#
# logger.debug('filling %r byte-long buffer up to %r bytes', len(self._buffer), size)
while len(self._buffer) < size and not self._eof:
raw = self._raw_reader.read(size=io.DEFAULT_BUFFER_SIZE)
if len(raw):
self._buffer += raw
else:
logger.debug('reached EOF while filling buffer')
self._eof = True

self._fill_buffer(size)
return self._read_from_buffer(size)

def read1(self, size=-1):
Expand All @@ -218,6 +217,30 @@ def readinto(self, b):
b[:len(data)] = data
return len(data)

def readline(self, limit=-1):
"""Read up to and including the next newline. Returns the bytes read."""
if limit != -1:
raise NotImplementedError('limits other than -1 not implemented yet')
the_line = io.BytesIO()
while not (self._eof and len(self._buffer) == 0):
#
# In the worst case, we're reading self._buffer twice here, once in
# the if condition, and once when calling index.
#
# This is sub-optimal, but better than the alternative: wrapping
# .index in a try..except, because that is slower.
#
if self._line_terminator in self._buffer:
next_newline = self._buffer.index(self._line_terminator)
the_line.write(self._buffer[:next_newline + 1])
self._buffer = self._buffer[next_newline + 1:]
break
else:
the_line.write(self._buffer)
self._buffer = b''
self._fill_buffer(self._buffer_size)
return the_line.getvalue()

def terminate(self):
"""Do nothing."""
pass
Expand All @@ -235,6 +258,15 @@ def _read_from_buffer(self, size):
# logger.debug('part: %r', part)
return part

def _fill_buffer(self, size):
while len(self._buffer) < size and not self._eof:
raw = self._raw_reader.read(size=self._buffer_size)
if len(raw):
self._buffer += raw
else:
logger.debug('reached EOF while filling buffer')
self._eof = True


class BufferedOutputBase(io.BufferedIOBase):
"""Writes bytes to S3.
Expand Down
60 changes: 38 additions & 22 deletions smart_open/tests/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
else:
import unittest

import boto
import boto3
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be a big change; does it belong here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@piskvorky It's not really that big a change. This is test code, and it writes a mock object to a mock S3 bucket. The code for doing this with boto and boto3 is slightly different, but the end result is the same (the tests still pass without changing the code). If you look at the remainder of the changes in test_s3.py, you'll see the tests aren't strongly coupled to either boto or boto3.

The real benefit to using boto3 in tests is that it matches the implementation: our S3 implementation uses boto3 under the covers. Also, using boto3 is future-proof: newer versions of moto mock boto and boto3 separately. That is, objects mocked via boto are not visible to boto3 and vice versa. This means that if we upgrade to the most recent moto version (unlike the year-old version currently used in the test environment), our boto-based tests will break.

This change solves that problem before it happens.

import moto

import smart_open
Expand All @@ -21,15 +21,11 @@
def create_bucket_and_key(bucket_name='mybucket', key_name='mykey', contents=None):
# fake connection, bucket and key
_LOGGER.debug('%r', locals())
conn = boto.connect_s3()
conn.create_bucket(bucket_name)
mybucket = conn.get_bucket(bucket_name)
mykey = boto.s3.key.Key()
mykey.name = key_name
mykey.bucket = mybucket
s3 = boto3.resource('s3')
mybucket = s3.create_bucket(Bucket=bucket_name)
mykey = s3.Object(bucket_name, key_name)
if contents is not None:
_LOGGER.debug('len(contents): %r', len(contents))
mykey.set_contents_from_string(contents)
mykey.put(Body=contents)
return mybucket, mykey


Expand All @@ -47,7 +43,7 @@ def test_iter(self):
"""Are S3 files iterated over correctly?"""
# a list of strings to test with
expected = u"hello wořld\nhow are you?".encode('utf8')
bucket, key = create_bucket_and_key(contents=expected)
create_bucket_and_key(contents=expected)

# connect to fake s3 and read from the fake key we filled above
fin = smart_open.s3.BufferedInputBase('mybucket', 'mykey')
Expand All @@ -57,15 +53,15 @@ def test_iter(self):
def test_iter_context_manager(self):
# same thing but using a context manager
expected = u"hello wořld\nhow are you?".encode('utf8')
bucket, key = create_bucket_and_key(contents=expected)
create_bucket_and_key(contents=expected)
with smart_open.s3.BufferedInputBase('mybucket', 'mykey') as fin:
output = [line.rstrip(b'\n') for line in fin]
self.assertEqual(output, expected.split(b'\n'))

def test_read(self):
"""Are S3 files read correctly?"""
content = u"hello wořld\nhow are you?".encode('utf8')
bucket, key = create_bucket_and_key(contents=content)
create_bucket_and_key(contents=content)
_LOGGER.debug('content: %r len: %r', content, len(content))

fin = smart_open.s3.BufferedInputBase('mybucket', 'mykey')
Expand All @@ -76,7 +72,7 @@ def test_read(self):
def test_seek_beginning(self):
"""Does seeking to the beginning of S3 files work correctly?"""
content = u"hello wořld\nhow are you?".encode('utf8')
bucket, key = create_bucket_and_key(contents=content)
create_bucket_and_key(contents=content)

fin = smart_open.s3.BufferedInputBase('mybucket', 'mykey')
self.assertEqual(content[:6], fin.read(6))
Expand All @@ -91,7 +87,7 @@ def test_seek_beginning(self):
def test_seek_start(self):
"""Does seeking from the start of S3 files work correctly?"""
content = u"hello wořld\nhow are you?".encode('utf8')
bucket, key = create_bucket_and_key(contents=content)
create_bucket_and_key(contents=content)

fin = smart_open.s3.BufferedInputBase('mybucket', 'mykey')
seek = fin.seek(6)
Expand All @@ -102,7 +98,7 @@ def test_seek_start(self):
def test_seek_current(self):
"""Does seeking from the middle of S3 files work correctly?"""
content = u"hello wořld\nhow are you?".encode('utf8')
bucket, key = create_bucket_and_key(contents=content)
create_bucket_and_key(contents=content)

fin = smart_open.s3.BufferedInputBase('mybucket', 'mykey')
self.assertEqual(fin.read(5), b'hello')
Expand All @@ -113,7 +109,7 @@ def test_seek_current(self):
def test_seek_end(self):
"""Does seeking from the end of S3 files work correctly?"""
content = u"hello wořld\nhow are you?".encode('utf8')
bucket, key = create_bucket_and_key(contents=content)
create_bucket_and_key(contents=content)

fin = smart_open.s3.BufferedInputBase('mybucket', 'mykey')
seek = fin.seek(-4, whence=smart_open.s3.END)
Expand All @@ -122,7 +118,7 @@ def test_seek_end(self):

def test_detect_eof(self):
content = u"hello wořld\nhow are you?".encode('utf8')
bucket, key = create_bucket_and_key(contents=content)
create_bucket_and_key(contents=content)

fin = smart_open.s3.BufferedInputBase('mybucket', 'mykey')
fin.read()
Expand All @@ -137,7 +133,7 @@ def test_read_gzip(self):
buf.close = lambda: None # keep buffer open so that we can .getvalue()
with contextlib.closing(gzip.GzipFile(fileobj=buf, mode='w')) as zipfile:
zipfile.write(expected)
bucket, key = create_bucket_and_key(contents=buf.getvalue())
create_bucket_and_key(contents=buf.getvalue())

#
# Make sure we're reading things correctly.
Expand All @@ -159,6 +155,26 @@ def test_read_gzip(self):

self.assertEqual(expected, actual)

def test_readline(self):
content = b'englishman\nin\nnew\nyork\n'
create_bucket_and_key(contents=content)

with smart_open.s3.BufferedInputBase('mybucket', 'mykey') as fin:
actual = list(fin)

expected = [b'englishman\n', b'in\n', b'new\n', b'york\n']
self.assertEqual(expected, actual)

def test_readline_tiny_buffer(self):
content = b'englishman\nin\nnew\nyork\n'
create_bucket_and_key(contents=content)

with smart_open.s3.BufferedInputBase('mybucket', 'mykey', buffer_size=8) as fin:
actual = list(fin)

expected = [b'englishman\n', b'in\n', b'new\n', b'york\n']
self.assertEqual(expected, actual)


@moto.mock_s3
class BufferedOutputBaseTest(unittest.TestCase):
Expand All @@ -168,7 +184,7 @@ class BufferedOutputBaseTest(unittest.TestCase):
"""
def test_write_01(self):
"""Does writing into s3 work correctly?"""
mybucket, mykey = create_bucket_and_key()
create_bucket_and_key()
test_string = u"žluťoučký koníček".encode('utf8')

# write into key
Expand All @@ -182,7 +198,7 @@ def test_write_01(self):

def test_write_01a(self):
"""Does s3 write fail on incorrect input?"""
mybucket, mykey = create_bucket_and_key()
create_bucket_and_key()

try:
with smart_open.s3.BufferedOutputBase('mybucket', 'writekey') as fin:
Expand All @@ -194,7 +210,7 @@ def test_write_01a(self):

def test_write_02(self):
"""Does s3 write unicode-utf8 conversion work?"""
mybucket, mykey = create_bucket_and_key()
create_bucket_and_key()

smart_open_write = smart_open.s3.BufferedOutputBase('mybucket', 'writekey')
smart_open_write.tell()
Expand All @@ -205,7 +221,7 @@ def test_write_02(self):

def test_write_03(self):
"""Does s3 multipart chunking work correctly?"""
mybucket, mykey = create_bucket_and_key()
create_bucket_and_key()

# write
smart_open_write = smart_open.s3.BufferedOutputBase(
Expand Down