- Notifications
You must be signed in to change notification settings - Fork 38
/
Copy pathinsert_pdf_recordthresher.py
64 lines (53 loc) · 2.07 KB
/
insert_pdf_recordthresher.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
importjson
importtime
fromdatetimeimportdatetime
fromthreadingimportThread
fromsqlalchemyimporttext
fromrecordthresher.pdf_recordimportPDFRecord
fromappimportoa_db_engine, db
CHUNK_SIZE=1000
INSERTED=0
stmnt=f'''
with queue as (
SELECT q.doi, parsed.authors, parsed.abstract, parsed.references FROM public.tmp_pdf_recordthresher_queue q JOIN public.pdf_parsed parsed ON q.doi = parsed.doi
WHERE in_progress is FALSE
LIMIT {CHUNK_SIZE} FOR UPDATE SKIP LOCKED
)
update public.tmp_pdf_recordthresher_queue update_rows SET in_progress = TRUE
FROM queue WHERE update_rows.doi = queue.doi
RETURNING queue.*;
'''
definsert_pdf_records_loop():
globalINSERTED
withoa_db_engine.connect() asconn:
whileTrue:
rows=conn.execute(stmnt).fetchall()
conn.connection.commit()
ifnotrows:
break
pdf_records= []
forrowinrows:
doi, authors, abstract, references=row
pdf_records.append(PDFRecord(doi=doi,
authors=json.dumps(authors),
abstract=abstract,
citations=json.dumps(references)))
db.session.bulk_save_objects(pdf_records)
conn.execute(text(
'DELETE FROM public.tmp_pdf_recordthresher_queue WHERE doi IN :dois'),
dois=tuple(
[record.doiforrecordinpdf_records]))
db.session.commit()
conn.connection.commit()
INSERTED+=CHUNK_SIZE
defprint_stats():
start=datetime.now()
whileTrue:
now=datetime.now()
hrs_elapsed= (now-start).total_seconds() / (60*60)
rate=round(INSERTED/hrs_elapsed, 2) ifhrs_elapsed>0else0
print(f'Inserted - {INSERTED} | Rate - {rate}/hr')
time.sleep(5)
if__name__=='__main__':
Thread(target=print_stats, daemon=True).start()
insert_pdf_records_loop()