-
Notifications
You must be signed in to change notification settings - Fork 1
/
api.py
180 lines (139 loc) · 5.44 KB
/
api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
import abc
import asyncio
import dataclasses
import logging
import typing as tp
import justwatch
@dataclasses.dataclass
class Rating:
name: str
score: float
@classmethod
def from_json(cls, json: tp.Dict[str, tp.Any]) -> tp.Optional["Rating"]:
if "provider_type" not in json or "value" not in json:
return None
provider = json["provider_type"]
if not provider.endswith("score"):
return None
return Rating(
name=provider[: -len(":score")],
score=json["value"],
)
def __str__(self) -> str:
return f"{self.name.title()}: {self.score}"
@dataclasses.dataclass
class BaseMovie:
id: int
title: str
object_type: str
original_release_year: tp.Optional[int]
ratings: tp.List[Rating]
def __init__(self, film_json: tp.Dict[str, tp.Any]) -> None:
"""
Parses json with base film data.
:param film_json: json with 'id', 'title', 'object_type' fields present
:raise if film_json doesn't have listed fields:
"""
self.id = film_json["id"]
self.title = film_json["title"]
self.object_type = film_json["object_type"]
self.original_release_year = film_json.get("original_release_year", None)
self.ratings = []
if "scoring" in film_json and isinstance(film_json["scoring"], list):
for rating_json in film_json["scoring"]:
rating = Rating.from_json(rating_json)
if rating is not None:
self.ratings.append(rating)
@dataclasses.dataclass
class CinemaLink:
provider_id: int
url: str
def __init__(self, cinema_link_json: tp.Dict[str, tp.Any]) -> None:
self.provider_id = cinema_link_json["provider_id"]
self.url = cinema_link_json["urls"]["standard_web"]
@dataclasses.dataclass
class Movie(BaseMovie):
short_description: str
poster: str
offers: tp.List[CinemaLink]
def __init__(self, film_json: tp.Dict[str, tp.Any]) -> None:
super().__init__(film_json)
self.short_description = film_json["short_description"]
self.poster = film_json["poster"]
offers: tp.Dict[int, CinemaLink] = {}
if "offers" in film_json:
for offer_json in film_json["offers"]:
try:
cinema_link = CinemaLink(offer_json)
offers[cinema_link.provider_id] = cinema_link
except KeyError:
pass
self.offers = list(offers.values())
def get_poster_url(self) -> str:
return "https://images.justwatch.com" + self.poster.format(profile="s592")
def format_base_movie(base_movie: BaseMovie) -> str:
if base_movie.original_release_year is not None:
return f"<b>{base_movie.title}</b> ({base_movie.original_release_year})"
else:
return f"<b>{base_movie.title}</b>"
def format_description(movie: Movie) -> str:
name_line = movie.title
if movie.original_release_year is not None:
name_line += f" ({movie.original_release_year})"
return "\n".join(
(
f"<b>{name_line}</b>",
", ".join(f"{rating}" for rating in movie.ratings),
"",
f"{movie.short_description}",
f"",
)
)
class SearchMovieAPI(abc.ABC):
@abc.abstractmethod
def provider_name(self, provider_id: int) -> str:
pass
@abc.abstractmethod
def base_search(self, query: str) -> tp.AsyncIterable[BaseMovie]:
pass
@abc.abstractmethod
async def movie_details(self, movie_id: int, object_type: str) -> Movie:
pass
async def search_for_item(self, query: str) -> tp.Optional[Movie]:
async for base_result in self.base_search(query):
try:
return await self.movie_details(base_result.id, base_result.object_type)
except KeyError:
pass
else:
return None
class JustWatchSearchMovieAPI(SearchMovieAPI):
def __init__(self, country: str = "RU") -> None:
self.jw = justwatch.JustWatch(country=country)
self.providers = {provider["id"]: provider for provider in self.jw.get_providers()}
def provider_name(self, provider_id: int) -> tp.Optional[str]:
if provider_id not in self.providers:
logging.error(f"no provider with id: '{provider_id}' was found.")
logging.error(f" known providers:")
for id, provider in self.providers.items():
logging.error(f" id: '{id}', clear_name: '{provider['clear_name']}'")
return None
if "clear_name" not in self.providers[provider_id]:
return None
return self.providers[provider_id]["clear_name"]
async def base_search(self, query: str) -> tp.AsyncIterable[BaseMovie]:
results = await asyncio.get_event_loop().run_in_executor(None, lambda: self.jw.search_for_item(query=query))
if ("items" not in results) or (not results["items"]):
return
results = results["items"]
for result_json in results:
try:
yield BaseMovie(result_json)
except KeyError:
pass
async def movie_details(self, movie_id: int, object_type: str) -> Movie:
film_json = await asyncio.get_event_loop().run_in_executor(
None, lambda: self.jw.get_title(movie_id, content_type=object_type)
)
return Movie(film_json)
api = JustWatchSearchMovieAPI()