Remote access from one system to models and functions of other one using Celery machinery.
Relies on three outstanding python projects:
Client and server are designed to:
- filter models with Django ORM lookups, Q-objects and excludes;
- change model state (create, update, update or create, delete);
- change model state in bulk mode (more than one object per request);
- atomic get-set model state with bulk mode support;
- call function;
- client does not require Django;
Install client:
pip install djangoceleryrpc
Install server:
pip install djangoceleryrpc[server]
Default configuration of django-celery-rpc must be overridden in settings.py by CELERY_RPC_CONFIG. The CELERY_RPC_CONFIG is a dict which must contains at least two keys: BROKER_URL and CELERY_RESULT_BACKEND. Any Celery config params also permitted (see Configuration and defaults)
setting.py:
# minimal required configuration
CELERY_RPC_CONFIG = {
'broker_url': 'amqp://guest:guest@rabbitmq:5672//',
'result_backend': 'redis://redis:6379/0',
}
setting.py:
# alternate request queue and routing key
CELERY_RPC_CONFIG = {
'broker_url': 'amqp://guest:guest@rabbitmq:5672/',
'result_backend': 'amqp://guest:guest@rabbitmq:5672/',
'task_default_queue': 'celery_rpc.requests.alter_queue',
'task_default_routing_key': 'celery_rpc.alter_routing_key'
}
setting.py:
# this settings will be used in clients by default
CELERY_RPC_CONFIG = {
'broker_url': 'amqp://guest:guest@rabbitmq:5672/',
'result_backend': 'redis://redis:6379/0',
}
# 'eggs' alternative configuration will be explicitly passed to the client constructor
CELERY_RPC_EGGS_CLIENT = {
# BROKER_URL will be used by default from section above
'result_backend': 'amqp://guest:guest@rabbitmq:5672/',
'task_default_queue': 'celery_rpc.requests.alter_queue',
'task_default_routing_key': 'celery_rpc.alter_routing_key'
}
*Note:
- client and server must share the same BROKER_URL, RESULT_BACKEND, DEFAULT_EXCHANGE, DEFAULT_QUEUE, DEFAULT_ROUTING_KEY
- different server must serve different request queues with different routing keys or must work with different exchanges*
example.py
from celery_rpc.client import Client
from django.conf import settings
# create client with default settings
span_client = Client()
# create client for `eggs` server
eggs_client = Client(CELERY_RPC_EGGS_CLIENT)
You can find more examples in tests.
Simple filtering example
span_client.filter('app.models:MyModel', kwargs=dict(filters={'a__exact':'a'}))
Filtering with Q object
from django.db.models import Q
span_client.filter('app.models:MyModel', kwargs=dict(filters_Q=(Q(a='1') | Q(b='1')))
Also, we can use both Q and lookups
span_client.filter('app.models:MyModel', kwargs=dict(filters={'c__exact':'c'}, filters_Q=(Q(a='1') | Q(b='1')))
Exclude supported
span_client.filter('app.models:MyModel', kwargs=dict(exclude={'c__exact':'c'}, exclude_Q=(Q(a='1') | Q(b='1')))
You can mix filters and exclude, Q-object with lookups. Try it yourself. ;)
Full list of available kwargs:
filters - dict of terms compatible with django lookup fields
offset - offset from which return a results
limit - max number of results
fields - list of serializer fields, which will be returned
exclude - lookups for excluding matched models
order_by - order of results (list, tuple or string),
minus ('-') set reverse order, default = []
filters_Q - django Q-object for filtering models
exclude_Q - django Q-object for excluding matched models
List of all MyModel objects with high priority
span_client.filter('app.models:MyModel', high_priority=True)
Create one object
span_client.create('apps.models:MyModel', data={"a": "a"})
Bulk creating
span_client.create('apps.models:MyModel', data=[{"a": "a"}, {"a": "b"}])
Update one object by PK field name
span_client.update('apps.models:MyModel', data={"id": 1, "a": "a"})
Update one object by special alias 'pk' which matched automatically to PK field
span_client.update('apps.models:MyModel', data={"id": 1, "a": "a"})
Attention! Magic area! Update one object by any field you wish
span_client.update('apps.models:MyModel', data={"alternative_key_field": 42, "a": "a"},
{'identity': 'alternative_key_field'})
All cases are very similar. Try it you console!
filter
- select modelscreate
- create new models, raise exception if model existsupdate
- update existing modelsupdate_or_create
- update if exist or create newdelete
- delete existing modelsgetset
- set new state and return old state atomically
All method support options:
fields
- shrink result fieldsserializer_cls
- fully qualified symbol name to DRF serializer class on serveridentity
- field name which will be used rather than PK field (mindless forfilter
)
It's possible to pipeline tasks, so they will be executed in one transaction.
p = span_client.pipe()
p = p.create('apps.models:MyModel', data={"a": "a"})
p = p.create('apps.models:MyAnotherModel', data={"b": "b"})
p.run()
You can pass some arguments from previous task to the next.
Suppose you have those models on the server
class MyModel(models.Model):
a = models.CharField()
class MyAnotherModel(models.Model):
fk = models.ForeignKey(MyModel)
b = models.CharField()
You need to create instance of MyModel and instance of MyAnotherModel which reffers to MyModel
p = span_client.pipe()
p = p.create('apps.models:MyModel', data={"a": "a"})
p = p.translate({"fk": "id"}, defaults={"b": "b"})
p = p.create('apps.models:MyAnotherModel')
p.run()
In this example the translate
task:
- take result of the previous
create
task - extract value of "id" field from it
- add this value to "defaults" by key "fk"
After that next create
task takes result of translate
as input data
Lets take such models:
class MyModel(models.Model):
str = models.CharField()
class MyManyToManyModel(models.Model):
m2m = models.ManyToManyField(MyModel, null=True)
Add relation between existing objects
my_models = span_client.create('apps.models:MyModel',
[{'str': 'monthy'}, {'str': 'python'}])
m2m_model = span_client.create('apps.models:MyManyToManyModel',
{'m2m': [my_models[0]['id']]})
# Will add 'python' to m2m_model.m2m where 'monty' already is
data = {'mymodel': my_models[1]['id'], 'mymanytomanymodel': m2m_model['id']}
through = span_client.create('apps.models:MyManyToManyModel.m2m.through', data)
And then delete some of existing relations
# Next `pipe` will eliminate all relations where `mymodel__str` equals 'monty'
p = span_client.pipe()
p = p.filter('apps.models:MyManyToManyModel.m2m.through', {'mymodel__str': 'monthy'})
p = p.delete('apps.models:MyManyToManyModel.m2m.through')
p.run()
celery worker -A celery_rpc.app
Server with support task consuming prioritization
celery multi start 2 -A celery_rpc.app -Q:1 celery_rpc.requests.high_priority
Note, you must replace 'celery_rpc.request' with actual value of config param CELERY_DEFAULT_QUEUE
Command will start two instances. First instance will consume from high priority queue only. Second instance will serve both queues.
For daemonization see Running the worker as a daemon
python django-celery-rpc/celery_rpc/runtests/runtests.py
OVERRIDE_BASE_TASKS = {
'ModelTask': 'package.module.MyModelTask',
'ModelChangeTask': 'package.module.MyModelChangeTask',
'FunctionTask': 'package.module.MyFunctionTask'
}
Supported class names: ModelTask
, ModelChangeTask
, FunctionTask
# Both server and client
CELERY_RPC_CONFIG['wrap_remote_errors'] = True
After enabling remote exception wrapping client will raise same errors happened
on the server side.
If client side has no error defined (i.e. no package installed),
Client.RemoteError
will be raised.
Also, Client.RemoteError
is a base for all exceptions on the client side.
For unknown exceptions this code is valid:
try:
result = rpc_client.call("remote_func")
except rpc_client.errors.SomeUnknownError as e:
# here a stub for remote SomeUnknownError is handled
print (e.args)
For known exceptions both variants work:
try:
result = rpc_client.call("remote_func")
except rpc_client.errors.MultipleObjectsReturned as e:
# django.core.exceptions.MultipleObjectsReturned
handle_error(e)
except django.core.exceptions.ObjectDoesNotExist as e:
# django.core.exceptions.ObjectDoesNotExist
handle_error(e)
If original exception hierarchy is needed:
SomeBaseError = rpc_client.errors.SomeBaseError
DerivedError = rpc_client.errors.subclass(SomeBaseError, "DerivedError")
- Set default non-generic model serializer.
- Test support for RPC result backend from Celery.
- Token auth and permissions support (like DRF).
- Resource map and strict mode.
- ...
Thanks for all who contributing to this project: