- Notifications
You must be signed in to change notification settings - Fork 38
/
Copy pathenqueue_doi_record_queue.py
56 lines (44 loc) · 2.26 KB
/
enqueue_doi_record_queue.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
importos
importtraceback
fromdatetimeimporttimedelta
importpandasaspd
fromsqlalchemyimportcreate_engine, text, bindparam
OPENALEX_DB_URL=os.getenv('OPENALEX_DATABASE_URL').replace('postgres://',
'postgresql://')
OADOI_DB_URL=os.getenv('DATABASE_URL').replace('postgres://', 'postgresql://')
OPENALEX_DB_ENGINE=create_engine(OPENALEX_DB_URL).execution_options(
isolation_level='AUTOCOMMIT')
OADOI_DB_ENGINE=create_engine(OADOI_DB_URL)
SELECT_CMD="WITH chunk as (SELECT * FROM recordthresher.doi_record_add_everything_queue WHERE enqueued_add_everything = false) UPDATE recordthresher.doi_record_add_everything_queue tbl SET enqueued_add_everything = true FROM chunk WHERE tbl.doi = chunk.doi RETURNING chunk.doi, chunk.real_updated;"
defdate_transform_func(real_updated):
date=real_updated+timedelta(days=365*30)
returndate.isoformat().replace('T', ' ')
defmain():
withOADOI_DB_ENGINE.connect() asconn:
ROWS=conn.execute(text(SELECT_CMD)).fetchall()
conn.connection.commit()
withOPENALEX_DB_ENGINE.connect() asconn, conn.connection.cursor() asoa_cur:
df=pd.DataFrame(data=ROWS)
dois=tuple(df['doi'].tolist())
work_ids_stmnt=text(
'SELECT work_id, doi FROM ins.recordthresher_record WHERE doi IN :dois').bindparams(
bindparam('dois', expanding=True))
work_ids=conn.execute(work_ids_stmnt, {'dois': dois}).fetchall()
df2=pd.DataFrame(data=work_ids)
df=df.merge(df2, on=['doi'])
df['real_updated'] =df['real_updated'].apply(date_transform_func)
df.drop(columns=['doi'], inplace=True)
df=df[['work_id', 'real_updated']]
rows=list(df.itertuples(index=False, name=None))
rows_tup_formatted=', '.join(
[oa_cur.mogrify('(%s, %s)', row).decode() forrowinrows])
insert_stmnt=text(
f'INSERT INTO queue.run_once_work_add_everything (work_id, work_updated) VALUES {rows_tup_formatted} ON CONFLICT DO NOTHING;')
conn.execute(insert_stmnt)
conn.connection.commit()
if__name__=='__main__':
try:
main()
exceptExceptionase:
print(traceback.format_exc())
print(e)