-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmyblt.py
338 lines (248 loc) · 8.92 KB
/
myblt.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
import os
import hashlib
import binascii
import string
import random
import uuid
import sys
import math
from flask import Flask, request, send_from_directory, jsonify, redirect
from database import db_session, init_db, init_engine
from wand.image import Image
from models import Upload, User, Invite
app = Flask(__name__)
def hash_exists(hash):
return Upload.query.filter(Upload.hash == hash).count() != 0
def short_url_exists(url):
if not url:
return True
return Upload.query.filter(Upload.short_url == url).count() != 0
def get_random_short_url():
"""Generates a random string of 7 ascii letters and digits
Can provide in the order or 10^12 unique strings
"""
pool = string.ascii_letters + string.digits
return ''.join(random.choice(pool) for _ in range(7))
def get_new_short_url():
"""Generate random urls until a new one is generated"""
url = None
while short_url_exists(url):
url = get_random_short_url()
return url
def generate_invite_code(user):
pool = string.ascii_letters + string.digits
code = ''.join(random.choice(pool) for _ in range(32))
invite_code = Invite(code, user.id)
db_session.add(invite_code)
db_session.commit()
return code
def verify_invite_code(invite_code):
code = Invite.query.filter(Invite.code == invite_code).first()
return code and not code.redeemed
def get_user_invite_codes(user):
user_id = user_id
return Invite.query.filter(Invite.creator_id == user.id).all()
def new_user(username, password):
salt = app.config['SALT']
hashpass = get_hash(password, salt)
user = User(username, hashpass, salt)
db_session.add(user)
db_session.commit()
return user
def get_extension(filename):
last_dot_pos = filename.rfind('.')
if last_dot_pos < 1:
return None
ext = filename[last_dot_pos + 1:]
double_dot_pos = filename.rfind('.', 0, last_dot_pos - 1)
if double_dot_pos == -1:
return ext
else:
double_ext = filename[double_dot_pos + 1:last_dot_pos]
if double_ext in app.config['DOUBLE_EXTS']:
return double_ext + '.' + ext
else:
return ext
def extension_blocked(file):
extension = get_extension(file.filename)
blacklist = app.config['BLACKLIST_EXTENSIONS']
return extension in blacklist
def new_upload(file, file_hash_bin):
file_hash_str = str(binascii.hexlify(file_hash_bin).decode('utf8'))
abs_file = os.path.join(app.config['UPLOAD_FOLDER'], file_hash_str)
if not os.path.exists(app.config['UPLOAD_FOLDER']):
os.makedirs(app.config['UPLOAD_FOLDER'])
file.stream.seek(0)
file.save(abs_file)
# thumbnail generation
if 'image' in file.mimetype:
with Image(filename=abs_file) as img:
ratio = img.width / img.height
img.format = 'jpeg'
img.transform(resize='125x125')
img.save(filename='public/assets/thumbnails/' +
file_hash_str + '.thumb.jpg')
# Generate a short id and append extension
short_id = get_new_short_url()
extension = get_extension(file.filename)
if extension:
full_id = short_id + '.' + extension
else:
full_id = short_id
# Add upload in DB
upload = Upload(file_hash_bin, full_id, file.mimetype)
db_session.add(upload)
db_session.commit()
return upload
def get_hash(password, salt):
m = hashlib.sha512()
m.update(salt.encode('utf8'))
m.update(password.encode('utf8'))
return m.digest()
def get_admin_status():
token = request.cookies.get('token')
user = User.query.filter(User.token == token).first()
if not token or not user.is_admin:
return jsonify({'error': 'Unauthorized'}), 403
def get_auth_error(semi=False):
# If app is not configured for private usage, ignore check
if semi and not app.config['IS_PRIVATE']:
return
token = request.cookies.get('token')
if not token or not User.query.filter(User.token == token).first():
return jsonify({'error': 'Unauthorized'}), 403
@app.route('/private', methods=['GET'])
def is_app_private():
return jsonify({'private': app.config['IS_PRIVATE']})
@app.route('/upload', methods=['POST'])
def upload_file():
file = request.files['file']
if not file:
return jsonify({'error': 'Bad request'}), 400
if extension_blocked(file):
return jsonify({'error': 'File type not allowed'}), 400
err = get_auth_error(True)
if err:
return err
# Get sha1 of uploaded file
m = hashlib.sha1()
m.update(file.read())
file_hash_bin = m.digest()
if hash_exists(file_hash_bin):
# Get old (identical) file's short_url from the hash
upload = Upload.query.filter(Upload.hash == file_hash_bin).first()
else:
upload = new_upload(file, file_hash_bin)
if not upload:
return jsonify({'error': 'An unknown error occurred'}), 500
return jsonify(short_url=app.config['API_URL'] + upload.short_url)
@app.route('/<short_url>', methods=['GET'])
def get_upload(short_url):
upload = Upload.query.filter(Upload.short_url == short_url).first()
if not upload:
return '404 not found'
hash_str = str(binascii.hexlify(upload.hash).decode('utf8'))
mimetype = upload.mime_type
if upload.blocked:
return redirect("/#/blocked", code=301)
else:
return send_from_directory(
app.config['UPLOAD_FOLDER'],
hash_str, mimetype=mimetype, as_attachment=False)
@app.route('/uploads', methods=['GET'])
def get_uploads():
err = get_admin_status()
if err:
return err
uploads = Upload.query.all()
objects = []
for upload in uploads:
objects.append({
"short_url": upload.short_url,
"blocked": upload.blocked,
"hash": str(binascii.hexlify(upload.hash).decode('utf8')),
"mime_type": upload.mime_type,
})
return jsonify(uploads=objects)
@app.route('/block/<short_url>', methods=['GET'])
def block_upload(short_url):
err = get_auth_error()
if err:
return err
upload = Upload.query.filter(Upload.short_url == short_url).first()
upload.blocked = not upload.blocked
db_session.commit()
return jsonify({'success': True}), 200
@app.route('/login', methods=['POST'])
def login():
req = request.get_json()
if 'username' not in req or 'password' not in req:
return jsonify({'error': 'Bad request'}), 400
username = req['username']
password = req['password']
user = User.query.filter(User.username == username).first()
if user and get_hash(password, user.salt) == user.password:
token = uuid.uuid4().hex
user.token = token
db_session.query(User).filter_by(id=user.id) \
.update({"token": user.token})
db_session.commit()
resp = jsonify({'success': True})
resp.set_cookie('token', token)
return resp
return jsonify({'error': 'Bad login'}), 401
@app.route('/CreateInviteCode', methods=['GET'])
def create_invite_code():
if not app.config['IS_PRIVATE']:
return jsonify({'error': 'Site must be in private mode.'}), 501
err = get_auth_error()
if err:
return err
token = request.cookies.get('token')
user = User.query.filter(User.token == token).first()
code = generate_invite_code(user)
return jsonify(invite_code=code)
@app.route('/InviteCodes', methods=['GET'])
def get_invite_codes():
if not app.config['IS_PRIVATE']:
return jsonify({'error': 'Site must be in private mode.'}), 501
err = get_auth_error()
if err:
return err
token = request.cookies.get('token')
user = User.query.filter(User.token == token).first()
codes = Invite.query.filter(Invite.creator_id == user.id).all()
objects = []
for code in codes:
objects.append({
"code": code.code,
"redeemed": code.redeemed,
})
return jsonify(codes=objects)
@app.route('/register', methods=['POST'])
def register():
req = request.get_json()
if 'username' not in req or 'password' not in req \
or 'invite_code' not in req:
return jsonify({'error': 'Bad request'}), 400
username = req['username']
password = req['password']
invite_code = req['invite_code']
if not verify_invite_code(invite_code):
return jsonify({'error': 'Invalid invite code'}), 400
new_user(username, password)
Invite.query.filter(Invite.code == invite_code).first().redeemed = True
db_session.commit()
return jsonify({'success': True})
@app.teardown_appcontext
def shutdown_session(exception=None):
db_session.remove()
if __name__ == "__main__":
app.config.from_pyfile('config/default_config.py')
if len(sys.argv) == 2:
conf = sys.argv[1]
print('Loading additional config %s...', conf)
app.config.from_pyfile('config/' + conf + '_config.py')
init_engine(app.config['DATABASE_URI'])
init_db()
app.run()