diff --git a/.env.example.sh b/.env.example.sh new file mode 100644 index 0000000..b66b6bd --- /dev/null +++ b/.env.example.sh @@ -0,0 +1,3 @@ +export NCORE_USERNAME="" +export NCORE_PASSWORD="" +export RSS_URL="" diff --git a/.gitignore b/.gitignore index 2a2da90..a412646 100644 --- a/.gitignore +++ b/.gitignore @@ -5,4 +5,7 @@ build/ dist/ ncoreparser.egg-info/ .venv -.vscode \ No newline at end of file +.vscode +.env.sh +*.torrent +.tox* diff --git a/.pylintrc b/.pylintrc index 16d7c8a..b741fe5 100644 --- a/.pylintrc +++ b/.pylintrc @@ -93,7 +93,7 @@ prefer-stubs=no # Minimum Python version to use for version dependent checks. Will default to # the version used to run pylint. -py-version=3.9 +py-version=3.12 # Discover python modules and packages in the file system subtree. recursive=no diff --git a/Makefile b/Makefile index 68e89d7..042055f 100644 --- a/Makefile +++ b/Makefile @@ -12,6 +12,10 @@ reqs: .venv lint: $(ACTIVATE) && pylint ncoreparser + $(ACTIVATE) && mypy ncoreparser + +format: + $(ACTIVATE) && black . test: $(ACTIVATE) && tox diff --git a/ncoreparser/__init__.py b/ncoreparser/__init__.py index 74ce626..b02e282 100644 --- a/ncoreparser/__init__.py +++ b/ncoreparser/__init__.py @@ -1,16 +1,6 @@ # flake8: noqa from .client import Client from .client_async import AsyncClient -from .data import ( - SearchParamType, - SearchParamWhere, - ParamSeq, - ParamSort -) -from .error import ( - NcoreDownloadError, - NcoreParserError, - NcoreCredentialError, - NcoreConnectionError -) +from .data import SearchParamType, SearchParamWhere, ParamSeq, ParamSort +from .error import NcoreDownloadError, NcoreParserError, NcoreCredentialError, NcoreConnectionError from .util import Size diff --git a/ncoreparser/client.py b/ncoreparser/client.py index 333d03f..8422d0d 100644 --- a/ncoreparser/client.py +++ b/ncoreparser/client.py @@ -1,33 +1,18 @@ import os import httpx -from ncoreparser.data import ( - URLs, - SearchParamType, - SearchParamWhere, - ParamSort, - ParamSeq -) -from ncoreparser.error import ( - NcoreConnectionError, - NcoreCredentialError, - NcoreDownloadError -) -from ncoreparser.parser import ( - TorrentsPageParser, - TorrenDetailParser, - RssParser, - ActivityParser, - RecommendedParser -) +from typing_extensions import Any, Generator, Union +from ncoreparser.data import URLs, SearchParamType, SearchParamWhere, ParamSort, ParamSeq +from ncoreparser.error import NcoreConnectionError, NcoreCredentialError, NcoreDownloadError +from ncoreparser.parser import TorrentsPageParser, TorrenDetailParser, RssParser, ActivityParser, RecommendedParser from ncoreparser.util import Size, check_login from ncoreparser.torrent import Torrent class Client: - def __init__(self, timeout=1): - self._client = httpx.Client(headers={'User-Agent': 'python ncoreparser'}, - timeout=timeout, - follow_redirects=True) + def __init__(self, timeout: int = 1) -> None: + self._client = httpx.Client( + headers={"User-Agent": "python ncoreparser"}, timeout=timeout, follow_redirects=True + ) self._logged_in = False self._page_parser = TorrentsPageParser() self._detailed_parser = TorrenDetailParser() @@ -35,53 +20,52 @@ def __init__(self, timeout=1): self._activity_parser = ActivityParser() self._recommended_parser = RecommendedParser() - def login(self, username, password): + def login(self, username: str, password: str) -> None: self._client.cookies.clear() try: - r = self._client.post(URLs.LOGIN.value, - data={"nev": username, "pass": password}) + r = self._client.post(URLs.LOGIN.value, data={"nev": username, "pass": password}) except Exception as e: - raise NcoreConnectionError(f"Error while perform post " - f"method to url '{URLs.LOGIN.value}'.") from e + raise NcoreConnectionError(f"Error while perform post " f"method to url '{URLs.LOGIN.value}'.") from e if r.url != URLs.INDEX.value: self.logout() - raise NcoreCredentialError(f"Error while login, check " - f"credentials for user: '{username}'") + raise NcoreCredentialError(f"Error while login, check " f"credentials for user: '{username}'") self._logged_in = True @check_login # pylint: disable=too-many-arguments, too-many-positional-arguments def search( self, - pattern, - type=SearchParamType.ALL_OWN, - where=SearchParamWhere.NAME, - sort_by=ParamSort.UPLOAD, - sort_order=ParamSeq.DECREASING, - number=None - ): + pattern: str, + type: SearchParamType = SearchParamType.ALL_OWN, + where: SearchParamWhere = SearchParamWhere.NAME, + sort_by: ParamSort = ParamSort.UPLOAD, + sort_order: ParamSeq = ParamSeq.DECREASING, + number: Union[int, None] = None, + ) -> list[Torrent]: page_count = 1 - torrents = [] + torrents: list[Torrent] = [] while number is None or len(torrents) < number: - url = URLs.DOWNLOAD_PATTERN.value.format(page=page_count, - t_type=type.value, - sort=sort_by.value, - seq=sort_order.value, - pattern=pattern, - where=where.value) + url = URLs.DOWNLOAD_PATTERN.value.format( + page=page_count, + t_type=type.value, + sort=sort_by.value, + seq=sort_order.value, + pattern=pattern, + where=where.value, + ) try: request = self._client.get(url) except Exception as e: raise NcoreConnectionError(f"Error while searhing torrents. {e}") from e new_torrents = [Torrent(**params) for params in self._page_parser.get_items(request.text)] + torrents.extend(new_torrents) if number is None or len(new_torrents) == 0: return torrents - torrents.extend(new_torrents) page_count += 1 return torrents[:number] @check_login - def get_torrent(self, id, **ext_params): + def get_torrent(self, id: str, **ext_params: Any) -> Torrent: url = URLs.DETAIL_PATTERN.value.format(id=id) try: content = self._client.get(url) @@ -93,7 +77,7 @@ def get_torrent(self, id, **ext_params): return Torrent(**params) @check_login - def get_by_rss(self, url): + def get_by_rss(self, url: str) -> Generator[Torrent, None, None]: try: content = self._client.get(url) except Exception as e: @@ -103,27 +87,32 @@ def get_by_rss(self, url): yield self.get_torrent(id) @check_login - def get_by_activity(self): + def get_by_activity(self) -> list[Torrent]: try: content = self._client.get(URLs.ACTIVITY.value) except Exception as e: raise NcoreConnectionError(f"Error while get activity. Url: '{URLs.ACTIVITY.value}'. {e}") from e torrents = [] - for id, start_t, updated_t, status, uploaded, downloaded, remaining_t, rate in \ - self._activity_parser.get_params(content.text): - torrents.append(self.get_torrent(id, - start=start_t, - updated=updated_t, - status=status, - uploaded=Size(uploaded), - downloaded=Size(downloaded), - remaining=remaining_t, - rate=float(rate))) + for id, start_t, updated_t, status, uploaded, downloaded, remaining_t, rate in self._activity_parser.get_params( + content.text + ): + torrents.append( + self.get_torrent( + id, + start=start_t, + updated=updated_t, + status=status, + uploaded=Size(uploaded), + downloaded=Size(downloaded), + remaining=remaining_t, + rate=float(rate), + ) + ) return torrents @check_login - def get_recommended(self, type=None): + def get_recommended(self, type: Union[SearchParamType, None] = None) -> Generator[Torrent, None, None]: try: content = self._client.get(URLs.RECOMMENDED.value) except Exception as e: @@ -131,11 +120,11 @@ def get_recommended(self, type=None): for id in self._recommended_parser.get_ids(content.text): torrent = self.get_torrent(id) - if not type or torrent['type'] == type: + if not type or torrent["type"] == type: yield torrent @check_login - def download(self, torrent, path, override=False): + def download(self, torrent: Torrent, path: str, override: bool = False) -> str: file_path, url = torrent.prepare_download(path) try: content = self._client.get(url) @@ -143,11 +132,11 @@ def download(self, torrent, path, override=False): raise NcoreConnectionError(f"Error while downloading torrent. Url: '{url}'. {e}") from e if not override and os.path.exists(file_path): raise NcoreDownloadError(f"Error while downloading file: '{file_path}'. It is already exists.") - with open(file_path, 'wb') as fh: + with open(file_path, "wb") as fh: fh.write(content.content) return file_path - def logout(self): + def logout(self) -> None: self._client.cookies.clear() self._client.close() self._logged_in = False diff --git a/ncoreparser/client_async.py b/ncoreparser/client_async.py index 9535b38..28f4f9b 100644 --- a/ncoreparser/client_async.py +++ b/ncoreparser/client_async.py @@ -2,34 +2,19 @@ import os import httpx -from ncoreparser.data import ( - URLs, - SearchParamType, - SearchParamWhere, - ParamSort, - ParamSeq -) -from ncoreparser.error import ( - NcoreConnectionError, - NcoreCredentialError, - NcoreDownloadError -) -from ncoreparser.parser import ( - TorrentsPageParser, - TorrenDetailParser, - RssParser, - ActivityParser, - RecommendedParser -) +from typing_extensions import Any, AsyncGenerator, Union +from ncoreparser.data import URLs, SearchParamType, SearchParamWhere, ParamSort, ParamSeq +from ncoreparser.error import NcoreConnectionError, NcoreCredentialError, NcoreDownloadError +from ncoreparser.parser import TorrentsPageParser, TorrenDetailParser, RssParser, ActivityParser, RecommendedParser from ncoreparser.util import Size, check_login from ncoreparser.torrent import Torrent class AsyncClient: - def __init__(self, timeout=1): - self._client = httpx.AsyncClient(headers={'User-Agent': 'python ncoreparser'}, - timeout=timeout, - follow_redirects=True) + def __init__(self, timeout: int = 1) -> None: + self._client = httpx.AsyncClient( + headers={"User-Agent": "python ncoreparser"}, timeout=timeout, follow_redirects=True + ) self._logged_in = False self._page_parser = TorrentsPageParser() self._detailed_parser = TorrenDetailParser() @@ -37,53 +22,52 @@ def __init__(self, timeout=1): self._activity_parser = ActivityParser() self._recommended_parser = RecommendedParser() - async def login(self, username, password): + async def login(self, username: str, password: str) -> None: self._client.cookies.clear() try: - r = await self._client.post(URLs.LOGIN.value, - data={"nev": username, "pass": password}) + r = await self._client.post(URLs.LOGIN.value, data={"nev": username, "pass": password}) except Exception as e: - raise NcoreConnectionError(f"Error while perform post " - f"method to url '{URLs.LOGIN.value}'.") from e + raise NcoreConnectionError(f"Error while perform post " f"method to url '{URLs.LOGIN.value}'.") from e if r.url != URLs.INDEX.value: await self.logout() - raise NcoreCredentialError(f"Error while login, check " - f"credentials for user: '{username}'") + raise NcoreCredentialError(f"Error while login, check " f"credentials for user: '{username}'") self._logged_in = True @check_login # pylint: disable=too-many-arguments, too-many-positional-arguments async def search( self, - pattern, - type=SearchParamType.ALL_OWN, - where=SearchParamWhere.NAME, - sort_by=ParamSort.UPLOAD, - sort_order=ParamSeq.DECREASING, - number=None - ): + pattern: str, + type: SearchParamType = SearchParamType.ALL_OWN, + where: SearchParamWhere = SearchParamWhere.NAME, + sort_by: ParamSort = ParamSort.UPLOAD, + sort_order: ParamSeq = ParamSeq.DECREASING, + number: Union[int, None] = None, + ) -> list[Torrent]: page_count = 1 - torrents = [] + torrents: list[Torrent] = [] while number is None or len(torrents) < number: - url = URLs.DOWNLOAD_PATTERN.value.format(page=page_count, - t_type=type.value, - sort=sort_by.value, - seq=sort_order.value, - pattern=pattern, - where=where.value) + url = URLs.DOWNLOAD_PATTERN.value.format( + page=page_count, + t_type=type.value, + sort=sort_by.value, + seq=sort_order.value, + pattern=pattern, + where=where.value, + ) try: request = await self._client.get(url) except Exception as e: raise NcoreConnectionError(f"Error while searhing torrents. {e}") from e new_torrents = [Torrent(**params) for params in self._page_parser.get_items(request.text)] + torrents.extend(new_torrents) if number is None or len(new_torrents) == 0: return torrents - torrents.extend(new_torrents) page_count += 1 return torrents[:number] @check_login - async def get_torrent(self, id, **ext_params): + async def get_torrent(self, id: str, **ext_params: Any) -> Torrent: url = URLs.DETAIL_PATTERN.value.format(id=id) try: content = await self._client.get(url) @@ -95,7 +79,7 @@ async def get_torrent(self, id, **ext_params): return Torrent(**params) @check_login - async def get_by_rss(self, url): + async def get_by_rss(self, url: str) -> AsyncGenerator[Torrent, None]: try: content = await self._client.get(url) except Exception as e: @@ -105,27 +89,32 @@ async def get_by_rss(self, url): yield await self.get_torrent(id) @check_login - async def get_by_activity(self): + async def get_by_activity(self) -> list[Torrent]: try: content = await self._client.get(URLs.ACTIVITY.value) except Exception as e: raise NcoreConnectionError(f"Error while get activity. Url: '{URLs.ACTIVITY.value}'. {e}") from e torrents = [] - for id, start_t, updated_t, status, uploaded, downloaded, remaining_t, rate in \ - self._activity_parser.get_params(content.text): - torrents.append(await self.get_torrent(id, - start=start_t, - updated=updated_t, - status=status, - uploaded=Size(uploaded), - downloaded=Size(downloaded), - remaining=remaining_t, - rate=float(rate))) + for id, start_t, updated_t, status, uploaded, downloaded, remaining_t, rate in self._activity_parser.get_params( + content.text + ): + torrents.append( + await self.get_torrent( + id, + start=start_t, + updated=updated_t, + status=status, + uploaded=Size(uploaded), + downloaded=Size(downloaded), + remaining=remaining_t, + rate=float(rate), + ) + ) return torrents @check_login - async def get_recommended(self, type=None): + async def get_recommended(self, type: Union[SearchParamType, None] = None) -> AsyncGenerator[Torrent, None]: try: content = await self._client.get(URLs.RECOMMENDED.value) except Exception as e: @@ -133,11 +122,11 @@ async def get_recommended(self, type=None): for id in self._recommended_parser.get_ids(content.text): torrent = await self.get_torrent(id) - if not type or torrent['type'] == type: + if not type or torrent["type"] == type: yield torrent @check_login - async def download(self, torrent, path, override=False): + async def download(self, torrent: Torrent, path: str, override: bool = False) -> str: file_path, url = torrent.prepare_download(path) try: content = await self._client.get(url) @@ -145,11 +134,11 @@ async def download(self, torrent, path, override=False): raise NcoreConnectionError(f"Error while downloading torrent. Url: '{url}'. {e}") from e if not override and os.path.exists(file_path): raise NcoreDownloadError(f"Error while downloading file: '{file_path}'. It is already exists.") - with open(file_path, 'wb') as fh: + with open(file_path, "wb") as fh: fh.write(content.content) return file_path - async def logout(self): + async def logout(self) -> None: self._client.cookies.clear() await self._client.aclose() self._logged_in = False diff --git a/ncoreparser/data.py b/ncoreparser/data.py index 343c50b..05e9b74 100644 --- a/ncoreparser/data.py +++ b/ncoreparser/data.py @@ -12,41 +12,41 @@ class ParamSort(Enum): class SearchParamType(Enum): - SD_HUN = 'xvid_hun' - SD = 'xvid' - DVD_HUN = 'dvd_hun' - DVD = 'dvd' - DVD9_HUN = 'dvd9_hun' - DVD9 = 'dvd9' - HD_HUN = 'hd_hun' - HD = 'hd' - SDSER_HUN = 'xvidser_hun' - SDSER = 'xvidser' - DVDSER_HUN = 'dvdser_hun' - DVDSER = 'dvdser' - HDSER_HUN = 'hdser_hun' - HDSER = 'hdser' - MP3_HUN = 'mp3_hun' - MP3 = 'mp3' - LOSSLESS_HUN = 'lossless_hun' - LOSSLESS = 'lossless' - CLIP = 'clip' - GAME_ISO = 'game_iso' - GAME_RIP = 'game_rip' - CONSOLE = 'console' - EBOOK_HUN = 'ebook_hun' - EBOOK = 'ebook' - ISO = 'iso' - MISC = 'misc' - MOBIL = 'mobil' - XXX_IMG = 'xxx_imageset' - XXX_SD = 'xxx_xvid' - XXX_DVD = 'xxx_dvd' - XXX_HD = 'xxx_hd' + SD_HUN = "xvid_hun" + SD = "xvid" + DVD_HUN = "dvd_hun" + DVD = "dvd" + DVD9_HUN = "dvd9_hun" + DVD9 = "dvd9" + HD_HUN = "hd_hun" + HD = "hd" + SDSER_HUN = "xvidser_hun" + SDSER = "xvidser" + DVDSER_HUN = "dvdser_hun" + DVDSER = "dvdser" + HDSER_HUN = "hdser_hun" + HDSER = "hdser" + MP3_HUN = "mp3_hun" + MP3 = "mp3" + LOSSLESS_HUN = "lossless_hun" + LOSSLESS = "lossless" + CLIP = "clip" + GAME_ISO = "game_iso" + GAME_RIP = "game_rip" + CONSOLE = "console" + EBOOK_HUN = "ebook_hun" + EBOOK = "ebook" + ISO = "iso" + MISC = "misc" + MOBIL = "mobil" + XXX_IMG = "xxx_imageset" + XXX_SD = "xxx_xvid" + XXX_DVD = "xxx_dvd" + XXX_HD = "xxx_hd" ALL_OWN = "all_own" -def get_detailed_param(category, type): +def get_detailed_param(category: str, type: str) -> SearchParamType: detailed = { "osszes_film_xvid_hun": SearchParamType.SD_HUN, "osszes_film_xvid": SearchParamType.SD, @@ -104,11 +104,13 @@ class URLs(Enum): ACTIVITY = "https://ncore.pro/hitnrun.php" RECOMMENDED = "https://ncore.pro/recommended.php" TORRENTS_BASE = "https://ncore.pro/torrents.php" - DOWNLOAD_PATTERN = TORRENTS_BASE + "?oldal={page}" \ - "&tipus={t_type}" \ - "&miszerint={sort}" \ - "&hogyan={seq}" \ - "&mire={pattern}" \ - "&miben={where}" + DOWNLOAD_PATTERN = ( + TORRENTS_BASE + "?oldal={page}" + "&tipus={t_type}" + "&miszerint={sort}" + "&hogyan={seq}" + "&mire={pattern}" + "&miben={where}" + ) DETAIL_PATTERN = TORRENTS_BASE + "?action=details&id={id}" DOWNLOAD_LINK = "https://ncore.pro/torrents.php?action=download&id={id}&key={key}" diff --git a/ncoreparser/parser.py b/ncoreparser/parser.py index 4f50f0d..9d420ec 100644 --- a/ncoreparser/parser.py +++ b/ncoreparser/parser.py @@ -1,14 +1,16 @@ import re import datetime +from typing_extensions import Generator, Any, Union from ncoreparser.error import NcoreParserError from ncoreparser.util import parse_datetime, Size from ncoreparser.data import SearchParamType, get_detailed_param class TorrentsPageParser: - def __init__(self): + def __init__(self) -> None: self.type_pattern = re.compile( - r'.*') + r'.*' + ) self.id_and_name_pattern = re.compile( r'' ) @@ -19,26 +21,24 @@ def __init__(self): self.leechers_pattern = re.compile(r'') @staticmethod - def get_key(data): + def get_key(data: str) -> Union[str, None]: key_pattern = r' Generator[dict, None, None]: types = self.type_pattern.findall(data) ids_and_names = self.id_and_name_pattern.findall(data) dates_and_times = self.date_and_time_pattern.findall(data) sizes = self.size_pattern.findall(data) seed = self.seeders_pattern.findall(data) leech = self.leechers_pattern.findall(data) - ids = [] - if ( - len(types) != 0 - and len(types) == len(ids_and_names) == len(dates_and_times) == len(sizes) == len(seed) == len(leech) - ): + ids: tuple[Any, ...] = () + if len(types) != 0 and len(types) == len(ids_and_names) == len(dates_and_times) == len(sizes) == len( + seed + ) == len(leech): ids, names = zip(*ids_and_names) dates, times = zip(*dates_and_times) key = self.get_key(data) @@ -46,49 +46,81 @@ def get_items(self, data): if not self.not_found_pattern.search(data): raise NcoreParserError(f"Error while parse download items in {self.__class__.__name__}.") for i, id in enumerate(ids): - yield {"id": id, "title": names[i], "key": key, "date": parse_datetime(dates[i], times[i]), - "size": Size(sizes[i]), "type": SearchParamType(types[i]), "seed": seed[i], "leech": leech[i]} + yield { + "id": id, + "title": names[i], + "key": key, + "date": parse_datetime(dates[i], times[i]), + "size": Size(sizes[i]), + "type": SearchParamType(types[i]), + "seed": seed[i], + "leech": leech[i], + } class TorrenDetailParser: - def __init__(self): - self.type_pattern = re.compile(r'') - self.date_pattern = re.compile(r'
(?P[0-9]{4}\-[0-9]{2}\-[0-9]{2}\ ' - r'[0-9]{2}\:[0-9]{2}\:[0-9]{2})
') + def __init__(self) -> None: + self.type_pattern = re.compile( + r'' + ) + self.date_pattern = re.compile( + r'
(?P[0-9]{4}\-[0-9]{2}\-[0-9]{2}\ [0-9]{2}\:[0-9]{2}\:[0-9]{2})
' + ) self.title_pattern = re.compile(r'
(?P.*?)</div>') self.size_pattern = re.compile(r'<div class="dd">(?P<size>[0-9,.]+\ [K,M,G,T]{1}iB)\ \(.*?\)</div>') - self.peers_pattern = re.compile(r'div class="dt">Seederek:</div>.*?<div class="dd"><a onclick=".*?">' - r'(?P<seed>[0-9]+)</a></div>.*?<div class="dt">Leecherek:</div>.*?<div ' - r'class="dd"><a onclick=".*?">(?P<leech>[0-9]+)</a></div>', re.DOTALL) + self.peers_pattern = re.compile( + r'div class="dt">Seederek:</div>.*?<div class="dd"><a onclick=".*?">' + r'(?P<seed>[0-9]+)</a></div>.*?<div class="dt">Leecherek:</div>.*?<div ' + r'class="dd"><a onclick=".*?">(?P<leech>[0-9]+)</a></div>', + re.DOTALL, + ) - def get_item(self, data): + def get_item(self, data: str) -> dict: try: - t_type = self.type_pattern.search(data) - t_type = get_detailed_param(t_type.group("category"), t_type.group("type")) - date = datetime.datetime.strptime(self.date_pattern.search(data).group("date"), "%Y-%m-%d %H:%M:%S") - title = self.title_pattern.search(data).group("title") + t_type_match = self.type_pattern.search(data) + if t_type_match: + t_type = get_detailed_param(t_type_match.group("category"), t_type_match.group("type")) + else: + raise NcoreParserError("Type pattern not found in data") + date_match = self.date_pattern.search(data) + if date_match: + date = datetime.datetime.strptime(date_match.group("date"), "%Y-%m-%d %H:%M:%S") + else: + raise NcoreParserError("Date pattern not found in data") + title_match = self.title_pattern.search(data) + if title_match: + title = title_match.group("title") + else: + raise NcoreParserError("Title pattern not found in data") key = TorrentsPageParser.get_key(data) - size = Size(self.size_pattern.search(data).group("size")) - peers = self.peers_pattern.search(data) - seed = peers.group('seed') - leech = peers.group('leech') + size_match = self.size_pattern.search(data) + if size_match: + size = Size(size_match.group("size")) + else: + raise NcoreParserError("Size pattern not found in data") + peers_match = self.peers_pattern.search(data) + if peers_match: + seed = peers_match.group("seed") + leech = peers_match.group("leech") + else: + raise NcoreParserError("Peers pattern not found in data") except AttributeError as e: raise NcoreParserError(f"Error while parsing by detailed page. {e}") from e - return {"title": title, "key": key, "date": date, "size": size, "type": t_type, 'seed': seed, 'leech': leech} + return {"title": title, "key": key, "date": date, "size": size, "type": t_type, "seed": seed, "leech": leech} class RssParser: - def __init__(self): + def __init__(self) -> None: self.id_pattern = re.compile(r'<source url=".*?\/rss_dl.php\/id=(?P<id>[0-9]+)\/key\=.[a-z,0-9]+">') - def get_ids(self, data): + def get_ids(self, data: str) -> list[str]: return self.id_pattern.findall(data) class ActivityParser: - def __init__(self): + def __init__(self) -> None: self.patterns = [ re.compile(r'onclick="torrent\((.*?)\);'), re.compile(r'<div class="hnr_tstart">(.*?)<\/div>'), @@ -97,10 +129,10 @@ def __init__(self): re.compile(r'<div class="hnr_tup">(.*?)<\/div>'), re.compile(r'<div class="hnr_tdown">(.*?)<\/div>'), re.compile(r'<div class="hnr_ttimespent"><span class=".*?">(.*?)<\/span><\/div>'), - re.compile(r'<div class="hnr_tratio"><span class=".*?">(.*?)<\/span><\/div>') + re.compile(r'<div class="hnr_tratio"><span class=".*?">(.*?)<\/span><\/div>'), ] - def get_params(self, data): + def get_params(self, data: str) -> tuple[tuple[Any, ...], ...]: out = [] for parser in self.patterns: out.append(parser.findall(data)) @@ -108,9 +140,11 @@ def get_params(self, data): class RecommendedParser: - def __init__(self): - self.recommended_pattern = re.compile(r'<a href=".*?torrents.php\?action=details\&id=(.*?)" target=".*?"><img' - r' src=".*?" width=".*?" height=".*?" border=".*?" title=".*?"\/><\/a>') + def __init__(self) -> None: + self.recommended_pattern = re.compile( + r'<a href=".*?torrents.php\?action=details\&id=(.*?)" target=".*?"><img' + r' src=".*?" width=".*?" height=".*?" border=".*?" title=".*?"\/><\/a>' + ) - def get_ids(self, data): + def get_ids(self, data: str) -> list[str]: return self.recommended_pattern.findall(data) diff --git a/ncoreparser/torrent.py b/ncoreparser/torrent.py index 18ca5ce..079bad1 100644 --- a/ncoreparser/torrent.py +++ b/ncoreparser/torrent.py @@ -1,47 +1,41 @@ import os +from typing_extensions import Any from ncoreparser.data import URLs +from ncoreparser.util import Size class Torrent: # pylint: disable=too-many-arguments, too-many-positional-arguments def __init__( - self, - id, - title, - key, - size, - type, - date, - seed, - leech, - **params - ): - self._details = {} - self._details["id"] = int(id) - self._details["title"] = title - self._details["key"] = key - self._details["size"] = size - self._details["type"] = type - self._details["date"] = date - self._details["seed"] = seed - self._details["leech"] = leech - self._details["download"] = URLs.DOWNLOAD_LINK.value.format(id=id, key=key) + self, id: str, title: str, key: str, size: Size, type: str, date: str, seed: str, leech: str, **params: Any + ) -> None: + self._details = { + "id": id, + "title": title, + "key": key, + "size": size, + "type": type, + "date": date, + "seed": seed, + "leech": leech, + "download": URLs.DOWNLOAD_LINK.value.format(id=id, key=key), + } self._details.update(params) - def __getitem__(self, key): + def __getitem__(self, key: str) -> Any: return self._details[key] - def keys(self): - return self._details.keys() + def keys(self) -> list[str]: + return list(self._details.keys()) - def __str__(self): + def __str__(self) -> str: return f"<Torrent {self._details['id']}>" - def __repr__(self): + def __repr__(self) -> str: return f"<Torrent {self._details['id']}>" - def prepare_download(self, path): - filename = self._details['title'].replace(' ', '_') + '.torrent' + def prepare_download(self, path: str) -> tuple[str, str]: + filename = str(self._details["title"]).replace(" ", "_") + ".torrent" filepath = os.path.join(path, filename) - url = self._details['download'] + url = str(self._details["download"]) return filepath, url diff --git a/ncoreparser/util.py b/ncoreparser/util.py index a29e31f..e27630a 100644 --- a/ncoreparser/util.py +++ b/ncoreparser/util.py @@ -1,41 +1,36 @@ import datetime import functools +from typing_extensions import Self, Callable, Any, Union from ncoreparser.error import NcoreConnectionError class Size: - unit_size = { - "B": 1, - "KiB": 1024, - "MiB": 1024**2, - "GiB": 1024**3, - "TiB": 1024**4 - } - - def __init__(self, size, unit=None): - self._unit = unit - self._size = 0 # in bytes + unit_size = {"B": 1, "KiB": 1024, "MiB": 1024**2, "GiB": 1024**3, "TiB": 1024**4} + + def __init__(self, size: Union[str, int, float], unit: Union[str, None] = None) -> None: + self._unit: Union[str, None] = unit + self._size: Union[int, float] = 0 # in bytes if isinstance(size, str): self._parse_str(size) elif isinstance(size, (int, float)): self._size = size - def _parse_str(self, size): + def _parse_str(self, size: str) -> None: size, unit = size.split(" ") self._size = float(size) * self.unit_size[unit] self._unit = unit - def __str__(self): + def __str__(self) -> str: return f"{self.size:.2f} {self.unit}" - def __repr__(self): + def __repr__(self) -> str: return f"{self.size:.2f} {self.unit}" - def _check_obj(self, obj): + def _check_obj(self, obj: object) -> None: if not isinstance(obj, Size): raise ValueError(f"Error while perform operaton with Size and {type(obj)}") - def __add__(self, obj): + def __add__(self, obj: Self) -> object: self._check_obj(obj) size = self._size + obj._size unit = self._unit @@ -45,7 +40,7 @@ def __add__(self, obj): break return Size(size, unit) - def __iadd__(self, obj): + def __iadd__(self, obj: Self) -> object: self._check_obj(obj) self._size = self._size + obj._size for unit, multiplier in self.unit_size.items(): @@ -54,44 +49,46 @@ def __iadd__(self, obj): break return self - def __eq__(self, obj): - self._check_obj(obj) + def __eq__(self, obj: object) -> bool: + if not isinstance(obj, Size): + return NotImplemented return self._size == obj._size - def __ne__(self, obj): - self._check_obj(obj) + def __ne__(self, obj: object) -> bool: + if not isinstance(obj, Size): + return NotImplemented return self._size != obj._size - def __gt__(self, obj): + def __gt__(self, obj: Self) -> bool: self._check_obj(obj) return self._size > obj._size - def __ge__(self, obj): + def __ge__(self, obj: Self) -> bool: self._check_obj(obj) return self._size >= obj._size @property - def unit(self): + def unit(self) -> Union[str, None]: return self._unit @property - def size(self): - return self._size / self.unit_size[self._unit] + def size(self) -> float: + return self._size / float(self.unit_size[str(self._unit)]) @property - def bytes(self): - return self._size + def bytes(self) -> int: + return int(self._size) -def parse_datetime(date, time): +def parse_datetime(date: str, time: str) -> datetime.datetime: return datetime.datetime.strptime(f"{date}_{time}", "%Y-%m-%d_%H:%M:%S") -def check_login(func): +def check_login(func: Callable) -> Callable: @functools.wraps(func) - def wrapper(self, *args, **kwargs): + def wrapper(self: Any, *args: Any, **kwargs: Any) -> Callable: if not self._logged_in: # pylint: disable=protected-access - raise NcoreConnectionError("Cannot login to tracker. " - f"Please use {self.login.__name__} function first.") + raise NcoreConnectionError("Cannot login to tracker. " f"Please use {self.login.__name__} function first.") return func(self, *args, **kwargs) + return wrapper diff --git a/pyproject.toml b/pyproject.toml index 18acca4..143f02e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ build-backend = "hatchling.build" [project] name = "ncoreparser" -version = "3.0.0" +version = "3.1.0" description = "Package to download from ncore.pro" authors = [ { name="Aron Radics", email="aron.radics.jozsef@gmail.com" } @@ -15,7 +15,7 @@ readme = "Readme.md" dependencies = [ "httpx>=0.26.0", ] -requires-python = ">=3.9,<3.13" +requires-python = ">=3.9" [project.urls] @@ -26,6 +26,16 @@ Repository = "https://github.com/radaron/ncoreparser.git" dev = [ "pytest", "pylint", + "black", + "mypy", "tox", "tox-gh" ] + +[tool.black] +line-length = 120 + +[tool.mypy] +disallow_untyped_calls = true +disallow_incomplete_defs = true +disallow_untyped_defs = true diff --git a/tests/manual.py b/tests/manual.py index 0596a85..b3580c6 100644 --- a/tests/manual.py +++ b/tests/manual.py @@ -7,25 +7,41 @@ def print_category(msg): print("") print("*{:175}*".format("-" * 175)) print(f"|{msg:^175}|") - print("*{:^100}*{:^30}*{:^10}*{:^10}*{:^10}*{:^10}*".format("-" * 100, "-" * 30, "-" * 10, "-" * 10, "-" * 10, "-" * 10)) + print( + "*{:^100}*{:^30}*{:^10}*{:^10}*{:^10}*{:^10}*".format( + "-" * 100, "-" * 30, "-" * 10, "-" * 10, "-" * 10, "-" * 10 + ) + ) print("|{:^100}|{:^30}|{:^10}|{:^10}|{:^10}|{:^10}|".format("Title", "Type", "Size", "ID", "Seed", "Leech")) - print("*{:^100}*{:^30}*{:^10}*{:^10}*{:^10}*{:^10}*".format("-" * 100, "-" * 30, "-" * 10, "-" * 10, "-" * 10, "-" * 10)) + print( + "*{:^100}*{:^30}*{:^10}*{:^10}*{:^10}*{:^10}*".format( + "-" * 100, "-" * 30, "-" * 10, "-" * 10, "-" * 10, "-" * 10 + ) + ) def pretty_print(torrent): - print("|{:^100}|{:^30}|{:^10}|{:^10}|{:^10}|{:^10}|".format(torrent['title'], - torrent['type'], - str(torrent['size']), - str(torrent['id']), - torrent['seed'], - torrent['leech'])) - print("*{:^100}*{:^30}*{:^10}*{:^10}*{:^10}*{:^10}*".format("-" * 100, "-" * 30, "-" * 10, "-" * 10, "-" * 10, "-" * 10)) + print( + "|{:^100}|{:^30}|{:^10}|{:^10}|{:^10}|{:^10}|".format( + torrent["title"], + torrent["type"], + str(torrent["size"]), + str(torrent["id"]), + torrent["seed"], + torrent["leech"], + ) + ) + print( + "*{:^100}*{:^30}*{:^10}*{:^10}*{:^10}*{:^10}*".format( + "-" * 100, "-" * 30, "-" * 10, "-" * 10, "-" * 10, "-" * 10 + ) + ) parser = argparse.ArgumentParser() -parser.add_argument('--user', '-u', required=True, type=str) -parser.add_argument('--passw', '-p', required=True, type=str) -parser.add_argument('--rss-feed', '-r', required=True, type=str) +parser.add_argument("--user", "-u", required=True, type=str) +parser.add_argument("--passw", "-p", required=True, type=str) +parser.add_argument("--rss-feed", "-r", required=True, type=str) args = parser.parse_args() @@ -39,14 +55,20 @@ def pretty_print(torrent): print_category("Most seeded torrents/category") for t_type in SearchParamType: - torrent = client.search(pattern="", type=t_type, number=1, - sort_by=ParamSort.SEEDERS, sort_order=ParamSeq.DECREASING)[0] + torrent = client.search( + pattern="", type=t_type, number=1, sort_by=ParamSort.SEEDERS, sort_order=ParamSeq.DECREASING + )[0] pretty_print(torrent) print("") print("Donwnload torrent") - torrent = client.search(pattern="Forrest gump", type=SearchParamType.HD_HUN, number=1, - sort_by=ParamSort.SEEDERS, sort_order=ParamSeq.DECREASING)[0] + torrent = client.search( + pattern="Forrest gump", + type=SearchParamType.HD_HUN, + number=1, + sort_by=ParamSort.SEEDERS, + sort_order=ParamSeq.DECREASING, + )[0] client.download(torrent, "/tmp", override=True) diff --git a/tests/manual_async.py b/tests/manual_async.py index 07a596b..95f6d72 100644 --- a/tests/manual_async.py +++ b/tests/manual_async.py @@ -8,25 +8,41 @@ def print_category(msg): print("") print("*{:175}*".format("-" * 175)) print(f"|{msg:^175}|") - print("*{:^100}*{:^30}*{:^10}*{:^10}*{:^10}*{:^10}*".format("-" * 100, "-" * 30, "-" * 10, "-" * 10, "-" * 10, "-" * 10)) + print( + "*{:^100}*{:^30}*{:^10}*{:^10}*{:^10}*{:^10}*".format( + "-" * 100, "-" * 30, "-" * 10, "-" * 10, "-" * 10, "-" * 10 + ) + ) print("|{:^100}|{:^30}|{:^10}|{:^10}|{:^10}|{:^10}|".format("Title", "Type", "Size", "ID", "Seed", "Leech")) - print("*{:^100}*{:^30}*{:^10}*{:^10}*{:^10}*{:^10}*".format("-" * 100, "-" * 30, "-" * 10, "-" * 10, "-" * 10, "-" * 10)) + print( + "*{:^100}*{:^30}*{:^10}*{:^10}*{:^10}*{:^10}*".format( + "-" * 100, "-" * 30, "-" * 10, "-" * 10, "-" * 10, "-" * 10 + ) + ) def pretty_print(torrent): - print("|{:^100}|{:^30}|{:^10}|{:^10}|{:^10}|{:^10}|".format(torrent['title'], - torrent['type'], - str(torrent['size']), - str(torrent['id']), - torrent['seed'], - torrent['leech'])) - print("*{:^100}*{:^30}*{:^10}*{:^10}*{:^10}*{:^10}*".format("-" * 100, "-" * 30, "-" * 10, "-" * 10, "-" * 10, "-" * 10)) + print( + "|{:^100}|{:^30}|{:^10}|{:^10}|{:^10}|{:^10}|".format( + torrent["title"], + torrent["type"], + str(torrent["size"]), + str(torrent["id"]), + torrent["seed"], + torrent["leech"], + ) + ) + print( + "*{:^100}*{:^30}*{:^10}*{:^10}*{:^10}*{:^10}*".format( + "-" * 100, "-" * 30, "-" * 10, "-" * 10, "-" * 10, "-" * 10 + ) + ) parser = argparse.ArgumentParser() -parser.add_argument('--user', '-u', required=True, type=str) -parser.add_argument('--passw', '-p', required=True, type=str) -parser.add_argument('--rss-feed', '-r', required=True, type=str) +parser.add_argument("--user", "-u", required=True, type=str) +parser.add_argument("--passw", "-p", required=True, type=str) +parser.add_argument("--rss-feed", "-r", required=True, type=str) args = parser.parse_args() @@ -39,14 +55,20 @@ async def main(): print_category("Most seeded torrents/category") for t_type in SearchParamType: - torrents = await client.search(pattern="", type=t_type, number=1, - sort_by=ParamSort.SEEDERS, sort_order=ParamSeq.DECREASING) + torrents = await client.search( + pattern="", type=t_type, number=1, sort_by=ParamSort.SEEDERS, sort_order=ParamSeq.DECREASING + ) pretty_print(torrents[0]) print("") print("Donwnload torrent") - torrents = await client.search(pattern="Forrest gump", type=SearchParamType.HD_HUN, number=1, - sort_by=ParamSort.SEEDERS, sort_order=ParamSeq.DECREASING) + torrents = await client.search( + pattern="Forrest gump", + type=SearchParamType.HD_HUN, + number=1, + sort_by=ParamSort.SEEDERS, + sort_order=ParamSeq.DECREASING, + ) await client.download(torrents[0], "/tmp", override=True) @@ -71,5 +93,6 @@ async def main(): diff = end - start print(f"\nElapsed time: {diff} sec.") + if __name__ == "__main__": asyncio.run(main()) diff --git a/tests/test_module/test.py b/tests/test_module/test.py index 5f30daf..86c8617 100644 --- a/tests/test_module/test.py +++ b/tests/test_module/test.py @@ -50,5 +50,5 @@ def test_download(self, client): for file in files: if file.endswith(".torrent"): path = os.path.join(root, file) - file_content = open(path, 'rb').read() + file_content = open(path, "rb").read() assert len(file_content) > 0 diff --git a/tests/test_unit/test_util.py b/tests/test_unit/test_util.py index c28663f..fef9be9 100644 --- a/tests/test_unit/test_util.py +++ b/tests/test_unit/test_util.py @@ -3,42 +3,36 @@ class TestSize: - @pytest.mark.parametrize("size1, size2", [("1024 MiB", "1 GiB"), - ("10 MiB", "10 MiB"), - ("2048 KiB", "2 MiB")]) + @pytest.mark.parametrize("size1, size2", [("1024 MiB", "1 GiB"), ("10 MiB", "10 MiB"), ("2048 KiB", "2 MiB")]) def test_equal(self, size1, size2): s1 = Size(size1) s2 = Size(size2) assert s1 == s2 - @pytest.mark.parametrize("size1, size2", [("1023 MiB", "1 GiB"), - ("10 MiB", "11 MiB"), - ("2049 KiB", "2 MiB")]) + @pytest.mark.parametrize("size1, size2", [("1023 MiB", "1 GiB"), ("10 MiB", "11 MiB"), ("2049 KiB", "2 MiB")]) def test_not_equal(self, size1, size2): s1 = Size(size1) s2 = Size(size2) assert s1 != s2 - @pytest.mark.parametrize("size1, size2", [("1025 MiB", "1 GiB"), - ("11 MiB", "10 MiB"), - ("2049 KiB", "2 MiB")]) + @pytest.mark.parametrize("size1, size2", [("1025 MiB", "1 GiB"), ("11 MiB", "10 MiB"), ("2049 KiB", "2 MiB")]) def test_greater_than(self, size1, size2): s1 = Size(size1) s2 = Size(size2) assert s1 > s2 - @pytest.mark.parametrize("size1, size2", [("1025 MiB", "1 GiB"), - ("10 MiB", "10 MiB"), - ("2049 KiB", "2 MiB"), - ("2048 KiB", "2 MiB")]) + @pytest.mark.parametrize( + "size1, size2", [("1025 MiB", "1 GiB"), ("10 MiB", "10 MiB"), ("2049 KiB", "2 MiB"), ("2048 KiB", "2 MiB")] + ) def test_greater_equal(self, size1, size2): s1 = Size(size1) s2 = Size(size2) assert s1 >= s2 - @pytest.mark.parametrize("size1, size2, expected", [("1024 MiB", "1 GiB", "2.00 GiB"), - ("10 MiB", "11 MiB", "21.00 MiB"), - ("2048 KiB", "2 MiB", "4.00 MiB")]) + @pytest.mark.parametrize( + "size1, size2, expected", + [("1024 MiB", "1 GiB", "2.00 GiB"), ("10 MiB", "11 MiB", "21.00 MiB"), ("2048 KiB", "2 MiB", "4.00 MiB")], + ) def test_add(self, size1, size2, expected): s = Size(size1) + Size(size2) assert str(s) == expected diff --git a/tox.ini b/tox.ini index 4e09c55..ae58186 100644 --- a/tox.ini +++ b/tox.ini @@ -4,6 +4,7 @@ envlist = py310 py311 py312 + py313 [testenv] deps = .[dev] @@ -15,3 +16,4 @@ python = 3.10 = 3.10 3.11 = 3.11 3.12 = 3.12 + 3.13 = 3.13