Skip to content

Commit

Permalink
add coordination
Browse files Browse the repository at this point in the history
  • Loading branch information
sfzeng committed Apr 13, 2020
1 parent 92eb4ee commit 16505f9
Show file tree
Hide file tree
Showing 4 changed files with 219 additions and 0 deletions.
2 changes: 2 additions & 0 deletions dolphin/common/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
help="Top-level directory for maintaining dolphin's state."),
]

CONF.register_cli_opts(core_opts)

global_opts = [
cfg.HostAddressOpt('my_ip',
default=netutils.get_my_ipv4(),
Expand Down
203 changes: 203 additions & 0 deletions dolphin/coordination.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

"""Tooz Coordination and locking utilities."""

import inspect

import decorator
from oslo_config import cfg
from oslo_log import log
from oslo_utils import uuidutils
import six
from tooz import coordination
from tooz import locking

from dolphin import exception
from dolphin.i18n import _


LOG = log.getLogger(__name__)


coordination_opts = [
cfg.StrOpt('backend_url',
default='redis://127.0.0.1:6379',
help='The back end URL to use for distributed coordination.')
]

CONF = cfg.CONF
CONF.register_opts(coordination_opts, group='coordination')


class Coordinator(object):
"""Tooz coordination wrapper.
Coordination member id is created from concatenated `prefix` and
`agent_id` parameters.
:param str agent_id: Agent identifier
:param str prefix: Used to provide member identifier with a
meaningful prefix.
"""

def __init__(self, agent_id=None, prefix=''):
self.coordinator = None
self.agent_id = agent_id or uuidutils.generate_uuid()
self.started = False
self.prefix = prefix

def start(self):
"""Connect to coordination back end."""
if self.started:
return

# NOTE(gouthamr): Tooz expects member_id as a byte string.
member_id = (self.prefix + self.agent_id).encode('ascii')
LOG.info("cfg.CONF.coordination.backend_url=%s, membrid=%s" % (cfg.CONF.coordination.backend_url, member_id))
self.coordinator = coordination.get_coordinator(
cfg.CONF.coordination.backend_url, member_id)
self.coordinator.start(start_heart=True)
self.started = True

def stop(self):
"""Disconnect from coordination back end."""
msg = 'Stopped Coordinator (Agent ID: %(agent)s, prefix: %(prefix)s)'
msg_args = {'agent': self.agent_id, 'prefix': self.prefix}
if self.started:
self.coordinator.stop()
self.coordinator = None
self.started = False

LOG.info(msg, msg_args)

def get_lock(self, name):
"""Return a Tooz back end lock.
:param str name: The lock name that is used to identify it
across all nodes.
"""
# NOTE(gouthamr): Tooz expects lock name as a byte string
lock_name = (self.prefix + name).encode('ascii')
if self.started:
return self.coordinator.get_lock(lock_name)
else:
raise exception.LockCreationFailed(_('Coordinator uninitialized.'))


LOCK_COORDINATOR = Coordinator(prefix='dolphin-')


class Lock(locking.Lock):
"""Lock with dynamic name.
:param str lock_name: Lock name.
:param dict lock_data: Data for lock name formatting.
:param coordinator: Coordinator object to use when creating lock.
Defaults to the global coordinator.
Using it like so::
with Lock('mylock'):
...
ensures that only one process at a time will execute code in context.
Lock name can be formatted using Python format string syntax::
Lock('foo-{share.id}, {'share': ...,}')
Available field names are keys of lock_data.
"""
def __init__(self, lock_name, lock_data=None, coordinator=None):
super(Lock, self).__init__(six.text_type(id(self)))
lock_data = lock_data or {}
self.coordinator = coordinator or LOCK_COORDINATOR
self.blocking = True
self.lock = self._prepare_lock(lock_name, lock_data)

def _prepare_lock(self, lock_name, lock_data):
if not isinstance(lock_name, six.string_types):
raise ValueError(_('Not a valid string: %s') % lock_name)
return self.coordinator.get_lock(lock_name.format(**lock_data))

def acquire(self, blocking=None):
"""Attempts to acquire lock.
:param blocking: If True, blocks until the lock is acquired. If False,
returns right away. Otherwise, the value is used as a timeout
value and the call returns maximum after this number of seconds.
:return: returns true if acquired (false if not)
:rtype: bool
"""
blocking = self.blocking if blocking is None else blocking
return self.lock.acquire(blocking=blocking)

def release(self):
"""Attempts to release lock.
The behavior of releasing a lock which was not acquired in the first
place is undefined.
"""
self.lock.release()


def synchronized(lock_name, blocking=True, coordinator=None):
"""Synchronization decorator.
:param str lock_name: Lock name.
:param blocking: If True, blocks until the lock is acquired.
If False, raises exception when not acquired. Otherwise,
the value is used as a timeout value and if lock is not acquired
after this number of seconds exception is raised.
:param coordinator: Coordinator object to use when creating lock.
Defaults to the global coordinator.
:raises tooz.coordination.LockAcquireFailed: if lock is not acquired
Decorating a method like so::
@synchronized('mylock')
def foo(self, *args):
...
ensures that only one process will execute the foo method at a time.
Different methods can share the same lock::
@synchronized('mylock')
def foo(self, *args):
...
@synchronized('mylock')
def bar(self, *args):
...
This way only one of either foo or bar can be executing at a time.
Lock name can be formatted using Python format string syntax::
@synchronized('{f_name}-{shr.id}-{snap[name]}')
def foo(self, shr, snap):
...
Available field names are: decorated function parameters and
`f_name` as a decorated function name.
"""
@decorator.decorator
def _synchronized(f, *a, **k):
call_args = inspect.getcallargs(f, *a, **k)
call_args['f_name'] = f.__name__
lock = Lock(lock_name, call_args, coordinator)
with lock(blocking):
LOG.info('Lock "%(name)s" acquired by "%(function)s".',
{'name': lock_name, 'function': f.__name__})
return f(*a, **k)
return _synchronized
10 changes: 10 additions & 0 deletions dolphin/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from dolphin import context
from dolphin import exception
from dolphin import rpc
from dolphin import coordination

LOG = log.getLogger(__name__)

Expand Down Expand Up @@ -98,6 +99,9 @@ def __init__(self, host, binary, topic, manager, periodic_enable=None,
self.coordinator = coordination

def start(self):
if self.coordinator:
coordination.LOCK_COORDINATOR.start()

LOG.info('Starting %(topic)s node.', {'topic': self.topic})
LOG.debug("Creating RPC server for service %s.", self.topic)

Expand Down Expand Up @@ -181,6 +185,12 @@ def stop(self):
x.stop()
except Exception:
pass
if self.coordinator:
try:
coordination.LOCK_COORDINATOR.stop()
except Exception:
LOG.exception("Unable to stop the Tooz Locking "
"Coordinator.")

self.timers = []

Expand Down
4 changes: 4 additions & 0 deletions dolphin/task_manager/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@

from dolphin import manager
from dolphin.task_manager import rpcapi as task_rpcapi
from dolphin import coordination
from dolphin import context

LOG = log.getLogger(__name__)
CONF = cfg.CONF
Expand All @@ -39,7 +41,9 @@ def __init__(self, service_name=None, *args, **kwargs):
super(TaskManager, self).__init__(*args, **kwargs)
self.task_rpcapi = task_rpcapi.TaskAPI()

"""Periodical task, this task will use coordination for distribute synchronization."""
@periodic_task.periodic_task(spacing=2, run_immediately=True)
@coordination.synchronized('lock-task-example')
def _task_example(self, context):
LOG.info("Produce task, say hello ...")
self.task_rpcapi.say_hello(context)
Expand Down

0 comments on commit 16505f9

Please sign in to comment.