- Notifications
You must be signed in to change notification settings - Fork 75
/
Copy pathuploader.py
executable file
·259 lines (211 loc) · 9.86 KB
/
uploader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
#!/usr/bin/python3
"""uploader.py: upload WARC files toward the Internet Archive
rsync mode (RSYNC_URL set): upload everything to an rsync endpoint
such as fos.
s3 mode (S3_URL set): upload everything directly to the Internet Archive
"""
from __future__ importprint_function
importos
importtime
importsubprocess
importsys
importre
importdatetime
importjson
importhashlib
importrequests
classParams:
"""Encapsulation of global parameters from environment and derivation
"""
def__init__(self):
iflen(sys.argv) >1:
self.directory=sys.argv[1]
elifos.environ.get('FINISHED_WARCS_DIR') !=None:
self.directory=os.environ['FINISHED_WARCS_DIR']
else:
raiseRuntimeError('No directory specified (set FINISHED_WARCS_DIR '
'or specify directory on command line)')
self.url=os.environ.get('RSYNC_URL')
ifself.url!=None:
if'/localhost'inself.urlor'/127.'inself.url:
raiseRuntimeError('Won\'t let you upload to localhost because I '
'remove files after uploading them, and you '
'might be uploading to the same directory')
ifnotstr(self.url).endswith('/'):
print(str(self.url))
raiseRuntimeError('Won\'t let you run without a trailing slash on '
'rsync directory')
self.mode='rsync'
ifself.urlisNone:
self.url=os.environ.get('S3_URL')
ifself.urlisnotNone:
self.mode='s3'
ifself.urlisNone:
raiseRuntimeError('Neither RSYNC_URL nor S3_URL are set - nowhere to '
'upload to. Hint: use'
'S3_URL=https://s3.us.archive.org')
ifself.mode=='s3': #parse IA-S3-specific options
self.ia_collection=os.environ.get('IA_COLLECTION')
ifself.ia_collectionisNone:
raiseRuntimeError('Must specify IA_COLLECTION if using IA S3 '
'(hint: ArchiveBot)')
self.ia_item_title=os.environ.get('IA_ITEM_TITLE')
ifself.ia_item_titleisNone:
raiseRuntimeError('Must specify IA_ITEM_TITLE if using IA S3 '
'(hint: "Archiveteam: Archivebot $pipeline_name '
'GO Pack")')
self.ia_auth=os.environ.get('IA_AUTH')
ifself.ia_authisNone:
raiseRuntimeError('Must specify IA_AUTH if using IA S3 '
'(hint: access_key:secret_key)')
self.ia_item_prefix=os.environ.get('IA_ITEM_PREFIX')
ifself.ia_authisNone:
raiseRuntimeError('Must specify IA_ITEM_PREFIX if using IA S3 '
'(hint: archiveteam_archivebot_go_$pipeline_name'
'_}')
self.ia_access=os.environ.get('IA_ACCESS')
ifself.ia_accessisNone:
raiseRuntimeError('Must specify IA_ACCESS if using IA S3 '
'(hint: your access key)')
self.wait=os.environ.get('WAIT', 5)
deftry_mkdir(path):
try:
os.mkdir(path)
exceptOSError:
pass
defshould_upload(basename):
assertnot'/'inbasename, basename
returnnotbasename.startswith('.') and \
(basename.endswith('.warc.gz') orbasename.endswith('.json') orbasename.endswith('.txt') orbasename.endswith('.log.gz'))
defparse_name(basename):
k=re.split(r'(.*)-\w+-(\d{8})-\d{6}-[^.]*\.warc.gz', basename) # extract domain name and date
iflen(k) !=4:
return {'dns': 'UNKNOWN', 'date': datetime.datetime.now().strftime("%Y%m%d")}
return {'dns': k[1], 'date': k[2]}
defia_upload_allowed(s3_url, accesskey, bucket=''):
try:
quota_url='{}/?check_limit=1&accesskey={}&bucket={}'.format(s3_url, accesskey, bucket)
resp=requests.get(url=quota_url)
data=json.loads(resp.text)
except (requests.RequestException, json.JSONDecodeError) aserr:
print('Could not get throttling status - assuming IA is down')
print('Exception: {}'.format(err))
returnFalse
if'over_limit'indataanddata['over_limit'] !=0:
print('IA S3 API notifies us we are being throttled (over_limit)')
returnFalse
if'detail'indataand'rationing_engaged'indata['detail'] \
anddata['detail']['rationing_engaged'] !=0:
quota_our_remaining=data['detail']['accesskey_ration'] - \
data['detail']['accesskey_tasks_queued']
quota_global_remaining=data['detail']['total_global_limit'] - \
data['detail']['total_tasks_queued']
quota_bucket_remaining=data['detail']['bucket_ration'] - \
data['detail']['bucket_tasks_queued']
ifquota_our_remaining<10orquota_global_remaining<10orquota_bucket_remaining<5:
print('IA S3 API notifies us rationing is engaged with little room for new work!')
print('Our outstanding jobs: {}'.format(data['detail']['accesskey_tasks_queued']))
print('Our remaining quota: {}'.format(quota_our_remaining))
print('Global remaining quota: {}'.format(quota_global_remaining))
print('Limit reason given: {}'.format(data['detail']['limit_reason']))
returnFalse
else:
print('IA S3 API notifies us rationing is engaged but we have '
'room for another job.')
returnTrue
deffile_md5(fname):
md5=hashlib.md5()
withopen(fname, "rb") asinputfile:
forblockiniter(lambda: inputfile.read(16384), b""):
md5.update(block)
returnmd5.hexdigest()
defia_s3_ship(fname, basename, item, params: Params):
bucket_unescaped_name=params.ia_item_prefix+'_'+ \
item['dns'][-64:] +'_'+item['date']
ia_upload_bucket=re.sub(r'[^0-9a-zA-Z-]+', '_', bucket_unescaped_name)
ifnotia_upload_allowed(params.url, params.ia_access, ia_upload_bucket):
# IA is throttling
# At some point, an ambitious person could try a file belonging
# in a different bucket if ia_upload_allowed denied this one
return1
size_hint=str(os.stat(fname).st_size)
compat_filename=re.sub(r'[^0-9a-zA-Z-.]+', '_', basename)[-64:]
ifcompat_filename==''orcompat_filename[0] =='_':
# IA filenames cannot be empty or start with underscore
compat_filename='z'+compat_filename[1:]
target=params.url+'/'+ia_upload_bucket+'/'+ \
compat_filename
md5sum=file_md5(fname)
returnsubprocess.call([
"curl", "-v", "--location", "--fail",
"--speed-limit", "1", "--speed-time", "900",
"--header", "Content-MD5: "+md5sum,
"--header", "x-archive-queue-derive:1",
"--header", "x-amz-auto-make-bucket:1",
"--header", "x-archive-meta-sponsor:Internet Archive",
"--header", "x-archive-meta-collection:"+params.ia_collection,
"--header", "x-archive-meta-mediatype:web",
"--header", "x-archive-meta-subject:archivebot",
"--header", "x-archive-meta-title:"+params.ia_item_title+
' '+item['dns'] +' '+item['date'],
"--header", "x-archive-meta-date:"+
item['date'][0:4] +'-'+
item['date'][4:6] +'-'+
item['date'][6:8],
"--header", "x-archive-size-hint:"+size_hint,
"--header", "authorization: LOW "+params.ia_auth,
"-o", "/dev/stdout",
"--upload-file", fname,
target])
defmain():
params=Params()
print("CHECK THE UPLOAD TARGET: %s as %s endpoint"% (params.url, params.mode))
print()
print("Upload target must reliably store data")
print("Each local file will removed after upload")
print("Hit CTRL-C immediately if upload target is incorrect")
print()
uploading_dir=os.path.join(params.directory, "_uploading")
try_mkdir(uploading_dir)
need_wait=True
whileTrue:
ifneed_wait:
print("Waiting {} seconds".format(params.wait))
time.sleep(params.wait)
need_wait=True
fnames=sorted(list(fforfinos.listdir(params.directory) ifshould_upload(f)))
iflen(fnames):
basename=fnames[0]
fname_d=os.path.join(params.directory, basename)
fname_u=os.path.join(uploading_dir, basename)
ifos.path.exists(fname_u):
print("%r already exists - another uploader probably grabbed it"% (fname_u,))
continue
try:
os.rename(fname_d, fname_u)
exceptOSError:
print("Could not rename %r - another uploader probably grabbed it"% (fname_d,))
else:
print("Uploading %r"% (fname_u,))
item=parse_name(basename)
ifparams.mode=='rsync':
exit_code=subprocess.call([
"rsync", "-tv", "--timeout=300", "--contimeout=300",
"--progress", fname_u, params.url])
elifparams.mode=='s3':
exit_code=ia_s3_ship(fname_u, basename, item, params)
else: #no upload mechanism available
exit_code=1
ifexit_code==0:
print("Removing %r"% (fname_u,))
os.remove(fname_u)
need_wait=False
else:
# Move it out of the _uploading directory so that this
# uploader (or another one) can try again.
os.rename(fname_u, fname_d)
else:
print("Nothing to upload")
if__name__=='__main__':
main()
# vim:ts=4:sw=4:et:tw=78