forked from miguelgrinberg/flask-celery-example
-
Notifications
You must be signed in to change notification settings - Fork 0
/
app.py
129 lines (108 loc) · 4.07 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
import os
import random
import time
from flask import Flask, request, render_template, session, flash, redirect, \
url_for, jsonify
from flask_mail import Mail, Message
from celery import Celery
app = Flask(__name__)
app.config['SECRET_KEY'] = 'top-secret!'
# Flask-Mail configuration
app.config['MAIL_SERVER'] = 'smtp.googlemail.com'
app.config['MAIL_PORT'] = 587
app.config['MAIL_USE_TLS'] = True
app.config['MAIL_USERNAME'] = os.environ.get('MAIL_USERNAME')
app.config['MAIL_PASSWORD'] = os.environ.get('MAIL_PASSWORD')
app.config['MAIL_DEFAULT_SENDER'] = 'flask@example.com'
# Celery configuration
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'
# Initialize extensions
mail = Mail(app)
# Initialize Celery
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
@celery.task
def send_async_email(email_data):
"""Background task to send an email with Flask-Mail."""
msg = Message(email_data['subject'],
sender=app.config['MAIL_DEFAULT_SENDER'],
recipients=[email_data['to']])
msg.body = email_data['body']
with app.app_context():
mail.send(msg)
@celery.task(bind=True)
def long_task(self):
"""Background task that runs a long function with progress reports."""
verb = ['Starting up', 'Booting', 'Repairing', 'Loading', 'Checking']
adjective = ['master', 'radiant', 'silent', 'harmonic', 'fast']
noun = ['solar array', 'particle reshaper', 'cosmic ray', 'orbiter', 'bit']
message = ''
total = random.randint(10, 50)
for i in range(total):
if not message or random.random() < 0.25:
message = '{0} {1} {2}...'.format(random.choice(verb),
random.choice(adjective),
random.choice(noun))
self.update_state(state='PROGRESS',
meta={'current': i, 'total': total,
'status': message})
time.sleep(1)
return {'current': 100, 'total': 100, 'status': 'Task completed!',
'result': 42}
@app.route('/', methods=['GET', 'POST'])
def index():
if request.method == 'GET':
return render_template('index.html', email=session.get('email', ''))
email = request.form['email']
session['email'] = email
# send the email
email_data = {
'subject': 'Hello from Flask',
'to': email,
'body': 'This is a test email sent from a background Celery task.'
}
if request.form['submit'] == 'Send':
# send right away
send_async_email.delay(email_data)
flash('Sending email to {0}'.format(email))
else:
# send in one minute
send_async_email.apply_async(args=[email_data], countdown=60)
flash('An email will be sent to {0} in one minute'.format(email))
return redirect(url_for('index'))
@app.route('/longtask', methods=['POST'])
def longtask():
task = long_task.apply_async()
return jsonify({}), 202, {'Location': url_for('taskstatus',
task_id=task.id)}
@app.route('/status/<task_id>')
def taskstatus(task_id):
task = long_task.AsyncResult(task_id)
if task.state == 'PENDING':
response = {
'state': task.state,
'current': 0,
'total': 1,
'status': 'Pending...'
}
elif task.state != 'FAILURE':
response = {
'state': task.state,
'current': task.info.get('current', 0),
'total': task.info.get('total', 1),
'status': task.info.get('status', '')
}
if 'result' in task.info:
response['result'] = task.info['result']
else:
# something went wrong in the background job
response = {
'state': task.state,
'current': 1,
'total': 1,
'status': str(task.info), # this is the exception raised
}
return jsonify(response)
if __name__ == '__main__':
app.run(debug=True)