Skip to content

Commit

Permalink
oaiharvesting: import of ebooks
Browse files Browse the repository at this point in the history
* Corrects creation, merging and deletetions of holding
records for ebooks.

Co-Authored-by: Peter Weber <peter.weber@rero.ch>
  • Loading branch information
rerowep committed May 13, 2020
1 parent 047f231 commit 4c6491f
Show file tree
Hide file tree
Showing 9 changed files with 135 additions and 77 deletions.
7 changes: 6 additions & 1 deletion rero_ils/dojson/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,12 @@ def clean_punctuation(value, punct, spaced_punct):
if value:
data = [{'value': value}]
else:
error_print('WARNING NO VALUE:', tag, code, label)
try:
fields_035 = self.get_fields(tag='035')
id = self.get_subfields(fields_035[0], 'a')[0]
except:
id = '???'
error_print('WARNING NO VALUE:', id, tag, code, label)
try:
alt_gr = self.alternate_graphic[tag][link]
subfield = self.get_subfields(alt_gr['field'])[index]
Expand Down
23 changes: 14 additions & 9 deletions rero_ils/modules/ebooks/receivers.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from flask import current_app

from .dojson.contrib.marc21 import marc21
from .tasks import create_records
from .tasks import create_records, delete_records


def publish_harvested_records(sender=None, records=[], *args, **kwargs):
Expand All @@ -32,10 +32,8 @@ def publish_harvested_records(sender=None, records=[], *args, **kwargs):
if max:
records = records[:int(max)]
converted_records = []
deleted_records = []
for record in records:
if record.deleted:
# TODO: remove record
continue
rec = create_record(record.xml)
rec = marc21.do(rec)
rec.setdefault('harvested', True)
Expand All @@ -49,12 +47,19 @@ def publish_harvested_records(sender=None, records=[], *args, **kwargs):
}
)
rec['identifiedBy'] = identifiers
converted_records.append(rec)
if records:
if record.deleted:
deleted_records.append(rec)
else:
converted_records.append(rec)
if converted_records:
current_app.logger.info(
'publish_harvester: received {count} records'
'publish_harvester: received {count} records to create'
.format(count=len(converted_records))
)
create_records(converted_records)
else:
current_app.logger.info('publish_harvester: nothing to do')
if deleted_records:
current_app.logger.info(
'publish_harvester: received {count} records to delete'
.format(count=len(deleted_records))
)
delete_records(deleted_records)
56 changes: 45 additions & 11 deletions rero_ils/modules/ebooks/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from flask import current_app

from .utils import create_document_holding, update_document_holding
from ..documents.api import DocumentsSearch
from ..documents.api import Document, DocumentsSearch
from ..utils import do_bulk_index

# from time import sleep
Expand Down Expand Up @@ -53,6 +53,7 @@ def create_records(records):
pid = [r.pid for r in query.scan()].pop()
except IndexError:
pid = None

try:
if pid:
# update the record
Expand All @@ -73,16 +74,49 @@ def create_records(records):
record=record
)
)
# TODO: bulk indexing does not work with travis, need to check why
do_bulk_index(uuids, doc_type='doc', process=True)
# wait for bulk index task to finish
# inspector = inspect()
# reserved = inspector.reserved()
# if reserved:
# while any(a != [] for a in reserved.values()):
# reserved = inspector.reserved()
# sleep(1)

current_app.logger.info('create_records: {} updated, {} new'
.format(n_updated, n_created))
current_app.logger.info(
'create_records: {updated} updated, {created} new'.format(
updated=n_updated,
created=n_created
)
)
return n_created, n_updated


@shared_task(ignore_result=True)
def delete_records(records):
"""Records deleting."""
count = 0
for record in records:
# check if exist
pid = None
for identifier in record.get('identifiedBy'):
if identifier.get('source') == 'cantook':
harvested_id = identifier.get('value')
query = DocumentsSearch().filter(
'term',
identifiedBy__value=harvested_id
).source(includes=['pid'])
try:
pid = [r.pid for r in query.scan()].pop()
except IndexError:
pid = None
try:
if pid:
# update the record
existing_record = Document.get_record_by_pid(pid)
# TODO: delete record and linked references
count += 1
except Exception as err:
current_app.logger.error(
'EBOOKS DELETE RECORDS: {err} {record}'.format(
err=err,
record=record
)
)
current_app.logger.info('delete_records: {count}'.format(
count=count
))
return count
31 changes: 19 additions & 12 deletions rero_ils/modules/ebooks/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@
from invenio_oaiharvester.models import OAIHarvestConfig

