-
Notifications
You must be signed in to change notification settings - Fork 0
/
flask_uploads.py
547 lines (437 loc) · 19.4 KB
/
flask_uploads.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
# -*- coding: utf-8 -*-
"""
flaskext.uploads
================
This module provides upload support for Flask. The basic pattern is to set up
an `UploadSet` object and upload your files to it.
:copyright: 2010 Matthew "LeafStorm" Frazier
:license: MIT/X11, see LICENSE for details
"""
import sys
PY3 = sys.version_info[0] == 3
if PY3:
string_types = str,
else:
string_types = basestring,
import os.path
import posixpath
import itertools
from werkzeug import secure_filename, FileStorage
from flask import current_app, Blueprint, send_from_directory, abort, url_for
# Extension presets
#: This just contains plain text files (.txt).
TEXT = ('txt',)
#: This contains various office document formats (.rtf, .odf, .ods, .gnumeric,
#: .abw, .doc, .docx, .xls, and .xlsx). Note that the macro-enabled versions
#: of Microsoft Office 2007 files are not included.
DOCUMENTS = tuple('rtf odf ods gnumeric abw doc docx xls xlsx'.split())
#: This contains basic image types that are viewable from most browsers (.jpg,
#: .jpe, .jpeg, .png, .gif, .svg, and .bmp).
IMAGES = tuple('jpg jpe jpeg png gif svg bmp'.split())
#: This contains audio file types (.wav, .mp3, .aac, .ogg, .oga, and .flac).
AUDIO = tuple('wav mp3 aac ogg oga flac'.split())
#: This is for structured data files (.csv, .ini, .json, .plist, .xml, .yaml,
#: and .yml).
DATA = tuple('csv ini json plist xml yaml yml'.split())
#: This contains various types of scripts (.js, .php, .pl, .py .rb, and .sh).
#: If your Web server has PHP installed and set to auto-run, you might want to
#: add ``php`` to the DENY setting.
SCRIPTS = tuple('js php pl py rb sh'.split())
#: This contains archive and compression formats (.gz, .bz2, .zip, .tar,
#: .tgz, .txz, and .7z).
ARCHIVES = tuple('gz bz2 zip tar tgz txz 7z'.split())
#: This contains pdf, graphics and postsript formats (.pdf, .psd, .ai, .cdr,
#: .ps).
GRAPHICS = tuple('pdf psd ai cdr ps'.split())
#: This contains shared libraries and executable files (.so, .exe and .dll).
#: Most of the time, you will not want to allow this - it's better suited for
#: use with `AllExcept`.
EXECUTABLES = tuple('so exe dll'.split())
#: The default allowed extensions - `TEXT`, `DOCUMENTS`, `DATA`, and `IMAGES`.
DEFAULTS = TEXT + DOCUMENTS + IMAGES + DATA
class UploadNotAllowed(Exception):
"""
This exception is raised if the upload was not allowed. You should catch
it in your view code and display an appropriate message to the user.
"""
def tuple_from(*iters):
return tuple(itertools.chain(*iters))
def extension(filename):
ext = os.path.splitext(filename)[1]
if ext.startswith('.'):
# os.path.splitext retains . separator
ext = ext[1:]
return ext
def lowercase_ext(filename):
"""
This is a helper used by UploadSet.save to provide lowercase extensions for
all processed files, to compare with configured extensions in the same
case.
.. versionchanged:: 0.1.4
Filenames without extensions are no longer lowercased, only the
extension is returned in lowercase, if an extension exists.
:param filename: The filename to ensure has a lowercase extension.
"""
if '.' in filename:
main, ext = os.path.splitext(filename)
return main + ext.lower()
# For consistency with os.path.splitext,
# do not treat a filename without an extension as an extension.
# That is, do not return filename.lower().
return filename
def addslash(url):
if url.endswith('/'):
return url
return url + '/'
def patch_request_class(app, size=64 * 1024 * 1024):
"""
By default, Flask will accept uploads to an arbitrary size. While Werkzeug
switches uploads from memory to a temporary file when they hit 500 KiB,
it's still possible for someone to overload your disk space with a
gigantic file.
This patches the app's request class's
`~werkzeug.BaseRequest.max_content_length` attribute so that any upload
larger than the given size is rejected with an HTTP error.
.. note::
In Flask 0.6, you can do this by setting the `MAX_CONTENT_LENGTH`
setting, without patching the request class. To emulate this behavior,
you can pass `None` as the size (you must pass it explicitly). That is
the best way to call this function, as it won't break the Flask 0.6
functionality if it exists.
.. versionchanged:: 0.1.1
:param app: The app to patch the request class of.
:param size: The maximum size to accept, in bytes. The default is 64 MiB.
If it is `None`, the app's `MAX_CONTENT_LENGTH` configuration
setting will be used to patch.
"""
if size is None:
if isinstance(app.request_class.__dict__['max_content_length'],
property):
return
size = app.config.get('MAX_CONTENT_LENGTH')
reqclass = app.request_class
patched = type(reqclass.__name__, (reqclass,),
{'max_content_length': size})
app.request_class = patched
def config_for_set(uset, app, defaults=None):
"""
This is a helper function for `configure_uploads` that extracts the
configuration for a single set.
:param uset: The upload set.
:param app: The app to load the configuration from.
:param defaults: A dict with keys `url` and `dest` from the
`UPLOADS_DEFAULT_DEST` and `DEFAULT_UPLOADS_URL`
settings.
"""
config = app.config
prefix = 'UPLOADED_%s_' % uset.name.upper()
using_defaults = False
if defaults is None:
defaults = dict(dest=None, url=None)
allow_extns = tuple(config.get(prefix + 'ALLOW', ()))
deny_extns = tuple(config.get(prefix + 'DENY', ()))
destination = config.get(prefix + 'DEST')
base_url = config.get(prefix + 'URL')
if destination is None:
# the upload set's destination wasn't given
if uset.default_dest:
# use the "default_dest" callable
destination = uset.default_dest(app)
if destination is None: # still
# use the default dest from the config
if defaults['dest'] is not None:
using_defaults = True
destination = os.path.join(defaults['dest'], uset.name)
else:
raise RuntimeError("no destination for set %s" % uset.name)
if base_url is None and using_defaults and defaults['url']:
base_url = addslash(defaults['url']) + uset.name + '/'
return UploadConfiguration(destination, base_url, allow_extns, deny_extns)
def configure_uploads(app, upload_sets):
"""
Call this after the app has been configured. It will go through all the
upload sets, get their configuration, and store the configuration on the
app. It will also register the uploads module if it hasn't been set. This
can be called multiple times with different upload sets.
.. versionchanged:: 0.1.3
The uploads module/blueprint will only be registered if it is needed
to serve the upload sets.
:param app: The `~flask.Flask` instance to get the configuration from.
:param upload_sets: The `UploadSet` instances to configure.
"""
if isinstance(upload_sets, UploadSet):
upload_sets = (upload_sets,)
if not hasattr(app, 'upload_set_config'):
app.upload_set_config = {}
set_config = app.upload_set_config
defaults = dict(dest=app.config.get('UPLOADS_DEFAULT_DEST'),
url=app.config.get('UPLOADS_DEFAULT_URL'))
for uset in upload_sets:
config = config_for_set(uset, app, defaults)
set_config[uset.name] = config
should_serve = any(s.base_url is None for s in set_config.values())
if '_uploads' not in app.blueprints and should_serve:
app.register_blueprint(uploads_bp)
class UploadsManager(object):
""" Simple manager for fast sets registration
Usage:
# flaskapp.py
from flask import Flask
from flask.ext.uploads import UploadsManager, UploadSet, IMAGES
app = Flask(__name__)
sets = (
UploadSet('photos', IMAGES),
UploadSet('pdf', ('pdf',))
)
uploads = UploadsManager()
uploads.init_app(app, upload_sets=sets)
uploads.register(UploadSet('photos'))
"""
def __init__(self, app=None):
if app is not None:
self.init_app(app)
def init_app(self, app, upload_sets=None):
self.app = app
if upload_sets is not None:
configure_uploads(self.app, upload_sets)
if not hasattr(app, 'extensions'):
app.extensions = {}
app.extensions['uploads'] = self
def register(self, upload_sets):
configure_uploads(self.app, upload_sets)
class All(object):
"""
This type can be used to allow all extensions. There is a predefined
instance named `ALL`.
"""
def __contains__(self, item):
return True
#: This "contains" all items. You can use it to allow all extensions to be
#: uploaded.
ALL = All()
class AllExcept(object):
"""
This can be used to allow all file types except certain ones. For example,
to ban .exe and .iso files, pass::
AllExcept(('exe', 'iso'))
to the `UploadSet` constructor as `extensions`. You can use any container,
for example::
AllExcept(SCRIPTS + EXECUTABLES)
"""
def __init__(self, items):
self.items = items
def __contains__(self, item):
return item not in self.items
class UploadConfiguration(object):
"""
This holds the configuration for a single `UploadSet`. The constructor's
arguments are also the attributes.
:param destination: The directory to save files to.
:param base_url: The URL (ending with a /) that files can be downloaded
from. If this is `None`, Flask-Uploads will serve the
files itself.
:param allow: A list of extensions to allow, even if they're not in the
`UploadSet` extensions list.
:param deny: A list of extensions to deny, even if they are in the
`UploadSet` extensions list.
"""
def __init__(self, destination, base_url=None, allow=(), deny=()):
self.destination = destination
self.base_url = base_url
self.allow = allow
self.deny = deny
@property
def tuple(self):
return (self.destination, self.base_url, self.allow, self.deny)
def __eq__(self, other):
return self.tuple == other.tuple
class UploadSet(object):
"""
This represents a single set of uploaded files. Each upload set is
independent of the others. This can be reused across multiple application
instances, as all configuration is stored on the application object itself
and found with `flask.current_app`.
:param name: The name of this upload set. It defaults to ``files``, but
you can pick any alphanumeric name you want. (For simplicity,
it's best to use a plural noun.)
:param extensions: The extensions to allow uploading in this set. The
easiest way to do this is to add together the extension
presets (for example, ``TEXT + DOCUMENTS + IMAGES``).
It can be overridden by the configuration with the
`UPLOADED_X_ALLOW` and `UPLOADED_X_DENY` configuration
parameters. The default is `DEFAULTS`.
:param default_dest: If given, this should be a callable. If you call it
with the app, it should return the default upload
destination path for that app.
"""
def __init__(self, name='files', extensions=DEFAULTS, default_dest=None):
if not name.isalnum():
raise ValueError("Name must be alphanumeric (no underscores)")
self.name = name
self.extensions = extensions
self._config = None
self.default_dest = default_dest
@property
def config(self):
"""
This gets the current configuration. By default, it looks up the
current application and gets the configuration from there. But if you
don't want to go to the full effort of setting an application, or it's
otherwise outside of a request context, set the `_config` attribute to
an `UploadConfiguration` instance, then set it back to `None` when
you're done.
"""
if self._config is not None:
return self._config
try:
return current_app.upload_set_config[self.name]
except AttributeError:
raise RuntimeError("cannot access configuration outside request")
def url(self, filename):
"""
This function gets the URL a file uploaded to this set would be
accessed at. It doesn't check whether said file exists.
:param filename: The filename to return the URL for.
"""
base = self.config.base_url
if base is None:
return url_for('_uploads.uploaded_file', setname=self.name,
filename=filename, _external=True)
else:
return base + filename
def path(self, filename, folder=None):
"""
This returns the absolute path of a file uploaded to this set. It
doesn't actually check whether said file exists.
:param filename: The filename to return the path for.
:param folder: The subfolder within the upload set previously used
to save to.
"""
if folder is not None:
target_folder = os.path.join(self.config.destination, folder)
else:
target_folder = self.config.destination
return os.path.join(target_folder, filename)
def file_allowed(self, storage, basename):
"""
This tells whether a file is allowed. It should return `True` if the
given `werkzeug.FileStorage` object can be saved with the given
basename, and `False` if it can't. The default implementation just
checks the extension, so you can override this if you want.
:param storage: The `werkzeug.FileStorage` to check.
:param basename: The basename it will be saved under.
"""
return self.extension_allowed(extension(basename))
def extension_allowed(self, ext):
"""
This determines whether a specific extension is allowed. It is called
by `file_allowed`, so if you override that but still want to check
extensions, call back into this.
:param ext: The extension to check, without the dot.
"""
return ((ext in self.config.allow) or
(ext in self.extensions and ext not in self.config.deny))
def get_basename(self, filename):
"""
"""
return lowercase_ext(secure_filename(filename))
def save(self, storage, folder=None, name=None):
"""
This saves a `werkzeug.FileStorage` into this upload set. If the
upload is not allowed, an `UploadNotAllowed` error will be raised.
Otherwise, the file will be saved and its name (including the folder)
will be returned.
:param storage: The uploaded file to save.
:param folder: The subfolder within the upload set to save to.
:param name: The name to save the file as. If it ends with a dot, the
file's extension will be appended to the end. (If you
are using `name`, you can include the folder in the
`name` instead of explicitly using `folder`, i.e.
``uset.save(file, name="someguy/photo_123.")``
"""
if not isinstance(storage, FileStorage):
raise TypeError("storage must be a werkzeug.FileStorage")
if folder is None and name is not None and "/" in name:
folder, name = os.path.split(name)
basename = self.get_basename(storage.filename)
if name:
if name.endswith('.'):
basename = name + extension(basename)
else:
basename = name
if not self.file_allowed(storage, basename):
raise UploadNotAllowed()
if folder:
target_folder = os.path.join(self.config.destination, folder)
else:
target_folder = self.config.destination
if not os.path.exists(target_folder):
os.makedirs(target_folder)
if os.path.exists(os.path.join(target_folder, basename)):
basename = self.resolve_conflict(target_folder, basename)
target = os.path.join(target_folder, basename)
storage.save(target)
if folder:
return posixpath.join(folder, basename)
else:
return basename
def resolve_conflict(self, target_folder, basename):
"""
If a file with the selected name already exists in the target folder,
this method is called to resolve the conflict. It should return a new
basename for the file.
The default implementation splits the name and extension and adds a
suffix to the name consisting of an underscore and a number, and tries
that until it finds one that doesn't exist.
:param target_folder: The absolute path to the target.
:param basename: The file's original basename.
"""
name, ext = os.path.splitext(basename)
count = 0
while True:
count = count + 1
newname = '%s_%d%s' % (name, count, ext)
if not os.path.exists(os.path.join(target_folder, newname)):
return newname
uploads_bp = Blueprint('_uploads', __name__, url_prefix='/_uploads')
@uploads_bp.route('/<setname>/<path:filename>')
def uploaded_file(setname, filename):
config = current_app.upload_set_config.get(setname)
if config is None:
abort(404)
return send_from_directory(config.destination, filename)
class TestingFileStorage(FileStorage):
"""
This is a helper for testing upload behavior in your application. You
can manually create it, and its save method is overloaded to set `saved`
to the name of the file it was saved to. All of these parameters are
optional, so only bother setting the ones relevant to your application.
:param stream: A stream. The default is an empty stream.
:param filename: The filename uploaded from the client. The default is the
stream's name.
:param name: The name of the form field it was loaded from. The default is
`None`.
:param content_type: The content type it was uploaded as. The default is
``application/octet-stream``.
:param content_length: How long it is. The default is -1.
:param headers: Multipart headers as a `werkzeug.Headers`. The default is
`None`.
"""
def __init__(self, stream=None, filename=None, name=None,
content_type='application/octet-stream', content_length=-1,
headers=None):
FileStorage.__init__(self, stream, filename, name=name,
content_type=content_type, content_length=content_length,
headers=None)
self.saved = None
def save(self, dst, buffer_size=16384):
"""
This marks the file as saved by setting the `saved` attribute to the
name of the file it was saved to.
:param dst: The file to save to.
:param buffer_size: Ignored.
"""
if isinstance(dst, string_types):
self.saved = dst
else:
self.saved = dst.name