@@ -208,19 +208,14 @@ async def read_response(
208208class PythonParser (BaseParser ):
209209 """Plain Python parsing class"""
210210
211- __slots__ = BaseParser .__slots__ + ("encoder" , "_buffer" , "_pos" , "_chunks" )
211+ __slots__ = BaseParser .__slots__ + ("encoder" , "_buffer" , "_pos" )
212212
213213 def __init__ (self , socket_read_size : int ):
214214 super ().__init__ (socket_read_size )
215215 self .encoder : Optional [Encoder ] = None
216216 self ._buffer = b""
217- self ._chunks = []
218217 self ._pos = 0
219218
220- def _clear (self ):
221- self ._buffer = b""
222- self ._chunks .clear ()
223-
224219 def on_connect (self , connection : "Connection" ):
225220 """Called when the stream connects"""
226221 self ._stream = connection ._reader
@@ -234,6 +229,7 @@ def on_disconnect(self):
234229 if self ._stream is not None :
235230 self ._stream = None
236231 self .encoder = None
232+ self ._buffer = b""
237233
238234 async def can_read_destructive (self ) -> bool :
239235 if self ._buffer :
@@ -247,30 +243,37 @@ async def can_read_destructive(self) -> bool:
247243 return False
248244
249245 async def read_response (self , disable_decoding : bool = False ):
250- if self ._stream is None :
251- raise RedisError ("Buffer is closed." )
252- if self ._chunks :
253- # augment parsing buffer with previously read data
254- self ._buffer += b"" .join (self ._chunks )
255- self ._chunks .clear ()
256- try :
246+ if not self ._stream or not self .encoder :
247+ raise ConnectionError (SERVER_CLOSED_CONNECTION_ERROR )
248+
249+ if not self ._buffer :
250+ await self ._fill_buffer ()
251+ while True :
257252 self ._pos = 0
258- response = await self ._read_response (disable_decoding = disable_decoding )
259- except (ConnectionError , InvalidResponse ):
260- # We don't want these errors to be resumable
261- self ._clear ()
262- raise
263- else :
264- # Successfully parsing a response allows us to clear our parsing buffer
265- self ._clear ()
266- return response
253+ try :
254+ response = self ._read_response (disable_decoding = disable_decoding )
267255
268- async def _read_response (
256+ except EOFError :
257+ await self ._fill_buffer ()
258+ else :
259+ break
260+ # Successfully parsing a response allows us to clear our parsing buffer
261+ self ._buffer = self ._buffer [self ._pos :]
262+ return response
263+
264+ async def _fill_buffer (self ):
265+ """
266+ IO is performed here
267+ """
268+ buffer = await self ._stream .read (self ._read_size )
269+ if not buffer or not isinstance (buffer , bytes ):
270+ raise ConnectionError (SERVER_CLOSED_CONNECTION_ERROR ) from None
271+ self ._buffer += buffer
272+
273+ def _read_response (
269274 self , disable_decoding : bool = False
270275 ) -> Union [EncodableT , ResponseError , None ]:
271- if not self ._stream or not self .encoder :
272- raise ConnectionError (SERVER_CLOSED_CONNECTION_ERROR )
273- raw = await self ._readline ()
276+ raw = self ._readline ()
274277 if not raw :
275278 raise ConnectionError (SERVER_CLOSED_CONNECTION_ERROR )
276279 response : Any
@@ -286,7 +289,7 @@ async def _read_response(
286289 # if the error is a ConnectionError, raise immediately so the user
287290 # is notified
288291 if isinstance (error , ConnectionError ):
289- self ._clear () # Successful parse
292+ self ._buffer = self . _buffer [ self . _pos :] # Successful parse
290293 raise error
291294 # otherwise, we're dealing with a ResponseError that might belong
292295 # inside a pipeline response. the connection's read_response()
@@ -304,55 +307,39 @@ async def _read_response(
304307 length = int (response )
305308 if length == - 1 :
306309 return None
307- response = await self ._read (length )
310+ response = self ._read (length )
308311 # multi-bulk response
309312 elif byte == b"*" :
310313 length = int (response )
311314 if length == - 1 :
312315 return None
313- response = [
314- (await self ._read_response (disable_decoding )) for _ in range (length )
315- ]
316+ response = [(self ._read_response (disable_decoding )) for _ in range (length )]
316317 if isinstance (response , bytes ) and disable_decoding is False :
317318 response = self .encoder .decode (response )
318319 return response
319320
320- async def _read (self , length : int ) -> bytes :
321+ def _read (self , length : int ) -> bytes :
321322 """
322323 Read `length` bytes of data. These are assumed to be followed
323324 by a '\r \n ' terminator which is subsequently discarded.
324325 """
325- want = length + 2
326- end = self ._pos + want
327- if len (self ._buffer ) >= end :
328- result = self ._buffer [self ._pos : end - 2 ]
329- else :
330- tail = self ._buffer [self ._pos :]
331- try :
332- data = await self ._stream .readexactly (want - len (tail ))
333- except asyncio .IncompleteReadError as error :
334- raise ConnectionError (SERVER_CLOSED_CONNECTION_ERROR ) from error
335- result = (tail + data )[:- 2 ]
336- self ._chunks .append (data )
337- self ._pos += want
326+ end = self ._pos + length + 2
327+ if len (self ._buffer ) < end :
328+ raise EOFError () # Signal that we need more data
329+ result = self ._buffer [self ._pos : end - 2 ]
330+ self ._pos = end
338331 return result
339332
340- async def _readline (self ) -> bytes :
333+ def _readline (self ) -> bytes :
341334 """
342335 read an unknown number of bytes up to the next '\r \n '
343336 line separator, which is discarded.
344337 """
345338 found = self ._buffer .find (b"\r \n " , self ._pos )
346- if found >= 0 :
347- result = self ._buffer [self ._pos : found ]
348- else :
349- tail = self ._buffer [self ._pos :]
350- data = await self ._stream .readline ()
351- if not data .endswith (b"\r \n " ):
352- raise ConnectionError (SERVER_CLOSED_CONNECTION_ERROR )
353- result = (tail + data )[:- 2 ]
354- self ._chunks .append (data )
355- self ._pos += len (result ) + 2
339+ if found < 0 :
340+ raise EOFError () # signal that we need more data
341+ result = self ._buffer [self ._pos : found ]
342+ self ._pos = found + 2
356343 return result
357344
358345
0 commit comments