- Notifications
You must be signed in to change notification settings - Fork 38
/
Copy pathqueue_pmh_rt_record.py
127 lines (100 loc) · 4.79 KB
/
queue_pmh_rt_record.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
importargparse
fromtimeimportsleep
fromtimeimporttime
fromsqlalchemyimporttext
fromappimportdb
fromappimportlogger
frompmh_recordimportPmhRecord
fromrecordthresher.recordimportRecordthresherParentRecord
fromrecordthresher.record_makerimportPmhRecordMaker
fromutilimportelapsed
fromutilimportsafe_commit
importendpoint# magic
classQueuePmhRTRecord:
defworker_run(self, **kwargs):
single_id=kwargs.get("pmh_id", None)
chunk_size=kwargs.get("chunk", 100)
limit=kwargs.get("limit", None)
iflimitisNone:
limit=float("inf")
ifsingle_id:
pmh=PmhRecord.query.filter(PmhRecord.id==single_id).scalar()
ifrecord:=PmhRecordMaker.make_record(pmh):
db.session.merge(record)
secondary_records=PmhRecordMaker.make_secondary_repository_responses(record)
forsecondary_recordinsecondary_records:
db.session.merge(secondary_record)
db.session.merge(
RecordthresherParentRecord(
record_id=secondary_record.id,
parent_record_id=record.id
)
)
safe_commit(db) orlogger.info("COMMIT fail")
else:
num_updated=0
whilenum_updated<limit:
start_time=time()
pmh_ids=self.fetch_queue_chunk(chunk_size)
ifnotpmh_ids:
logger.info('no queued pmh records ready to update. waiting...')
sleep(5)
continue
secondary_records= {}
parent_relationships= {}
forpmh_idinpmh_ids:
ifpmh:=PmhRecord.query.filter(PmhRecord.id==pmh_id).scalar():
ifrecord:=PmhRecordMaker.make_record(pmh):
db.session.merge(record)
record_secondary_records=PmhRecordMaker.make_secondary_repository_responses(record)
forrecord_secondary_recordinrecord_secondary_records:
secondary_records[record_secondary_record.id] =record_secondary_record
parent_relationships[record_secondary_record.id] =RecordthresherParentRecord(
record_id=record_secondary_record.id,
parent_record_id=record.id
)
forsecondary_recordinsecondary_records.values():
print(secondary_record)
db.session.merge(secondary_record)
forparent_relationshipinparent_relationships.values():
db.session.merge(parent_relationship)
db.session.execute(
text('''
delete from recordthresher.pmh_record_queue q
where q.pmh_id = any(:pmh_ids)
''').bindparams(pmh_ids=pmh_ids)
)
commit_start_time=time()
safe_commit(db) orlogger.info("commit fail")
logger.info(f'commit took {elapsed(commit_start_time, 2)} seconds')
num_updated+=chunk_size
logger.info(f'processed {len(pmh_ids)} PMH records in {elapsed(start_time, 2)} seconds')
deffetch_queue_chunk(self, chunk_size):
logger.info("looking for new jobs")
queue_query=text("""
with queue_chunk as (
select pmh_id
from recordthresher.pmh_record_queue
where started is null
order by rand
limit :chunk
for update skip locked
)
update recordthresher.pmh_record_queue q
set started = now()
from queue_chunk
where q.pmh_id = queue_chunk.pmh_id
returning q.pmh_id;
""").bindparams(chunk=chunk_size)
job_time=time()
pmh_id_list= [row[0] forrowindb.engine.execute(queue_query.execution_options(autocommit=True)).all()]
logger.info(f'got {len(pmh_id_list)} ids, took {elapsed(job_time)} seconds')
returnpmh_id_list
if__name__=="__main__":
parser=argparse.ArgumentParser()
parser.add_argument('--pmh_id', nargs="?", type=str, help="pmh_id you want to update the RT record for")
parser.add_argument('--limit', "-l", nargs="?", type=int, help="how many records to update")
parser.add_argument('--chunk', "-ch", nargs="?", default=100, type=int, help="how many records to update at once")
parsed_args=parser.parse_args()
my_queue=QueuePmhRTRecord()
my_queue.worker_run(**vars(parsed_args))