Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streams optimization #496

Merged
merged 11 commits into from
Sep 19, 2015
104 changes: 49 additions & 55 deletions aiohttp/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ def __init__(self, limit=DEFAULT_LIMIT, loop=None):
if loop is None:
loop = asyncio.get_event_loop()
self._loop = loop
self._buffer = bytearray()
self._buffer = collections.deque()
self._buffer_size = 0
self._buffer_offset = 0
self._eof = False
self._waiter = None
self._eof_waiter = None
Expand Down Expand Up @@ -86,7 +88,8 @@ def feed_data(self, data):
if not data:
return

self._buffer.extend(data)
self._buffer.append(data)
self._buffer_size += len(data)
self.total_bytes += len(data)

waiter = self._waiter
Expand All @@ -110,22 +113,22 @@ def readline(self):
if self._exception is not None:
raise self._exception

line = bytearray()
line = []
line_size = 0
not_enough = True

while not_enough:
while self._buffer and not_enough:
ichar = self._buffer.find(b'\n')
if ichar < 0:
line.extend(self._buffer)
self._buffer.clear()
else:
ichar += 1
line.extend(self._buffer[:ichar])
del self._buffer[:ichar]
offset = self._buffer_offset
ichar = self._buffer[0].find(b'\n', offset) + 1
# Read from current offset to found b'\n' or to the end.
data = self._read_nowait(ichar - offset if ichar else 0)
line.append(data)
line_size += len(data)
if ichar:
not_enough = False

if len(line) > self._limit:
if line_size > self._limit:
raise ValueError('Line is too long')

if self._eof:
Expand All @@ -138,10 +141,7 @@ def readline(self):
finally:
self._waiter = None

if line:
return bytes(line)
else:
return EOF_MARKER
return b''.join(line)

@asyncio.coroutine
def read(self, n=-1):
Expand All @@ -168,38 +168,23 @@ def read(self, n=-1):
# This used to just loop creating a new waiter hoping to
# collect everything in self._buffer, but that would
# deadlock if the subprocess sends more than self.limit
# bytes. So just call self.read(self._limit) until EOF.
# bytes. So just call self.readany() until EOF.
blocks = []
while True:
block = yield from self.read(self._limit)
block = yield from self.readany()
if not block:
break
blocks.append(block)
data = b''.join(blocks)
if data:
return data
else:
return EOF_MARKER
else:
if not self._buffer and not self._eof:
self._waiter = self._create_waiter('read')
try:
yield from self._waiter
finally:
self._waiter = None
return b''.join(blocks)

if n < 0 or len(self._buffer) <= n:
data = bytes(self._buffer)
self._buffer.clear()
else:
# n > 0 and len(self._buffer) > n
data = bytes(self._buffer[:n])
del self._buffer[:n]
if not self._buffer and not self._eof:
self._waiter = self._create_waiter('read')
try:
yield from self._waiter
finally:
self._waiter = None

if data:
return data
else:
return EOF_MARKER
return self._read_nowait(n)

@asyncio.coroutine
def readany(self):
Expand All @@ -213,13 +198,7 @@ def readany(self):
finally:
self._waiter = None

data = bytes(self._buffer)
del self._buffer[:]

if data:
return data
else:
return EOF_MARKER
return self._read_nowait()

@asyncio.coroutine
def readexactly(self, n):
Expand Down Expand Up @@ -253,12 +232,28 @@ def read_nowait(self):
raise RuntimeError(
'Called while some coroutine is waiting for incoming data.')

return self._read_nowait()

def _read_nowait(self, n=None):
if not self._buffer:
return EOF_MARKER

first_buffer = self._buffer[0]
offset = self._buffer_offset
if n and len(first_buffer) - offset > n:
data = first_buffer[offset:offset + n]
self._buffer_offset += n

elif offset:
self._buffer.popleft()
data = first_buffer[offset:]
self._buffer_offset = 0

else:
data = bytes(self._buffer)
del self._buffer[:]
return data
data = self._buffer.popleft()

self._buffer_size -= len(data)
return data


class EmptyStreamReader:
Expand Down Expand Up @@ -397,17 +392,16 @@ def maybe_resume(func):
def wrapper(self, *args, **kw):
result = yield from func(self, *args, **kw)

size = len(self._buffer)
if self._stream.paused:
if size < self._b_limit:
if self._buffer_size < self._b_limit:
try:
self._stream.transport.resume_reading()
except (AttributeError, NotImplementedError):
pass
else:
self._stream.paused = False
else:
if size > self._b_limit:
if self._buffer_size > self._b_limit:
try:
self._stream.transport.pause_reading()
except (AttributeError, NotImplementedError):
Expand Down Expand Up @@ -441,7 +435,7 @@ def feed_data(self, data, size=0):
super().feed_data(data)

if (not self._stream.paused and
not has_waiter and len(self._buffer) > self._b_limit):
not has_waiter and self._buffer_size > self._b_limit):
try:
self._stream.transport.pause_reading()
except (AttributeError, NotImplementedError):
Expand Down
Loading