Skip to content

Commit

Permalink
Add write_only argument to Kombu and Redis manager classes
Browse files Browse the repository at this point in the history
  • Loading branch information
miguelgrinberg committed Jan 10, 2016
1 parent 0e47a75 commit 57756e3
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 21 deletions.
40 changes: 26 additions & 14 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -233,10 +233,11 @@ type of installation, each server processes owns the connections to a subset
of the clients. To make broadcasting work in this environment, the servers
communicate with each other through the message queue.

The message queue service needs to be installed and configured separately. By
default, the server uses `Kombu <http://kombu.readthedocs.org/en/latest/>`_
to access the message queue, so any message queue supported by this package
can be used. Kombu can be installed with pip::
The message queue service needs to be installed and configured separately. One
of the options offered by this package is to use
`Kombu <http://kombu.readthedocs.org/en/latest/>`_ to access the message
queue, which means that any message queue supported by this package can be
used. Kombu can be installed with pip::

pip install kombu

Expand All @@ -252,29 +253,40 @@ To configure a Socket.IO server to connect to a message queue, the
following example instructs the server to connect to a Redis service running
on the same host and on the default port::

redis = socketio.KombuManager('redis://')
sio = socketio.Server(client_manager=redis)
mgr = socketio.KombuManager('redis://')
sio = socketio.Server(client_manager=mgr)

For a RabbitMQ queue also running on the local server with default
credentials, the configuration is as follows::

amqp = socketio.KombuManager('amqp://')
sio = socketio.Server(client_manager=amqp)
mgr = socketio.KombuManager('amqp://')
sio = socketio.Server(client_manager=mgr)

The arguments passed to the ``KombuManager`` constructor are passed directly
to Kombu's `Connection object
The URL passed to the ``KombuManager`` constructor is passed directly to
Kombu's `Connection object
<http://kombu.readthedocs.org/en/latest/userguide/connections.html>`_, so
the Kombu documentation should be consulted for information on how to
connect to the message queue appropriately.

If the use of Kombu is not desired, native Redis support is also offered
through the ``RedisManager`` class. This class takes the same arguments as
``KombuManager``, but connects directly to a Redis store using the queue's
pub/sub functionality::


mgr = socketio.RedisManager('redis://')
sio = socketio.Server(client_manager=mgr)

If multiple Sokcet.IO servers are connected to a message queue, they
automatically communicate with each other and manage a combined client list,
without any need for additional configuration. To have a process other than
the server connect to the queue to emit a message, the same ``KombuManager``
class can be used as standalone object. For example::
a server connect to the queue to emit a message, the same ``KombuManager``
and ``RedisManager`` classes can be used as standalone object. In this case,
the ``write_only`` argument should be set to ``True`` to disable the creation
of a listening thread. For example::

# connect to the redis queue
redis = socketio.KombuManager('redis://localhost:6379/')
# connect to the redis queue through Kombu
redis = socketio.KombuManager('redis://', write_only=True)
# emit an event
redis.emit('my event', data={'foo': 'bar'}, room='my room')
Expand Down
8 changes: 6 additions & 2 deletions socketio/kombu_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,23 @@ class KombuManager(PubSubManager): # pragma: no cover
connection URLs.
:param channel: The channel name on which the server sends and receives
notifications. Must be the same in all the servers.
:param write_only: If set ot ``True``, only initialize to emit events. The
default of ``False`` initializes the class for emitting
and receiving.
"""
name = 'kombu'

def __init__(self, url='amqp://guest:guest@localhost:5672//',
channel='socketio'):
channel='socketio', write_only=False):
if kombu is None:
raise RuntimeError('Kombu package is not installed '
'(Run "pip install kombu" in your '
'virtualenv).')
self.kombu = kombu.Connection(url)
self.exchange = kombu.Exchange(channel, type='fanout', durable=False)
self.queue = kombu.Queue(str(uuid.uuid4()), self.exchange)
super(KombuManager, self).__init__(channel=channel)
super(KombuManager, self).__init__(channel=channel,
write_only=write_only)

def _publish(self, data):
with self.kombu.SimpleQueue(self.queue) as queue:
Expand Down
6 changes: 4 additions & 2 deletions socketio/pubsub_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,16 @@ class PubSubManager(BaseManager):
"""
name = 'pubsub'

def __init__(self, channel='socketio'):
def __init__(self, channel='socketio', write_only=False):
super(PubSubManager, self).__init__()
self.channel = channel
self.write_only = write_only
self.host_id = uuid.uuid4().hex

def initialize(self, server):
super(PubSubManager, self).initialize(server)
self.thread = self.server.start_background_task(self._thread)
if not self.write_only:
self.thread = self.server.start_background_task(self._thread)
self.server.logger.info(self.name + ' backend initialized.')

def emit(self, event, data, namespace=None, room=None, skip_sid=None,
Expand Down
12 changes: 9 additions & 3 deletions socketio/redis_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,26 @@ class RedisManager(PubSubManager): # pragma: no cover
url = 'redis://hostname:port/0'
server = socketio.Server(client_manager=socketio.RedisManager(url))
:param url: The connection URL for the Redis server.
:param url: The connection URL for the Redis server. For a default Redis
store running on the same host, use ``redis://``.
:param channel: The channel name on which the server sends and receives
notifications. Must be the same in all the servers.
:param write_only: If set ot ``True``, only initialize to emit events. The
default of ``False`` initializes the class for emitting
and receiving.
"""
name = 'redis'

def __init__(self, url='redis://localhost:6379/0', channel='socketio'):
def __init__(self, url='redis://localhost:6379/0', channel='socketio',
write_only=False):
if redis is None:
raise RuntimeError('Redis package is not installed '
'(Run "pip install redis" in your '
'virtualenv).')
self.redis = redis.Redis.from_url(url)
self.pubsub = self.redis.pubsub()
super(RedisManager, self).__init__(channel=channel)
super(RedisManager, self).__init__(channel=channel,
write_only=write_only)

def _publish(self, data):
return self.redis.publish(self.channel, pickle.dumps(data))
Expand Down
8 changes: 8 additions & 0 deletions tests/test_pubsub_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@ def test_custom_init(self):
self.assertEqual(pubsub.channel, 'foo')
self.assertEqual(len(pubsub.host_id), 32)

def test_write_only_init(self):
mock_server = mock.MagicMock()
pm = pubsub_manager.PubSubManager(write_only=True)
pm.initialize(mock_server)
self.assertEqual(pm.channel, 'socketio')
self.assertEqual(len(pm.host_id), 32)
self.assertEqual(pm.server.start_background_task.call_count, 0)

def test_emit(self):
self.pm.emit('foo', 'bar')
self.pm._publish.assert_called_once_with(
Expand Down

0 comments on commit 57756e3

Please sign in to comment.