-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathupdate_publisher_recordset.py
636 lines (554 loc) · 22.8 KB
/
update_publisher_recordset.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
# must set PYTHONPATH environment variable to the top level prior to running this script
import logging
import re
import datetime
import dateutil.parser
import time
import os
import requests
import feedparser
assert feedparser.__version__ >= "5.2.0"
from idb import config
from idb.postgres_backend.db import PostgresDB, MediaObject, DictCursor
from idb.helpers.etags import calcFileHash
from idb.helpers.storage import IDigBioStorage
from idb.helpers.logging import idblogger
from idigbio_ingestion.lib.util import download_file
from idigbio_ingestion.lib.eml import parseEml
#### disabling warnings per https://urllib3.readthedocs.org/en/latest/security.html#disabling-warnings
## Would rather have warnings go to log but could not get logging.captureWarnings(True) to work.
## There is no urllib3.enable_warnings method. Is it possible to disable_warnings and then re-enable them later?
####### The disable_warnings method did not prevent warnings from being printed. Commenting out for now...
#import urllib3
#assert urllib3.__version__ >= "1.13"
#urllib3.disable_warnings()
####
# uuid '872733a2-67a3-4c54-aa76-862735a5f334' is the idigbio root entity,
# the parent of all publishers.
IDIGBIO_ROOT_UUID = "872733a2-67a3-4c54-aa76-862735a5f334"
logger = idblogger.getChild('upr')
def struct_to_datetime(s):
"""
Convert a Struct representation of a time to a datetime
timestamp.
Parameters
----------
s : struct
Timestamp in Struct representation, a 9-tuple such as
(2019, 2, 17, 17, 3, 38, 1, 48, 0)
Returns
-------
datetime timestamp
"""
return datetime.datetime.fromtimestamp(time.mktime(s))
def id_func(portal_url, e):
"""
Given a portal url and an RSS feed entry (feedparser dictionary
object), return something suitable to be used as a recordid
for the published dataset entry. The portal_url is only used
to help construct a recordid for Symbiota recordsets.
Parameters
----------
portal_url : url string
A url to a data portal from the publishers table
e : feedparser entry object (feedparser.FeedParserDict)
An individual rss entry already processed into a feedparser dict.
"""
id = None
# feedparser magic maps various fields to "id" including "guid"
if "id" in e:
id = e["id"]
# portal_url is used to help construct ids in Symbiota feeds
elif "collid" in e:
id = "{0}collections/misc/collprofiles.php?collid={1}".format(
portal_url, e["collid"])
if id is not None:
# Strip trailing version info from ipt ids
m = re.search('^(.*)/v[0-9]*(\.)?[0-9]*$', id)
if m is not None:
id = m.group(1)
id = id.lower()
logger.debug ("id_func ready to return recorid '{0}' from portal url '{1}'".format(id, portal_url))
return id
def get_feed(rss_url):
"""
Download contents of an RSS feed into a string.
This is required since feedparser itself doesn't have a timeout parameter
Parameters
----------
rss_url : str
The URI of an rss feed
Returns
-------
text or False
Content of the RSS feed (body / text) if we are able to download it
from the URI, otherwise return False.
"""
feedtest = None
try:
feedtest = requests.get(rss_url, timeout=10)
feedtest.raise_for_status()
except requests.exceptions.SSLError:
# Ignore urllib3 SSL issues on this check?
# Most of the time, SSL issues are simply certificate errors at the provider side and we feel ok skipping.
#
# However, "special" kinds of server errors such as documented in redmine #2114 get skipped if we do nothing, hence the extra check below.
pass
except Exception as e:
logger.error("Failed to read %r; reason: %s",
rss_url,
feedtest.reason if feedtest is not None else "non-http error")
if feedtest is None:
logger.error("Specific reason: %s", e)
return False
# At this point we could have feedtest = None coming out of the SSLError exception above.
if feedtest is None:
logger.error("Feed error on rss_url = %r", rss_url)
return False
else:
return feedtest.text
def update_db_from_rss():
# existing_recordsets is a dict that holds mapping of recordids to DB id
existing_recordsets = {}
# file_links is a dict that holds mapping of file_links to DB id
file_links = {}
# recordsets is a dict that holds entire rows based on DB id (not recordid or uuid)
recordsets = {}
with PostgresDB() as db:
logger.debug("Gathering existing recordsets...")
for row in db.fetchall("SELECT * FROM recordsets"):
recordsets[row["id"]] = row
file_links[row["file_link"]] = row["id"]
for recordid in row["recordids"]:
logger.debug("id | recordid | file_link : '{0}' | '{1}' | '{2}'".format(
row["id"], recordid, row["file_link"]))
if recordid in existing_recordsets:
logger.error("recordid '{0}' already in existing recordsets. This should never happen.".format(recordid))
else:
existing_recordsets[recordid] = row["id"]
logger.debug("Gathering existing publishers...")
pub_recs = db.fetchall("SELECT * FROM publishers")
logger.debug("Checking %d publishers", len(pub_recs))
for row in pub_recs:
uuid, rss_url = row['uuid'], row['rss_url']
logger.info("Starting Publisher Feed: %s %s", uuid, rss_url)
rsscontents = get_feed(rss_url)
if rsscontents:
try:
_do_rss(rsscontents, row, db, recordsets, existing_recordsets, file_links)
logger.debug('_do_rss returned, ready to COMMIT...')
db.commit()
except Exception:
logger.exception("An exception occurred processing '{0}' in rss '{1}', will try ROLLBACK...".format(uuid, rss_url))
db.rollback()
except:
logger.exception("Unknown exception occurred in rss '{0}' in rss '{1}', will try ROLLBACK...".format(uuid, rss_url))
db.rollback()
raise
logger.info("Finished processing add publisher RSS feeds")
def _do_rss_entry(entry, portal_url, db, recordsets, existing_recordsets, pub_uuid, file_links):
"""
Do the recordset parts.
Parameters
----------
entry : feedparser entry object
Each field in the feedparser object is accessible via dict notation
portal_url : url string
publisher portal url, needed for some id functions
db : db object
DB connection object
recordsets : dict
dict of existing known recordset DB ids with associated DB row data
existing_recordsets : dict
dict of existing known recordset recordids with associated DB ids
pub_uuid : uuid
Publisher's uuid
file_links : dict
dict of existing known file_links with associated DB ids
"""
logger.debug("Dump of this feed entry: '{0}'".format(entry))
# We pass in portal_url even though it is only needeed for Symbiota portals
recordid = id_func(portal_url, entry)
rsid = None
ingest = False # any newly discovered recordsets default to False
feed_recordids = [recordid]
# recordset holds one row of recordset data
recordset = None
if recordid in existing_recordsets:
logger.debug("Found recordid '{0}' in existing recordsets.".format(recordid))
recordset = recordsets[existing_recordsets[recordid]]
logger.debug("recordset = '{0}'".format(recordset))
rsid = recordset["uuid"]
ingest = recordset["ingest"]
feed_recordids = list(set(feed_recordids + recordset["recordids"]))
logger.debug("")
else:
logger.debug("recordid '{0}' NOT found in existing recordsets.".format(recordid))
eml_link = None
file_link = None
date = None
rs_name = None
if "published_parsed" in entry and entry["published_parsed"] is not None:
date = struct_to_datetime(entry["published_parsed"])
logger.debug('pub_date struct via published_parsed: {0}'.format(date.isoformat()))
elif "published" in entry and entry["published"] is not None:
date = dateutil.parser.parse(entry["published"])
logger.debug('pub_date via dateutil: {0}'.format(date.isoformat()))
# Pick a time distinctly before now() to avoid data races
fifteen_minutes_ago = datetime.datetime.now() - datetime.timedelta(minutes=15)
if date is None or date > datetime.datetime.now():
date = fifteen_minutes_ago
for eml_prop in ["ipt_eml", "emllink"]:
if eml_prop in entry:
eml_link = entry[eml_prop]
break
else:
if recordset is not None:
eml_link = recordset["eml_link"]
for link_prop in ["ipt_dwca", "link"]:
if link_prop in entry:
file_link = entry[link_prop]
break
else:
if recordset is not None:
file_link = recordset["file_link"]
if "title" in entry:
rs_name = entry['title']
elif recordset is not None:
rs_name = recordset["name"]
else:
rs_name = recordid
if recordid is not None:
logger.debug("Identified recordid: '{0}'".format(recordid))
else:
logger.debug("No recordid identified.")
if recordset is None:
logger.debug("Ready to INSERT: '{0}', '{1}'".format(feed_recordids, file_link))
sql = (
"""INSERT INTO recordsets
(uuid, publisher_uuid, name, recordids, eml_link, file_link, ingest, pub_date)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s)
ON CONFLICT (file_link) DO UPDATE set recordids=array_append(recordsets.recordids,%s), pub_date=%s,
last_seen = now()
""",
(rsid, pub_uuid, rs_name, feed_recordids, eml_link, file_link, ingest, date, recordid, date))
db.execute(*sql)
logger.info("Created Recordset for recordid:%s '%s'", recordid, rs_name)
else:
logger.debug("Ready to UPDATE: '{0}', '{1}', '{2}'".format(recordset["id"], feed_recordids, file_link))
# The following checks helps to identify dataset recordids that exist in multiple RSS feeds.
# The DB id should match when doing a "reverse" look up by file_link.
if file_link in file_links:
if recordset["id"] != file_links[file_link]:
logger.error("Skipping file_link: '{0}'. Found conflict or duplicate recordid. "
"Investigate db ids: '{1}' and '{2}'".format(
file_link, recordset["id"], file_links[file_link]
))
return
sql = ("""UPDATE recordsets
SET publisher_uuid=%(publisher_uuid)s,
eml_link=%(eml_link)s,
file_link=%(file_link)s,
last_seen=%(last_seen)s,
pub_date=%(pub_date)s
WHERE id=%(id)s""",
{
"publisher_uuid": pub_uuid,
"name": rs_name,
"recordids": feed_recordids,
"eml_link": eml_link,
"file_link": file_link,
"last_seen": datetime.datetime.now(),
"pub_date": date,
"id": recordset["id"]
})
db.execute(*sql)
logger.info("Updated Recordset id:%s %s %s '%s'",
recordset["id"], recordset["uuid"], file_link, rs_name)
def _do_rss(rsscontents, r, db, recordsets, existing_recordsets, file_links):
"""
Process one RSS feed contents. Compares the recordsets we know
about with the ones found in the feed.
Parameters
----------
rsscontents : text
Content of an RSS feed
r : row of publisher data
A row of data from the publishers table that contains all columns,
each column addressable as r["column_name"]
db : database object
A PostgresDB() database object
recordsets : dict
dict of existing known recordset DB ids with associated DB row data
existing_recordsets : dict
dict of existing known recordset recordids with associated DB ids
file_links : dict
dict of existing known file_links with associated DB ids
"""
logger.debug("Start parsing results of %s", r['rss_url'])
feed = feedparser.parse(rsscontents)
logger.debug("Found {0} entries in feed to process.".format(len(feed)))
pub_uuid = r["uuid"]
if pub_uuid is None:
pub_uuid, _, _ = db.get_uuid(r["recordids"])
name = r["name"]
if name is None or name == "":
if "title" in feed["feed"]:
name = feed["feed"]["title"]
if name == "":
name = r["rss_url"]
else:
name = r["rss_url"]
if "\\x" in name:
name = name.decode("utf8")
logger.info("Update Publisher id:%s %s '%s'", r["id"], pub_uuid, name)
auto_publish = False # we never auto-publish anymore
pub_date = None
if "published_parsed" in feed["feed"]:
pub_date = struct_to_datetime(feed["feed"]["published_parsed"])
elif "updated_parsed" in feed:
pub_date = struct_to_datetime(feed["updated_parsed"])
elif "updated_parsed" in feed["feed"]:
pub_date = struct_to_datetime(feed["feed"]["updated_parsed"])
sql = ("""UPDATE publishers
SET name=%(name)s,
last_seen=%(last_seen)s,
pub_date=%(pub_date)s,
uuid=%(uuid)s
WHERE id=%(id)s""",
{
"id": r["id"],
"name": name,
"uuid": pub_uuid,
"last_seen": datetime.datetime.now(),
"pub_date": pub_date,
})
db.execute(*sql)
logger.debug("Begin iteration over entries found in '{0}'".format(r['rss_url']))
for e in feed['entries']:
_do_rss_entry(e, r['portal_url'], db, recordsets,
existing_recordsets,
pub_uuid,
file_links)
db.set_record(pub_uuid, "publisher", IDIGBIO_ROOT_UUID,
{
"rss_url": r["rss_url"],
"name": name,
"auto_publish": r["auto_publish"],
"base_url": r["portal_url"],
"publisher_type": r["pub_type"],
"recordsets": {}
},
r["recordids"], [])
def harvest_all_eml():
sql = """SELECT *
FROM recordsets
WHERE eml_link IS NOT NULL
AND ingest=true
AND pub_date < now()
AND (eml_harvest_date IS NULL OR eml_harvest_date < pub_date)"""
with PostgresDB() as db:
recs = db.fetchall(sql, cursor_factory=DictCursor)
logger.info("Harvesting %d EML files", len(recs))
for r in recs:
try:
harvest_eml(r, db)
db.commit()
except KeyboardInterrupt:
db.rollback()
raise
except:
db.rollback()
logger.exception("failed Harvest EML %s %s", r["id"], r["name"])
def harvest_eml(r, db):
logger.info("Harvest EML %s '%s' @ '%s'", r["id"], r["name"], r["eml_link"])
fname = "{0}.eml".format(r["id"])
if not download_file(r["eml_link"], fname):
logger.error("failed Harvest EML %s '%s' @ '%s'", r["id"], r["name"], r["eml_link"])
return
try:
etag = calcFileHash(fname)
u = r["uuid"]
if u is None:
logger.debug("No uuid, using get_uuid on recordids")
u, _, _ = db.get_uuid(r["recordids"])
logger.debug("Using recordset UUID: {0}".format(u))
desc = {}
with open(fname,"rb") as inf:
desc = parseEml(r["recordids"][0], inf.read())
desc["ingest"] = r["ingest"]
desc["link"] = r["file_link"]
desc["eml_link"] = r["eml_link"]
desc["update"] = r["pub_date"].isoformat()
parent = r["publisher_uuid"]
db.set_record(u, "recordset", parent, desc, r["recordids"], [])
sql = ("""UPDATE recordsets
SET eml_harvest_etag=%s, eml_harvest_date=%s, uuid=%s
WHERE id=%s""",
(etag, datetime.datetime.now(), u, r["id"]))
db.execute(*sql)
finally:
if os.path.exists(fname):
os.unlink(fname)
def harvest_all_file():
sql = """SELECT *
FROM recordsets
WHERE file_link IS NOT NULL
AND uuid IS NOT NULL
AND ingest=true
AND pub_date < now()
AND (file_harvest_date IS NULL OR file_harvest_date < pub_date)"""
with PostgresDB() as db:
recs = db.fetchall(sql)
logger.info("Harvesting %d files", len(recs))
for r in recs:
try:
harvest_file(r, db)
db.commit()
except KeyboardInterrupt:
db.rollback()
raise
except:
logger.exception("Error processing id:%s url:%s", r['id'], r['file_link'])
db.rollback()
def harvest_file(r, db):
logger.info("Harvest File %s '%s' @ '%s'", r["id"], r["name"], r["file_link"])
fname = "{0}.file".format(r["id"])
if not download_file(r["file_link"], fname, timeout=5):
logger.error("failed Harvest file %s '%s' @ '%s'", r["id"], r["name"], r["file_link"])
return
try:
etag = upload_recordset(r["uuid"], fname, db)
assert etag
sql = ("""UPDATE recordsets
SET file_harvest_etag=%s, file_harvest_date=%s
WHERE id=%s""",
(etag, datetime.datetime.now(), r["id"]))
db.execute(*sql)
finally:
if os.path.exists(fname):
os.unlink(fname)
def upload_recordset(rsid, fname, idbmodel):
filereference = "http://api.idigbio.org/v1/recordsets/" + rsid
logger.debug("Starting Upload of %r", rsid)
stor = IDigBioStorage()
with open(fname, 'rb') as fobj:
mo = MediaObject.fromobj(
fobj, url=filereference, type='datasets', owner=config.IDB_UUID)
k = mo.get_key(stor)
if k.exists():
logger.debug("ETAG %s already present in Storage.", mo.etag)
else:
mo.upload(stor, fobj)
logger.debug("ETAG %s uploading from %r", mo.etag, fname)
mo.ensure_media(idbmodel)
mo.ensure_object(idbmodel)
mo.ensure_media_object(idbmodel)
logger.debug("Finished Upload of %r, etag = %s", rsid, mo.etag)
return mo.etag
def upload_recordset_from_file(rsid, fname):
"""
Given a recordset uuid and a local dataset filename, upload the local
dataset file as the "current" file for that uuid.
Parameters
----------
rsid : uuid
An iDigBio recordset uuid
fname : string
Filename (full path or current directory only)
Returns
-------
bool
True if successful, False otherwise
"""
# convert rsid uuid to string here because of either:
# psycopg2.ProgrammingError: can't adapt type 'UUID'
# or
# TypeError: 'UUID' object does not support indexing
rsuuid = str(rsid)
logger.info("Manual upload of '{0}' from file '{1}' requested.".format(rsuuid, fname))
# do some checks here
try:
f = open(fname)
f.close()
except:
logger.error("Cannot access file: '{0}'. Aborting upload.".format(fname))
raise
db = PostgresDB()
sql = ("""SELECT id FROM recordsets WHERE uuid=%s""", (rsuuid, ))
idcount = db.execute(*sql)
if idcount < 1:
logger.error("Cannot find uuid '{0}' in db. Aborting upload.".format(rsuuid))
db.rollback()
return False
# output the "before" state
results = db.fetchall("""SELECT id,file_harvest_date,file_harvest_etag FROM recordsets WHERE uuid=%s""", (rsuuid, ))
for each in results:
logger.debug("{0}".format(each))
try:
etag = upload_recordset(rsuuid, fname, db)
assert etag
sql = ("""UPDATE recordsets
SET file_harvest_etag=%s, file_harvest_date=%s
WHERE uuid=%s""",
(etag, datetime.datetime.now(), rsuuid))
update_count = db.execute(*sql)
db.commit()
logger.info("UPDATED {0} rows.".format(update_count))
logger.info("Finished manual upload of file '{0}', result etag = '{1}', saved to db.".format(fname, etag))
except:
logger.error("An exception occurred during upload of file or db update for '{0}'".format(fname))
raise
# output the "after" state
results = db.fetchall("""SELECT id,file_harvest_date,file_harvest_etag FROM recordsets WHERE uuid=%s""", (rsuuid, ))
for each in results:
logger.debug("{0}".format(each))
return True
def create_tables():
"""
This function is out-of-sync with actual database, unmaintained.
Commenting out all action in this function, it will do nothing until modified again.
"""
db = PostgresDB()
logger.error('create_tables called but has no valid code to run.')
# db.execute("""CREATE TABLE IF NOT EXISTS publishers (
# id BIGSERIAL NOT NULL PRIMARY KEY,
# uuid uuid UNIQUE,
# name text NOT NULL,
# recordids text[] NOT NULL DEFAULT '{}',
# pub_type varchar(20) NOT NULL DEFAULT 'rss',
# portal_url text,
# rss_url text NOT NULL,
# auto_publish boolean NOT NULL DEFAULT false,
# first_seen timestamp NOT NULL DEFAULT now(),
# last_seen timestamp NOT NULL DEFAULT now(),
# pub_date timestamp
# )""")
# #pubid, rsid Ingest, rs_record_id, eml_link, file_link, First Seen Date, Last Seen Date, Feed Date, Harvest Date, Harvest Etag
# db.execute("""CREATE TABLE IF NOT EXISTS recordsets (
# id BIGSERIAL NOT NULL PRIMARY KEY,
# uuid uuid UNIQUE,
# publisher_uuid uuid REFERENCES publishers(uuid),
# name text NOT NULL,
# recordids text[] NOT NULL DEFAULT '{}',
# eml_link text,
# file_link text NOT NULL,
# ingest boolean NOT NULL DEFAULT false,
# first_seen timestamp NOT NULL DEFAULT now(),
# last_seen timestamp NOT NULL DEFAULT now(),
# pub_date timestamp,
# harvest_date timestamp,
# harvest_etag varchar(41)
# )""")
# db.commit()
db.close()
def main():
# create_tables()
# Re-work from canonical db
logger.info("Begin update_publisher_recordset()")
update_db_from_rss()
logger.info("*** Begin harvest of eml files...")
harvest_all_eml()
logger.info("*** Begin harvest of dataset files...")
harvest_all_file()
logger.info("Finished all updates")