diff --git a/resources/lib/kodiutils.py b/resources/lib/kodiutils.py index 47d7e53..5c35167 100644 --- a/resources/lib/kodiutils.py +++ b/resources/lib/kodiutils.py @@ -310,26 +310,6 @@ def get_addon_info(key, addon=None): return to_unicode(addon.getAddonInfo(key)) -def container_refresh(url=None): - """Refresh the current container or (re)load a container by URL""" - if url: - _LOGGER.debug('Execute: Container.Refresh(%s)', url) - xbmc.executebuiltin('Container.Refresh({url})'.format(url=url)) - else: - _LOGGER.debug('Execute: Container.Refresh') - xbmc.executebuiltin('Container.Refresh') - - -def container_update(url): - """Update the current container while respecting the path history.""" - if url: - _LOGGER.debug('Execute: Container.Update(%s)', url) - xbmc.executebuiltin('Container.Update({url})'.format(url=url)) - else: - # URL is a mandatory argument for Container.Update, use Container.Refresh instead - container_refresh() - - def jsonrpc(*args, **kwargs): """Perform JSONRPC calls""" from json import dumps, loads diff --git a/resources/lib/modules/addon.py b/resources/lib/modules/addon.py index c852c7a..7125b89 100644 --- a/resources/lib/modules/addon.py +++ b/resources/lib/modules/addon.py @@ -255,7 +255,7 @@ def _wait_for_data(self, sock, timeout=10): chunk = conn.recv(1024) if not chunk: break - buf += chunk + buf += chunk.decode() if not buf: # We got an empty reply, this means that something didn't go according to plan diff --git a/tests/__init__.py b/tests/__init__.py index 8272521..31cfeb1 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -5,4 +5,4 @@ import logging -logging.basicConfig() +logging.basicConfig(level=logging.DEBUG) diff --git a/tests/mocks/plugin.video.example/addon.xml b/tests/mocks/plugin.video.example/addon.xml new file mode 100644 index 0000000..05fd16b --- /dev/null +++ b/tests/mocks/plugin.video.example/addon.xml @@ -0,0 +1,6 @@ + + + + video + + diff --git a/tests/mocks/plugin.video.example/plugin.py b/tests/mocks/plugin.video.example/plugin.py new file mode 100644 index 0000000..960fe5d --- /dev/null +++ b/tests/mocks/plugin.video.example/plugin.py @@ -0,0 +1,135 @@ +# -*- coding: utf-8 -*- +""" This is a fake addon """ +from __future__ import absolute_import, division, print_function, unicode_literals + +import datetime +import logging +import sys + +import dateutil.parser +import dateutil.tz + +try: # Python 3 + from urllib.parse import parse_qsl, urlparse +except ImportError: # Python 2 + from urlparse import parse_qsl, urlparse + +logging.basicConfig() +_LOGGER = logging.getLogger() + + +class IPTVManager: + """Interface to IPTV Manager""" + + def __init__(self, port): + """Initialize IPTV Manager object""" + self.port = port + + def via_socket(func): # pylint: disable=no-self-argument + """Send the output of the wrapped function to socket""" + + def send(self): + """Decorator to send over a socket""" + import json + import socket + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.connect(('127.0.0.1', self.port)) + try: + sock.send(json.dumps(func()).encode()) + finally: + sock.close() + + return send + + @via_socket + def send_channels(): # pylint: disable=no-method-argument + """Return JSON-M3U formatted information to IPTV Manager""" + streams = [ + dict( + id='channel1.com', + name='Channel 1', + preset=1, + stream='plugin://plugin.video.example/play/1', + logo='https://example.com/channel1.png', + ), + dict( + id='channel2.com', + name='Channel 2', + preset=2, + stream='plugin://plugin.video.example/play/2', + logo='https://example.com/channel2.png', + ), + dict( + id='radio1.com', + name='Radio 1', + preset=901, + stream='plugin://plugin.video.example/play/901', + logo='https://example.com/radio1.png', + radio=True, + ), + ] + return dict(version=1, streams=streams) + + @via_socket + def send_epg(): # pylint: disable=no-method-argument + """Return JSONTV formatted information to IPTV Manager""" + now = datetime.datetime.now(tz=dateutil.tz.gettz('CET')) + + epg = { + 'channel1.com': [ + dict( + start=now.isoformat(), + stop=(now + datetime.timedelta(seconds=1800)).isoformat(), + title='This is a show', + description='This is the description of the show', + subtitle='Pilot episode', + episode='S01E01', + image='https://example.com/image.png', + date='1987-06-15', + ), + dict( + start=(now + datetime.timedelta(seconds=1800)).isoformat(), + stop=(now + datetime.timedelta(seconds=3600)).isoformat(), + title='This is a show 2', + description='This is the description of the show 2', + image=None, + ) + ], + 'channel2.com': [ + dict( + start=now.isoformat(), + stop=(now + datetime.timedelta(seconds=1800)).isoformat(), + title='This is a show 3', + description='This is the description of the show 3', + image=None, + ), + dict( + start=(now + datetime.timedelta(seconds=1800)).isoformat(), + stop=(now + datetime.timedelta(seconds=3600)).isoformat(), + title='This is a show 4', + description='This is the description of the show 4', + image=None, + ) + ], + } + return dict(version=1, epg=epg) + + +if __name__ == "__main__": + + if len(sys.argv) <= 1: + print('ERROR: Missing URL as first parameter') + exit(1) + + # Parse routing + url_parts = urlparse(sys.argv[1]) + route = url_parts.path + query = dict(parse_qsl(url_parts.query)) + + if route == '/iptv/channels': + IPTVManager(int(query['port'])).send_channels() + exit() + + if route == '/iptv/epg': + IPTVManager(int(query['port'])).send_epg() + exit() diff --git a/tests/mocks/plugin.video.example/resources/settings.xml b/tests/mocks/plugin.video.example/resources/settings.xml new file mode 100644 index 0000000..504a5f3 --- /dev/null +++ b/tests/mocks/plugin.video.example/resources/settings.xml @@ -0,0 +1,6 @@ + + + + + + diff --git a/tests/test_integration.py b/tests/test_integration.py new file mode 100644 index 0000000..93c8375 --- /dev/null +++ b/tests/test_integration.py @@ -0,0 +1,47 @@ +# -*- coding: utf-8 -*- +""" Tests for Integration """ + +# pylint: disable=invalid-name,missing-docstring,no-self-use + +from __future__ import absolute_import, division, print_function, unicode_literals + +import os +import unittest + +from resources.lib.modules.addon import Addon + + +class IntegrationTest(unittest.TestCase): + """ Integration Tests """ + + def test_refresh(self): + """ Test the refreshing of data """ + m3u_path = 'tests/userdata/playlist.m3u8' + epg_path = 'tests/userdata/epg.xml' + + # Remove existing files + for path in [m3u_path, epg_path]: + if os.path.exists(path): + os.unlink(path) + + # Do the refresh + Addon.refresh(True) + + # Check that the files now exist + for path in [m3u_path, epg_path]: + self.assertTrue(os.path.exists(path), '%s does not exist' % path) + + with open(m3u_path, 'r') as fdesc: + data = fdesc.read() + self.assertTrue('#EXTM3U' in data) + self.assertTrue('channel1.com' in data) + self.assertTrue('radio1.com' in data) + + with open(epg_path, 'r') as fdesc: + data = fdesc.read() + self.assertTrue('channel1.com' in data) + self.assertTrue('1987-06-15' in data) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_iptvsimple.py b/tests/test_iptvsimple.py new file mode 100644 index 0000000..b26ce7c --- /dev/null +++ b/tests/test_iptvsimple.py @@ -0,0 +1,29 @@ +# -*- coding: utf-8 -*- +""" Tests for IPTV Simpled """ + +# pylint: disable=invalid-name,missing-docstring,no-self-use + +from __future__ import absolute_import, division, print_function, unicode_literals + +import unittest + +from resources.lib.modules.iptvsimple import IptvSimple + + +class IptvSimpleTest(unittest.TestCase): + """ IPTV Simple Tests """ + + def test_setup(self): + """ Test the setup of IPTV Simple (this will be mocked) """ + self.assertTrue(IptvSimple.setup()) + + def test_restart(self): + """ Test the restart of IPTV Simple (this will be mocked) """ + IptvSimple.restart_required = True + IptvSimple.restart(force=True) + + self.assertFalse(IptvSimple.restart_required) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/userdata/addon_settings.json b/tests/userdata/addon_settings.json index ad60abd..c44e1f8 100644 --- a/tests/userdata/addon_settings.json +++ b/tests/userdata/addon_settings.json @@ -1,4 +1,12 @@ { "service.iptv.manager": { + "iptv_simple_restart": "true" + }, + "pvr.iptvsimple": { + }, + "plugin.video.example": { + "iptv.enabled": "true", + "iptv.channels_uri": "plugin://plugin.video.example/iptv/channels", + "iptv.epg_uri": "plugin://plugin.video.example/iptv/epg" } } diff --git a/tests/xbmc.py b/tests/xbmc.py index 78e0f48..c30ab4f 100644 --- a/tests/xbmc.py +++ b/tests/xbmc.py @@ -11,7 +11,7 @@ import os import time -from xbmcextra import global_settings, import_language +from xbmcextra import global_settings, import_language, read_addon_xml LOGDEBUG = 0 LOGERROR = 4 @@ -162,9 +162,21 @@ def getRating(self): return 0 -def executebuiltin(string, wait=False): # pylint: disable=unused-argument - """ A stub implementation of the xbmc executebuiltin() function """ - return +def executebuiltin(function): + """ A reimplementation of the xbmc executebuiltin() function """ + import re + try: + command, params = re.search(r'([A-Za-z]+)\(([^\)]+)\)', function).groups() + if command == 'RunPlugin': + addon, route = re.search(r'plugin://([^/]+)(.*)', params).groups() + if addon: + import subprocess + addon_info = read_addon_xml('tests/mocks/%s/addon.xml' % addon) + pluginsource = next(iter(list(addon_info.values()))).get('pluginsource') + subprocess.call(['python', 'tests/mocks/%s/%s' % (addon, pluginsource), route]) + + except AttributeError: + pass def executeJSONRPC(jsonrpccommand): @@ -182,7 +194,8 @@ def executeJSONRPC(jsonrpccommand): if command.get('method') == 'Textures.RemoveTexture': return json.dumps(dict(id=1, jsonrpc='2.0', result="OK")) if command.get('method') == 'Addons.GetAddons': - return json.dumps(dict(id=1, jsonrpc='2.0', result=dict(addons=[]))) + # TODO: generate a list of addons based on the folders in tests/mocks + return json.dumps(dict(id=1, jsonrpc='2.0', result=dict(addons=[dict(addonid='plugin.video.example')]))) log("executeJSONRPC does not implement method '{method}'".format(**command), 'Error') return json.dumps(dict(error=dict(code=-1, message='Not implemented'), id=1, jsonrpc='2.0')) diff --git a/tests/xbmcextra.py b/tests/xbmcextra.py index d617189..658a3c0 100644 --- a/tests/xbmcextra.py +++ b/tests/xbmcextra.py @@ -41,8 +41,8 @@ def uri_to_path(uri): def read_addon_xml(path): """Parse the addon.xml and return an info dictionary""" info = dict( - path='./', # '/storage/.kodi/addons/plugin.video.vrt.nu', - profile='special://userdata', # 'special://profile/addon_data/plugin.video.vrt.nu/', + path='./', + profile='special://userdata', type='xbmc.python.pluginsource', ) @@ -53,19 +53,23 @@ def read_addon_xml(path): info['author'] = info.pop('provider-name') for child in root: - if child.attrib.get('point') != 'xbmc.addon.metadata': + if child.attrib.get('point') == 'xbmc.python.pluginsource': + info['pluginsource'] = child.attrib.get('library') + continue + + if child.attrib.get('point') == 'xbmc.addon.metadata': + for grandchild in child: + # Handle assets differently + if grandchild.tag == 'assets': + for asset in grandchild: + info[asset.tag] = asset.text + continue + # Not in English ? Drop it + if grandchild.attrib.get('lang', 'en_GB') != 'en_GB': + continue + # Add metadata + info[grandchild.tag] = grandchild.text continue - for grandchild in child: - # Handle assets differently - if grandchild.tag == 'assets': - for asset in grandchild: - info[asset.tag] = asset.text - continue - # Not in English ? Drop it - if grandchild.attrib.get('lang', 'en_GB') != 'en_GB': - continue - # Add metadata - info[grandchild.tag] = grandchild.text return {info['name']: info} @@ -110,19 +114,6 @@ def addon_settings(addon_id=None): print("Error: Cannot use 'tests/userdata/addon_settings.json' : %s" % e) settings = {} - # Read credentials from environment or credentials.json - if 'ADDON_USERNAME' in os.environ and 'ADDON_PASSWORD' in os.environ: - # print('Using credentials from the environment variables ADDON_USERNAME and ADDON_PASSWORD') - settings[ADDON_ID]['username'] = os.environ.get('ADDON_USERNAME') - settings[ADDON_ID]['password'] = os.environ.get('ADDON_PASSWORD') - elif os.path.exists('tests/userdata/credentials.json'): - # print('Using credentials from tests/userdata/credentials.json') - with open('tests/userdata/credentials.json') as f: - credentials = json.load(f) - settings[ADDON_ID].update(credentials) - else: - print("Error: Cannot use 'tests/userdata/credentials.json'") - if addon_id: return settings[addon_id]