Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduler: improvments #2937

Merged
merged 1 commit into from
Jun 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions rero_ils/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,16 +403,25 @@ def _(x):
},
'replace-idby-contribution': {
'task': ('rero_ils.modules.documents.tasks.replace_idby_contribution'),
'schedule': crontab(minute=22, hour=22, day_of_week=6),
'schedule': crontab(minute=5, hour=2, day_of_week=6),
# Every week on Saturday at 22:22 UTC,
'enabled': False
},
'replace-idby-subjects': {
'task': ('rero_ils.modules.documents.tasks.replace_idby_subjects'),
'schedule': crontab(minute=21, hour=22, day_of_week=6),
'schedule': crontab(minute=10, hour=2, day_of_week=6),
# Every week on Saturday at 22:22 UTC,
'enabled': False
},
'replace-idby-subjects-imported': {
'task': ('rero_ils.modules.documents.tasks.replace_idby_subjects'),
'kwargs': {
'subjects': 'subjects_imported'
},
'schedule': crontab(minute=15, hour=2, day_of_week=6),
# Every week on Saturday at 22:20 UTC,
'enabled': False
},
'delete-provisional-items': {
'task': 'rero_ils.modules.items.tasks.delete_provisional_items',
'schedule': crontab(minute=0, hour=3), # Every day at 03:00 UTC,
Expand Down
20 changes: 6 additions & 14 deletions rero_ils/modules/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -392,17 +392,12 @@ def update(self, data, commit=False, dbcommit=False, reindex=False):
:param redindex: reindex the record.
:returns: the modified record
"""
pid = data.get('pid')
if pid:
if pid := data.get('pid'):
db_record = self.get_record_by_id(self.id)
if pid != db_record.pid:
raise IlsRecordError.PidChange(
'{class_n} changed pid from {old_pid} to {new_pid}'.format(
class_n=self.__class__.__name__,
old_pid=db_record.pid,
new_pid=pid
)
)
f'{self.__class__.__name__} changed pid from '
f'{db_record.pid} to {pid}')
record = self

# TODO: find a way to make extended validations.
Expand Down Expand Up @@ -478,11 +473,8 @@ def delete_from_index(self):
indexer().delete(self)
except NotFoundError:
current_app.logger.warning(
'Can not delete from index {class_name}: {pid}'.format(
class_name=self.__class__.__name__,
pid=self.pid
)
)
f'Can not delete from index {self.__class__.__name__}'
f': {self.pid}')

@property
def pid(self):
Expand Down Expand Up @@ -615,7 +607,7 @@ def _actionsiter(self, message_iterator):
except Exception:
message.reject()
current_app.logger.error(
"Failed to index record {id}".format(id=payload.get('id')),
f"Failed to index record {payload.get('id')}",
exc_info=True)

def _index_action(self, payload):
Expand Down
155 changes: 80 additions & 75 deletions rero_ils/modules/documents/utils_mef.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@

"""Celery tasks to documents."""


from __future__ import absolute_import, print_function

import contextlib
from abc import ABC, abstractmethod

import click
from elasticsearch_dsl import Q
from flask import current_app
from sqlalchemy.orm.exc import NoResultFound
from webargs import ValidationError

from .api import Document, DocumentsSearch
Expand Down Expand Up @@ -231,43 +234,44 @@ def process(self):
click.echo(f'Found identifiedBy contributions: '
f'{self._query_filter().count()}')
for hit in list(self._query_filter().source('pid').scan()):
doc = Document.get_record_by_id(hit.meta.id)
changed = False
for contribution in doc.get('contribution', []):
ref_type = contribution['agent'].get(
'identifiedBy', {}).get('type', '').lower()
if ref_type in self.cont_types + ['rero']:
ref_pid = contribution['agent'].get(
'identifiedBy', {}).get('value')
if cont := self.get_contribution(hit.pid, ref_type,
ref_pid):
# change the contribution to linked contribution
for cont_type in self.cont_types:
if cont.get(cont_type):
changed = True
url = f'{self.mef_url}/{cont_type}/' \
f'{cont[cont_type]["pid"]}'
new_contribution = {
'$ref': url,
'type': contribution['agent']['type']
}
self.print_debug(
f'{hit.pid} Change:',
f' {contribution["agent"]}',
f' {new_contribution}'
)
contribution['agent'] = new_contribution
break
else:
self.add_preferred_name(
ref_type=ref_type,
ref_pid=ref_pid,
preferred_name=contribution.get(
'agent', {}).get('preferred_name', ''),
type=contribution.get(
'agent', {}).get('type', ''),
)
self.update_document(changed=changed, document=doc)
with contextlib.suppress(NoResultFound):
doc = Document.get_record_by_id(hit.meta.id)
changed = False
for contribution in doc.get('contribution', []):
ref_type = contribution['agent'].get(
'identifiedBy', {}).get('type', '').lower()
if ref_type in self.cont_types + ['rero']:
ref_pid = contribution['agent'].get(
'identifiedBy', {}).get('value')
if cont := self.get_contribution(hit.pid, ref_type,
ref_pid):
# change the contribution to linked contribution
for cont_type in self.cont_types:
if cont.get(cont_type):
changed = True
url = f'{self.mef_url}/{cont_type}/' \
f'{cont[cont_type]["pid"]}'
new_contribution = {
'$ref': url,
'type': contribution['agent']['type']
}
self.print_debug(
f'{hit.pid} Change:',
f' {contribution["agent"]}',
f' {new_contribution}'
)
contribution['agent'] = new_contribution
break
else:
self.add_preferred_name(
ref_type=ref_type,
ref_pid=ref_pid,
preferred_name=contribution.get(
'agent', {}).get('preferred_name', ''),
type=contribution.get(
'agent', {}).get('type', ''),
)
self.update_document(changed=changed, document=doc)
return self.counts


Expand Down Expand Up @@ -298,42 +302,43 @@ def process(self):
f'{self._query_filter().count()}')
hits = list(self._query_filter().source('pid').scan())
for hit in list(self._query_filter().source('pid').scan()):
doc = Document.get_record_by_id(hit.meta.id)
changed = False
for subject in doc.get(self.name, []):
ref_type = subject.get(
'identifiedBy', {}).get('type', '').lower()
is_pers_org = subject.get('type') in [
'bf:Person', 'bf:Organisation']
if ref_type in self.cont_types + ['rero'] and is_pers_org:
ref_pid = subject.get('identifiedBy', {}).get('value')
if cont := self.get_contribution(hit.pid, ref_type,
ref_pid):
# change the contribution to linked contribution
for cont_type in ['idref', 'gnd']:
if cont.get(cont_type):
changed = True
url = f'{self.mef_url}/{cont_type}/' \
f'{cont[cont_type]["pid"]}'
new_subject = {
'$ref': url,
# TOTO: we have to correct all wrong
# bf:Organisation
'type': subject['type']
}
self.print_debug(
f'{hit.pid} Change:',
f' {subject}',
f' {new_subject}'
)
subject = new_subject
break
else:
self.add_preferred_name(
ref_type=ref_type,
ref_pid=ref_pid,
preferred_name=subject.get('preferred_name'),
type=subject.get('type')
)
self.update_document(changed=changed, document=doc)
with contextlib.suppress(NoResultFound):
doc = Document.get_record_by_id(hit.meta.id)
changed = False
for subject in doc.get(self.name, []):
ref_type = subject.get(
'identifiedBy', {}).get('type', '').lower()
is_pers_org = subject.get('type') in [
'bf:Person', 'bf:Organisation']
if ref_type in self.cont_types + ['rero'] and is_pers_org:
ref_pid = subject.get('identifiedBy', {}).get('value')
if cont := self.get_contribution(hit.pid, ref_type,
ref_pid):
# change the contribution to linked contribution
for cont_type in ['idref', 'gnd']:
if cont.get(cont_type):
changed = True
url = f'{self.mef_url}/{cont_type}/' \
f'{cont[cont_type]["pid"]}'
new_subject = {
'$ref': url,
# TOTO: we have to correct all wrong
# bf:Organisation
'type': subject['type']
}
self.print_debug(
f'{hit.pid} Change:',
f' {subject}',
f' {new_subject}'
)
subject = new_subject
break
else:
self.add_preferred_name(
ref_type=ref_type,
ref_pid=ref_pid,
preferred_name=subject.get('preferred_name'),
type=subject.get('type')
)
self.update_document(changed=changed, document=doc)
return self.counts
66 changes: 19 additions & 47 deletions rero_ils/schedulers.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,7 @@ def __init__(self, app, *args, **kwargs):
lazy = kwargs.get('lazy', False)
url = app.conf.get("CELERY_REDIS_SCHEDULER_URL",
"redis://localhost:6379")
logger.info('Connect: {url} lazy:{lazy}'.format(
url=url,
lazy=lazy
))
logger.info(f'Connect: {url} lazy:{lazy}')
kwargs['app'] = app
kwargs['lazy'] = lazy
super().__init__(*args, **kwargs)
Expand All @@ -83,20 +80,18 @@ def get(self, name):
:return: scheduled task
"""
tasks = self.rdb.zrange(self.key, 0, -1) or []
for idx, task in enumerate(tasks):
for task in tasks:
entry = jsonpickle.decode(task)
if entry.name == name:
return entry
else:
return None

def enabled_name(self, name):
"""Name for enabled value in REDIS DB.

:param name: name of entry in task scheduler
:return: name of the enable key in REDIS DB
"""
return '{key}:{name}'.format(key=self.key, name=name)
return f'{self.key}:{name}'

def merge_inplace(self, tasks):
"""Merge entries from CELERY_BEAT_SCHEDULE.
Expand All @@ -114,9 +109,8 @@ def setup_schedule(self):
beat_schedule = FLASK_TO_CELERY_MAPPING['CELERY_BEAT_SCHEDULE']
config = deepcopy(self.app.conf.get(beat_schedule))
self.merge_inplace(config)
msg = 'Current schedule:\n{tasks}'.format(
tasks='\n'.join(self.display_all(prefix='- Tasks: '))
)
current_schedule = "\n".join(self.display_all(prefix="- Tasks: "))
msg = f'Current schedule:\n {current_schedule}'
logger.info(msg)

def is_due(self, entry):
Expand All @@ -136,17 +130,11 @@ def is_due(self, entry):
:return: the state of the entry as schedstate
"""
if self.get_entry_enabled(entry.name):
state = entry.is_due()
else:
msg = ('Not enabled: {name} = {task} {each} {kwargs}'.format(
name=entry.name,
task=entry.task,
each=repr(entry.schedule),
kwargs=entry.kwargs,
))
logger.info(msg)
state = schedstate(is_due=False, next=entry.is_due().next)
return state
return entry.is_due()
msg = f'Not enabled: {entry.name} = {entry.task} ' \
f'{repr(entry.schedule)} {entry.kwargs}'
logger.info(msg)
return schedstate(is_due=False, next=entry.is_due().next)

def set(self, entry, enable=True):
"""Sets an entry.
Expand Down Expand Up @@ -206,30 +194,14 @@ def display_entry(self, name, prefix='- '):
:param prefix: prefix to add to returned info
:return: entry as string representative
"""
entry_as_text = 'Not found entry: {name}'.format(name=name)
entry = self.get(name)
if entry:
entry_as_text = f'Not found entry: {name}'
if entry := self.get(name):
entry_as_text = (
'{txt}{name} = {task} {each} {kwargs}'
' {options} {enabled}'
).format(
txt=prefix,
name=entry.name,
task=entry.task,
each=repr(entry.schedule),
kwargs='{txt}{data}'.format(
txt='kwargs:',
data=entry.kwargs
),
options='{txt}{data}'.format(
txt='options:',
data=entry.options
),
enabled='{txt}{data}'.format(
txt='enabled:',
data=self.get_entry_enabled(name)
)
)
f'{prefix}{entry.name} = {entry.task} {repr(entry.schedule)} '
f'kwargs:{entry.kwargs} '
f'options:{entry.options} '
f'enabled:{self.get_entry_enabled(name)}'
)
return entry_as_text

def display_all(self, prefix='- '):
Expand Down Expand Up @@ -270,7 +242,7 @@ def set_entry_enabled(self, name, enable=True):
:param enable: enable or disable scheduling
"""
if self.get(name):
enabled_name = '{key}:{name}'.format(key=self.key, name=name)
enabled_name = f'{self.key}:{name}'
self.rdb[enabled_name] = int(enable)

def set_enable_all(self, enable=True):
Expand Down Expand Up @@ -336,7 +308,7 @@ def enable_tasks(all, names, disable, verbose):
if verbose:
click.echo('\n'.join(current_scheduler.display_all()))
else:
names = names if names else []
names = names or []
for name in names:
name = name.strip()
current_scheduler.set_entry_enabled(name=name, enable=not disable)
Expand Down