-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
106 lines (86 loc) · 3.12 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
import time
import yaml
import json
import requests
import socket
import pandas as pd
# CircuitPython Imports
import board
import busio
import digitalio
import adafruit_tlc5947
LED_BRIGHTNESS = 1000
# load configuration YAML file
# Loads MBTA API key & Gotify configuration
with open("mbta-map-config.yml", "r") as ymlfile:
config = yaml.safe_load(ymlfile)
# Setup nessesary information for API requests
# This is specifically set to use vehicles API
mbta_api_key = config["mbta"]["api_key"]
mbta_url = "https://api-v3.mbta.com/vehicles"
mbta_headers = {"X-API-Key": mbta_api_key}
MBTA_TIMEOUT = 5
# Setup Gotify notification system API information
#
gotify_enabled = config["gotify"]["enabled"]
if gotify_enabled:
gotify_url = config["gotify"]["url"]
gotify_key = config["gotify"]["token"]
# Declare the LED controller boards
SCK = board.SCK
MOSI = board.MOSI
LATCH = digitalio.DigitalInOut(board.D6)
spi = busio.SPI(clock=SCK, MOSI=MOSI)
tlc5947 = adafruit_tlc5947.TLC5947(spi, LATCH, auto_write=False, num_drivers=7)
# Declare lines to monitor:
with open("lines.yml", "r", encoding="utf-8") as linefile:
lines = list(yaml.safe_load(linefile).keys())
for index, line in enumerate(lines):
lines[index] = str(line)
# Load an outside
with open("stop_data.csv", 'r', encoding="utf-8") as stopfile:
stop_to_led = pd.read_csv(stopfile)
while True:
# Get API response from MBTA
mbta_api_response = requests.get(
mbta_url, headers=mbta_headers, timeout=MBTA_TIMEOUT
)#
# Convert to JSON for easy indexing
response_json = mbta_api_response.json()
# Loop through each vehicle returned from the API
for item in response_json["data"]:
# Load Route and Stop_id - only data from response that is
try:
route = item["relationships"]["route"]["data"]["id"]
stop_id = item["relationships"]["stop"]["data"]["id"]
except TypeError:
continue
# print(f"Route: {route}")
# print(f"Stop: {stop_id}")
# if route is not one of the ones being tracked that we have stop
# data for, then skip to the next vehicle (this mainly skips bus lines
# that are not being tracked)
if route not in lines:
continue
# convert the stop_id into a led_id, and then turn that LED on
stop_name = ""
try:
row = stop_to_led.loc[stop_to_led['stop_id'] == stop_id]
row = row.reset_index(drop=True)
# print(f"Row: {row}")
led_id = row.loc[0, "led_id"]
# print(f"Stop LED ID: {led_id}")
stop_name = row.loc[0, "stop_name"]
# print(f"Enabling {stop_name}")
if led_id != 0 and led_id != -1:
tlc5947[led_id] = LED_BRIGHTNESS
except KeyError:
print(f"Stop Error: {stop_name}, {row}")
# Write the current set of LEDs to be on
tlc5947.write()
# Wait 10 seconds before the next update
time.sleep(10)
# Turn all LEDs back off
for led_id in stop_to_led["led_id"].tolist():
if led_id != 0 and led_id != -1:
tlc5947[led_id] = 0