diff --git a/pyskyqremote/channel.py b/pyskyqremote/channel.py index 8aaff9f..2f32658 100644 --- a/pyskyqremote/channel.py +++ b/pyskyqremote/channel.py @@ -1,17 +1,85 @@ """Structure of a standard EPG prorgramme.""" +from dataclasses import dataclass, field +from datetime import datetime +import json -class Programme: - """SkyQ Programme Class.""" - - def __init__( - self, programmeuuid, starttime, endtime, title, season, episode, imageUrl - ): - """Programme structure for SkyQ.""" - self.progammeuuid = programmeuuid - self.starttime = starttime - self.endtime = endtime - self.title = title - self.season = season - self.episode = episode - self.imageUrl = imageUrl +from .programme import Programme + + +@dataclass +class Channel: + """SkyQ Channel Class.""" + + sid: str = field( + init=True, repr=True, compare=False, + ) + channelno: str = field( + init=True, repr=True, compare=False, + ) + channelname: str = field( + init=True, repr=True, compare=False, + ) + channelImageUrl: str = field( + init=True, repr=True, compare=False, + ) + programmes: set = field( + init=True, repr=True, compare=False, + ) + + def as_json(self) -> str: + """Return a JSON string respenting this Channel.""" + return json.dumps(self, cls=_ChannelJSONEncoder) + + +def ChannelDecoder(obj): + """Decode channel object from json.""" + channel = json.loads(obj, object_hook=_json_decoder_hook) + if "__type__" in channel and channel["__type__"] == "__channel__": + return Channel(programmes=channel["programmes"], **channel["attributes"]) + return channel + + +def _json_decoder_hook(obj): + """Decode JSON into appropriate types used in this library.""" + if "starttime" in obj: + obj["starttime"] = datetime.strptime(obj["starttime"], "%Y-%m-%dT%H:%M:%SZ") + if "endtime" in obj: + obj["endtime"] = datetime.strptime(obj["endtime"], "%Y-%m-%dT%H:%M:%SZ") + if "__type__" in obj and obj["__type__"] == "__programme__": + obj = Programme(**obj["attributes"]) + return obj + + +class _ChannelJSONEncoder(json.JSONEncoder): + def default(self, obj): + if isinstance(obj, Channel): + type_ = "__channel__" + programmes = obj.programmes + attributes = {} + for k, v in vars(obj).items(): + if k not in {"programmes"}: + attributes.update({k: v}) + return { + "__type__": type_, + "attributes": attributes, + "programmes": programmes, + } + + if isinstance(obj, set): + return list(obj) + + if isinstance(obj, Programme): + attributes = {} + for k, v in vars(obj).items(): + if type(v) is datetime: + v = v.strftime("%Y-%m-%dT%H:%M:%SZ") + attributes.update({k: v}) + + result = { + "__type__": "__programme__", + "attributes": attributes, + } + return result + + json.JSONEncoder.default(self, obj) # pragma: no cover diff --git a/pyskyqremote/country/remote_gb.py b/pyskyqremote/country/remote_gb.py index 6b2307d..8e59599 100644 --- a/pyskyqremote/country/remote_gb.py +++ b/pyskyqremote/country/remote_gb.py @@ -3,9 +3,10 @@ import logging import requests -from pyskyqremote.channel import Programme - from ..const import RESPONSE_OK +from ..channel import Channel +from ..programme import Programme + from .const_gb import CHANNEL_IMAGE_URL, PVR_IMAGE_URL, SCHEDULE_URL, LIVE_IMAGE_URL _LOGGER = logging.getLogger(__name__) @@ -18,7 +19,7 @@ def __init__(self, host): """Initialise UK remote.""" self.channel_image_url = CHANNEL_IMAGE_URL self.pvr_image_url = PVR_IMAGE_URL - self.epgData = None + self.epgData = set() self._lastEpgUrl = None self._host = host @@ -38,13 +39,14 @@ def getEpgData(self, sid, channelno, epgDate): if epgData is None: return None + if len(epgData[0]["events"]) == 0: _LOGGER.warning( f"W0010UK - Programme data not found. Do you need to set 'live_tv' to False? {self._host}" ) return None - self.epgData = [] + programmes = set() for p in epgData[0]["events"]: starttime = datetime.utcfromtimestamp(p["st"]) endtime = datetime.utcfromtimestamp(p["st"] + p["d"]) @@ -63,11 +65,9 @@ def getEpgData(self, sid, channelno, epgDate): programmeuuid = str(p["programmeuuid"]) imageUrl = LIVE_IMAGE_URL.format(programmeuuid) - programme = vars( - Programme( - programmeuuid, starttime, endtime, title, season, episode, imageUrl - ) + programme = Programme( + programmeuuid, starttime, endtime, title, season, episode, imageUrl ) - - self.epgData.append(programme) + programmes.add(programme) + self.epgData = Channel(sid, channelno, None, None, sorted(programmes)) return self.epgData diff --git a/pyskyqremote/country/remote_it.py b/pyskyqremote/country/remote_it.py index 0484e66..c83e596 100644 --- a/pyskyqremote/country/remote_it.py +++ b/pyskyqremote/country/remote_it.py @@ -3,9 +3,10 @@ import logging import requests -from pyskyqremote.channel import Programme - from ..const import RESPONSE_OK +from ..channel import Channel +from ..programme import Programme + from .const_it import ( CHANNEL_IMAGE_URL, PVR_IMAGE_URL, @@ -24,7 +25,7 @@ def __init__(self, host): """Initialise Italy remote.""" self.channel_image_url = CHANNEL_IMAGE_URL self.pvr_image_url = PVR_IMAGE_URL - self.epgData = None + self.epgData = set() self._lastEpgUrl = None self._host = host @@ -60,7 +61,7 @@ def getEpgData(self, sid, channelno, epgDate): ) return None - self.epgData = [] + programmes = set() epgDataLen = len(epgData) - 1 for index, p in enumerate(epgData): starttime = datetime.strptime(p["starttime"], "%Y-%m-%dT%H:%M:%SZ") @@ -84,13 +85,11 @@ def getEpgData(self, sid, channelno, epgDate): programmeuuid = str(p["content"]["uuid"]) imageUrl = LIVE_IMAGE_URL.format(programmeuuid) - programme = vars( - Programme( - programmeuuid, starttime, endtime, title, season, episode, imageUrl - ) + programme = Programme( + programmeuuid, starttime, endtime, title, season, episode, imageUrl ) - - self.epgData.append(programme) + programmes.add(programme) + self.epgData = Channel(sid, channelno, None, None, sorted(programmes)) return self.epgData def _getChannels(self): diff --git a/pyskyqremote/media.py b/pyskyqremote/media.py new file mode 100644 index 0000000..e1c38db --- /dev/null +++ b/pyskyqremote/media.py @@ -0,0 +1,54 @@ +"""Structure of a media information.""" + +from dataclasses import dataclass, field +import json +from datetime import datetime + + +@dataclass +class Media: + """SkyQ Programme Class.""" + + channel: str = field( + init=True, repr=True, compare=False, + ) + imageUrl: str = field( + init=True, repr=True, compare=False, + ) + sid: str = field( + init=True, repr=True, compare=False, + ) + pvrId: str = field( + init=True, repr=True, compare=False, + ) + live: bool = field( + init=True, repr=True, compare=False, + ) + + def as_json(self) -> str: + """Return a JSON string respenting this media info.""" + return json.dumps(self, cls=_MediaJSONEncoder) + + +def MediaDecoder(obj): + """Decode programme object from json.""" + media = json.loads(obj) + if "__type__" in media and media["__type__"] == "__media__": + return Media(**media["attributes"]) + return media + + +class _MediaJSONEncoder(json.JSONEncoder): + def default(self, obj): + if isinstance(obj, Media): + attributes = {} + for k, v in vars(obj).items(): + if type(v) is datetime: + v = v.strftime("%Y-%m-%dT%H:%M:%SZ") + attributes.update({k: v}) + + result = { + "__type__": "__media__", + "attributes": attributes, + } + return result diff --git a/pyskyqremote/programme.py b/pyskyqremote/programme.py new file mode 100644 index 0000000..1c853cc --- /dev/null +++ b/pyskyqremote/programme.py @@ -0,0 +1,110 @@ +"""Structure of a standard EPG prorgramme.""" + +from dataclasses import dataclass, field +from datetime import datetime +import json + + +@dataclass(order=True) +class Programme: + """SkyQ Programme Class.""" + + programmeuuid: str = field( + init=True, repr=True, compare=False, + ) + starttime: datetime = field( + init=True, repr=True, compare=True, + ) + endtime: datetime = field( + init=True, repr=True, compare=False, + ) + title: str = field( + init=True, repr=True, compare=False, + ) + season: str = field( + init=True, repr=True, compare=False, + ) + episode: str = field( + init=True, repr=True, compare=False, + ) + imageUrl: str = field( + init=True, repr=True, compare=False, + ) + + def __hash__(self): + """Calculate the hash of this object.""" + return hash(self.starttime) + + def as_json(self) -> str: + """Return a JSON string respenting this Programmel.""" + return json.dumps(self, cls=_ProgrammeJSONEncoder) + + +@dataclass +class RecordedProgramme(Programme): + """SkyQ Programme Class.""" + + channel: str = field( + init=True, repr=True, compare=False, + ) + + def as_json(self) -> str: + """Return a JSON string respenting this recording.""" + return json.dumps(self, cls=_RecordingJSONEncoder) + + +def ProgrammeDecoder(obj): + """Decode programme object from json.""" + programme = json.loads(obj, object_hook=_json_decoder_hook) + if "__type__" in programme and programme["__type__"] == "__programme__": + return Programme(**programme["attributes"]) + return programme + + +def RecordedProgrammeDecoder(obj): + """Decode recorded programme object from json.""" + recording = json.loads(obj, object_hook=_json_decoder_hook) + if "__type__" in recording and recording["__type__"] == "__recording__": + return RecordedProgramme(**recording["attributes"]) + return recording + + +def _json_decoder_hook(obj): + """Decode JSON into appropriate types used in this library.""" + if "starttime" in obj: + obj["starttime"] = datetime.strptime(obj["starttime"], "%Y-%m-%dT%H:%M:%SZ") + if "endtime" in obj: + obj["endtime"] = datetime.strptime(obj["endtime"], "%Y-%m-%dT%H:%M:%SZ") + return obj + + +class _ProgrammeJSONEncoder(json.JSONEncoder): + def default(self, obj): + if isinstance(obj, Programme): + attributes = {} + for k, v in vars(obj).items(): + if type(v) is datetime: + v = v.strftime("%Y-%m-%dT%H:%M:%SZ") + attributes.update({k: v}) + + result = { + "__type__": "__programme__", + "attributes": attributes, + } + return result + + +class _RecordingJSONEncoder(json.JSONEncoder): + def default(self, obj): + if isinstance(obj, Programme): + attributes = {} + for k, v in vars(obj).items(): + if type(v) is datetime: + v = v.strftime("%Y-%m-%dT%H:%M:%SZ") + attributes.update({k: v}) + + result = { + "__type__": "__recording__", + "attributes": attributes, + } + return result diff --git a/pyskyqremote/skyq_remote.py b/pyskyqremote/skyq_remote.py index 594eea2..e44e557 100644 --- a/pyskyqremote/skyq_remote.py +++ b/pyskyqremote/skyq_remote.py @@ -49,6 +49,8 @@ APP_EPG, ) from .const import TEST_CHANNEL_LIST +from .programme import RecordedProgramme +from .media import Media _LOGGER = logging.getLogger(__name__) @@ -104,6 +106,8 @@ def powerStatus(self) -> str: def getCurrentState(self): """Get current state of the SkyQ box.""" + if self.powerStatus() == SKY_STATE_OFF: + return SKY_STATE_OFF if self.powerStatus() == SKY_STATE_STANDBY: return SKY_STATE_STANDBY response = self._callSkySOAPService(UPNP_GET_TRANSPORT_INFO) @@ -132,15 +136,22 @@ def getActiveApplication(self): except Exception: return result + def getCurrentMediaJSON(self): + """Get the currently playing media on the SkyQ box as json.""" + media = self.getCurrentMedia() + if media: + return media.as_json() + else: + return None + def getCurrentMedia(self): """Get the currently playing media on the SkyQ box.""" - result = { - "channel": None, - "imageUrl": None, - "sid": None, - "pvrId": None, - "live": False, - } + channel = None + imageUrl = None + sid = None + pvrId = None + live = False + response = self._callSkySOAPService(UPNP_GET_MEDIA_INFO) if response is not None: currentURI = response[CURRENT_URI] @@ -152,22 +163,42 @@ def getCurrentMedia(self): if self._test_channel: sid = self._test_channel + live = True channel = self._getChannelNode(sid)["channel"] - result.update({"sid": sid, "live": True}) - result.update({"channel": channel}) - chid = "".join(e for e in channel.casefold() if e.isalnum()) - result.update({"imageUrl": self._buildChannelUrl(sid, chid)}) + imageUrl = self._buildChannelUrl(sid, channel) elif PVR in currentURI: # Recorded content pvrId = "P" + currentURI[11:] - result.update({"pvrId": pvrId, "live": False}) + live = False + media = Media(channel, imageUrl, sid, pvrId, live) + + return media - return _objectview(result) + def getEpgDataJSON(self, sid, epgDate): + """Get EPG data for the specified channel as json.""" + channel = self.getEpgData(sid, epgDate) + if channel: + return channel.as_json() + else: + return None def getEpgData(self, sid, epgDate): """Get EPG data for the specified channel.""" channelno = self._getChannelNode(sid)["channelno"] - return self._remoteCountry.getEpgData(sid, channelno, epgDate) + channelname = self._getChannelNode(sid)["channel"] + channelImageUrl = self._buildChannelUrl(sid, channelname) + channel = self._remoteCountry.getEpgData(sid, channelno, epgDate) + channel.channelname = channelname + channel.channelImageUrl = channelImageUrl + return channel + + def getProgrammeFromEpgJSON(self, sid, epgDate, queryDate): + """Get programme from EPG for specfied time and channel as json.""" + programme = self.getProgrammeFromEpg(sid, epgDate, queryDate) + if programme: + return programme.as_json() + else: + return None def getProgrammeFromEpg(self, sid, epgDate, queryDate): """Get programme from EPG for specfied time and channel.""" @@ -178,60 +209,81 @@ def getProgrammeFromEpg(self, sid, epgDate, queryDate): try: programme = next( p - for p in epgData - if p["starttime"] <= queryDate and p["endtime"] >= queryDate + for p in epgData.programmes + if p.starttime <= queryDate and p.endtime >= queryDate ) return programme except StopIteration: return PAST_END_OF_EPG + def getCurrentLiveTVProgrammeJSON(self, sid): + """Get current live programme on the specified channel as json.""" + programme = self.getCurrentLiveTVProgramme(sid) + if programme: + return programme.as_json() + else: + return None + def getCurrentLiveTVProgramme(self, sid): """Get current live programme on the specified channel.""" try: - result = {"title": None, "season": None, "episode": None, "imageUrl": None} queryDate = datetime.utcnow() programme = self.getProgrammeFromEpg(sid, queryDate, queryDate) if programme is None: - return result + return None if programme == PAST_END_OF_EPG: programme = self.getProgrammeFromEpg( sid, queryDate + timedelta(days=1), queryDate ) - result.update({"title": programme["title"]}) - result.update({"episode": programme["episode"]}) - result.update({"season": programme["season"]}) - result.update({"imageUrl": programme["imageUrl"]}) - return _objectview(result) + return programme except Exception as err: _LOGGER.exception(f"X0030 - Error occurred: {self._host} : {sid} : {err}") - return _objectview(result) + return None + + def getRecordingJSON(self, pvrId): + """Get the recording details as json.""" + recording = self.getRecording(pvrId) + if recording: + return recording.as_json() + else: + return None def getRecording(self, pvrId): """Get the recording details.""" - result = { - "channel": None, - "imageUrl": None, - "title": None, - "season": None, - "episode": None, - } - - recording = self._http_json(REST_RECORDING_DETAILS.format(pvrId)) - result.update({"channel": recording["details"]["cn"]}) - result.update({"title": recording["details"]["t"]}) - if ( - "seasonnumber" in recording["details"] - and "episodenumber" in recording["details"] - ): - result.update({"season": recording["details"]["seasonnumber"]}) - result.update({"episode": recording["details"]["episodenumber"]}) - if "programmeuuid" in recording["details"]: - programmeuuid = recording["details"]["programmeuuid"] + season = None + episode = None + starttime = None + endtime = None + programmeuuid = None + + recording = self._http_json(REST_RECORDING_DETAILS.format(pvrId))["details"] + + channel = recording["cn"] + title = recording["t"] + if "seasonnumber" in recording and "episodenumber" in recording: + season = recording["seasonnumber"] + episode = recording["episodenumber"] + if "programmeuuid" in recording: + programmeuuid = recording["programmeuuid"] imageUrl = self._remoteCountry.pvr_image_url.format(str(programmeuuid)) - result.update({"imageUrl": imageUrl}) + elif "osid" in recording: + sid = str(recording["osid"]) + imageUrl = self._buildChannelUrl(sid) + + starttime = datetime.utcfromtimestamp(recording["ast"]) + if "finald" in recording: + endtime = datetime.utcfromtimestamp(recording["ast"] + recording["finald"]) + elif "schd" in recording: + endtime = datetime.utcfromtimestamp(recording["ast"] + recording["schd"]) + else: + endtime = starttime - return _objectview(result) + programme = RecordedProgramme( + programmeuuid, starttime, endtime, title, season, episode, imageUrl, channel + ) + + return programme def press(self, sequence): """Issue the specified sequence of commands to SkyQ box.""" @@ -392,7 +444,8 @@ def _sendCommand(self, code): ) break - def _buildChannelUrl(self, sid, chid): + def _buildChannelUrl(self, sid, channel): + chid = "".join(e for e in channel.casefold() if e.isalnum()) channel_image_url = self._remoteCountry.channel_image_url return channel_image_url.format(sid, chid) @@ -452,8 +505,8 @@ def _setupDevice(self): self.deviceSetup = True return "ok" - except Exception: - _LOGGER.error(f"E0040 - Invalid country: {self._host} : {alpha3}") + except Exception as err: + _LOGGER.error(f"E0040 - Invalid country: {self._host} : {alpha3} : {err}") return None @@ -464,8 +517,3 @@ def __init__(self, url): def received_message(self, message): self.data = message.data - - -class _objectview(object): - def __init__(self, d): - self.__dict__ = d diff --git a/setup.py b/setup.py index bf7d3d3..f297933 100644 --- a/setup.py +++ b/setup.py @@ -22,5 +22,6 @@ install_requires=['requests', 'ws4py==0.5.1', 'xmltodict==0.12.0', 'pycountry==19.8.18'], keywords='SKYQ Remote', include_package_data=True, - zip_safe=False + zip_safe=False, + python_requires='>=3.7' ) diff --git a/tests/bash_sky_test.py b/tests/bash_sky_test.py index 71808b4..159fb78 100644 --- a/tests/bash_sky_test.py +++ b/tests/bash_sky_test.py @@ -1,42 +1,47 @@ #!/usr/bin/env python -import requests +"""Test script.""" import sys from datetime import datetime from pyskyqremote.skyq_remote import SkyQRemote +from pyskyqremote.const import APP_EPG +from pyskyqremote.media import MediaDecoder + +# from pyskyqremote.channel import ChannelDecoder +# from pyskyqremote.programme import ProgrammeDecoder, RecordedProgrammeDecoder + # Run ./bash_sky.py # example: ./bash_sky_test.py 192.168.0.9 # Note: you may need to modify top line change python3 to python, depending on OS/setup. this is works for me on my mac -country = "UK" +country = None if len(sys.argv) == 3: country = sys.argv[2] -sky = SkyQRemote(sys.argv[1], country=country) +sky = SkyQRemote(sys.argv[1], overrideCountry=country) print("----------- Power status") print(sky.powerStatus()) -print("----------- Current Media") -currentMedia = sky.getCurrentMedia() -print(currentMedia) -if currentMedia["live"]: - queryDate = datetime.utcnow() - # print("----------- Today's EPG") - # print(sky.getEpgData(currentMedia["sid"], queryDate)) - print("----------- Programme from Epg - Now") - print(sky.getProgrammeFromEpg(currentMedia["sid"], queryDate, queryDate)) - print("----------- Current Live TV") - print(sky.getCurrentLiveTVProgramme(currentMedia["sid"])) - print("----------- Active Application") -print(str(sky.getActiveApplication())) +app = sky.getActiveApplication() +print(str(app)) +if app == APP_EPG: + print("----------- Current Media") + currentMedia = sky.getCurrentMediaJSON() + print(currentMedia) + + media = MediaDecoder(currentMedia) + sid = media.sid + if not media.live: + print("----------- Recording") + print(sky.getRecordingJSON(media.pvrId)) + sid = 2153 -# print("----------- Testing Description 0") -# print(sky._getSoapControlURL(0)) -# print("----------- Testing Description 1") -# print(sky._getSoapControlURL(1)) -# print("----------- Testing Description 2") -# print(sky._getSoapControlURL(2)) + queryDate = datetime.utcnow() + print(f"----------- Programme from Epg - Now - {sid}") + print(sky.getProgrammeFromEpgJSON(sid, queryDate, queryDate)) + print(f"----------- Current Live TV - {sid}") + print(sky.getCurrentLiveTVProgrammeJSON(sid)) -# print("----------- Transport Info") -# print(sky._callSkySOAPService('GetTransportInfo')) + print("----------- Today's EPG") + print(sky.getEpgDataJSON(sid, queryDate))