-
Notifications
You must be signed in to change notification settings - Fork 38
/
Copy pathqueue_repo.py
203 lines (166 loc) · 7.99 KB
/
queue_repo.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
import argparse
import logging
import os
from random import shuffle
from time import sleep
from time import time
from sqlalchemy import text
import endpoint # magic
import pmh_record # magic
import pub # magic
from app import db
from app import logger
from endpoint import Endpoint
from queue_main import DbQueue
from util import safe_commit
class DbQueueRepo(DbQueue):
def table_name(self, job_type):
table_name = "endpoint"
return table_name
def process_name(self, job_type):
process_name = "run_repo" # formation name is from Procfile
return process_name
def maint(self, **kwargs):
if parsed_args.id:
endpoints = Endpoint.query.filter(Endpoint.id == parsed_args.id).all()
else:
# endpoints = Endpoint.query.filter(Endpoint.harvest_identify_response==None, Endpoint.error==None).all()
endpoints = Endpoint.query.filter(Endpoint.harvest_identify_response == None).all()
shuffle(endpoints)
for my_endpoint in endpoints:
my_endpoint.run_diagnostics()
db.session.merge(my_endpoint)
safe_commit(db)
logger.info("merged and committed my_endpoint: {}".format(my_endpoint))
def add_pmh_record(self, **kwargs):
endpoint_id = kwargs.get("id", None)
record_id = kwargs.get("recordid")
my_repo = Endpoint.query.get(endpoint_id)
print("my_repo", my_repo)
my_pmh_record = my_repo.get_pmh_record(record_id)
my_pmh_record.mint_pages()
# for my_page in my_pmh_record.pages:
# print "my_page", my_page
# my_page.scrape()
my_pmh_record.delete_old_record()
db.session.merge(my_pmh_record)
db.session.flush()
my_pmh_record.enqueue_representative_page()
# print my_pmh_record.pages
safe_commit(db)
def worker_run(self, **kwargs):
single_obj_id = kwargs.get("id", None)
chunk = kwargs.get("chunk")
queue_table = "endpoint"
run_method = kwargs.get("method")
run_class = Endpoint
limit = 1 # just do one repo at a time
if not single_obj_id:
text_query_pattern = """WITH picked_from_queue AS (
SELECT id
FROM {queue_table}
WHERE (
most_recent_year_harvested is null
or (
most_recent_year_harvested + interval '1 day'
< now() at time zone 'utc'
- interval '1 day' -- wait until most_recent_year_harvested is over 1 day ago
- rand * interval '18 hours' -- plus an offset so we don't run everything at midnight
)
)
and (
last_harvest_started is null
or last_harvest_started < now() at time zone 'utc' - interval '8 hours'
)
and (
last_harvest_finished is null
or last_harvest_finished < now() at time zone 'utc' - interval '2 minutes'
)
and (
retry_at <= now()
or retry_at is null
)
and ready_to_run
ORDER BY random() -- not rand, because want it to be different every time
LIMIT {chunk}
FOR UPDATE SKIP LOCKED
)
UPDATE {queue_table} queue_rows_to_update
SET last_harvest_started = now() at time zone 'utc', last_harvest_finished=null
FROM picked_from_queue
WHERE picked_from_queue.id = queue_rows_to_update.id
RETURNING picked_from_queue.*;"""
text_query = text_query_pattern.format(
chunk=chunk,
queue_table=queue_table
)
index = 0
start_time = time()
while True:
new_loop_start_time = time()
if single_obj_id:
objects = [run_class.query.filter(run_class.id == single_obj_id).first()]
else:
endpoint_id_rows = db.engine.execute(text(text_query).execution_options(autocommit=True)).fetchall()
endpoint_ids = [row[0] for row in endpoint_id_rows]
objects = db.session.query(run_class).filter(run_class.id.in_(endpoint_ids)).all()
db.session.commit()
if not objects:
logger.info("none ready, sleeping for 5 seconds, then going again")
sleep(5)
continue
print("run_method", run_method)
self.update_fn(run_class, run_method, objects, index=index)
# finished is set in update_fn
index += 1
if single_obj_id:
return
else:
self.print_update(new_loop_start_time, chunk, limit, start_time, index)
def run_right_thing(self, parsed_args, job_type):
if parsed_args.dynos != None: # to tell the difference from setting to 0
self.scale_dyno(parsed_args.dynos, job_type)
if parsed_args.status:
self.print_status(job_type)
if parsed_args.monitor:
self.monitor_till_done(job_type)
self.scale_dyno(0, job_type)
if parsed_args.logs:
self.print_logs(job_type)
if parsed_args.kick:
self.kick(job_type)
if parsed_args.add:
self.add_pmh_record(**vars(parsed_args))
elif parsed_args.maint:
self.maint(**vars(parsed_args))
else:
if parsed_args.id or parsed_args.run:
self.run(parsed_args, job_type)
if parsed_args.tilltoday:
while True:
self.run(parsed_args, job_type)
# python queue_repo.py --hybrid --filename=data/dois_juan_accuracy.csv --dynos=40 --soup
if __name__ == "__main__":
if os.getenv('OADOI_LOG_SQL'):
logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)
db.session.configure()
parser = argparse.ArgumentParser(description="Run stuff.")
parser.add_argument('--id', nargs="?", type=str, help="id of the one thing you want to update (case sensitive)")
parser.add_argument('--recordid', nargs="?", type=str, help="id of the record you want to update (case sensitive)")
parser.add_argument('--method', nargs="?", type=str, default="harvest", help="method name to run")
parser.add_argument('--run', default=False, action='store_true', help="to run the queue")
parser.add_argument('--status', default=False, action='store_true', help="to logger.info(the status")
parser.add_argument('--dynos', default=None, type=int, help="scale to this many dynos")
parser.add_argument('--logs', default=False, action='store_true', help="logger.info(out logs")
parser.add_argument('--monitor', default=False, action='store_true', help="monitor till done, then turn off dynos")
parser.add_argument('--kick', default=False, action='store_true', help="put started but unfinished dois back to unstarted so they are retried")
parser.add_argument('--limit', "-l", nargs="?", type=int, help="how many jobs to do")
parser.add_argument('--chunk', "-ch", nargs="?", default=1, type=int, help="how many to take off db at once")
parser.add_argument('--maint', default=False, action='store_true', help="to run the queue")
parser.add_argument('--tilltoday', default=False, action='store_true', help="run all the years till today")
parser.add_argument('--add', default=False, action='store_true', help="how many to take off db at once")
parsed_args = parser.parse_args()
job_type = "normal" #should be an object attribute
my_queue = DbQueueRepo()
my_queue.run_right_thing(parsed_args, job_type)
print("finished")