-
Notifications
You must be signed in to change notification settings - Fork 0
/
utils.py
108 lines (89 loc) · 2.96 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
import requests
import json
import re
from netifaces import ifaddresses
from sh import grep, netstat
from urlparse import urlparse
from datetime import timedelta
from settings import settings
# This will only work on the Raspberry Pi,
# so let's wrap it in a try/except so that
# Travis can run.
try:
from sh import omxplayer
except:
pass
def validate_url(string):
"""Simple URL verification.
>>> validate_url("hello")
False
>>> validate_url("ftp://example.com")
False
>>> validate_url("http://")
False
>>> validate_url("http://wireload.net/logo.png")
True
>>> validate_url("https://wireload.net/logo.png")
True
"""
checker = urlparse(string)
return bool(checker.scheme in ('rtsp', 'rtmp', 'http', 'https') and checker.netloc)
def get_node_ip():
"""Returns the node's IP, for the interface
that is being used as the default gateway.
This shuld work on both MacOS X and Linux."""
try:
default_interface = grep(netstat('-nr'), '-e', '^default', '-e' '^0.0.0.0').split()[-1]
my_ip = ifaddresses(default_interface)[2][0]['addr']
return my_ip
except:
pass
return None
def get_video_duration(file):
"""
Returns the duration of a video file in timedelta.
"""
time = None
try:
run_omxplayer = omxplayer(file, info=True, _err_to_out=True)
for line in run_omxplayer.split('\n'):
if 'Duration' in line:
match = re.search(r'[0-9]+:[0-9]+:[0-9]+\.[0-9]+', line)
if match:
time_input = match.group()
time_split = time_input.split(':')
hours = int(time_split[0])
minutes = int(time_split[1])
seconds = float(time_split[2])
time = timedelta(hours=hours, minutes=minutes, seconds=seconds)
break
except:
pass
return time
def handler(obj):
if hasattr(obj, 'isoformat'):
return obj.isoformat()
else:
raise TypeError('Object of type %s with value of %s is not JSON serializable' % (type(obj), repr(obj)))
def json_dump(obj):
return json.dumps(obj, default=handler)
def url_fails(url):
"""
Accept 200 and 405 as 'OK' statuses for URLs.
Some hosting providers (like Google App Engine) throws a 405 at `requests`.
Can not check RTSP or RTMP, so we just believe it is there (if not OMX Player will terminate, and the next asset is shown -> no ugly error messages)
"""
if url.startswith('rtsp://'):
return False
if url.startswith('rtmp://'):
return False
try:
if validate_url(url):
obj = requests.head(url, allow_redirects=True, timeout=10, verify=settings['verify_ssl'])
assert obj.status_code in (200, 405)
else:
return True
except (requests.ConnectionError, requests.exceptions.Timeout, AssertionError):
return True
else:
return False