-
Notifications
You must be signed in to change notification settings - Fork 1
/
app.py
140 lines (112 loc) · 4.7 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
from flask import Flask, jsonify, g, request, make_response, send_from_directory
import json
import datetime
import requests
import src.database as database
import src.reports as reports
app = Flask(__name__)
# DB connection, stuff
def build_response (json_content=None):
response = make_response()
# copied this from http://stackoverflow.com/questions/49547/making-sure-a-web-page-is-not-cached-across-all-browsers
response.headers["Cache-Control"] = "no-cache, no-store, must-revalidate" # HTTP 1.1.
response.headers["Pragma"] = "no-cache" # HTTP 1.0.
response.headers["Expires"] = "0" # Proxies. r.status_code = 200
if json_content:
response.status_code = requests.codes.ok
response.headers['Content-type'] = 'application/json'
response.data = json.dumps(json_content)
else:
response.status_code = requests.codes.no_content
return response
@app.before_request
def before_request():
g.db = database.connect('trans-health.db')
@app.teardown_request
def teardown_request(exception):
db = getattr(g, 'db', None)
if db is not None:
db.commit()
db.close()
@app.route('/')
def index():
return send_from_directory('', 'index.html', cache_timeout=0)
@app.route('/app/<path:path>')
def js(path):
return send_from_directory('app', path, cache_timeout=0)
# Add some API routes
@app.route('/api/v1/experience', methods=['POST'])
def post_experience():
data = request.get_json()
# need to look up plan and company
plan = database.plan_by_company_name(g.db,
data['company'],
data['plan'],
data['state'])
if plan is None:
plan = database.create_plan(g.db, data['company'], data['plan'], data['state'])
def make_experience (service_data):
return database.Experience(
date=datetime.datetime.strptime(service_data['date'], '%Y-%m-%d'),
plan=plan,
documented_gender='U',
service=service_data['name'],
success=service_data['success'],
age=service_data['age'])
# there may be multiple services in one request
experiences = [make_experience(service_data) for service_data in data['services']]
g.db.add_all(experiences)
g.db.commit()
return build_response()
@app.route('/api/v1/coverage', methods=['POST'])
def post_coverage():
data = request.get_json()
# look up by plan and company
plan = database.plan_by_company_name(g.db,
data['company'],
data['plan'],
data['state'])
if plan is None:
plan = database.create_plan(g.db, data['company'], data['plan'], data['state'])
def make_coverage (service_type_data):
return database.CoverageStatement(
date=datetime.datetime.strptime(data['date'], '%Y-%m-%d'),
plan=plan,
service_type=service_type_data['name'],
covered=service_type_data['covered'])
# there may be multiple service_types in one request
coverage_reports = [make_coverage(service_type_data)
for service_type_data in data['service_types']]
g.db.add_all(coverage_reports)
g.db.commit()
return build_response()
@app.route('/api/v1/search')
def search_plan():
state = request.args.get('state')
exchange_code = request.args.get('exchange_code')
plan_name = request.args.get('plan_name')
def plan_filter (plan):
if exchange_code and plan.color_code != exchange_code:
return False
if plan_name and not re.search(plan_name, plan.name):
return False
return True
plans = g.db.query(database.Plan).filter(database.Plan.state == state).all()
matching_plans = filter(plan_filter, plans)
return build_response(json_content=[reports.plan_summary(p) for p in matching_plans])
@app.route('/api/v1/companies')
def company_list():
companies = g.db.query(database.Company).all()
return build_response(json_content={'companies': [c.name for c in companies]})
@app.route('/api/v1/plans')
def plans_list():
the_plans = g.db.query(database.Plan).all()
return build_response(json_content=[{'state': p.state,
'company': p.company.name,
'plan': p.name}
for p in the_plans])
@app.route('/api/v1/services')
def service_list():
return build_response(json_content=reports.service_types)
if __name__ == '__main__':
app.run(debug=True)