Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add type annotations to examples #823

Merged
merged 1 commit into from
Feb 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ jobs:
flake8 --exclude cast_channel_pb2.py,authority_keys_pb2.py,logging_pb2.py examples pychromecast
- name: Lint with mypy
run: |
mypy pychromecast
mypy examples pychromecast
- name: Lint with pylint
run: |
pylint examples pychromecast
6 changes: 3 additions & 3 deletions examples/custom_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
CAST_NAME = "Living Room"


def your_main_loop():
def your_main_loop() -> None:
"""
Main loop example.
Check for cast.socket_client.get_socket() and
Expand All @@ -29,7 +29,7 @@ def your_main_loop():
t = 1
cast = None

def callback(chromecast):
def callback(chromecast: pychromecast.Chromecast) -> None:
if chromecast.name == args.cast:
print("=> Discovered cast...")
chromecast.connect()
Expand Down Expand Up @@ -59,7 +59,7 @@ def callback(chromecast):
browser.stop_discovery()


def do_actions(cast, t):
def do_actions(cast: pychromecast.Chromecast, t: int) -> None:
"""Your code which is called by main loop."""
if t == 5:
print()
Expand Down
4 changes: 2 additions & 2 deletions examples/dashcast_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@
if not cast.is_idle:
print("Killing current running app")
cast.quit_app()
t = 5
while cast.status.app_id is not None and t > 0:
t = 5.0
while cast.status.app_id is not None and t > 0: # type: ignore[union-attr]
time.sleep(0.1)
t = t - 0.1

Expand Down
10 changes: 6 additions & 4 deletions examples/discovery_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
import argparse
import logging
import time
from uuid import UUID

import zeroconf

import pychromecast
from pychromecast import CastInfo

parser = argparse.ArgumentParser(
description="Example on how to receive updates on discovered chromecasts."
Expand Down Expand Up @@ -40,7 +42,7 @@
logging.getLogger("zeroconf").setLevel(logging.DEBUG)


def list_devices():
def list_devices() -> None:
"""Print a list of known devices."""
print("Currently known cast devices:")
for service in browser.services.values():
Expand All @@ -54,19 +56,19 @@ def list_devices():
class MyCastListener(pychromecast.discovery.AbstractCastListener):
"""Listener for discovering chromecasts."""

def add_cast(self, uuid, _service):
def add_cast(self, uuid: UUID, service: str) -> None:
"""Called when a new cast has beeen discovered."""
print(
f"Found cast device '{browser.services[uuid].friendly_name}' with UUID {uuid}"
)
list_devices()

def remove_cast(self, uuid, _service, cast_info):
def remove_cast(self, uuid: UUID, service: str, cast_info: CastInfo) -> None:
"""Called when a cast has beeen lost (MDNS info expired or host down)."""
print(f"Lost cast device '{cast_info.friendly_name}' with UUID {uuid}")
list_devices()

def update_cast(self, uuid, _service):
def update_cast(self, uuid: UUID, service: str) -> None:
"""Called when a cast has beeen updated (MDNS info renewed or changed)."""
print(
f"Updated cast device '{browser.services[uuid].friendly_name}' with UUID {uuid}"
Expand Down
2 changes: 1 addition & 1 deletion examples/media_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@

# Wait for player_state PLAYING
player_state = None
t = 30
t = 30.0
has_played = False
while True:
try:
Expand Down
4 changes: 2 additions & 2 deletions examples/media_example2.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@
if not cast.is_idle:
print("Killing current running app")
cast.quit_app()
t = 5
while cast.status.app_id is not None and t > 0:
t = 5.0
while cast.status.app_id is not None and t > 0: # type: ignore[union-attr]
time.sleep(0.1)
t = t - 0.1

Expand Down
12 changes: 6 additions & 6 deletions examples/multizone_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
MultizoneController,
MultiZoneControllerListener,
)
from pychromecast.socket_client import ConnectionStatusListener
from pychromecast.socket_client import ConnectionStatus, ConnectionStatusListener

# Change to the name of your Chromecast
CAST_NAME = "Whole house"
Expand Down Expand Up @@ -48,24 +48,24 @@
class MyConnectionStatusListener(ConnectionStatusListener):
"""ConnectionStatusListener"""

def __init__(self, _mz):
def __init__(self, _mz: MultizoneController):
self._mz = _mz

def new_connection_status(self, status):
def new_connection_status(self, status: ConnectionStatus) -> None:
if status.status == "CONNECTED":
self._mz.update_members()


class MyMultiZoneControllerListener(MultiZoneControllerListener):
"""MultiZoneControllerListener"""

def multizone_member_added(self, group_uuid):
def multizone_member_added(self, group_uuid: str) -> None:
print(f"New member: {group_uuid}")

def multizone_member_removed(self, group_uuid):
def multizone_member_removed(self, group_uuid: str) -> None:
print(f"Removed member: {group_uuid}")

def multizone_status_received(self):
def multizone_status_received(self) -> None:
print(f"Members: {mz.members}")


Expand Down
12 changes: 6 additions & 6 deletions examples/simple_listener_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import zeroconf

import pychromecast
from pychromecast.controllers.media import MediaStatusListener
from pychromecast.controllers.media import MediaStatus, MediaStatusListener
from pychromecast.controllers.receiver import CastStatusListener

# Change to the friendly name of your Chromecast
Expand All @@ -21,27 +21,27 @@
class MyCastStatusListener(CastStatusListener):
"""Cast status listener"""

def __init__(self, name, cast):
def __init__(self, name: str | None, cast: pychromecast.Chromecast) -> None:
self.name = name
self.cast = cast

def new_cast_status(self, status):
def new_cast_status(self, status: pychromecast.CastStatus) -> None:
print("[", time.ctime(), " - ", self.name, "] status chromecast change:")
print(status)


class MyMediaStatusListener(MediaStatusListener):
"""Status media listener"""

def __init__(self, name, cast):
def __init__(self, name: str | None, cast: pychromecast.Chromecast) -> None:
self.name = name
self.cast = cast

def new_media_status(self, status):
def new_media_status(self, status: MediaStatus) -> None:
print("[", time.ctime(), " - ", self.name, "] status media change:")
print(status)

def load_media_failed(self, item, error_code):
def load_media_failed(self, item: int, error_code: int) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does the item represent?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's to identify which item of a queue failed to play, Google calls it itemId.

We should update the docstring for MediaStatusListener and maybe also rename the parameter to queue_item or queue_item_id to make it clearer what it is.

print(
"[",
time.ctime(),
Expand Down
2 changes: 1 addition & 1 deletion examples/supla_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

result = requests.get(f"https://www.supla.fi/ohjelmat/{PROGRAM}", timeout=10)
soup = BeautifulSoup(result.content)
MEDIA_ID = soup.select('a[title*="Koko Shitti"]')[0]["href"].split("/")[-1]
MEDIA_ID = soup.select('a[title*="Koko Shitti"]')[0]["href"].split("/")[-1] # type: ignore[union-attr]
print(MEDIA_ID)


Expand Down
7 changes: 4 additions & 3 deletions examples/yleareena_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
Example on how to use the Yle Areena Controller

"""
# pylint: disable=invalid-name, import-outside-toplevel
# pylint: disable=invalid-name, import-outside-toplevel, too-many-locals

import argparse
import logging
Expand Down Expand Up @@ -46,7 +46,7 @@
logging.getLogger("zeroconf").setLevel(logging.DEBUG)


def get_kaltura_id(program_id):
def get_kaltura_id(program_id: str) -> str:
"""
Dive into the yledl internals and fetch the kaltura player id.
This can be used with Chromecast
Expand Down Expand Up @@ -74,7 +74,8 @@ def get_kaltura_id(program_id):

info = extractor.program_info_for_pid(pid, url, title_formatter, ffprobe)

return info.media_id.split("-")[-1]
kaltura_id: str = info.media_id.split("-")[-1]
return kaltura_id


chromecasts, browser = pychromecast.get_listed_chromecasts(
Expand Down
3 changes: 2 additions & 1 deletion mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ disallow_untyped_defs = true
warn_return_any = true
warn_unreachable = true
exclude = (?x)(
^pychromecast/generated/ # The protobuf autogenerated files are not annotated
^examples/plex_multi_example\.py$
| ^pychromecast/generated/ # The protobuf autogenerated files are not annotated
| ^pychromecast/controllers/plex\.py$
Comment on lines +23 to 25
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In a separate PR, the plex controller and the example will be type annotated

)