Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow to assign a default channel on @job functions #71

Merged
merged 2 commits into from
May 27, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions connector/queue/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -589,9 +589,15 @@ def related_action(self, session):
JOB_REGISTRY = set()


def job(func):
def job(func=None, default_channel='root'):
""" Decorator for jobs.

Optional argument:

:param default_channel: the channel wherein the job will be assigned. This
channel is set at the installation of the module
and can be manually changed later using the views.

Add a ``delay`` attribute on the decorated function.

When ``delay`` is called, the function is transformed to a job and
Expand Down Expand Up @@ -646,19 +652,31 @@ def export_one_thing(session, model_name, one_thing):
# => the job will be executed with a low priority and not before a
# delay of 5 hours from now

@job(default_channel='root.subchannel')
def export_one_thing(session, model_name, one_thing):
# work
# export one_thing

See also: :py:func:`related_action` a related action can be attached
to a job

"""
if func is None:
return functools.partial(job, default_channel=default_channel)

def delay(session, model_name, *args, **kwargs):
"""Enqueue the function. Return the uuid of the created job."""
return OpenERPJobStorage(session).enqueue_resolve_args(
func,
model_name=model_name,
*args,
**kwargs)
JOB_REGISTRY.add(func)

assert default_channel == 'root' or default_channel.startswith('root.'), (
"The channel path must start by 'root'")
func.default_channel = default_channel
func.delay = delay
JOB_REGISTRY.add(func)
return func


Expand Down
37 changes: 33 additions & 4 deletions connector/queue/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ def _compute_complete_name(self):
self.complete_name = '.'.join(reversed(parts))

@api.one
@api.constrains('parent_id')
@api.constrains('parent_id', 'name')
def parent_required(self):
if self.name != 'root' and not self.parent_id:
raise exceptions.ValidationError(_('Parent channel required.'))
Expand Down Expand Up @@ -489,11 +489,40 @@ def _default_channel(self):
readonly=True)

@api.model
def _setup_complete(self):
super(JobFunction, self)._setup_complete()
def _find_or_create_channel(self, channel_path):
channel_model = self.env['queue.job.channel']
parts = channel_path.split('.')
parts.reverse()
channel_name = parts.pop()
assert channel_name == 'root', "A channel path starts with 'root'"
# get the root channel
channel = channel_model.search([('name', '=', channel_name)])
while parts:
channel_name = parts.pop()
parent_channel = channel
channel = channel_model.search([
('name', '=', channel_name),
('parent_id', '=', parent_channel.id)],
limit=1,
)
if not channel:
channel = channel_model.create({
'name': channel_name,
'parent_id': parent_channel.id,
})
return channel

@api.model
def _register_jobs(self):
for func in JOB_REGISTRY:
if not is_module_installed(self.pool, get_openerp_module(func)):
continue
func_name = '%s.%s' % (func.__module__, func.__name__)
if not self.search_count([('name', '=', func_name)]):
self.create({'name': func_name})
channel = self._find_or_create_channel(func.default_channel)
self.create({'name': func_name, 'channel_id': channel.id})

@api.model
def _setup_complete(self):
super(JobFunction, self)._setup_complete()
self._register_jobs()
82 changes: 81 additions & 1 deletion connector/tests/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import unittest2
from datetime import datetime, timedelta

from openerp import SUPERUSER_ID
from openerp import SUPERUSER_ID, exceptions
import openerp.tests.common as common
from openerp.addons.connector.queue.job import (
Job,
Expand Down Expand Up @@ -610,3 +610,83 @@ def test_job_subscription(self):
followers_id = [f.id for f in stored.message_follower_ids]
self.assertIn(self.other_partner_a.id, followers_id)
self.assertNotIn(self.other_partner_b.id, followers_id)


class TestJobChannels(common.TransactionCase):

def setUp(self):
super(TestJobChannels, self).setUp()
self.function_model = self.env['queue.job.function']
self.channel_model = self.env['queue.job.channel']
self.job_model = self.env['queue.job']
self.root_channel = self.env.ref('connector.channel_root')
self.session = ConnectorSession(self.cr, self.uid, context={})

def test_channel_complete_name(self):
channel = self.channel_model.create({'name': 'number',
'parent_id': self.root_channel.id,
})
subchannel = self.channel_model.create({'name': 'five',
'parent_id': channel.id,
})
self.assertEquals(channel.complete_name, 'root.number')
self.assertEquals(subchannel.complete_name, 'root.number.five')

def test_channel_tree(self):
with self.assertRaises(exceptions.ValidationError):
self.channel_model.create({'name': 'sub'})

def test_channel_root(self):
with self.assertRaises(exceptions.Warning):
self.root_channel.unlink()
with self.assertRaises(exceptions.Warning):
self.root_channel.name = 'leaf'

def test_register_jobs(self):
job(task_a)
job(task_b)
self.function_model._register_jobs()
path_a = 'openerp.addons.connector.tests.test_job.task_a'
path_b = 'openerp.addons.connector.tests.test_job.task_b'
self.assertTrue(self.function_model.search([('name', '=', path_a)]))
self.assertTrue(self.function_model.search([('name', '=', path_b)]))

def test_channel_on_job(self):
job(task_a)
self.function_model._register_jobs()
path_a = 'openerp.addons.connector.tests.test_job.task_a'
job_func = self.function_model.search([('name', '=', path_a)])
self.assertEquals(job_func.channel, 'root')

test_job = Job(func=task_a)
storage = OpenERPJobStorage(self.session)
storage.store(test_job)
stored = self.job_model.search([('uuid', '=', test_job.uuid)])
self.assertEquals(stored.channel, 'root')

channel = self.channel_model.create({'name': 'sub',
'parent_id': self.root_channel.id,
})
job_func.channel_id = channel

test_job = Job(func=task_a)
storage = OpenERPJobStorage(self.session)
storage.store(test_job)
stored = self.job_model.search([('uuid', '=', test_job.uuid)])
self.assertEquals(stored.channel, 'root.sub')

def test_default_channel(self):
self.function_model.search([]).unlink()
job(task_a, default_channel='root.sub.subsub')
self.assertEquals(task_a.default_channel, 'root.sub.subsub')

self.function_model._register_jobs()

path_a = 'openerp.addons.connector.tests.test_job.task_a'
job_func = self.function_model.search([('name', '=', path_a)])

self.assertEquals(job_func.channel, 'root.sub.subsub')
channel = job_func.channel_id
self.assertEquals(channel.name, 'subsub')
self.assertEquals(channel.parent_id.name, 'sub')
self.assertEquals(channel.parent_id.parent_id.name, 'root')