Skip to content

Commit

Permalink
operation logs: use an elasticsearch only resource
Browse files Browse the repository at this point in the history
* Renames virtua command cli name.
* Fixes monitor view to compute elasticsearch and db count diff when the
  index does not exists.
* Creates an operation logs elasticsearch record class. It creates one
  index per year.
* Denies to all to read one record.
* Adds a cli to dumps the operation logs in a JSON file for backup.
* Closes: rero#1725.

Co-Authored-By: Johnny Mariéthoz <Johnny.Mariethoz@rero.ch>
  • Loading branch information
jma committed May 26, 2021
1 parent 3a71c92 commit f10c903
Show file tree
Hide file tree
Showing 26 changed files with 394 additions and 460 deletions.
11 changes: 10 additions & 1 deletion data/operation_logs.json
Original file line number Diff line number Diff line change
@@ -1 +1,10 @@
[]
[
{
"record": {
"$ref": "https://ils.rero.ch/api/documents/1"
},
"operation": "update",
"user_name": "system",
"date": "2021-01-21T09:51:52.879533+00:00"
}
]
6 changes: 3 additions & 3 deletions rero_ils/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -1613,13 +1613,12 @@ def _(x):
action='delete', record=record, cls=TemplatePermission)
),
oplg=dict(
# TODO: useless, but required
pid_type='oplg',
pid_minter='operation_log_id',
pid_minter='recid',
pid_fetcher='operation_log_id',
search_class='rero_ils.modules.operation_logs.api:OperationLogsSearch',
search_index='operation_logs',
search_type=None,
indexer_class='rero_ils.modules.operation_logs.api:OperationLogsIndexer',
record_serializers={
'application/json': (
'rero_ils.modules.serializers:json_v1_response'
Expand All @@ -1638,6 +1637,7 @@ def _(x):
},
record_class='rero_ils.modules.operation_logs.api:OperationLog',
list_route='/operation_logs/',
# TODO: create a converter for es id, not used for the moment.
item_route='/operation_logs/<pid(oplg, record_class='
'"rero_ils.modules.operation_logs.api:OperationLog"):pid_value>',
default_media_type='application/json',
Expand Down
4 changes: 2 additions & 2 deletions rero_ils/es_templates/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
#
# RERO ILS
# Copyright (C) 2019 RERO
# Copyright (C) 2021 RERO
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
Expand All @@ -19,7 +19,7 @@


def list_es_templates():
"""Elasticsearch Templates path."""
"""Elasticsearch templates path."""
return [
'rero_ils.es_templates'
]
5 changes: 3 additions & 2 deletions rero_ils/modules/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@
from .items.cli import create_items, reindex_items
from .loans.cli import create_loans, load_virtua_transactions
from .monitoring import Monitoring
from .operation_logs.cli import migrate_virtua_operation_logs
from .operation_logs.cli import create_operation_logs, dump_operation_logs
from .patrons.cli import import_users, users_validate
from .tasks import process_bulk_queue
from .utils import bulk_load_metadata, bulk_load_pids, bulk_load_pidstore, \
Expand Down Expand Up @@ -110,7 +110,8 @@ def fixtures():
fixtures.add_command(create_patterns)
fixtures.add_command(create_ill_requests)
fixtures.add_command(create_collections)
fixtures.add_command(migrate_virtua_operation_logs)
fixtures.add_command(create_operation_logs)
fixtures.add_command(dump_operation_logs)


@users.command('confirm')
Expand Down
2 changes: 1 addition & 1 deletion rero_ils/modules/fetchers.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@


def id_fetcher(record_uuid, data, provider, pid_key='pid'):
"""Fetch a Organisation record's identifiers.
"""Fetch an Organisation record's identifier.
:param record_uuid: The record UUID.
:param data: The record metadata.
Expand Down
2 changes: 2 additions & 0 deletions rero_ils/modules/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -446,10 +446,12 @@ def info(cls, with_deleted=False, difference_db_es=False):
).items():
info[doc_type] = {}
count_db = cls.get_db_count(doc_type, with_deleted=with_deleted)
count_db = count_db if isinstance(count_db, int) else 0
info[doc_type]['db'] = count_db
index = endpoint.get('search_index', '')
if index:
count_es = cls.get_es_count(index)
count_es = count_es if isinstance(count_es, int) else 0
db_es = count_db - count_es
info[doc_type]['index'] = index
info[doc_type]['es'] = count_es
Expand Down
163 changes: 109 additions & 54 deletions rero_ils/modules/operation_logs/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,76 +17,131 @@

"""API for manipulating operation_logs."""

from functools import partial
from elasticsearch.helpers import bulk
from invenio_records.api import RecordBase
from invenio_search import current_search_client

from .models import OperationLogIdentifier, OperationLogMetadata, \
OperationLogOperation
from ..api import IlsRecord, IlsRecordsIndexer, IlsRecordsSearch
from ..fetchers import id_fetcher
from ..minters import id_minter
from ..providers import Provider
from .extensions import ResolveRefsExension
from ..fetchers import FetchedPID

# provider
OperationLogProvider = type(
'OperationLogProvider',
(Provider,),
dict(identifier=OperationLogIdentifier, pid_type='oplg')
)
# minter
operation_log_id_minter = partial(id_minter, provider=OperationLogProvider)
# fetcher
operation_log_id_fetcher = partial(id_fetcher, provider=OperationLogProvider)

def operation_log_id_fetcher(record_uuid, data):
"""Fetch an Organisation record's identifier.
class OperationLogsSearch(IlsRecordsSearch):
"""Operation log Search."""
:param record_uuid: The record UUID.
:param data: The record metadata.
:return: A :data:`rero_ils.modules.fetchers.FetchedPID` instance.
"""
return FetchedPID(
provider=None,
pid_type='oplg',
pid_value=record_uuid
)

class Meta:
"""Search only on operation_log index."""

index = 'operation_logs'
doc_types = None
fields = ('*', )
facets = {}
LONG_INDEX_NAME = 'operation_logs-operation_log-v0.0.1'

default_filter = None


class OperationLog(IlsRecord):
class OperationLog(RecordBase):
"""OperationLog class."""

minter = operation_log_id_minter
fetcher = operation_log_id_fetcher
provider = OperationLogProvider
model_cls = OperationLogMetadata
index_name = 'operation_logs'

@classmethod
def get_create_operation_log_by_resource_pid(cls, pid_type, record_pid):
"""Return a create operation log for a given resource and pid.
_extensions = [ResolveRefsExension()]

:param pid_type: resource pid type.
:param record_pid: record pid.
@classmethod
def create(cls, data, id_=None, index_refresh='false', **kwargs):
r"""Create a new record instance and store it in elasticsearch.
:param data: Dict with the record metadata.
:param id_: Specify a UUID to use for the new record, instead of
automatically generated.
:param refresh: If `true` then refresh the affected shards to make
this operation visible to search, if `wait_for` then wait for a
refresh to make this operation visible to search, if `false`
(the default) then do nothing with refreshes.
Valid choices: true, false, wait_for
:returns: A new :class:`Record` instance.
"""
search = OperationLogsSearch()
search = search.filter('term', record__pid=record_pid)\
.filter('term', record__type=pid_type)\
.filter('term', operation=OperationLogOperation.CREATE)
oplgs = search.source(['pid']).scan()
try:
return OperationLog.get_record_by_pid(next(oplgs).pid)
except StopIteration:
return None
record = cls(
data,
model=None,
**kwargs
)

# TODO: enable this whe a invenio-records is updated.
# # Run pre create extensions
# for e in cls._extensions:
# e.pre_create(record)

res = current_search_client.index(
cls.get_index(record),
record.dumps(),
refresh=index_refresh)
record['id'] = res.get('_id')

# Run post create extensions
for e in cls._extensions:
e.post_create(record)
return record

@classmethod
def get_index(cls, data):
"""Get the index name given the data.
One index per year is created based on the data date field.
:param data: Dict with the record metadata.
:returns: str, the corresponding index name.
"""
suffix = '-'.join(data.get('date', '').split('-')[0:1])
return f'{cls.index_name}-{suffix}'

class OperationLogsIndexer(IlsRecordsIndexer):
"""Operation log indexing class."""
@classmethod
def bulk(cls, data):
"""Bulk indexing.
record_cls = OperationLog
:params data: list of dicts with the record metadata.
"""
actions = []
for d in data:
d = OperationLog(d)
action = {
'_op_type': 'index',
'_index': cls.get_index(d),
'_source': d.dumps()
}
actions.append(action)
res = bulk(current_search_client, actions)
# TODO: check errors
print(res)

def bulk_index(self, record_id_iterator):
"""Bulk index records.
@classmethod
def get_record(cls, id_):
"""Retrieve the record by ID.
:param record_id_iterator: Iterator yielding record UUIDs.
Raise a database exception if the record does not exist.
:param id_: record ID.
:returns: The :class:`Record` instance.
"""
super(OperationLogsIndexer, self).bulk_index(
record_id_iterator, doc_type='oplg')
source = current_search_client.get(id=id_, index=cls.index_name)
return cls(source.get('_source', {}))

@classmethod
def get_indices(cls):
"""Get all index names present in the elasticsearch server."""
return set([
v['_index'] for v in current_search_client.search(
index=cls.index_name)['hits']['hits']
])

@classmethod
def delete_indices(cls):
"""Remove all index names present in the elasticsearch server."""
current_search_client.indices.delete(f'{cls.index_name}*')
return True

@property
def id(self):
"""Get model identifier."""
return self.get('id')
68 changes: 38 additions & 30 deletions rero_ils/modules/operation_logs/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,28 +24,26 @@
import click
from flask import current_app
from flask.cli import with_appcontext
from invenio_search.api import RecordsSearch

from rero_ils.modules.operation_logs.api import OperationLog
from rero_ils.modules.operation_logs.models import OperationLogOperation
from rero_ils.modules.utils import extracted_data_from_ref

from ..utils import read_json_record


@click.command('migrate_virtua_operation_logs')
@click.option('-v', '--verbose', 'verbose', is_flag=True, default=False)
@click.option('-d', '--debug', 'debug', is_flag=True, default=False)
@click.command('create_operation_logs')
@click.option('-l', '--lazy', 'lazy', is_flag=True, default=False)
@click.option('-s', '--batch-size', 'size', type=int, default=10000)
@click.argument('infile', type=click.File('r'))
@with_appcontext
def migrate_virtua_operation_logs(infile, verbose, debug, lazy):
"""Migrate Virtua operation log records in reroils.
def create_operation_logs(infile, lazy, size):
"""Load operation log records in reroils.
:param infile: Json operation log file.
:param lazy: lazy reads file
"""
enabled_logs = current_app.config.get('RERO_ILS_ENABLE_OPERATION_LOG')
click.secho('Migrate Virtua operation log records:', fg='green')
click.secho('Load operation log records:', fg='green')
if lazy:
# try to lazy read json file (slower, better memory management)
data = read_json_record(infile)
Expand All @@ -54,28 +52,38 @@ def migrate_virtua_operation_logs(infile, verbose, debug, lazy):
data = json.load(infile)
index_count = 0
with click.progressbar(data) as bar:
records = []
for oplg in bar:
try:
operation = oplg.get('operation')
resource = extracted_data_from_ref(
oplg.get('record').get('$ref'), data='resource')
pid_type = enabled_logs.get(resource)
if pid_type and operation == OperationLogOperation.CREATE:
# The virtua create operation log overrides the reroils
# create operation log, the method to use is UPDATE
record_pid = extracted_data_from_ref(
oplg.get('record').get('$ref'), data='pid')
if not (index_count + 1) % size:
OperationLog.bulk(records)
records = []
records.append(oplg)
index_count += 1
# the rest of the records
if records:
OperationLog.bulk(records)
index_count += len(records)
click.echo(f'created {index_count} operation logs.')


create_rec = \
OperationLog.get_create_operation_log_by_resource_pid(
pid_type, record_pid)
if create_rec:
create_rec.update(oplg, dbcommit=True, reindex=True)
elif pid_type and operation == OperationLogOperation.UPDATE:
# The virtua update operation log is a new entry in the
# reroils operation log, the method to use is CREATE
OperationLog.create(data=oplg, dbcommit=True, reindex=True)
except Exception:
pass
index_count += len(data)
@click.command('dump_operation_logs')
@click.argument('outfile', type=click.File('w'))
@with_appcontext
def dump_operation_logs(outfile):
"""Dumps operation log records in a given file.
:param outfile: JSON operation log output file.
"""
enabled_logs = current_app.config.get('RERO_ILS_ENABLE_OPERATION_LOG')
click.secho('Dumps operation log records:', fg='green')
search = RecordsSearch(index=OperationLog.index_name)

index_count = 0
outfile.write('[\n')
with click.progressbar(search.scan()) as bar:
for oplg in bar:
outfile.write(str(oplg.to_dict()))
outfile.write(',\n')
index_count += 1
outfile.write('\n]')
click.echo(f'created {index_count} operation logs.')
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
#
# RERO ILS
# Copyright (C) 2021 RERO
# Copyright (C) 2019 RERO
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
Expand All @@ -15,6 +15,11 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

"""Elasticsearch mappings."""
"""Elasticsearch templates for Operation log records."""

from __future__ import absolute_import, print_function

def list_es_templates():
"""Elasticsearch templates path."""
return [
'rero_ils.modules.operation_logs.es_templates'
]
Loading

0 comments on commit f10c903

Please sign in to comment.