Skip to content

Commit

Permalink
ebooks: fix dumps for records without series
Browse files Browse the repository at this point in the history
* Fixes documents dumps for records without series.
* Corrects series in dojson from ebooks.
* Adds "_text" to json schema.
* Adds `invenio utils reindex` and `invenio utils runindex` commands
  for bulk indexing with custom class.

Co-Authored-by: Peter Weber <peter.weber@rero.ch>
  • Loading branch information
rerowep committed Nov 22, 2019
1 parent 2356ada commit 81d595e
Show file tree
Hide file tree
Showing 21 changed files with 212 additions and 62 deletions.
74 changes: 74 additions & 0 deletions rero_ils/modules/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,17 @@
from copy import deepcopy
from uuid import uuid4

import pytz
from elasticsearch.exceptions import NotFoundError
from flask import current_app
from invenio_db import db
from invenio_indexer import current_record_to_index
from invenio_indexer.api import RecordIndexer
from invenio_indexer.signals import before_record_index
from invenio_pidstore.errors import PIDDoesNotExistError
from invenio_pidstore.models import PersistentIdentifier, PIDStatus
from invenio_records.api import Record
from invenio_records_rest.utils import obj_or_import_string
from invenio_search import current_search
from invenio_search.api import RecordsSearch
from sqlalchemy.orm.exc import NoResultFound
Expand Down Expand Up @@ -85,6 +88,77 @@ def delete(self, record):
current_search.flush_and_refresh(index_name)
return return_value

def bulk_index(self, record_id_iterator, doc_type=None):
"""Bulk index records.
:param record_id_iterator: Iterator yielding record UUIDs.
"""
self._bulk_op(record_id_iterator, op_type='index', doc_type=doc_type)

def _index_action(self, payload):
"""Bulk index action.
:param payload: Decoded message body.
:returns: Dictionary defining an Elasticsearch bulk 'index' action.
"""
# take the first defined doc type for finding the class
pid_type = payload.get('doc_type', ['rec'])[0]
record_class = obj_or_import_string(
current_app.config.get('RECORDS_REST_ENDPOINTS').get(
pid_type
).get('record_class', Record)
)
record = record_class.get_record(payload['id'])
index, doc_type = self.record_to_index(record)

arguments = {}
body = self._prepare_record(record, index, doc_type, arguments)
action = {
'_op_type': 'index',
'_index': index,
'_type': doc_type,
'_id': str(record.id),
'_version': record.revision_id,
'_version_type': self._version_type,
'_source': body
}
action.update(arguments)

return action

@staticmethod
def _prepare_record(record, index, doc_type, arguments=None, **kwargs):
"""Prepare record data for indexing.
:param record: The record to prepare.
:param index: The Elasticsearch index.
:param doc_type: The Elasticsearch document type.
:param arguments: The arguments to send to Elasticsearch upon indexing.
:param **kwargs: Extra parameters.
:returns: The record metadata.
"""
if current_app.config['INDEXER_REPLACE_REFS']:
data = record.replace_refs().dumps()
else:
data = record.dumps()

data['_created'] = pytz.utc.localize(record.created).isoformat() \
if record.created else None
data['_updated'] = pytz.utc.localize(record.updated).isoformat() \
if record.updated else None

# Allow modification of data prior to sending to Elasticsearch.
before_record_index.send(
current_app._get_current_object(),
json=data,
record=record,
index=index,
doc_type=doc_type,
arguments={} if arguments is None else arguments,
**kwargs
)
return data


class IlsRecord(Record):
"""ILS Record class."""
Expand Down
69 changes: 69 additions & 0 deletions rero_ils/modules/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
from flask_security.confirmable import confirm_user
from invenio_accounts.cli import commit, users
from invenio_db import db
from invenio_indexer.tasks import process_bulk_queue
from invenio_pidstore.models import PersistentIdentifier, PIDStatus
from invenio_records.api import Record
from invenio_records_rest.utils import obj_or_import_string
Expand All @@ -50,6 +51,7 @@
from pkg_resources import resource_string
from werkzeug.local import LocalProxy

from .api import IlsRecordIndexer
from .documents.dojson.contrib.marc21tojson import marc21tojson
from .items.cli import create_items, reindex_items
from .loans.cli import create_loans
Expand All @@ -60,6 +62,12 @@
_datastore = LocalProxy(lambda: current_app.extensions['security'].datastore)


