From b54696c89d390be55ab89fd35eaad85269f5b75f Mon Sep 17 00:00:00 2001 From: Peter Weber Date: Tue, 31 May 2022 11:50:38 +0200 Subject: [PATCH] scheduler: improvments * Uses f strings for logging. * Adds `replace-idby-subjects-imported` scheduled task. Co-Authored-by: Peter Weber --- rero_ils/config.py | 13 +- rero_ils/modules/api.py | 20 +-- rero_ils/modules/documents/utils_mef.py | 155 ++++++++++++------------ rero_ils/schedulers.py | 66 +++------- 4 files changed, 116 insertions(+), 138 deletions(-) diff --git a/rero_ils/config.py b/rero_ils/config.py index aeb8609e1d..90f24bfc9b 100644 --- a/rero_ils/config.py +++ b/rero_ils/config.py @@ -403,16 +403,25 @@ def _(x): }, 'replace-idby-contribution': { 'task': ('rero_ils.modules.documents.tasks.replace_idby_contribution'), - 'schedule': crontab(minute=22, hour=22, day_of_week=6), + 'schedule': crontab(minute=5, hour=2, day_of_week=6), # Every week on Saturday at 22:22 UTC, 'enabled': False }, 'replace-idby-subjects': { 'task': ('rero_ils.modules.documents.tasks.replace_idby_subjects'), - 'schedule': crontab(minute=21, hour=22, day_of_week=6), + 'schedule': crontab(minute=10, hour=2, day_of_week=6), # Every week on Saturday at 22:22 UTC, 'enabled': False }, + 'replace-idby-subjects-imported': { + 'task': ('rero_ils.modules.documents.tasks.replace_idby_subjects'), + 'kwargs': { + 'subjects': 'subjects_imported' + }, + 'schedule': crontab(minute=15, hour=2, day_of_week=6), + # Every week on Saturday at 22:20 UTC, + 'enabled': False + }, 'delete-provisional-items': { 'task': 'rero_ils.modules.items.tasks.delete_provisional_items', 'schedule': crontab(minute=0, hour=3), # Every day at 03:00 UTC, diff --git a/rero_ils/modules/api.py b/rero_ils/modules/api.py index 563317d947..f2db014f88 100644 --- a/rero_ils/modules/api.py +++ b/rero_ils/modules/api.py @@ -392,17 +392,12 @@ def update(self, data, commit=False, dbcommit=False, reindex=False): :param redindex: reindex the record. :returns: the modified record """ - pid = data.get('pid') - if pid: + if pid := data.get('pid'): db_record = self.get_record_by_id(self.id) if pid != db_record.pid: raise IlsRecordError.PidChange( - '{class_n} changed pid from {old_pid} to {new_pid}'.format( - class_n=self.__class__.__name__, - old_pid=db_record.pid, - new_pid=pid - ) - ) + f'{self.__class__.__name__} changed pid from ' + f'{db_record.pid} to {pid}') record = self # TODO: find a way to make extended validations. @@ -478,11 +473,8 @@ def delete_from_index(self): indexer().delete(self) except NotFoundError: current_app.logger.warning( - 'Can not delete from index {class_name}: {pid}'.format( - class_name=self.__class__.__name__, - pid=self.pid - ) - ) + f'Can not delete from index {self.__class__.__name__}' + f': {self.pid}') @property def pid(self): @@ -615,7 +607,7 @@ def _actionsiter(self, message_iterator): except Exception: message.reject() current_app.logger.error( - "Failed to index record {id}".format(id=payload.get('id')), + f"Failed to index record {payload.get('id')}", exc_info=True) def _index_action(self, payload): diff --git a/rero_ils/modules/documents/utils_mef.py b/rero_ils/modules/documents/utils_mef.py index 8a09a7eba3..aabbda13e5 100644 --- a/rero_ils/modules/documents/utils_mef.py +++ b/rero_ils/modules/documents/utils_mef.py @@ -17,13 +17,16 @@ """Celery tasks to documents.""" + from __future__ import absolute_import, print_function +import contextlib from abc import ABC, abstractmethod import click from elasticsearch_dsl import Q from flask import current_app +from sqlalchemy.orm.exc import NoResultFound from webargs import ValidationError from .api import Document, DocumentsSearch @@ -231,43 +234,44 @@ def process(self): click.echo(f'Found identifiedBy contributions: ' f'{self._query_filter().count()}') for hit in list(self._query_filter().source('pid').scan()): - doc = Document.get_record_by_id(hit.meta.id) - changed = False - for contribution in doc.get('contribution', []): - ref_type = contribution['agent'].get( - 'identifiedBy', {}).get('type', '').lower() - if ref_type in self.cont_types + ['rero']: - ref_pid = contribution['agent'].get( - 'identifiedBy', {}).get('value') - if cont := self.get_contribution(hit.pid, ref_type, - ref_pid): - # change the contribution to linked contribution - for cont_type in self.cont_types: - if cont.get(cont_type): - changed = True - url = f'{self.mef_url}/{cont_type}/' \ - f'{cont[cont_type]["pid"]}' - new_contribution = { - '$ref': url, - 'type': contribution['agent']['type'] - } - self.print_debug( - f'{hit.pid} Change:', - f' {contribution["agent"]}', - f' {new_contribution}' - ) - contribution['agent'] = new_contribution - break - else: - self.add_preferred_name( - ref_type=ref_type, - ref_pid=ref_pid, - preferred_name=contribution.get( - 'agent', {}).get('preferred_name', ''), - type=contribution.get( - 'agent', {}).get('type', ''), - ) - self.update_document(changed=changed, document=doc) + with contextlib.suppress(NoResultFound): + doc = Document.get_record_by_id(hit.meta.id) + changed = False + for contribution in doc.get('contribution', []): + ref_type = contribution['agent'].get( + 'identifiedBy', {}).get('type', '').lower() + if ref_type in self.cont_types + ['rero']: + ref_pid = contribution['agent'].get( + 'identifiedBy', {}).get('value') + if cont := self.get_contribution(hit.pid, ref_type, + ref_pid): + # change the contribution to linked contribution + for cont_type in self.cont_types: + if cont.get(cont_type): + changed = True + url = f'{self.mef_url}/{cont_type}/' \ + f'{cont[cont_type]["pid"]}' + new_contribution = { + '$ref': url, + 'type': contribution['agent']['type'] + } + self.print_debug( + f'{hit.pid} Change:', + f' {contribution["agent"]}', + f' {new_contribution}' + ) + contribution['agent'] = new_contribution + break + else: + self.add_preferred_name( + ref_type=ref_type, + ref_pid=ref_pid, + preferred_name=contribution.get( + 'agent', {}).get('preferred_name', ''), + type=contribution.get( + 'agent', {}).get('type', ''), + ) + self.update_document(changed=changed, document=doc) return self.counts @@ -298,42 +302,43 @@ def process(self): f'{self._query_filter().count()}') hits = list(self._query_filter().source('pid').scan()) for hit in list(self._query_filter().source('pid').scan()): - doc = Document.get_record_by_id(hit.meta.id) - changed = False - for subject in doc.get(self.name, []): - ref_type = subject.get( - 'identifiedBy', {}).get('type', '').lower() - is_pers_org = subject.get('type') in [ - 'bf:Person', 'bf:Organisation'] - if ref_type in self.cont_types + ['rero'] and is_pers_org: - ref_pid = subject.get('identifiedBy', {}).get('value') - if cont := self.get_contribution(hit.pid, ref_type, - ref_pid): - # change the contribution to linked contribution - for cont_type in ['idref', 'gnd']: - if cont.get(cont_type): - changed = True - url = f'{self.mef_url}/{cont_type}/' \ - f'{cont[cont_type]["pid"]}' - new_subject = { - '$ref': url, - # TOTO: we have to correct all wrong - # bf:Organisation - 'type': subject['type'] - } - self.print_debug( - f'{hit.pid} Change:', - f' {subject}', - f' {new_subject}' - ) - subject = new_subject - break - else: - self.add_preferred_name( - ref_type=ref_type, - ref_pid=ref_pid, - preferred_name=subject.get('preferred_name'), - type=subject.get('type') - ) - self.update_document(changed=changed, document=doc) + with contextlib.suppress(NoResultFound): + doc = Document.get_record_by_id(hit.meta.id) + changed = False + for subject in doc.get(self.name, []): + ref_type = subject.get( + 'identifiedBy', {}).get('type', '').lower() + is_pers_org = subject.get('type') in [ + 'bf:Person', 'bf:Organisation'] + if ref_type in self.cont_types + ['rero'] and is_pers_org: + ref_pid = subject.get('identifiedBy', {}).get('value') + if cont := self.get_contribution(hit.pid, ref_type, + ref_pid): + # change the contribution to linked contribution + for cont_type in ['idref', 'gnd']: + if cont.get(cont_type): + changed = True + url = f'{self.mef_url}/{cont_type}/' \ + f'{cont[cont_type]["pid"]}' + new_subject = { + '$ref': url, + # TOTO: we have to correct all wrong + # bf:Organisation + 'type': subject['type'] + } + self.print_debug( + f'{hit.pid} Change:', + f' {subject}', + f' {new_subject}' + ) + subject = new_subject + break + else: + self.add_preferred_name( + ref_type=ref_type, + ref_pid=ref_pid, + preferred_name=subject.get('preferred_name'), + type=subject.get('type') + ) + self.update_document(changed=changed, document=doc) return self.counts diff --git a/rero_ils/schedulers.py b/rero_ils/schedulers.py index 825fdc873a..452797ded1 100644 --- a/rero_ils/schedulers.py +++ b/rero_ils/schedulers.py @@ -68,10 +68,7 @@ def __init__(self, app, *args, **kwargs): lazy = kwargs.get('lazy', False) url = app.conf.get("CELERY_REDIS_SCHEDULER_URL", "redis://localhost:6379") - logger.info('Connect: {url} lazy:{lazy}'.format( - url=url, - lazy=lazy - )) + logger.info(f'Connect: {url} lazy:{lazy}') kwargs['app'] = app kwargs['lazy'] = lazy super().__init__(*args, **kwargs) @@ -83,12 +80,10 @@ def get(self, name): :return: scheduled task """ tasks = self.rdb.zrange(self.key, 0, -1) or [] - for idx, task in enumerate(tasks): + for task in tasks: entry = jsonpickle.decode(task) if entry.name == name: return entry - else: - return None def enabled_name(self, name): """Name for enabled value in REDIS DB. @@ -96,7 +91,7 @@ def enabled_name(self, name): :param name: name of entry in task scheduler :return: name of the enable key in REDIS DB """ - return '{key}:{name}'.format(key=self.key, name=name) + return f'{self.key}:{name}' def merge_inplace(self, tasks): """Merge entries from CELERY_BEAT_SCHEDULE. @@ -114,9 +109,8 @@ def setup_schedule(self): beat_schedule = FLASK_TO_CELERY_MAPPING['CELERY_BEAT_SCHEDULE'] config = deepcopy(self.app.conf.get(beat_schedule)) self.merge_inplace(config) - msg = 'Current schedule:\n{tasks}'.format( - tasks='\n'.join(self.display_all(prefix='- Tasks: ')) - ) + current_schedule = "\n".join(self.display_all(prefix="- Tasks: ")) + msg = f'Current schedule:\n {current_schedule}' logger.info(msg) def is_due(self, entry): @@ -136,17 +130,11 @@ def is_due(self, entry): :return: the state of the entry as schedstate """ if self.get_entry_enabled(entry.name): - state = entry.is_due() - else: - msg = ('Not enabled: {name} = {task} {each} {kwargs}'.format( - name=entry.name, - task=entry.task, - each=repr(entry.schedule), - kwargs=entry.kwargs, - )) - logger.info(msg) - state = schedstate(is_due=False, next=entry.is_due().next) - return state + return entry.is_due() + msg = f'Not enabled: {entry.name} = {entry.task} ' \ + f'{repr(entry.schedule)} {entry.kwargs}' + logger.info(msg) + return schedstate(is_due=False, next=entry.is_due().next) def set(self, entry, enable=True): """Sets an entry. @@ -206,30 +194,14 @@ def display_entry(self, name, prefix='- '): :param prefix: prefix to add to returned info :return: entry as string representative """ - entry_as_text = 'Not found entry: {name}'.format(name=name) - entry = self.get(name) - if entry: + entry_as_text = f'Not found entry: {name}' + if entry := self.get(name): entry_as_text = ( - '{txt}{name} = {task} {each} {kwargs}' - ' {options} {enabled}' - ).format( - txt=prefix, - name=entry.name, - task=entry.task, - each=repr(entry.schedule), - kwargs='{txt}{data}'.format( - txt='kwargs:', - data=entry.kwargs - ), - options='{txt}{data}'.format( - txt='options:', - data=entry.options - ), - enabled='{txt}{data}'.format( - txt='enabled:', - data=self.get_entry_enabled(name) - ) - ) + f'{prefix}{entry.name} = {entry.task} {repr(entry.schedule)} ' + f'kwargs:{entry.kwargs} ' + f'options:{entry.options} ' + f'enabled:{self.get_entry_enabled(name)}' + ) return entry_as_text def display_all(self, prefix='- '): @@ -270,7 +242,7 @@ def set_entry_enabled(self, name, enable=True): :param enable: enable or disable scheduling """ if self.get(name): - enabled_name = '{key}:{name}'.format(key=self.key, name=name) + enabled_name = f'{self.key}:{name}' self.rdb[enabled_name] = int(enable) def set_enable_all(self, enable=True): @@ -336,7 +308,7 @@ def enable_tasks(all, names, disable, verbose): if verbose: click.echo('\n'.join(current_scheduler.display_all())) else: - names = names if names else [] + names = names or [] for name in names: name = name.strip() current_scheduler.set_entry_enabled(name=name, enable=not disable)