- Notifications
You must be signed in to change notification settings - Fork 38
/
Copy pathqueue_repo.py
203 lines (166 loc) · 7.99 KB
/
queue_repo.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
importargparse
importlogging
importos
fromrandomimportshuffle
fromtimeimportsleep
fromtimeimporttime
fromsqlalchemyimporttext
importendpoint# magic
importpmh_record# magic
importpub# magic
fromappimportdb
fromappimportlogger
fromendpointimportEndpoint
fromqueue_mainimportDbQueue
fromutilimportsafe_commit
classDbQueueRepo(DbQueue):
deftable_name(self, job_type):
table_name="endpoint"
returntable_name
defprocess_name(self, job_type):
process_name="run_repo"# formation name is from Procfile
returnprocess_name
defmaint(self, **kwargs):
ifparsed_args.id:
endpoints=Endpoint.query.filter(Endpoint.id==parsed_args.id).all()
else:
# endpoints = Endpoint.query.filter(Endpoint.harvest_identify_response==None, Endpoint.error==None).all()
endpoints=Endpoint.query.filter(Endpoint.harvest_identify_response==None).all()
shuffle(endpoints)
formy_endpointinendpoints:
my_endpoint.run_diagnostics()
db.session.merge(my_endpoint)
safe_commit(db)
logger.info("merged and committed my_endpoint: {}".format(my_endpoint))
defadd_pmh_record(self, **kwargs):
endpoint_id=kwargs.get("id", None)
record_id=kwargs.get("recordid")
my_repo=Endpoint.query.get(endpoint_id)
print("my_repo", my_repo)
my_pmh_record=my_repo.get_pmh_record(record_id)
my_pmh_record.mint_pages()
# for my_page in my_pmh_record.pages:
# print "my_page", my_page
# my_page.scrape()
my_pmh_record.delete_old_record()
db.session.merge(my_pmh_record)
db.session.flush()
my_pmh_record.enqueue_representative_page()
# print my_pmh_record.pages
safe_commit(db)
defworker_run(self, **kwargs):
single_obj_id=kwargs.get("id", None)
chunk=kwargs.get("chunk")
queue_table="endpoint"
run_method=kwargs.get("method")
run_class=Endpoint
limit=1# just do one repo at a time
ifnotsingle_obj_id:
text_query_pattern="""WITH picked_from_queue AS (
SELECT id
FROM {queue_table}
WHERE (
most_recent_year_harvested is null
or (
most_recent_year_harvested + interval '1 day'
< now() at time zone 'utc'
- interval '1 day' -- wait until most_recent_year_harvested is over 1 day ago
- rand * interval '18 hours' -- plus an offset so we don't run everything at midnight
)
)
and (
last_harvest_started is null
or last_harvest_started < now() at time zone 'utc' - interval '8 hours'
)
and (
last_harvest_finished is null
or last_harvest_finished < now() at time zone 'utc' - interval '2 minutes'
)
and (
retry_at <= now()
or retry_at is null
)
and ready_to_run
ORDER BY random() -- not rand, because want it to be different every time
LIMIT {chunk}
FOR UPDATE SKIP LOCKED
)
UPDATE {queue_table} queue_rows_to_update
SET last_harvest_started = now() at time zone 'utc', last_harvest_finished=null
FROM picked_from_queue
WHERE picked_from_queue.id = queue_rows_to_update.id
RETURNING picked_from_queue.*;"""
text_query=text_query_pattern.format(
chunk=chunk,
queue_table=queue_table
)
index=0
start_time=time()
whileTrue:
new_loop_start_time=time()
ifsingle_obj_id:
objects= [run_class.query.filter(run_class.id==single_obj_id).first()]
else:
endpoint_id_rows=db.engine.execute(text(text_query).execution_options(autocommit=True)).fetchall()
endpoint_ids= [row[0] forrowinendpoint_id_rows]
objects=db.session.query(run_class).filter(run_class.id.in_(endpoint_ids)).all()
db.session.commit()
ifnotobjects:
logger.info("none ready, sleeping for 5 seconds, then going again")
sleep(5)
continue
print("run_method", run_method)
self.update_fn(run_class, run_method, objects, index=index)
# finished is set in update_fn
index+=1
ifsingle_obj_id:
return
else:
self.print_update(new_loop_start_time, chunk, limit, start_time, index)
defrun_right_thing(self, parsed_args, job_type):
ifparsed_args.dynos!=None: # to tell the difference from setting to 0
self.scale_dyno(parsed_args.dynos, job_type)
ifparsed_args.status:
self.print_status(job_type)
ifparsed_args.monitor:
self.monitor_till_done(job_type)
self.scale_dyno(0, job_type)
ifparsed_args.logs:
self.print_logs(job_type)
ifparsed_args.kick:
self.kick(job_type)
ifparsed_args.add:
self.add_pmh_record(**vars(parsed_args))
elifparsed_args.maint:
self.maint(**vars(parsed_args))
else:
ifparsed_args.idorparsed_args.run:
self.run(parsed_args, job_type)
ifparsed_args.tilltoday:
whileTrue:
self.run(parsed_args, job_type)
# python queue_repo.py --hybrid --filename=data/dois_juan_accuracy.csv --dynos=40 --soup
if__name__=="__main__":
ifos.getenv('OADOI_LOG_SQL'):
logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)
db.session.configure()
parser=argparse.ArgumentParser(description="Run stuff.")
parser.add_argument('--id', nargs="?", type=str, help="id of the one thing you want to update (case sensitive)")
parser.add_argument('--recordid', nargs="?", type=str, help="id of the record you want to update (case sensitive)")
parser.add_argument('--method', nargs="?", type=str, default="harvest", help="method name to run")
parser.add_argument('--run', default=False, action='store_true', help="to run the queue")
parser.add_argument('--status', default=False, action='store_true', help="to logger.info(the status")
parser.add_argument('--dynos', default=None, type=int, help="scale to this many dynos")
parser.add_argument('--logs', default=False, action='store_true', help="logger.info(out logs")
parser.add_argument('--monitor', default=False, action='store_true', help="monitor till done, then turn off dynos")
parser.add_argument('--kick', default=False, action='store_true', help="put started but unfinished dois back to unstarted so they are retried")
parser.add_argument('--limit', "-l", nargs="?", type=int, help="how many jobs to do")
parser.add_argument('--chunk', "-ch", nargs="?", default=1, type=int, help="how many to take off db at once")
parser.add_argument('--maint', default=False, action='store_true', help="to run the queue")
parser.add_argument('--tilltoday', default=False, action='store_true', help="run all the years till today")
parser.add_argument('--add', default=False, action='store_true', help="how many to take off db at once")
parsed_args=parser.parse_args()
job_type="normal"#should be an object attribute
my_queue=DbQueueRepo()
my_queue.run_right_thing(parsed_args, job_type)
print("finished")