def abort_if_false(ctx, param, value):
"""Abort command is value is False."""
if not value:
ctx.abort()


@click.group()
def fixtures():
"""Fixtures management commands."""
Expand Down Expand Up @@ -861,3 +869,64 @@ def reserve_pid_range(pid_type, records_number, unused):
status=PIDStatus.NEW)
db.session.add(identifier(recid=pid))
db.session.commit()


@utils.command('runindex')
@click.option(
'--delayed', '-d', is_flag=True, help='Run indexing in background.')
@click.option(
'--concurrency', '-c', default=1, type=int,
help='Number of concurrent indexing tasks to start.')
@click.option('--queue', '-q', type=str,
help='Name of the celery queue used to put the tasks into.')
@click.option('--version-type', help='Elasticsearch version type to use.')
@click.option(
'--raise-on-error/--skip-errors', default=True,
help='Controls if Elasticsearch bulk indexing errors raise an exception.')
@with_appcontext
def run(delayed, concurrency, version_type=None, queue=None,
raise_on_error=True):
"""Run bulk record indexing."""
if delayed:
celery_kwargs = {
'kwargs': {
'version_type': version_type,
'es_bulk_kwargs': {'raise_on_error': raise_on_error},
}
}
click.secho(
'Starting {0} tasks for indexing records...'.format(concurrency),
fg='green')
if queue is not None:
celery_kwargs.update({'queue': queue})
for c in range(0, concurrency):
process_bulk_queue.apply_async(**celery_kwargs)
else:
click.secho('Indexing records...', fg='green')
IlsRecordIndexer(version_type=version_type).process_bulk_queue(
es_bulk_kwargs={'raise_on_error': raise_on_error})


@utils.command('reindex')
@click.option('--yes-i-know', is_flag=True, callback=abort_if_false,
expose_value=False,
prompt='Do you really want to reindex all records?')
@click.option('-t', '--pid-type', multiple=True, required=True)
@with_appcontext
def reindex(pid_type):
"""Reindex all records.
:param pid_type: Pid type.
"""
click.secho('Sending records to indexing queue ...', fg='green')

query = (x[0] for x in PersistentIdentifier.query.filter_by(
object_type='rec', status=PIDStatus.REGISTERED
).filter(
PersistentIdentifier.pid_type.in_(pid_type)
).values(
PersistentIdentifier.object_uuid
))
IlsRecordIndexer().bulk_index(query, doc_type=pid_type)
click.secho('Execute "run" command to process the queue!',
fg='yellow')
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,6 @@ def build_place_or_agent_data(code, label, index, link, add_country):
})
except Exception as err:
pass
# print('++++', err)
return place_or_agent_data

# function build_statement start here
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2318,4 +2318,4 @@
"default": false
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2318,4 +2318,4 @@
"default": false
}
}
}
}
6 changes: 2 additions & 4 deletions rero_ils/modules/documents/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,15 @@


