Skip to content

Commit

Permalink
scheduler: improvments
Browse files Browse the repository at this point in the history
* Uses f strings for logging.
* Adds `replace-idby-subjects-imported` scheduled task.

Co-Authored-by: Peter Weber <peter.weber@rero.ch>
  • Loading branch information
rerowep committed Jun 8, 2022
1 parent 43f6fae commit cf42ba6
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 138 deletions.
13 changes: 11 additions & 2 deletions rero_ils/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,16 +401,25 @@ def _(x):
},
'replace-idby-contribution': {
'task': ('rero_ils.modules.documents.tasks.replace_idby_contribution'),
'schedule': crontab(minute=22, hour=22, day_of_week=6),
'schedule': crontab(minute=5, hour=2, day_of_week=6),
# Every week on Saturday at 22:22 UTC,
'enabled': False
},
'replace-idby-subjects': {
'task': ('rero_ils.modules.documents.tasks.replace_idby_subjects'),
'schedule': crontab(minute=21, hour=22, day_of_week=6),
'schedule': crontab(minute=10, hour=2, day_of_week=6),
# Every week on Saturday at 22:22 UTC,
'enabled': False
},
'replace-idby-subjects-imported': {
'task': ('rero_ils.modules.documents.tasks.replace_idby_subjects'),
'kwargs': {
'subjects': 'subjects_imported'
},
'schedule': crontab(minute=15, hour=2, day_of_week=6),
# Every week on Saturday at 22:20 UTC,
'enabled': False
},
'delete-provisional-items': {
'task': 'rero_ils.modules.items.tasks.delete_provisional_items',
'schedule': crontab(minute=0, hour=3), # Every day at 03:00 UTC,
Expand Down
20 changes: 6 additions & 14 deletions rero_ils/modules/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -392,17 +392,12 @@ def update(self, data, commit=False, dbcommit=False, reindex=False):
:param redindex: reindex the record.
:returns: the modified record
"""
pid = data.get('pid')
if pid:
if pid := data.get('pid'):
db_record = self.get_record_by_id(self.id)
if pid != db_record.pid:
raise IlsRecordError.PidChange(
'{class_n} changed pid from {old_pid} to {new_pid}'.format(
class_n=self.__class__.__name__,
old_pid=db_record.pid,
new_pid=pid
)
)
f'{self.__class__.__name__} changed pid from '
f'{db_record.pid} to {pid}')
record = self

# TODO: find a way to make extended validations.
Expand Down Expand Up @@ -478,11 +473,8 @@ def delete_from_index(self):
indexer().delete(self)
except NotFoundError:
current_app.logger.warning(
'Can not delete from index {class_name}: {pid}'.format(
class_name=self.__class__.__name__,
pid=self.pid
)
)
f'Can not delete from index {self.__class__.__name__}'
f': {self.pid}')

@property
def pid(self):
Expand Down Expand Up @@ -615,7 +607,7 @@ def _actionsiter(self, message_iterator):
except Exception:
message.reject()
current_app.logger.error(
"Failed to index record {id}".format(id=payload.get('id')),
f"Failed to index record {payload.get('id')}",
exc_info=True)

def _index_action(self, payload):
Expand Down
155 changes: 80 additions & 75 deletions rero_ils/modules/documents/utils_mef.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@

"""Celery tasks to documents."""


from __future__ import absolute_import, print_function

import contextlib
from abc import ABC, abstractmethod

import click
from elasticsearch_dsl import Q
from flask import current_app
from sqlalchemy.orm.exc import NoResultFound
from webargs import ValidationError

from .api import Document, DocumentsSearch
Expand Down Expand Up @@ -231,43 +234,44 @@ def process(self):
click.echo(f'Found identifiedBy contributions: '
f'{self._query_filter().count()}')
for hit in list(self._query_filter().source('pid').scan()):
doc = Document.get_record_by_id(hit.meta.id)
changed = False
for contribution in doc.get('contribution', []):
ref_type = contribution['agent'].get(
'identifiedBy', {}).get('type', '').lower()
if ref_type in self.cont_types + ['rero']:
ref_pid = contribution['agent'].get(
'identifiedBy', {}).get('value')
if cont := self.get_contribution(hit.pid, ref_type,
ref_pid):
# change the contribution to linked contribution
for cont_type in self.cont_types:
if cont.get(cont_type):
changed = True
url = f'{self.mef_url}/{cont_type}/' \
f'{cont[cont_type]["pid"]}'
new_contribution = {
'$ref': url,
'type': contribution['agent']['type']
}
self.print_debug(
f'{hit.pid} Change:',
f' {contribution["agent"]}',
f' {new_contribution}'
)
contribution['agent'] = new_contribution
break
else:
self.add_preferred_name(
ref_type=ref_type,
ref_pid=ref_pid,
preferred_name=contribution.get(
'agent', {}).get('preferred_name', ''),
type=contribution.get(
'agent', {}).get('type', ''),
)
self.update_document(changed=changed, document=doc)
with contextlib.suppress(NoResultFound):
doc = Document.get_record_by_id(hit.meta.id)
changed = False
for contribution in doc.get('contribution', []):
ref_type = contribution['agent'].get(
'identifiedBy', {}).get('type', '').lower()
if ref_type in self.cont_types + ['rero']:
ref_pid = contribution['agent'].get(
'identifiedBy', {}).get('value')
if cont := self.get_contribution(hit.pid, ref_type,
ref_pid):
# change the contribution to linked contribution
for cont_type in self.cont_types:
if cont.get(cont_type):
changed = True
url = f'{self.mef_url}/{cont_type}/' \
f'{cont[cont_type]["pid"]}'
new_contribution = {
'$ref': url,
'type': contribution['agent']['type']
}
self.print_debug(
f'{hit.pid} Change:',
f' {contribution["agent"]}',
f' {new_contribution}'
)
contribution['agent'] = new_contribution
break
else:
self.add_preferred_name(
ref_type=ref_type,
ref_pid=ref_pid,
preferred_name=contribution.get(
'agent', {}).get('preferred_name', ''),
type=contribution.get(
'agent', {}).get('type', ''),
)
self.update_document(changed=changed, document=doc)
return self.counts


Expand Down Expand Up @@ -298,42 +302,43 @@ def process(self):
f'{self._query_filter().count()}')
hits = list(self._query_filter().source('pid').scan())
for hit in list(self._query_filter().source('pid').scan()):
doc = Document.get_record_by_id(hit.meta.id)
changed = False
for subject in doc.get(self.name, []):
ref_type = subject.get(
'identifiedBy', {}).get('type', '').lower()
is_pers_org = subject.get('type') in [
'bf:Person', 'bf:Organisation']
if ref_type in self.cont_types + ['rero'] and is_pers_org:
ref_pid = subject.get('identifiedBy', {}).get('value')
if cont := self.get_contribution(hit.pid, ref_type,
ref_pid):
# change the contribution to linked contribution
for cont_type in ['idref', 'gnd']:
if cont.get(cont_type):
changed = True
url = f'{self.mef_url}/{cont_type}/' \
f'{cont[cont_type]["pid"]}'
new_subject = {
'$ref': url,
# TOTO: we have to correct all wrong
# bf:Organisation
'type': subject['type']
}
self.print_debug(
f'{hit.pid} Change:',
f' {subject}',
f' {new_subject}'
)
subject = new_subject
break
else:
self.add_preferred_name(
ref_type=ref_type,
ref_pid=ref_pid,
preferred_name=subject.get('preferred_name'),
type=subject.get('type')
)
self.update_document(changed=changed, document=doc)
with contextlib.suppress(NoResultFound):
doc = Document.get_record_by_id(hit.meta.id)
changed = False
for subject in doc.get(self.name, []):
ref_type = subject.get(
'identifiedBy', {}).get('type', '').lower()
is_pers_org = subject.get('type') in [
'bf:Person', 'bf:Organisation']
if ref_type in self.cont_types + ['rero'] and is_pers_org:
ref_pid = subject.get('identifiedBy', {}).get('value')
if cont := self.get_contribution(hit.pid, ref_type,
ref_pid):
# change the contribution to linked contribution
for cont_type in ['idref', 'gnd']:
if cont.get(cont_type):
changed = True
url = f'{self.mef_url}/{cont_type}/' \
f'{cont[cont_type]["pid"]}'
new_subject = {
'$ref': url,
# TOTO: we have to correct all wrong
# bf:Organisation
'type': subject['type']
}
self.print_debug(
f'{hit.pid} Change:',
f' {subject}',
f' {new_subject}'
)
subject = new_subject
break
else:
self.add_preferred_name(
ref_type=ref_type,
ref_pid=ref_pid,
preferred_name=subject.get('preferred_name'),
type=subject.get('type')
)
self.update_document(changed=changed, document=doc)
return self.counts
66 changes: 19 additions & 47 deletions rero_ils/schedulers.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,7 @@ def __init__(self, app, *args, **kwargs):
lazy = kwargs.get('lazy', False)
url = app.conf.get("CELERY_REDIS_SCHEDULER_URL",
"redis://localhost:6379")
logger.info('Connect: {url} lazy:{lazy}'.format(
url=url,
lazy=lazy
))
logger.info(f'Connect: {url} lazy:{lazy}')
kwargs['app'] = app
kwargs['lazy'] = lazy
super().__init__(*args, **kwargs)
Expand All @@ -83,20 +80,18 @@ def get(self, name):
:return: scheduled task
"""
tasks = self.rdb.zrange(self.key, 0, -1) or []
for idx, task in enumerate(tasks):
for task in tasks:
entry = jsonpickle.decode(task)
if entry.name == name:
return entry
else:
return None

def enabled_name(self, name):
"""Name for enabled value in REDIS DB.
:param name: name of entry in task scheduler
:return: name of the enable key in REDIS DB
"""
return '{key}:{name}'.format(key=self.key, name=name)
return f'{self.key}:{name}'

def merge_inplace(self, tasks):
"""Merge entries from CELERY_BEAT_SCHEDULE.
Expand All @@ -114,9 +109,8 @@ def setup_schedule(self):
beat_schedule = FLASK_TO_CELERY_MAPPING['CELERY_BEAT_SCHEDULE']
config = deepcopy(self.app.conf.get(beat_schedule))
self.merge_inplace(config)
msg = 'Current schedule:\n{tasks}'.format(
tasks='\n'.join(self.display_all(prefix='- Tasks: '))
)
current_schedule = "\n".join(self.display_all(prefix="- Tasks: "))
msg = f'Current schedule:\n {current_schedule}'
logger.info(msg)

def is_due(self, entry):
Expand All @@ -136,17 +130,11 @@ def is_due(self, entry):
:return: the state of the entry as schedstate
"""
if self.get_entry_enabled(entry.name):
state = entry.is_due()
else:
msg = ('Not enabled: {name} = {task} {each} {kwargs}'.format(
name=entry.name,
task=entry.task,
each=repr(entry.schedule),
kwargs=entry.kwargs,
))
logger.info(msg)
state = schedstate(is_due=False, next=entry.is_due().next)
return state
return entry.is_due()
msg = f'Not enabled: {entry.name} = {entry.task} ' \
f'{repr(entry.schedule)} {entry.kwargs}'
logger.info(msg)
return schedstate(is_due=False, next=entry.is_due().next)

def set(self, entry, enable=True):
"""Sets an entry.
Expand Down Expand Up @@ -206,30 +194,14 @@ def display_entry(self, name, prefix='- '):
:param prefix: prefix to add to returned info
:return: entry as string representative
"""
entry_as_text = 'Not found entry: {name}'.format(name=name)
entry = self.get(name)
if entry:
entry_as_text = f'Not found entry: {name}'
if entry := self.get(name):
entry_as_text = (
'{txt}{name} = {task} {each} {kwargs}'
' {options} {enabled}'
).format(
txt=prefix,
name=entry.name,
task=entry.task,
each=repr(entry.schedule),
kwargs='{txt}{data}'.format(
txt='kwargs:',
data=entry.kwargs
),
options='{txt}{data}'.format(
txt='options:',
data=entry.options
),
enabled='{txt}{data}'.format(
txt='enabled:',
data=self.get_entry_enabled(name)
)
)
f'{prefix}{entry.name} = {entry.task} {repr(entry.schedule)} '
f'kwargs:{entry.kwargs} '
f'options:{entry.options} '
f'enabled:{self.get_entry_enabled(name)}'
)
return entry_as_text

def display_all(self, prefix='- '):
Expand Down Expand Up @@ -270,7 +242,7 @@ def set_entry_enabled(self, name, enable=True):
:param enable: enable or disable scheduling
"""
if self.get(name):
enabled_name = '{key}:{name}'.format(key=self.key, name=name)
enabled_name = f'{self.key}:{name}'
self.rdb[enabled_name] = int(enable)

def set_enable_all(self, enable=True):
Expand Down Expand Up @@ -336,7 +308,7 @@ def enable_tasks(all, names, disable, verbose):
if verbose:
click.echo('\n'.join(current_scheduler.display_all()))
else:
names = names if names else []
names = names or []
for name in names:
name = name.strip()
current_scheduler.set_entry_enabled(name=name, enable=not disable)
Expand Down

0 comments on commit cf42ba6

Please sign in to comment.