Skip to content
This repository has been archived by the owner on Feb 21, 2023. It is now read-only.

Commit

Permalink
Merge pull request #129 from aio-libs/pool_refactoring
Browse files Browse the repository at this point in the history
Pool refactoring
  • Loading branch information
popravich authored Jan 13, 2017
2 parents b4f54fc + d0d25db commit 8b0efa2
Show file tree
Hide file tree
Showing 40 changed files with 816 additions and 547 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ $(CERT_DIR)/.test.key:
ci-test: $(REDIS_TARGETS)
$(call travis_start,tests)
@echo "Tests run"
py.test -rsxX --cov \
py.test -rsxX --cov -n auto \
--ssl-cafile=$(CERT_DIR)/test.crt \
$(foreach T,$(REDIS_TARGETS),--redis-server=$T) $(TEST_ARGS)
$(call travis_end,tests)
Expand Down
8 changes: 6 additions & 2 deletions aioredis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
from .commands import (
Redis, create_redis,
create_reconnecting_redis,
create_redis_pool,
GeoPoint, GeoMember,
)
from .pool import RedisPool, create_pool
from .pool import ConnectionsPool, create_pool
from .pubsub import Channel
from .errors import (
ConnectionClosedError,
Expand All @@ -21,10 +22,13 @@

__version__ = '0.3.0'

RedisPool = ConnectionsPool

# make pyflakes happy
(create_connection, RedisConnection,
create_redis, create_reconnecting_redis, Redis,
create_pool, RedisPool, Channel,
create_redis_pool, create_pool,
RedisPool, ConnectionsPool, Channel,
RedisError, ProtocolError, ReplyError,
PipelineError, MultiExecError, ConnectionClosedError,
ChannelClosedError, WatchVariableError,
Expand Down
124 changes: 61 additions & 63 deletions aioredis/commands/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio

from aioredis.connection import create_connection
from aioredis.pool import create_pool
from aioredis.util import _NOTSET
from .generic import GenericCommandsMixin
from .string import StringCommandsMixin
Expand All @@ -17,41 +18,16 @@
from .geo import GeoCommandsMixin, GeoPoint, GeoMember

__all__ = [
'create_redis', 'Redis',
'Pipeline', 'MultiExec',
'GeoPoint', 'GeoMember',
'create_redis',
'create_redis_pool',
'Redis',
'Pipeline',
'MultiExec',
'GeoPoint',
'GeoMember',
]


class AutoConnector(object):
closed = False

def __init__(self, *conn_args, **conn_kwargs):
self._conn_args = conn_args
self._conn_kwargs = conn_kwargs
self._conn = None
self._loop = conn_kwargs.get('loop')
self._lock = asyncio.Lock(loop=self._loop)

def __repr__(self):
return '<AutoConnector {!r}>'.format(self._conn)

@asyncio.coroutine
def execute(self, *args, **kwargs):
conn = yield from self.get_atomic_connection()
return (yield from conn.execute(*args, **kwargs))

@asyncio.coroutine
def get_atomic_connection(self):
if self._conn is None or self._conn.closed:
with (yield from self._lock):
if self._conn is None or self._conn.closed:
conn = yield from create_connection(
*self._conn_args, **self._conn_kwargs)
self._conn = conn
return self._conn


class Redis(GenericCommandsMixin, StringCommandsMixin,
HyperLogLogCommandsMixin, SetCommandsMixin,
HashCommandsMixin, TransactionsCommandsMixin,
Expand All @@ -65,70 +41,76 @@ class Redis(GenericCommandsMixin, StringCommandsMixin,
For commands details see: http://redis.io/commands/#connection
"""

def __init__(self, connection):
self._conn = connection
def __init__(self, pool_or_conn):
self._pool_or_conn = pool_or_conn

def __repr__(self):
return '<Redis {!r}>'.format(self._conn)
return '<Redis {!r}>'.format(self._pool_or_conn)

def execute(self, command, *args, **kwargs):
return self._pool_or_conn.execute(command, *args, **kwargs)

def close(self):
self._conn.close()
self._pool_or_conn.close()

@asyncio.coroutine
def wait_closed(self):
yield from self._conn.wait_closed()
yield from self._pool_or_conn.wait_closed()

@property
def db(self):
"""Currently selected db index."""
return self._conn.db
return self._pool_or_conn.db

@property
def encoding(self):
"""Current set codec or None."""
return self._conn.encoding
return self._pool_or_conn.encoding

@property
def connection(self):
""":class:`aioredis.RedisConnection` instance."""
return self._conn
"""Either :class:`aioredis.RedisConnection`,
or :class:`aioredis.ConnectionsPool` instance.
"""
return self._pool_or_conn

@property
def in_transaction(self):
"""Set to True when MULTI command was issued."""
return self._conn.in_transaction
# XXX: this must be bound to real connection
return self._pool_or_conn.in_transaction

@property
def closed(self):
"""True if connection is closed."""
return self._conn.closed
return self._pool_or_conn.closed

def auth(self, password):
"""Authenticate to server.
This method wraps call to :meth:`aioredis.RedisConnection.auth()`
"""
return self._conn.auth(password)
return self._pool_or_conn.auth(password)

def echo(self, message, *, encoding=_NOTSET):
"""Echo the given string."""
return self._conn.execute('ECHO', message, encoding=encoding)
return self.execute('ECHO', message, encoding=encoding)

def ping(self, *, encoding=_NOTSET):
"""Ping the server."""
return self._conn.execute('PING', encoding=encoding)
return self.execute('PING', encoding=encoding)

def quit(self):
"""Close the connection."""
return self._conn.execute('QUIT')
# TODO: warn when using pool
return self.execute('QUIT')

def select(self, db):
"""Change the selected database for the current connection.
This method wraps call to :meth:`aioredis.RedisConnection.select()`
"""
return self._conn.select(db)
return self._pool_or_conn.select(db)


@asyncio.coroutine
Expand All @@ -148,21 +130,37 @@ def create_redis(address, *, db=None, password=None, ssl=None,


@asyncio.coroutine
def create_reconnecting_redis(address, *, db=None, password=None, ssl=None,
encoding=None, commands_factory=Redis,
loop=None):
def create_redis_pool(address, *, db=0, password=None, ssl=None,
encoding=None, commands_factory=Redis,
minsize=1, maxsize=10,
loop=None):
"""Creates high-level Redis interface.
This function is a coroutine.
"""
# Note: this is not coroutine, but we may make it such. We may start
# a first connection in it, or just resolve DNS. So let's keep it
# coroutine for forward compatibility
conn = AutoConnector(address,
db=db, password=password, ssl=ssl,
encoding=encoding, loop=loop)
return commands_factory(conn)


# make pyflakes happy
(Pipeline, MultiExec)
pool = yield from create_pool(address, db=db,
password=password,
ssl=ssl,
encoding=encoding,
minsize=minsize,
maxsize=maxsize,
loop=loop)
return commands_factory(pool)


create_reconnecting_redis = create_redis_pool
# @asyncio.coroutine
# def create_reconnecting_redis(address, *, db=None, password=None, ssl=None,
# encoding=None, commands_factory=Redis,
# loop=None):
# """Creates high-level Redis interface.
#
# This function is a coroutine.
# """
# # Note: this is not coroutine, but we may make it such. We may start
# # a first connection in it, or just resolve DNS. So let's keep it
# # coroutine for forward compatibility
# conn = AutoConnector(address,
# db=db, password=password, ssl=ssl,
# encoding=encoding, loop=loop)
# return commands_factory(conn)
26 changes: 13 additions & 13 deletions aioredis/commands/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,27 +12,27 @@ def cluster_add_slots(self, slot, *slots):
slots = (slot,) + slots
if not all(isinstance(s, int) for s in slots):
raise TypeError("All parameters must be of type int")
fut = self._conn.execute(b'CLUSTER', b'ADDSLOTS', *slots)
fut = self.execute(b'CLUSTER', b'ADDSLOTS', *slots)
return wait_ok(fut)

def cluster_count_failure_reports(self, node_id):
"""Return the number of failure reports active for a given node."""
return self._conn.execute(
return self.execute(
b'CLUSTER', b'COUNT-FAILURE-REPORTS', node_id)

def cluster_count_key_in_slots(self, slot):
"""Return the number of local keys in the specified hash slot."""
if not isinstance(slot, int):
raise TypeError("Expected slot to be of type int, got {}"
.format(type(slot)))
return self._conn.execute(b'CLUSTER', b'COUNTKEYSINSLOT', slot)
return self.execute(b'CLUSTER', b'COUNTKEYSINSLOT', slot)

def cluster_del_slots(self, slot, *slots):
"""Set hash slots as unbound in receiving node."""
slots = (slot,) + slots
if not all(isinstance(s, int) for s in slots):
raise TypeError("All parameters must be of type int")
fut = self._conn.execute(b'CLUSTER', b'DELSLOTS', *slots)
fut = self.execute(b'CLUSTER', b'DELSLOTS', *slots)
return wait_ok(fut)

def cluster_failover(self):
Expand All @@ -41,25 +41,25 @@ def cluster_failover(self):

def cluster_forget(self, node_id):
"""Remove a node from the nodes table."""
fut = self._conn.execute(b'CLUSTER', b'FORGET', node_id)
fut = self.execute(b'CLUSTER', b'FORGET', node_id)
return wait_ok(fut)

def cluster_get_keys_in_slots(self, slot, count, *, encoding):
"""Return local key names in the specified hash slot."""
return self._conn.execute(b'CLUSTER', b'GETKEYSINSLOT', slot, count,
encoding=encoding)
return self.execute(b'CLUSTER', b'GETKEYSINSLOT', slot, count,
encoding=encoding)

def cluster_info(self):
"""Provides info about Redis Cluster node state."""
pass # TODO: Implement

def cluster_keyslot(self, key):
"""Returns the hash slot of the specified key."""
return self._conn.execute(b'CLUSTER', b'KEYSLOT', key)
return self.execute(b'CLUSTER', b'KEYSLOT', key)

def cluster_meet(self, ip, port):
"""Force a node cluster to handshake with another node."""
fut = self._conn.execute(b'CLUSTER', b'MEET', ip, port)
fut = self.execute(b'CLUSTER', b'MEET', ip, port)
return wait_ok(fut)

def cluster_nodes(self):
Expand All @@ -68,23 +68,23 @@ def cluster_nodes(self):

def cluster_replicate(self, node_id):
"""Reconfigure a node as a slave of the specified master node."""
fut = self._conn.execute(b'CLUSTER', b'REPLICATE', node_id)
fut = self.execute(b'CLUSTER', b'REPLICATE', node_id)
return wait_ok(fut)

def cluster_reset(self, *, hard=False):
"""Reset a Redis Cluster node."""
reset = hard and b'HARD' or b'SOFT'
fut = self._conn.execute(b'CLUSTER', b'RESET', reset)
fut = self.execute(b'CLUSTER', b'RESET', reset)
return wait_ok(fut)

def cluster_save_config(self):
"""Force the node to save cluster state on disk."""
fut = self._conn.execute(b'CLUSTER', b'SAVECONFIG')
fut = self.execute(b'CLUSTER', b'SAVECONFIG')
return wait_ok(fut)

def cluster_set_config_epoch(self, config_epoch):
"""Set the configuration epoch in a new node."""
fut = self._conn.execute(b'CLUSTER', b'SET-CONFIG-EPOCH', config_epoch)
fut = self.execute(b'CLUSTER', b'SET-CONFIG-EPOCH', config_epoch)
return wait_ok(fut)

def cluster_setslot(self, slot, command, node_id):
Expand Down
Loading

0 comments on commit 8b0efa2

Please sign in to comment.