-
Notifications
You must be signed in to change notification settings - Fork 2
/
zenodouploader.py
496 lines (455 loc) · 21.2 KB
/
zenodouploader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
#!/usr/bin/env python3
"""
Retrieve and disseminate files and metadata to Zenodo
"""
import logging
import sys
import re
import requests
from errors import DisseminationError
from uploader import Uploader, PUB_FORMATS, Location
class ZenodoUploader(Uploader):
"""Dissemination logic for Zenodo"""
def __init__(self, work_id, export_url, client_url, version):
"""Instantiate class for accessing Zenodo API."""
super().__init__(work_id, export_url, client_url, version)
try:
api_token = self.get_variable_from_env(
'zenodo_token', 'Zenodo')
except DisseminationError as error:
logging.error(error)
sys.exit(1)
self.api = ZenodoApi(api_token)
def upload_to_platform(self):
"""Upload work in required format to Zenodo."""
# Test that no record representing this Work already exists in Zenodo.
search_results = self.api.search_records(self.work_id)
if search_results['total'] > 0:
logging.error(
'Cannot upload to Zenodo: an item with this Work ID already '
'exists')
sys.exit(1)
# If any required metadata is missing, this step will fail, so do it
# before attempting large file downloads.
zenodo_metadata = self.parse_metadata()
# Include full work metadata file in JSON format,
# as a supplement to filling out Zenodo metadata fields.
metadata_bytes = self.get_formatted_metadata('json::thoth')
# Include all available publication files. Don't fail if
# one is missing, but do fail if none are found at all.
# (Any paywalled publications will not be retrieved.)
publications = []
for format in PUB_FORMATS:
try:
publication = self.get_publication_details(format)
publications.append(publication)
except DisseminationError as error:
pass
if len(publications) < 1:
logging.error(
'Cannot upload to Zenodo: no suitable publication files found')
sys.exit(1)
# Create a deposition to represent the Work.
(deposition_id, api_bucket) = self.api.create_deposition(
zenodo_metadata)
locations = []
location_platform = 'OTHER'
# Treat Zenodo deposition as a single "landing page" which may be
# shared by multiple "publications".
landing_page = 'https://zenodo.org/records/{}'.format(deposition_id)
# Any failure after this point will leave incomplete data in
# Zenodo storage which will need to be removed.
try:
filename = self.work_id
for publication in publications:
full_filename = '{}_book{}'.format(filename,
publication.file_ext)
self.api.upload_file(publication.bytes, full_filename,
api_bucket)
full_text_url = '{}/files/{}'.format(landing_page,
full_filename)
locations.append(Location(publication.id, location_platform,
landing_page, full_text_url))
self.api.upload_file(metadata_bytes,
'{}_metadata.json'.format(filename),
api_bucket)
published_url = self.api.publish_deposition(deposition_id)
except DisseminationError as error:
# Report failure, and remove any partially-created items
# from Zenodo storage.
logging.error(error)
self.api.clean_up(deposition_id)
sys.exit(1)
except Exception:
# Unexpected failure. Let program crash, but still need to tidy
# Zenodo storage.
self.api.clean_up(deposition_id)
raise
logging.info(
'Successfully uploaded to Zenodo at {}'.format(published_url))
# Return details of created uploads to be entered as Thoth Locations
return locations
def parse_metadata(self):
"""Convert work metadata into Zenodo format."""
work_metadata = self.metadata.get('data').get('work')
long_abstract = work_metadata.get('longAbstract')
if long_abstract is None:
logging.error(
'Cannot upload to Zenodo: Work must have a Long Abstract')
sys.exit(1)
doi = work_metadata.get('doi')
if doi is None:
logging.error(
'Cannot upload to Zenodo: Work must have a DOI')
sys.exit(1)
zenodo_metadata = {
'metadata': {
# Mandatory fields which will prevent publication
# if not set explicitly:
'title': work_metadata['fullTitle'], # mandatory in Thoth
'upload_type': 'publication',
# Mandatory when upload_type is publication
'publication_type': 'book',
'description': long_abstract,
'creators': self.get_zenodo_creators(work_metadata),
# Mandatory fields which will be defaulted
# if not set explicitly:
# Zenodo requires date in YYYY-MM-DD format, as output by Thoth
'date': work_metadata.get('publicationDate'),
'access_right': 'open',
# Mandatory when access_right is open
'license': self.get_zenodo_licence(work_metadata),
# Optional fields:
# If own DOI is not supplied, Zenodo will register one
'doi': doi,
'prereserve_doi': False,
# Will be safely ignored if empty
'keywords': [n['subjectCode']
for n in work_metadata.get('subjects')
if n.get('subjectType') == 'KEYWORD'],
# Will be safely ignored if empty
'related_identifiers': self.get_zenodo_relations(
work_metadata),
# Will be safely ignored if empty
'references': [n['unstructuredCitation']
for n in work_metadata.get('references')
if n.get('unstructuredCitation') is not None],
'communities': [{'identifier': 'thoth'}],
'imprint_publisher': self.get_publisher_name(),
# Will be safely ignored if None
'imprint_isbn': next(
(n.get('isbn') for n in work_metadata.get('publications')
if n.get('isbn') is not None
and n['publicationType'] == 'PDF'), None),
# Requested in format `city, country` but seemingly not checked
'imprint_place': work_metadata.get('place'),
'notes': 'thoth-work-id:{}'.format(self.work_id)
}
}
# Only one language can be supplied, and must not be None
language = next((n['languageCode']
for n in work_metadata.get('languages')), None)
if language is not None:
zenodo_metadata['metadata'].update({'language': language.lower()})
return zenodo_metadata
def get_zenodo_licence(self, metadata):
"""
Find the Zenodo licence string corresponding to the Thoth licence URL.
"""
thoth_licence_raw = metadata.get('license')
if thoth_licence_raw is None:
logging.error(
'Cannot upload to Zenodo: Work must have a Licence')
sys.exit(1)
# Thoth licence field is unchecked free text. Retrieve a normalised
# version of the Thoth licence, without http(s) or www prefixes,
# optional final '/', or the `deed`/`legalcode` suffixes sometimes
# given with CC licences. (IGNORECASE may be redundant here if Thoth
# licences are lowercased on entry into database)
try:
thoth_licence = re.fullmatch(
r'^(?:https?://)?(?:www\.)?(.*?)/?(?:(?:deed|legalcode)'
r'(?:\.[a-zA-Z]{2})?)?$',
thoth_licence_raw, re.IGNORECASE).group(1)
except AttributeError:
logging.error(
'Work Licence {} not in expected URL format'
.format(thoth_licence_raw))
sys.exit(1)
zenodo_licence = self.api.search_licences(thoth_licence)
if zenodo_licence is None:
logging.error(
'Work Licence {} not supported by Zenodo'
.format(thoth_licence_raw))
sys.exit(1)
return zenodo_licence
@staticmethod
def get_zenodo_creators(metadata):
"""
Create a list of main contributors in the format required by Zenodo.
"""
zenodo_creators = []
for contribution in [n for n in metadata.get('contributions')
if n['mainContribution'] is True]:
first_name = contribution.get('firstName')
# Zenodo requests author names in `Family name, Given name(s)`
# format, but if we only have full name, supply that as a
# workaround
if first_name is not None:
name = '{}, {}'.format(contribution['lastName'], first_name)
else:
name = contribution['fullName']
# OK to submit in URL format - Zenodo will convert to ID-only
# format (will also validate ORCID and prevent publication if
# invalid). Will be safely ignored if None.
orcid = contribution.get('contributor').get('orcid')
affiliations = contribution.get('affiliations')
# Will be safely ignored if None
first_institution = next((a.get('institution').get(
'institutionName') for a in affiliations if affiliations),
None)
zenodo_creators.append({
'name': name,
'orcid': orcid,
'affiliation': first_institution})
if len(zenodo_creators) < 1:
logging.error(
'Cannot upload to Zenodo: Work must have at least one Main '
'Contribution')
sys.exit(1)
return zenodo_creators
def get_zenodo_relations(self, metadata):
"""
Create a list of work relations in the format required by Zenodo.
Relations must have a standard identifier (e.g. ISBN, DOI).
Can be used to represent alternative format ISBNs, references,
Thoth work relations (e.g. child, parent), and series.
"""
zenodo_relations = []
for isbn in [n.get('isbn') for n in metadata.get('publications')
if n.get('isbn') is not None
and n['publicationType'] != 'PDF']:
zenodo_relations.append({
'relation': 'isVariantFormOf',
'identifier': isbn,
# Resource type is optional but can be guaranteed here
'resource_type': 'publication-book',
# Scheme will be auto-detected if not submitted
'scheme': 'isbn'})
for reference in [n['doi'] for n in metadata.get('references')
if n.get('doi') is not None]:
zenodo_relations.append({
'relation': 'cites',
'identifier': reference,
'scheme': 'doi'})
for (relation_type, relation_doi) in [(n.get('relationType'), n.get(
'relatedWork').get('doi')) for n in metadata.get('relations')
if n.get('relatedWork').get('doi') is not None]:
resource_type = 'publication-book'
if relation_type == 'HAS_PART' or relation_type == 'HAS_CHILD':
zenodo_type = 'hasPart'
# `section` in API displays as "Book chapter" in UI
resource_type = 'publication-section'
elif (relation_type == 'IS_PART_OF' or
relation_type == 'IS_CHILD_OF'):
zenodo_type = 'isPartOf'
elif relation_type == 'HAS_TRANSLATION':
zenodo_type = 'isSourceOf'
elif relation_type == 'IS_TRANSLATION_OF':
zenodo_type = 'isDerivedFrom'
elif relation_type == 'REPLACES':
zenodo_type = 'obsoletes'
elif relation_type == 'IS_REPLACED_BY':
zenodo_type = 'isObsoletedBy'
else:
raise NotImplementedError
zenodo_relations.append({
'relation': zenodo_type,
'identifier': relation_doi,
'resource_type': resource_type,
'scheme': 'doi'})
for issn in [n.get('series').get('issnPrint')
for n in metadata.get('issues')
if n.get('series').get('issnPrint') is not None]:
zenodo_relations.append({
'relation': 'isPartOf',
'identifier': issn,
# No appropriate resource type for book series
'scheme': 'issn'})
for issn in [n.get('series').get('issnDigital')
for n in metadata.get('issues')
if n.get('series').get('issnDigital') is not None]:
zenodo_relations.append({
'relation': 'isPartOf',
'identifier': issn,
# No appropriate resource type for book series
'scheme': 'eissn'})
# Only one "alternate identifier" per scheme is permitted
zenodo_relations.append({
'relation': 'isAlternateIdentifier',
'identifier': 'urn:uuid:{}'.format(self.work_id),
# Resource type is ignored for type `isAlternateIdentifier``
'scheme': 'urn'})
landing_page = metadata.get('landingPage')
if landing_page is not None:
zenodo_relations.append({
'relation': 'isAlternateIdentifier',
'identifier': landing_page,
# Resource type is ignored for type `isAlternateIdentifier``
'scheme': 'url'})
return zenodo_relations
class ZenodoApi:
"""
Methods for interacting with Zenodo API.
See documentation at https://developers.zenodo.org/#rest-api.
"""
# Production instance. Test instance is 'https://sandbox.zenodo.org/api'
API_ROOT = 'https://zenodo.org/api'
def __init__(self, api_token):
"""Set up API connection."""
self.api_token = api_token
def issue_request(self, method, url, expected_status, data_body=None,
json_body=None, return_json=False):
"""
Issue a request to the API, with optional request body, and handle
the response.
@param expected_status: HTTP status code expected for response.
@param data_body: Optional request body, as bytes.
@param json_body: Optional request body, as JSON.
@param return_json: True if caller expects JSON in the response
and wants it returned.
"""
headers = {'Authorization': 'Bearer ' + self.api_token}
response = requests.request(
method, url, headers=headers, data=data_body, json=json_body)
if response.status_code != expected_status:
error_message = 'Zenodo API error {}'.format(
response.status_code)
try:
json = response.json()
try:
# Per-error messages are the most useful, but only provide
# the first one so as not to overload the user
error_message += ' - {}'.format(
json['errors'][0]['messages'][0])
except (KeyError, IndexError):
# Fall back to main message if no per-error messages
error_message += ' - {}'.format(json['message'])
except (requests.exceptions.JSONDecodeError, KeyError):
# If JSON response body is empty, calling .json() will trigger
# a JSONDecodeError - this just means no additional error
# details are available
pass
raise DisseminationError(error_message)
if return_json:
try:
return response.json()
# If JSON response body is empty, calling .json() will trigger
# a JSONDecodeError
except requests.exceptions.JSONDecodeError:
raise DisseminationError(
'Zenodo API returned unexpected response')
def search_records(self, thoth_work_id):
"""
Search Zenodo for published records containing the supplied Thoth ID.
Note that login is not required to perform this query.
"""
query = 'q=notes:"thoth-work-id:{}"'.format(thoth_work_id)
url = '{}/records/?{}'.format(self.API_ROOT, query)
try:
response = self.issue_request('GET', url, 200, return_json=True)
except DisseminationError as error:
logging.error('Records search failed: {}'.format(error))
sys.exit(1)
try:
return response['hits']
except KeyError:
logging.error(
'Records search failed: Zenodo API returned unexpected '
'response')
sys.exit(1)
def search_licences(self, licence_url):
"""
Search the Zenodo licences endpoint for ones matching the supplied URL.
@param licence_url: normalised licence URL, without prefixes/suffixes.
"""
url = '{}/licenses/?q="{}"'.format(self.API_ROOT, licence_url)
try:
response = self.issue_request('GET', url, 200, return_json=True)
except DisseminationError as error:
logging.error('Searching for licence failed: {}'.format(error))
sys.exit(1)
try:
hits = response['hits']
if hits['total'] == 1:
licence_id = hits['hits'][0]['id']
else:
# If there are multiple matches, it might be because the
# specified URL also appears as a substring of other licence
# URLs (e.g. CC `by/3.0/` will also match `by/3.0/us/`). Zenodo
# lists CC URLs in their `https://[...]/legalcode` format, so
# see if any of the matches has the exact URL we're looking for
# (in this format).
licence_id = next(
(n['id'] for n in hits['hits'] if n['props']['url'] ==
'https://{}/legalcode'.format(licence_url)), None)
return licence_id
except KeyError:
logging.error(
'Searching for licence failed: Zenodo API returned unexpected '
'response')
sys.exit(1)
def create_deposition(self, metadata):
"""Create a deposition with the specified metadata."""
url = '{}/deposit/depositions'.format(self.API_ROOT)
try:
response = self.issue_request('POST', url, 201, json_body=metadata,
return_json=True)
except DisseminationError as error:
logging.error('Creating deposition failed: {}'.format(error))
sys.exit(1)
try:
return (response['id'], response['links']['bucket'])
except KeyError:
logging.error(
'Creating deposition failed: Zenodo API returned unexpected '
'response')
sys.exit(1)
def upload_file(self, file_bytes, file_name, api_bucket):
"""Upload the supplied file under the specified API bucket."""
url = '{}/{}'.format(api_bucket, file_name)
try:
self.issue_request('PUT', url, 201, data_body=file_bytes)
except DisseminationError as error:
raise DisseminationError('Uploading file failed: {}'.format(error))
def publish_deposition(self, deposition_id):
"""Publish the specified deposition."""
url = '{}/deposit/depositions/{}/actions/publish'.format(
self.API_ROOT, deposition_id)
try:
response = self.issue_request('POST', url, 202, return_json=True)
except DisseminationError as error:
raise DisseminationError(
'Publishing deposition failed: {}'.format(error))
try:
return response['links']['html']
except KeyError:
raise DisseminationError(
'Publishing deposition failed: Zenodo API returned unexpected '
'response')
def clean_up(self, deposition_id):
"""
Remove any items created during the upload process if it fails partway
through.
Deleting a deposition should delete any files under it.
This will fail with a 403 error if the deposition is already published.
"""
url = '{}/deposit/depositions/{}'.format(
self.API_ROOT, deposition_id)
try:
self.issue_request('DELETE', url, 204)
except DisseminationError as error:
# Can't do anything about this. Calling function will exit.
logging.error(
'Failed to delete incomplete deposition {}: {}'
.format(deposition_id, error))