- Notifications
You must be signed in to change notification settings - Fork 38
/
Copy pathqueue_pub_refresh_aux.py
143 lines (111 loc) · 4.96 KB
/
queue_pub_refresh_aux.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
importargparse
importlogging
importos
fromtimeimportsleep
fromtimeimporttime
fromsqlalchemyimportorm, text
fromappimportdb
fromappimportlogger
frompubimportPub
fromqueue_mainimportDbQueue
fromutilimportelapsed
fromutilimportsafe_commit
importendpoint# magic
importpmh_record# magic
classDbQueuePubRefreshAux(DbQueue):
deftable_name(self, job_type):
return'pub_refresh_queue_aux'
defprocess_name(self, job_type):
return'run_aux_pub_refresh'
defworker_run(self, **kwargs):
chunk_size=kwargs.get("chunk", 100)
limit=kwargs.get("limit", None)
queue_no=kwargs.get("queue", 0)
iflimitisNone:
limit=float("inf")
index=0
num_updated=0
start_time=time()
whilenum_updated<limit:
new_loop_start_time=time()
objects=self.fetch_queue_chunk(chunk_size, queue_no)
ifnotobjects:
sleep(5)
continue
foroinobjects:
o.refresh()
finish_batch_text='''
update {queue_table}
set finished = now(), started = null, priority = null
where id = any(:ids)'''.format(queue_table=self.table_name(None))
finish_batch_command=text(finish_batch_text).bindparams(
ids=[o.idforoinobjects]
)
db.session.execute(finish_batch_command)
commit_start_time=time()
safe_commit(db) orlogger.info("COMMIT fail")
logger.info("commit took {} seconds".format(elapsed(commit_start_time, 2)))
index+=1
num_updated+=chunk_size
self.print_update(new_loop_start_time, len(objects), limit, start_time, index)
deffetch_queue_chunk(self, chunk_size, queue_no):
logger.info("looking for new jobs")
text_query_pattern='''
with refresh_queue as (
select id
from {queue_table}
where
queue_no = {queue_no}
and started is null
order by
priority desc nulls last,
finished asc nulls first,
rand
limit {chunk_size}
for update skip locked
)
update {queue_table} queue_rows_to_update
set started = now()
from refresh_queue
where refresh_queue.id = queue_rows_to_update.id
returning refresh_queue.id;
'''
text_query=text_query_pattern.format(
chunk_size=chunk_size,
queue_table=self.table_name(None),
queue_no=queue_no
)
job_time=time()
row_list=db.engine.execute(text(text_query).execution_options(autocommit=True)).fetchall()
object_ids= [row[0] forrowinrow_list]
logger.info("got {} ids, took {} seconds".format(len(object_ids), elapsed(job_time)))
job_time=time()
q=db.session.query(Pub).options(
orm.undefer('*')
).filter(Pub.id.in_(object_ids))
objects=q.all()
logger.info("got pub objects in {} seconds".format(elapsed(job_time)))
returnobjects
if__name__=="__main__":
ifos.getenv('OADOI_LOG_SQL'):
logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)
db.session.configure()
parser=argparse.ArgumentParser(description="Run stuff.")
parser.add_argument('--run', default=False, action='store_true', help="to run the queue")
parser.add_argument('--limit', "-l", nargs="?", type=int, help="how many jobs to do")
parser.add_argument('--chunk', "-ch", nargs="?", default=1, type=int, help="how many to take off db at once")
parser.add_argument('--queue', "-q", nargs="?", default=0, type=int, help="which queue to run")
parser.add_argument('--dynos', default=None, type=int, help="don't use this option")
parser.add_argument('--reset', default=False, action='store_true', help="don't use this option")
parser.add_argument('--status', default=False, action='store_true', help="don't use this option")
parser.add_argument('--logs', default=False, action='store_true', help="don't use this option")
parser.add_argument('--monitor', default=False, action='store_true', help="don't use this option")
parser.add_argument('--kick', default=False, action='store_true', help="don't use this option")
parser.add_argument('--id', nargs="?", type=str, help="don't use this option")
parser.add_argument('--doi', nargs="?", type=str, help="don't use this option")
parser.add_argument('--method', nargs="?", type=str, default="update", help="don't use this option")
parsed_args=parser.parse_args()
job_type="normal"# should be an object attribute
my_queue=DbQueuePubRefreshAux()
my_queue.parsed_vars=vars(parsed_args)
my_queue.run_right_thing(parsed_args, job_type)