Skip to content
This repository has been archived by the owner on Feb 21, 2023. It is now read-only.

Fix Python 3.8 warnings #666

Merged
merged 10 commits into from
Nov 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,17 @@ python:
- "3.6"
- "3.7"
- "nightly"
- "pypy3.5-7.0"
- "pypy3.6-7.1.1"
- "pypy3.6-7.2.0"

stages:
- lint
- test
- examples

jobs:
allow_failures:
- python: "pypy3.6-7.2.0"
include:
# Add two extra tests with uvloop
- &UVLOOP
Expand Down
3 changes: 3 additions & 0 deletions CHANGES/666.removal
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Drop explicit loop requirement in API.
Deprecate ``loop`` argument.
Throw warning in Python 3.8+ if explicit ``loop`` is passed to methods.
33 changes: 15 additions & 18 deletions aioredis/commands/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from ..util import (
wait_ok,
_set_exception,
get_event_loop,
)


Expand Down Expand Up @@ -63,8 +64,7 @@ def multi_exec(self):
>>> await asyncio.gather(fut1, fut2)
[1, 1]
"""
return MultiExec(self._pool_or_conn, self.__class__,
loop=self._pool_or_conn._loop)
return MultiExec(self._pool_or_conn, self.__class__)

def pipeline(self):
"""Returns :class:`Pipeline` object to execute bulk of commands.
Expand All @@ -90,20 +90,19 @@ def pipeline(self):
>>> await asyncio.gather(fut1, fut2)
[2, 2]
"""
return Pipeline(self._pool_or_conn, self.__class__,
loop=self._pool_or_conn._loop)
return Pipeline(self._pool_or_conn, self.__class__)


class _RedisBuffer:

def __init__(self, pipeline, *, loop=None):
if loop is None:
loop = asyncio.get_event_loop()
# TODO: deprecation note
# if loop is None:
# loop = asyncio.get_event_loop()
self._pipeline = pipeline
self._loop = loop

def execute(self, cmd, *args, **kw):
fut = self._loop.create_future()
fut = get_event_loop().create_future()
self._pipeline.append((fut, cmd, args, kw))
return fut

Expand All @@ -129,13 +128,13 @@ class Pipeline:

def __init__(self, pool_or_connection, commands_factory=lambda conn: conn,
*, loop=None):
if loop is None:
loop = asyncio.get_event_loop()
# TODO: deprecation note
# if loop is None:
# loop = asyncio.get_event_loop()
self._pool_or_conn = pool_or_connection
self._loop = loop
self._pipeline = []
self._results = []
self._buffer = _RedisBuffer(self._pipeline, loop=loop)
self._buffer = _RedisBuffer(self._pipeline)
self._redis = commands_factory(self._buffer)
self._done = False

Expand All @@ -147,10 +146,9 @@ def __getattr__(self, name):
@functools.wraps(attr)
def wrapper(*args, **kw):
try:
task = asyncio.ensure_future(attr(*args, **kw),
loop=self._loop)
task = asyncio.ensure_future(attr(*args, **kw))
except Exception as exc:
task = self._loop.create_future()
task = get_event_loop().create_future()
task.set_exception(exc)
self._results.append(task)
return task
Expand Down Expand Up @@ -183,7 +181,6 @@ async def execute(self, *, return_exceptions=False):

async def _do_execute(self, conn, *, return_exceptions=False):
await asyncio.gather(*self._send_pipeline(conn),
loop=self._loop,
return_exceptions=True)
return await self._gather_result(return_exceptions)

Expand Down Expand Up @@ -265,11 +262,11 @@ async def _do_execute(self, conn, *, return_exceptions=False):
multi = conn.execute('MULTI')
coros = list(self._send_pipeline(conn))
exec_ = conn.execute('EXEC')
gather = asyncio.gather(multi, *coros, loop=self._loop,
gather = asyncio.gather(multi, *coros,
return_exceptions=True)
last_error = None
try:
await asyncio.shield(gather, loop=self._loop)
await asyncio.shield(gather)
except asyncio.CancelledError:
await gather
except Exception as err:
Expand Down
41 changes: 22 additions & 19 deletions aioredis/connection.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import types
import asyncio
import socket
import warnings
import sys

from functools import partial
from collections import deque
from contextlib import contextmanager
Expand All @@ -14,6 +17,7 @@
coerced_keys_dict,
decode,
parse_url,
get_event_loop,
)
from .parser import Reader
from .stream import open_connection, open_unix_connection
Expand Down Expand Up @@ -97,15 +101,16 @@ async def create_connection(address, *, db=None, password=None, ssl=None,
else:
cls = RedisConnection

if loop is None:
loop = asyncio.get_event_loop()
if loop is not None and sys.version_info >= (3, 8, 0):
warnings.warn("The loop argument is deprecated",
DeprecationWarning)

if isinstance(address, (list, tuple)):
host, port = address
logger.debug("Creating tcp connection to %r", address)
reader, writer = await asyncio.wait_for(open_connection(
host, port, limit=MAX_CHUNK_SIZE, ssl=ssl, loop=loop),
timeout, loop=loop)
host, port, limit=MAX_CHUNK_SIZE, ssl=ssl),
timeout)
sock = writer.transport.get_extra_info('socket')
if sock is not None:
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
Expand All @@ -114,15 +119,14 @@ async def create_connection(address, *, db=None, password=None, ssl=None,
else:
logger.debug("Creating unix connection to %r", address)
reader, writer = await asyncio.wait_for(open_unix_connection(
address, ssl=ssl, limit=MAX_CHUNK_SIZE, loop=loop),
timeout, loop=loop)
address, ssl=ssl, limit=MAX_CHUNK_SIZE),
timeout)
sock = writer.transport.get_extra_info('socket')
if sock is not None:
address = sock.getpeername()

conn = cls(reader, writer, encoding=encoding,
address=address, parser=parser,
loop=loop)
address=address, parser=parser)

try:
if password is not None:
Expand All @@ -141,22 +145,21 @@ class RedisConnection(AbcConnection):

def __init__(self, reader, writer, *, address, encoding=None,
parser=None, loop=None):
if loop is None:
loop = asyncio.get_event_loop()
if loop is not None and sys.version_info >= (3, 8):
warnings.warn("The loop argument is deprecated",
DeprecationWarning)
if parser is None:
parser = Reader
assert callable(parser), (
"Parser argument is not callable", parser)
self._reader = reader
self._writer = writer
self._address = address
self._loop = loop
self._waiters = deque()
self._reader.set_parser(
parser(protocolError=ProtocolError, replyError=ReplyError)
)
self._reader_task = asyncio.ensure_future(self._read_data(),
loop=self._loop)
self._reader_task = asyncio.ensure_future(self._read_data())
self._close_msg = None
self._db = 0
self._closing = False
Expand Down Expand Up @@ -212,7 +215,7 @@ async def _read_data(self):
else:
self._process_data(obj)
self._closing = True
self._loop.call_soon(self._do_close, last_error)
get_event_loop().call_soon(self._do_close, last_error)

def _process_data(self, obj):
"""Processes command results."""
Expand Down Expand Up @@ -342,7 +345,7 @@ def execute(self, command, *args, encoding=_NOTSET):
cb = None
if encoding is _NOTSET:
encoding = self._encoding
fut = self._loop.create_future()
fut = get_event_loop().create_future()
if self._pipeline_buffer is None:
self._writer.write(encode_command(command, *args))
else:
Expand All @@ -366,7 +369,7 @@ def execute_pubsub(self, command, *channels):
if not len(channels):
raise TypeError("No channels/patterns supplied")
is_pattern = len(command) in (10, 12)
mkchannel = partial(Channel, is_pattern=is_pattern, loop=self._loop)
mkchannel = partial(Channel, is_pattern=is_pattern)
channels = [ch if isinstance(ch, AbcChannel) else mkchannel(ch)
for ch in channels]
if not all(ch.is_pattern == is_pattern for ch in channels):
Expand All @@ -375,15 +378,15 @@ def execute_pubsub(self, command, *channels):
cmd = encode_command(command, *(ch.name for ch in channels))
res = []
for ch in channels:
fut = self._loop.create_future()
fut = get_event_loop().create_future()
res.append(fut)
cb = partial(self._update_pubsub, ch=ch)
self._waiters.append((fut, None, cb))
if self._pipeline_buffer is None:
self._writer.write(cmd)
else:
self._pipeline_buffer.extend(cmd)
return asyncio.gather(*res, loop=self._loop)
return asyncio.gather(*res)

def close(self):
"""Close connection."""
Expand Down Expand Up @@ -426,7 +429,7 @@ def closed(self):
closed = self._closing or self._closed
if not closed and self._reader and self._reader.at_eof():
self._closing = closed = True
self._loop.call_soon(self._do_close, None)
get_event_loop().call_soon(self._do_close, None)
return closed

async def wait_closed(self):
Expand Down
20 changes: 11 additions & 9 deletions aioredis/pool.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import asyncio
import collections
import types
import warnings
import sys

from .connection import create_connection, _PUBSUB_COMMANDS
from .log import logger
Expand Down Expand Up @@ -77,8 +79,9 @@ def __init__(self, address, db=None, password=None, encoding=None,
"maxsize must be int > 0", maxsize, type(maxsize))
assert minsize <= maxsize, (
"Invalid pool min/max sizes", minsize, maxsize)
if loop is None:
loop = asyncio.get_event_loop()
if loop is not None and sys.version_info >= (3, 8):
warnings.warn("The loop argument is deprecated",
DeprecationWarning)
self._address = address
self._db = db
self._password = password
Expand All @@ -87,12 +90,11 @@ def __init__(self, address, db=None, password=None, encoding=None,
self._parser_class = parser
self._minsize = minsize
self._create_connection_timeout = create_connection_timeout
self._loop = loop
self._pool = collections.deque(maxlen=maxsize)
self._used = set()
self._acquiring = 0
self._cond = asyncio.Condition(lock=Lock(loop=loop), loop=loop)
self._close_state = CloseEvent(self._do_close, loop=loop)
self._cond = asyncio.Condition(lock=Lock())
self._close_state = CloseEvent(self._do_close)
self._pubsub_conn = None
self._connection_cls = connection_cls

Expand Down Expand Up @@ -139,7 +141,7 @@ async def _do_clear(self):
conn = self._pool.popleft()
conn.close()
waiters.append(conn.wait_closed())
await asyncio.gather(*waiters, loop=self._loop)
await asyncio.gather(*waiters)

async def _do_close(self):
async with self._cond:
Expand All @@ -152,7 +154,7 @@ async def _do_close(self):
for conn in self._used:
conn.close()
waiters.append(conn.wait_closed())
await asyncio.gather(*waiters, loop=self._loop)
await asyncio.gather(*waiters)
# TODO: close _pubsub_conn connection
logger.debug("Closed %d connection(s)", len(waiters))

Expand Down Expand Up @@ -362,7 +364,7 @@ def release(self, conn):
else:
conn.close()
# FIXME: check event loop is not closed
asyncio.ensure_future(self._wakeup(), loop=self._loop)
asyncio.ensure_future(self._wakeup())

def _drop_closed(self):
for i in range(self.freesize):
Expand Down Expand Up @@ -410,7 +412,7 @@ def _create_new_connection(self, address):
parser=self._parser_class,
timeout=self._create_connection_timeout,
connection_cls=self._connection_cls,
loop=self._loop)
)

async def _wakeup(self, closing_conn=None):
async with self._cond:
Expand Down
20 changes: 13 additions & 7 deletions aioredis/pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import json
import types
import collections
import warnings
import sys

from .abc import AbcChannel
from .util import _converters # , _set_result
Expand All @@ -23,7 +25,10 @@ class Channel(AbcChannel):
"""Wrapper around asyncio.Queue."""

def __init__(self, name, is_pattern, loop=None):
self._queue = ClosableQueue(loop=loop)
if loop is not None and sys.version_info >= (3, 8):
warnings.warn("The loop argument is deprecated",
DeprecationWarning)
self._queue = ClosableQueue()
self._name = _converters[type(name)](name)
self._is_pattern = is_pattern

Expand Down Expand Up @@ -165,7 +170,7 @@ class Receiver:

>>> from aioredis.pubsub import Receiver
>>> from aioredis.abc import AbcChannel
>>> mpsc = Receiver(loop=loop)
>>> mpsc = Receiver()
>>> async def reader(mpsc):
... async for channel, msg in mpsc.iter():
... assert isinstance(channel, AbcChannel)
Expand All @@ -188,11 +193,12 @@ class Receiver:
def __init__(self, loop=None, on_close=None):
assert on_close is None or callable(on_close), (
"on_close must be None or callable", on_close)
if loop is None:
loop = asyncio.get_event_loop()
if loop is not None:
warnings.warn("The loop argument is deprecated",
DeprecationWarning)
if on_close is None:
on_close = self.check_stop
self._queue = ClosableQueue(loop=loop)
self._queue = ClosableQueue()
self._refs = {}
self._on_close = on_close

Expand Down Expand Up @@ -396,9 +402,9 @@ def close(self, exc=None):

class ClosableQueue:

def __init__(self, *, loop=None):
def __init__(self):
self._queue = collections.deque()
self._event = asyncio.Event(loop=loop)
self._event = asyncio.Event()
self._closed = False

async def wait(self):
Expand Down
Loading