def enrich_document_data(sender, json=None, record=None, index=None,
**dummy_kwargs):
doc_type=None, **dummy_kwargs):
"""Signal sent before a record is indexed.
:param json: The dumped record dictionary which can be modified.
:param record: The record being indexed.
:param index: The index in which the record will be indexed.
:param doc_type: The doc_type for the record.
"""
# TODO: this multiply the indexing time by 5, try an other way!
document_index_name = DocumentsSearch.Meta.index
if index.startswith(document_index_name):
if index == '-'.join([DocumentsSearch.Meta.index, doc_type]):
# HOLDINGS
holdings = []
document_pid = record['pid']
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,9 @@
"statement": {
"type": "object",
"properties": {
"_text": {
"type": "text"
},
"type": {
"type": "text"
},
Expand Down Expand Up @@ -255,6 +258,9 @@
"series": {
"type": "object",
"properties": {
"_text": {
"type": "text"
},
"name": {
"type": "text",
"analyzer": "global_lowercase_asciifolding",
Expand Down
4 changes: 2 additions & 2 deletions rero_ils/modules/ebooks/dojson/contrib/marc21/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ def build_place_or_agent_data(code, label, add_country):
add_country = False
if place_or_agent_data:
statement.append(place_or_agent_data)
return statement
return statement or None

# the function marc21_to_provisionActivity start here
ind2 = key[4]
Expand Down Expand Up @@ -320,7 +320,7 @@ def marc21_to_series(self, key, value):
number = value.get('v')
if number:
series['number'] = ', '.join(utils.force_list(number))
return series
return series or None


@marc21.over('notes', '^500..')
Expand Down
4 changes: 2 additions & 2 deletions rero_ils/modules/ebooks/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

from .utils import create_document_holding, update_document_holding
from ..documents.api import DocumentsSearch
from ..utils import bulk_index
from ..utils import do_bulk_index

# from time import sleep

Expand Down Expand Up @@ -66,7 +66,7 @@ def create_records(records):
n_created += 1
uuids.append(new_record.id)
# TODO: bulk indexing does not work with travis, need to check why
bulk_index(uuids, process=True)
do_bulk_index(uuids, doc_type=['doc'], process=True)
# wait for bulk index task to finish
# inspector = inspect()
# reserved = inspector.reserved()
Expand Down
12 changes: 12 additions & 0 deletions rero_ils/modules/ebooks/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,18 @@ def create_document_holding(record):
location_pid=location,
item_type_pid=item_type_pid,
electronic_location=harvested_source)
else:
current_app.logger.warning(
'create document holding no org: {source}'.format(
source=harvested_source['source']
)
)
else:
current_app.logger.warning(
'create document holding: {rec}'.format(
rec=record
)
)
return new_record


Expand Down
5 changes: 2 additions & 3 deletions rero_ils/modules/fees/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,15 @@


def enrich_fee_data(sender, json=None, record=None, index=None,
**dummy_kwargs):
doc_type=None, **dummy_kwargs):
"""Signal sent before a record is indexed.
:param json: The dumped record dictionary which can be modified.
:param record: The record being indexed.
:param index: The index in which the record will be indexed.
:param doc_type: The doc_type for the record.
"""
fee_index_name = FeesSearch.Meta.index
if index.startswith(fee_index_name):
if index == '-'.join([FeesSearch.Meta.index, doc_type]):
fee = Fee.get_record_by_pid(record.get('pid'))
org_pid = fee.organisation_pid
json['organisation'] = {
Expand Down
5 changes: 2 additions & 3 deletions rero_ils/modules/holdings/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,15 @@


def enrich_holding_data(sender, json=None, record=None, index=None,
**dummy_kwargs):
doc_type=None, **dummy_kwargs):
"""Signal sent before a record is indexed.
:param json: The dumped record dictionary which can be modified.
:param record: The record being indexed.
:param index: The index in which the record will be indexed.
:param doc_type: The doc_type for the record.
"""
holding_index_name = HoldingsSearch.Meta.index
if index.startswith(holding_index_name):
if index == '-'.join([HoldingsSearch.Meta.index, doc_type]):
holding = record
if not isinstance(record, Holding):
holding = Holding.get_record_by_pid(record.get('pid'))
Expand Down
5 changes: 2 additions & 3 deletions rero_ils/modules/items/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,15 @@


def enrich_item_data(sender, json=None, record=None, index=None,
**dummy_kwargs):
doc_type=None, **dummy_kwargs):
"""Signal sent before a record is indexed.
:param json: The dumped record dictionary which can be modified.
:param record: The record being indexed.
:param index: The index in which the record will be indexed.
:param doc_type: The doc_type for the record.
"""
item_index_name = ItemsSearch.Meta.index
if index.startswith(item_index_name):
if index == '-'.join([ItemsSearch.Meta.index, doc_type]):
item = record
if not isinstance(record, Item):
item = Item.get_record_by_pid(record.get('pid'))
Expand Down
6 changes: 3 additions & 3 deletions rero_ils/modules/loans/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,16 @@


def enrich_loan_data(sender, json=None, record=None, index=None,
**dummy_kwargs):
doc_type=None, **dummy_kwargs):
"""Signal sent before a record is indexed.
:param json: The dumped record dictionary which can be modified.
:param record: The record being indexed.
:param index: The index in which the record will be indexed.
:param doc_type: The doc_type for the record.
"""
loan_index_name = current_circulation.loan_search.Meta.index
if index.startswith(loan_index_name):
if index == '-'.join(
[current_circulation.loan_search.Meta.index, doc_type]):
item = Item.get_record_by_pid(record.get('item_pid'))
json['library_pid'] = item.holding_library_pid

Expand Down
Loading

0 comments on commit 81d595e

Please sign in to comment.