from ..documents.api import Document
from ..holdings.api import create_holding, \
get_holdings_by_document_item_type, \
get_standard_holding_pid_by_doc_location_item_type
from ..holdings.api import Holding, create_holding, \
get_holding_pid_by_doc_location_item_type
from ..organisations.api import Organisation


Expand Down Expand Up @@ -96,6 +95,7 @@ def create_document_holding(record):
item_type_pid = org.online_circulation_category()
locations = org.get_online_locations()
for location in locations:

create_holding(
document_pid=new_record.pid,
location_pid=location,
Expand Down Expand Up @@ -128,19 +128,26 @@ def update_document_holding(record, pid):
item_type_pid = org.online_circulation_category()
locations = org.get_online_locations()
for location_pid in locations:
if not get_standard_holding_pid_by_doc_location_item_type(
new_record.pid, location_pid, item_type_pid
if not get_holding_pid_by_doc_location_item_type(
new_record.pid, location_pid, item_type_pid, 'electronic'
):
create_holding(
document_pid=new_record.pid,
location_pid=location_pid,
item_type_pid=item_type_pid,
electronic_location=harvested_source,
holdings_type='electronic')
holdings = get_holdings_by_document_item_type(
new_record.pid, item_type_pid)
for holding in holdings:
if holding.location_pid not in locations:
holding.delete(
force=False, dbcommit=True, delindex=True)
holdings_type='electronic'
)

source_uris = []
for harvested_source in harvested_sources:
if harvested_source.get('source'):
source_uris.append(harvested_source.get('uri'))

for holding_pid in Holding.get_holdings_pid_by_document_pid(pid):
holding = Holding.get_record_by_pid(holding_pid)
for electronic_location in holding.get('electronic_location', []):
if electronic_location.get('source'):
if electronic_location.get('uri') not in source_uris:
holding.delete(force=False, dbcommit=True, delindex=True)
return new_record
8 changes: 4 additions & 4 deletions rero_ils/modules/holdings/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,22 +372,22 @@ def prediction_issues_preview_for_pattern(
return text


def get_standard_holding_pid_by_doc_location_item_type(
document_pid, location_pid, item_type_pid):
def get_holding_pid_by_doc_location_item_type(
document_pid, location_pid, item_type_pid, holdings_type='standard'):
"""Returns standard holding pid for document/location/item type."""
result = HoldingsSearch().filter(
'term',
document__pid=document_pid
).filter(
'term',
holdings_type='standard'
holdings_type=holdings_type
).filter(
'term',
circulation_category__pid=item_type_pid
).filter(
'term',
location__pid=location_pid
).source().scan()
).source('pid').scan()
try:
return next(result).pid
except StopIteration:
Expand Down
4 changes: 2 additions & 2 deletions rero_ils/modules/items/api/record.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,13 @@ def link_item_to_holding(self):
Link an item to a standard holding record.
"""
from ...holdings.api import \
get_standard_holding_pid_by_doc_location_item_type, \
get_holding_pid_by_doc_location_item_type, \
create_holding

item = self.replace_refs()
document_pid = item.get('document').get('pid')

holding_pid = get_standard_holding_pid_by_doc_location_item_type(
holding_pid = get_holding_pid_by_doc_location_item_type(
document_pid, self.location_pid, self.item_type_pid)

if not holding_pid:
Expand Down
1 change: 0 additions & 1 deletion rero_ils/modules/notifications/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,6 @@ def process_notifications(cls, verbose=False):
try:
pid = payload['pid']
notification = Notification.get_record_by_pid(pid)
print('----process_notifications----:', notification)
Dispatcher().dispatch_notification(notification, verbose)
message.ack()
count['send'] += 1
Expand Down
35 changes: 1 addition & 34 deletions tests/ui/documents/test_documents_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def test_document_create_records(app, org_martigny, org_sion, ebook_1_data,
assert n_updated == 2

# TODO: find a way to execute celery worker tasks in travis tests
# n_created, n_updated = create_records([ebook_1_data])
# n_created, n_updated = create_records.delay([ebook_1_data])
# assert n_created == 0
# assert n_updated == 1

Expand All @@ -133,39 +133,6 @@ def test_document_can_delete_with_loans(
assert 'links' in reasons


# TODO: Delete person in enrich_document_data() from index after deletion
# @mock.patch('rero_ils.modules.documents.listener.requests_get')
# @mock.patch('rero_ils.modules.persons.jsonresolver.requests_get')
# def test_document_person_resolve(mock_resolver_get, mock_listener_get,
# es_clear, db, document_ref,
# person_response_data):
# """Test document person resolve."""
# mock_resolver_get.return_value = mock_response(
# json_data=person_response_data
# )
# mock_listener_get.return_value = mock_response(
# json_data=person_response_data
# )

# assert document_ref.replace_refs()[
# 'authors'
# ][0]['pid'] == person_response_data['id']

# count = PersonsSearch().filter(
# 'term',
# pid=person_response_data['id']
# ).count()
# assert count == 1

# document_ref.update(document_ref)
# document_ref.delete()
# count = PersonsSearch().filter(
# 'term',
# pid=person_response_data['id']
# ).count()
# assert count == 0


def test_document_person_resolve_exception(es_clear, db, document_data_ref):
"""Test document person resolve."""
with pytest.raises(Exception):
Expand Down
47 changes: 44 additions & 3 deletions tests/ui/ebooks/test_ebooks_receivers.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,19 @@

from collections import namedtuple

from rero_ils.modules.documents.api import Document
from utils import flush_index

from rero_ils.modules.documents.api import Document, DocumentsSearch
from rero_ils.modules.ebooks.receivers import publish_harvested_records
from rero_ils.modules.ebooks.tasks import create_records, delete_records
from rero_ils.modules.holdings.api import Holding, HoldingsSearch


def test_publish_harvested_records(app, ebooks_1_xml, ebooks_2_xml,
org_martigny, org_sion):
org_martigny, loc_online_martigny,
item_type_online_martigny,
org_sion, loc_online_sion,
item_type_online_sion, capsys):
"""Test publish harvested records."""
Identifier = namedtuple('Identifier', 'identifier')
Record = namedtuple('Record', 'xml deleted header')
Expand All @@ -33,7 +40,13 @@ def test_publish_harvested_records(app, ebooks_1_xml, ebooks_2_xml,
header=Identifier(identifier='record1')))
records.append(Record(xml=ebooks_2_xml, deleted=False,
header=Identifier(identifier='record2')))
publish_harvested_records(sender=None, records=records)
records.append(Record(xml=ebooks_2_xml, deleted=True,
header=Identifier(identifier='record3')))

kwargs = {'max': 100}
publish_harvested_records(sender=None, records=records, kwargs=kwargs)
flush_index(DocumentsSearch.Meta.index)
flush_index(HoldingsSearch.Meta.index)

assert Document.count() == 2
doc1 = Document.get_record_by_pid('1')
Expand All @@ -42,9 +55,37 @@ def test_publish_harvested_records(app, ebooks_1_xml, ebooks_2_xml,
{'type': 'bf:Local', 'value': 'cantook-EDEN502344'},
{'type': 'bf:Local', 'source': 'cantook', 'value': 'record1'}
]
assert len(list(Holding.get_holdings_pid_by_document_pid(doc1.pid))) == 1
doc2 = Document.get_record_by_pid('2')
assert doc2.get('identifiedBy') == [
{'type': 'bf:Isbn', 'value': '9782811234157'},
{'type': 'bf:Local', 'value': 'cantook-immateriel.frO1006810'},
{'type': 'bf:Local', 'source': 'cantook', 'value': 'record2'}
]
assert len(list(Holding.get_holdings_pid_by_document_pid(doc2.pid))) == 1

# test update
publish_harvested_records(sender=None, records=records)
flush_index(DocumentsSearch.Meta.index)
flush_index(HoldingsSearch.Meta.index)
assert len(list(Holding.get_holdings_pid_by_document_pid(doc1.pid))) == 1
assert len(list(Holding.get_holdings_pid_by_document_pid(doc2.pid))) == 1

# test delete
records = []
del doc1['electronicLocator']
records.append(doc1)
doc2['electronicLocator'] = [{
"content": "coverImage",
"type": "relatedResource",
"url": "http://images.immateriel.fr/covers/DEQ2C5A.png"
}]
records.append(doc2)

create_records(records=records)
flush_index(DocumentsSearch.Meta.index)
flush_index(HoldingsSearch.Meta.index)
assert len(list(Holding.get_holdings_pid_by_document_pid(doc1.pid))) == 0
assert len(list(Holding.get_holdings_pid_by_document_pid(doc2.pid))) == 0

assert 2 == delete_records(records=records)

0 comments on commit 4c6491f

Please sign in to comment.