-
Notifications
You must be signed in to change notification settings - Fork 0
/
planning_center.py
74 lines (62 loc) · 2.56 KB
/
planning_center.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
import json
import requests
from requests.auth import HTTPBasicAuth
class PlanningCenterWrapper(object):
def __init__(self, config):
self.config = config
self.URL_ENDPOINT = "https://api.planningcenteronline.com/services/v2/"
self.SERVICE_TYPE = "287665"
self.USER_ID = "3878409"
def auth(self):
key = self.config.get("PLANNING_CENTER_ID")
secret = self.config.get("PLANNING_CENTER_SECRET")
if not key or not secret:
raise Exception("Could not find planning center credentials.")
return HTTPBasicAuth(key, secret)
def _make_request(self, url, params=None):
return requests.get(
self.URL_ENDPOINT + url, params=params, auth=self.auth()
).json()
def get_songs(self, offset=0):
params = {"per_page": 100, "offset": offset, "order": "-last_scheduled_at"}
return self._make_request("songs", params)
def get_person_plans(self):
person_plans = self._make_request("people/{user_id}/plan_people".format(user_id=self.USER_ID))
plan_ids = [item['relationships']['plan']['data']['id'] for item in person_plans['data']]
return plan_ids
def get_plan(self, plan_id):
url = "service_types/{service_type}/plans/{plan_id}".format(
service_type=self.SERVICE_TYPE, plan_id=str(plan_id)
)
data = self._make_request(url)
return data['data']
def get_service_songs(self, plan_id):
url = "service_types/{service_type}/plans/{plan_id}/items".format(
service_type=self.SERVICE_TYPE, plan_id=str(plan_id)
)
data = self._make_request(url)
songs = [
item["attributes"]["title"]
for item in data["data"]
if item["attributes"]["item_type"] == "song"
]
return songs
def output_song_data(self):
song_response = self.get_songs()
song_data = song_response["data"]
while song_response["meta"].get("next"):
song_response = self.get_songs(
offset=song_response["meta"]["next"]["offset"]
)
song_data.extend(song_response["data"])
output_data = {"songs": []}
for song in song_data:
if song["attributes"]:
output_data["songs"].append(
{
"title": song["attributes"]["title"],
"author": song["attributes"]["author"],
}
)
with open("songs.json", "w") as outfile:
json.dump(output_data, outfile)