- Notifications
You must be signed in to change notification settings - Fork 38
/
Copy pathqueue_doi_rt_record.py
151 lines (120 loc) · 5.35 KB
/
queue_doi_rt_record.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
importargparse
importdatetime
fromthreadingimportThread
fromtimeimportsleep
fromtimeimporttime
fromsqlalchemyimporttext
fromappimportdb
fromappimportlogger
frompubimportPub
fromrecordthresher.recordimportRecordthresherParentRecord
fromrecordthresher.record_makerimportCrossrefRecordMaker, PmhRecordMaker
fromrecordthresher.record_maker.parseland_record_makerimportParselandRecordMaker
fromutilimportelapsed
fromutilimportsafe_commit
importendpoint# magic
PROCESSED=0
defprint_stats():
start=datetime.datetime.now()
whileTrue:
now=datetime.datetime.now()
hrs_running= (now-start).total_seconds() / (60*60)
rate_hr=round(PROCESSED/hrs_running, 2)
logger.info(f'[*] Processing rate: {rate_hr}/hr')
sleep(5)
classQueueDoiRtRecord:
defworker_run(self, **kwargs):
globalPROCESSED
single_id=kwargs.get("doi", None)
chunk_size=kwargs.get("chunk", 100)
limit=kwargs.get("limit", None)
iflimitisNone:
limit=float("inf")
ifsingle_id:
pub=Pub.query.get(single_id)
ifrecord:=CrossrefRecordMaker.make_record(pub):
db.session.merge(record)
PROCESSED+=1
safe_commit(db) orlogger.info("COMMIT fail")
else:
num_updated=0
whilenum_updated<limit:
start_time=time()
dois=self.fetch_queue_chunk(chunk_size)
ifnotdois:
logger.info(
'no queued DOI records ready to update. waiting...')
sleep(5)
continue
seen_record_ids=set()
fordoiindois:
logger.info(f'making RecordThresher record for DOI {doi}')
ifpub:=Pub.query.get(doi):
ifrecord:=CrossrefRecordMaker.make_record(pub):
ifrecord.idnotinseen_record_ids:
db.session.merge(record)
seen_record_ids.add(record.id)
ifpl_record:=ParselandRecordMaker.make_record(pub):
ifpl_record.idnotinseen_record_ids:
db.session.merge(pl_record)
seen_record_ids.add(pl_record.id)
secondary_records=PmhRecordMaker.make_secondary_repository_responses(record)
forsecondary_recordinsecondary_records:
ifsecondary_record.idnotinseen_record_ids:
db.session.merge(secondary_record)
seen_record_ids.add(secondary_record.id)
db.session.merge(
RecordthresherParentRecord(
record_id=secondary_record.id,
parent_record_id=record.id
)
)
PROCESSED+=1
db.session.execute(
text('''
delete from recordthresher.doi_record_queue q
where q.doi = any(:dois)
''').bindparams(dois=dois)
)
commit_start_time=time()
safe_commit(db) orlogger.info("commit fail")
logger.info(
f'commit took {elapsed(commit_start_time, 2)} seconds')
num_updated+=chunk_size
logger.info(
f'processed {len(dois)} DOI records in {elapsed(start_time, 2)} seconds')
deffetch_queue_chunk(self, chunk_size):
logger.info("looking for new jobs")
queue_query=text("""
with queue_chunk as (
select doi
from recordthresher.doi_record_queue
where started is null
order by updated desc nulls last, rand
limit :chunk
for update skip locked
)
update recordthresher.doi_record_queue q
set started = now()
from queue_chunk
where q.doi = queue_chunk.doi
returning q.doi;
""").bindparams(chunk=chunk_size)
job_time=time()
doi_list= [row[0] forrowindb.engine.execute(
queue_query.execution_options(autocommit=True)).all()]
logger.info(
f'got {len(doi_list)} DOIs, took {elapsed(job_time)} seconds')
returndoi_list
if__name__=="__main__":
parser=argparse.ArgumentParser()
parser.add_argument('--doi', nargs="?", type=str,
help="doi you want to update the RT record for")
parser.add_argument('--limit', "-l", nargs="?", type=int,
help="how many records to update")
parser.add_argument('--chunk', "-ch", nargs="?", default=100, type=int,
help="how many records to update at once")
parsed_args=parser.parse_args()
my_queue=QueueDoiRtRecord()
Thread(target=print_stats, daemon=True).start()
my_queue.worker_run(**vars(parsed_args))