Skip to content
This repository has been archived by the owner on Feb 21, 2023. It is now read-only.

Commit

Permalink
Merge pull request #666 from aio-libs/python38
Browse files Browse the repository at this point in the history
Fix Python 3.8 warnings
  • Loading branch information
popravich authored Nov 22, 2019
2 parents b4b823e + 2085f4b commit 80abe42
Show file tree
Hide file tree
Showing 39 changed files with 569 additions and 1,043 deletions.
4 changes: 3 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,17 @@ python:
- "3.6"
- "3.7"
- "nightly"
- "pypy3.5-7.0"
- "pypy3.6-7.1.1"
- "pypy3.6-7.2.0"

stages:
- lint
- test
- examples

jobs:
allow_failures:
- python: "pypy3.6-7.2.0"
include:
# Add two extra tests with uvloop
- &UVLOOP
Expand Down
3 changes: 3 additions & 0 deletions CHANGES/666.removal
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Drop explicit loop requirement in API.
Deprecate ``loop`` argument.
Throw warning in Python 3.8+ if explicit ``loop`` is passed to methods.
33 changes: 15 additions & 18 deletions aioredis/commands/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from ..util import (
wait_ok,
_set_exception,
get_event_loop,
)


Expand Down Expand Up @@ -63,8 +64,7 @@ def multi_exec(self):
>>> await asyncio.gather(fut1, fut2)
[1, 1]
"""
return MultiExec(self._pool_or_conn, self.__class__,
loop=self._pool_or_conn._loop)
return MultiExec(self._pool_or_conn, self.__class__)

def pipeline(self):
"""Returns :class:`Pipeline` object to execute bulk of commands.
Expand All @@ -90,20 +90,19 @@ def pipeline(self):
>>> await asyncio.gather(fut1, fut2)
[2, 2]
"""
return Pipeline(self._pool_or_conn, self.__class__,
loop=self._pool_or_conn._loop)
return Pipeline(self._pool_or_conn, self.__class__)


class _RedisBuffer:

def __init__(self, pipeline, *, loop=None):
if loop is None:
loop = asyncio.get_event_loop()
# TODO: deprecation note
# if loop is None:
# loop = asyncio.get_event_loop()
self._pipeline = pipeline
self._loop = loop

def execute(self, cmd, *args, **kw):
fut = self._loop.create_future()
fut = get_event_loop().create_future()
self._pipeline.append((fut, cmd, args, kw))
return fut

Expand All @@ -129,13 +128,13 @@ class Pipeline:

def __init__(self, pool_or_connection, commands_factory=lambda conn: conn,
*, loop=None):
if loop is None:
loop = asyncio.get_event_loop()
# TODO: deprecation note
# if loop is None:
# loop = asyncio.get_event_loop()
self._pool_or_conn = pool_or_connection
self._loop = loop
self._pipeline = []
self._results = []
self._buffer = _RedisBuffer(self._pipeline, loop=loop)
self._buffer = _RedisBuffer(self._pipeline)
self._redis = commands_factory(self._buffer)
self._done = False

Expand All @@ -147,10 +146,9 @@ def __getattr__(self, name):
@functools.wraps(attr)
def wrapper(*args, **kw):
try:
task = asyncio.ensure_future(attr(*args, **kw),
loop=self._loop)
task = asyncio.ensure_future(attr(*args, **kw))
except Exception as exc:
task = self._loop.create_future()
task = get_event_loop().create_future()
task.set_exception(exc)
self._results.append(task)
return task
Expand Down Expand Up @@ -183,7 +181,6 @@ async def execute(self, *, return_exceptions=False):

async def _do_execute(self, conn, *, return_exceptions=False):
await asyncio.gather(*self._send_pipeline(conn),
loop=self._loop,
return_exceptions=True)
return await self._gather_result(return_exceptions)

Expand Down Expand Up @@ -265,11 +262,11 @@ async def _do_execute(self, conn, *, return_exceptions=False):
multi = conn.execute('MULTI')
coros = list(self._send_pipeline(conn))
exec_ = conn.execute('EXEC')
gather = asyncio.gather(multi, *coros, loop=self._loop,
gather = asyncio.gather(multi, *coros,
return_exceptions=True)
last_error = None
try:
await asyncio.shield(gather, loop=self._loop)
await asyncio.shield(gather)
except asyncio.CancelledError:
await gather
except Exception as err:
Expand Down
41 changes: 22 additions & 19 deletions aioredis/connection.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import types
import asyncio
import socket
import warnings
import sys

from functools import partial
from collections import deque
from contextlib import contextmanager
Expand All @@ -14,6 +17,7 @@
coerced_keys_dict,
decode,
parse_url,
get_event_loop,
)
from .parser import Reader
from .stream import open_connection, open_unix_connection
Expand Down Expand Up @@ -97,15 +101,16 @@ async def create_connection(address, *, db=None, password=None, ssl=None,
else:
cls = RedisConnection

if loop is None:
loop = asyncio.get_event_loop()
if loop is not None and sys.version_info >= (3, 8, 0):
warnings.warn("The loop argument is deprecated",
DeprecationWarning)

if isinstance(address, (list, tuple)):
host, port = address
logger.debug("Creating tcp connection to %r", address)
reader, writer = await asyncio.wait_for(open_connection(
host, port, limit=MAX_CHUNK_SIZE, ssl=ssl, loop=loop),
timeout, loop=loop)
host, port, limit=MAX_CHUNK_SIZE, ssl=ssl),
timeout)
sock = writer.transport.get_extra_info('socket')
if sock is not None:
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
Expand All @@ -114,15 +119,14 @@ async def create_connection(address, *, db=None, password=None, ssl=None,
else:
logger.debug("Creating unix connection to %r", address)
reader, writer = await asyncio.wait_for(open_unix_connection(
address, ssl=ssl, limit=MAX_CHUNK_SIZE, loop=loop),
timeout, loop=loop)
address, ssl=ssl, limit=MAX_CHUNK_SIZE),
timeout)
sock = writer.transport.get_extra_info('socket')
if sock is not None:
address = sock.getpeername()

conn = cls(reader, writer, encoding=encoding,
address=address, parser=parser,
loop=loop)
address=address, parser=parser)

try:
if password is not None:
Expand All @@ -141,22 +145,21 @@ class RedisConnection(AbcConnection):

def __init__(self, reader, writer, *, address, encoding=None,
parser=None, loop=None):
if loop is None:
loop = asyncio.get_event_loop()
if loop is not None and sys.version_info >= (3, 8):
warnings.warn("The loop argument is deprecated",
DeprecationWarning)
if parser is None:
parser = Reader
assert callable(parser), (
"Parser argument is not callable", parser)
self._reader = reader
self._writer = writer
self._address = address
self._loop = loop
self._waiters = deque()
self._reader.set_parser(
parser(protocolError=ProtocolError, replyError=ReplyError)
)
self._reader_task = asyncio.ensure_future(self._read_data(),
loop=self._loop)
self._reader_task = asyncio.ensure_future(self._read_data())
self._close_msg = None
self._db = 0
self._closing = False
Expand Down Expand Up @@ -212,7 +215,7 @@ async def _read_data(self):
else:
self._process_data(obj)
self._closing = True
self._loop.call_soon(self._do_close, last_error)
get_event_loop().call_soon(self._do_close, last_error)

def _process_data(self, obj):
"""Processes command results."""
Expand Down Expand Up @@ -342,7 +345,7 @@ def execute(self, command, *args, encoding=_NOTSET):
cb = None
if encoding is _NOTSET:
encoding = self._encoding
fut = self._loop.create_future()
fut = get_event_loop().create_future()
if self._pipeline_buffer is None:
self._writer.write(encode_command(command, *args))
else:
Expand All @@ -366,7 +369,7 @@ def execute_pubsub(self, command, *channels):
if not len(channels):
raise TypeError("No channels/patterns supplied")
is_pattern = len(command) in (10, 12)
mkchannel = partial(Channel, is_pattern=is_pattern, loop=self._loop)
mkchannel = partial(Channel, is_pattern=is_pattern)
channels = [ch if isinstance(ch, AbcChannel) else mkchannel(ch)
for ch in channels]
if not all(ch.is_pattern == is_pattern for ch in channels):
Expand All @@ -375,15 +378,15 @@ def execute_pubsub(self, command, *channels):
cmd = encode_command(command, *(ch.name for ch in channels))
res = []
for ch in channels:
fut = self._loop.create_future()
fut = get_event_loop().create_future()
res.append(fut)
cb = partial(self._update_pubsub, ch=ch)
self._waiters.append((fut, None, cb))
if self._pipeline_buffer is None:
self._writer.write(cmd)
else:
self._pipeline_buffer.extend(cmd)
return asyncio.gather(*res, loop=self._loop)
return asyncio.gather(*res)

def close(self):
"""Close connection."""
Expand Down Expand Up @@ -426,7 +429,7 @@ def closed(self):
closed = self._closing or self._closed
if not closed and self._reader and self._reader.at_eof():
self._closing = closed = True
self._loop.call_soon(self._do_close, None)
get_event_loop().call_soon(self._do_close, None)
return closed

async def wait_closed(self):
Expand Down
20 changes: 11 additions & 9 deletions aioredis/pool.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import asyncio
import collections
import types
import warnings
import sys

from .connection import create_connection, _PUBSUB_COMMANDS
from .log import logger
Expand Down Expand Up @@ -77,8 +79,9 @@ def __init__(self, address, db=None, password=None, encoding=None,
"maxsize must be int > 0", maxsize, type(maxsize))
assert minsize <= maxsize, (
"Invalid pool min/max sizes", minsize, maxsize)
if loop is None:
loop = asyncio.get_event_loop()
if loop is not None and sys.version_info >= (3, 8):
warnings.warn("The loop argument is deprecated",
DeprecationWarning)
self._address = address
self._db = db
self._password = password
Expand All @@ -87,12 +90,11 @@ def __init__(self, address, db=None, password=None, encoding=None,
self._parser_class = parser
self._minsize = minsize
self._create_connection_timeout = create_connection_timeout
self._loop = loop
self._pool = collections.deque(maxlen=maxsize)
self._used = set()
self._acquiring = 0
self._cond = asyncio.Condition(lock=Lock(loop=loop), loop=loop)
self._close_state = CloseEvent(self._do_close, loop=loop)
self._cond = asyncio.Condition(lock=Lock())
self._close_state = CloseEvent(self._do_close)
self._pubsub_conn = None
self._connection_cls = connection_cls

Expand Down Expand Up @@ -139,7 +141,7 @@ async def _do_clear(self):
conn = self._pool.popleft()
conn.close()
waiters.append(conn.wait_closed())
await asyncio.gather(*waiters, loop=self._loop)
await asyncio.gather(*waiters)

async def _do_close(self):
async with self._cond:
Expand All @@ -152,7 +154,7 @@ async def _do_close(self):
for conn in self._used:
conn.close()
waiters.append(conn.wait_closed())
await asyncio.gather(*waiters, loop=self._loop)
await asyncio.gather(*waiters)
# TODO: close _pubsub_conn connection
logger.debug("Closed %d connection(s)", len(waiters))

Expand Down Expand Up @@ -362,7 +364,7 @@ def release(self, conn):
else:
conn.close()
# FIXME: check event loop is not closed
asyncio.ensure_future(self._wakeup(), loop=self._loop)
asyncio.ensure_future(self._wakeup())

def _drop_closed(self):
for i in range(self.freesize):
Expand Down Expand Up @@ -410,7 +412,7 @@ def _create_new_connection(self, address):
parser=self._parser_class,
timeout=self._create_connection_timeout,
connection_cls=self._connection_cls,
loop=self._loop)
)

async def _wakeup(self, closing_conn=None):
async with self._cond:
Expand Down
20 changes: 13 additions & 7 deletions aioredis/pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import json
import types
import collections
import warnings
import sys

from .abc import AbcChannel
from .util import _converters # , _set_result
Expand All @@ -23,7 +25,10 @@ class Channel(AbcChannel):
"""Wrapper around asyncio.Queue."""

def __init__(self, name, is_pattern, loop=None):
self._queue = ClosableQueue(loop=loop)
if loop is not None and sys.version_info >= (3, 8):
warnings.warn("The loop argument is deprecated",
DeprecationWarning)
self._queue = ClosableQueue()
self._name = _converters[type(name)](name)
self._is_pattern = is_pattern

Expand Down Expand Up @@ -165,7 +170,7 @@ class Receiver:
>>> from aioredis.pubsub import Receiver
>>> from aioredis.abc import AbcChannel
>>> mpsc = Receiver(loop=loop)
>>> mpsc = Receiver()
>>> async def reader(mpsc):
... async for channel, msg in mpsc.iter():
... assert isinstance(channel, AbcChannel)
Expand All @@ -188,11 +193,12 @@ class Receiver:
def __init__(self, loop=None, on_close=None):
assert on_close is None or callable(on_close), (
"on_close must be None or callable", on_close)
if loop is None:
loop = asyncio.get_event_loop()
if loop is not None:
warnings.warn("The loop argument is deprecated",
DeprecationWarning)
if on_close is None:
on_close = self.check_stop
self._queue = ClosableQueue(loop=loop)
self._queue = ClosableQueue()
self._refs = {}
self._on_close = on_close

Expand Down Expand Up @@ -396,9 +402,9 @@ def close(self, exc=None):

class ClosableQueue:

def __init__(self, *, loop=None):
def __init__(self):
self._queue = collections.deque()
self._event = asyncio.Event(loop=loop)
self._event = asyncio.Event()
self._closed = False

async def wait(self):
Expand Down
Loading

0 comments on commit 80abe42

Please sign in to comment.