From 27262f106dc3098a0444bbc3ac039d18a083bda2 Mon Sep 17 00:00:00 2001 From: Youjung Kim <126618609+ykim-1@users.noreply.github.com> Date: Wed, 13 Mar 2024 15:22:12 -0700 Subject: [PATCH 1/2] test: remove all deprecation warnings when for e2e framework (#34) * adding ansible.cfg file for test framework * clean up config file --- ansible.cfg | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 ansible.cfg diff --git a/ansible.cfg b/ansible.cfg new file mode 100644 index 0000000..f9b8694 --- /dev/null +++ b/ansible.cfg @@ -0,0 +1,2 @@ +[defaults] +deprecation_warnings=False \ No newline at end of file From 81ecc7470f8d2f98aee84ba778abe1643c17ba57 Mon Sep 17 00:00:00 2001 From: Zhiwei Liang <121905282+zliang-akamai@users.noreply.github.com> Date: Thu, 21 Mar 2024 00:38:06 -0400 Subject: [PATCH 2/2] Implement Sync and Async Watcher (#35) * Implement the sync and async watcher * Add examples of the async client and watcher README * Make default_poll_interval as an optional param in get_watcher * simplified example * Remove unnecessary client closures in watchers * Add return type hint for `get_watcher` function Co-authored-by: Lena Garber <114949949+lgarber-akamai@users.noreply.github.com> * Add return type docstring for get_watcher func --------- Co-authored-by: Lena Garber <114949949+lgarber-akamai@users.noreply.github.com> --- README.md | 56 ++- linode_metadata/__init__.py | 3 +- linode_metadata/constants.py | 5 + linode_metadata/metadata_client.py | 152 ++++++-- linode_metadata/objects/response_base.py | 4 +- linode_metadata/watcher.py | 451 +++++++++++++++++++++++ test/integration/conftest.py | 29 +- test/integration/helpers.py | 64 ++++ test/integration/test_instance.py | 25 +- test/integration/test_network.py | 30 +- test/integration/test_ssh.py | 29 +- test/integration/test_token.py | 20 +- test/integration/test_watcher.py | 42 +++ 13 files changed, 775 insertions(+), 135 deletions(-) create mode 100644 linode_metadata/constants.py create mode 100644 linode_metadata/watcher.py create mode 100644 test/integration/helpers.py create mode 100644 test/integration/test_watcher.py diff --git a/README.md b/README.md index 6e2ef9a..6ff0f03 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ This package allows Python projects to easily interact with the [Linode Metadata ## Getting Started -### Prerequisites +### Prerequisites - Python >= 3.8 - A running [Linode Instance](https://www.linode.com/docs/api/linode-instances/) @@ -16,24 +16,24 @@ pip install linode_metadata ``` ### Building from Source + To build and install this package: - Clone this repository - `make install` -### Basic Example +## Examples -The following sample shows a simple Python project that initializes a new metadata client and retrieves various information -about the current Linode. +The following code snippets show multiple different ways to use the metadata +client and retrieves various metadata of the current Linode. +### Basic Usage ```python from linode_metadata import MetadataClient client = MetadataClient() -# All of these responses are handled as DataClasses, -# allowing IDEs to properly use completions. instance_info = client.get_instance() network_info = client.get_network() ssh_info = client.get_ssh_keys() @@ -45,15 +45,49 @@ print("SSH Keys:", "; ".join(ssh_info.users.root)) print("User Data:", user_data) ``` +### Asynchronous I/O and Context Manager Support + +You can also use the context manager to ensure the HTTP client will be properly closed, and the +`asyncio` enabled client is also available. + +```python +import asyncio +from linode_metadata import AsyncMetadataClient + +async def get_metadata(): + with AsyncMetadataClient() as client: + instance_info = await client.get_instance() + print("Instance ID:", instance_info.id) + +asyncio.run(get_metadata()) +``` + +### Watchers + +Watchers are useful for monitor changes in the metadata, e.g. newly added IP address to the Linode. + +```python +import asyncio +from linode_metadata import AsyncMetadataClient + +async def get_metadata(): + async with AsyncMetadataClient() as client: + watcher = client.get_watcher() + async for new_network_info in watcher.watch_network(): + print(new_network_info) + +asyncio.run(get_metadata()) +``` + ## Testing -Before running tests on this project, please ensure you have a +Before running tests on this project, please ensure you have a [Linode Personal Access Token](https://www.linode.com/docs/products/tools/api/guides/manage-api-tokens/) exported under the `LINODE_TOKEN` environment variable. ### End-to-End Testing Using Ansible -This project contains an Ansible playbook to automatically deploy the necessary infrastructure +This project contains an Ansible playbook to automatically deploy the necessary infrastructure and run end-to-end tests on it. To install the dependencies for this playbook, ensure you have Python 3 installed and run the following: @@ -68,20 +102,20 @@ After all dependencies have been installed, you can run the end-to-end test suit make e2e ``` -If your local SSH public key is stored in a location other than `~/.ssh/id_rsa.pub`, +If your local SSH public key is stored in a location other than `~/.ssh/id_rsa.pub`, you may need to override the `TEST_PUBKEY` argument: ```bash make TEST_PUBKEY=/path/to/my/pubkey e2e ``` -**NOTE: To speed up subsequent test runs, the infrastructure provisioned for testing will persist after the test run is complete. +**NOTE: To speed up subsequent test runs, the infrastructure provisioned for testing will persist after the test run is complete. This infrastructure is safe to manually remove.** ### Manual End-to-End Testing End-to-end tests can also be manually run using the `make e2e-local` target. -This test suite is expected to run from within a Linode instance and will likely +This test suite is expected to run from within a Linode instance and will likely fail in other environments. ## License diff --git a/linode_metadata/__init__.py b/linode_metadata/__init__.py index 18b3d0b..7d53ef6 100644 --- a/linode_metadata/__init__.py +++ b/linode_metadata/__init__.py @@ -2,5 +2,6 @@ Initializes objects for Metadata Client """ -from linode_metadata.metadata_client import MetadataAsyncClient, MetadataClient +from linode_metadata.metadata_client import AsyncMetadataClient, MetadataClient from linode_metadata.objects import * +from linode_metadata.watcher import AsyncMetadataWatcher, MetadataWatcher diff --git a/linode_metadata/constants.py b/linode_metadata/constants.py new file mode 100644 index 0000000..7f30b62 --- /dev/null +++ b/linode_metadata/constants.py @@ -0,0 +1,5 @@ +""" +Constants values for the linode_metadata package. +""" + +LOGGER_NAME = "linode_metadata" diff --git a/linode_metadata/metadata_client.py b/linode_metadata/metadata_client.py index 1fd47d8..da224c3 100644 --- a/linode_metadata/metadata_client.py +++ b/linode_metadata/metadata_client.py @@ -8,44 +8,55 @@ import base64 import json import logging +from abc import ABC from collections.abc import Awaitable from datetime import datetime, timedelta from importlib.metadata import version +from pathlib import Path from types import TracebackType from typing import Any, Callable, Optional, Type, Union import httpx from httpx import Response, TimeoutException +from linode_metadata.constants import LOGGER_NAME from linode_metadata.objects.error import ApiError, ApiTimeoutError from linode_metadata.objects.instance import InstanceResponse from linode_metadata.objects.networking import NetworkResponse from linode_metadata.objects.ssh_keys import SSHKeysResponse from linode_metadata.objects.token import MetadataToken +from linode_metadata.watcher import AsyncMetadataWatcher, MetadataWatcher BASE_URL = "http://169.254.169.254/v1" -DEFAULT_API_TIMEOUT = 10 +DEFAULT_API_TIMEOUT = 10.0 -class BaseMetadataClient: +class BaseMetadataClient(ABC): """ The base client of Linode Metadata Service that holds shared components - between MetadataClient and MetadataAsyncClient. + between MetadataClient and AsyncMetadataClient. """ + watcher: Optional[Union[AsyncMetadataWatcher, MetadataWatcher]] + _watcher_cls: Type[Union[AsyncMetadataWatcher, MetadataWatcher]] + _managed_token_expiry: Optional[datetime] + def __init__( self, - base_url=BASE_URL, - user_agent=None, - token=None, - timeout=DEFAULT_API_TIMEOUT, - managed_token=True, - managed_token_expiry_seconds=3600, - debug=False, - debug_file=None, + client: Union[httpx.AsyncClient, httpx.Client], + base_url: str = BASE_URL, + user_agent: Optional[str] = None, + token: Optional[str] = None, + timeout: float = DEFAULT_API_TIMEOUT, + managed_token: bool = True, + managed_token_expiry_seconds: float = 3600.0, + debug: bool = False, + debug_file: Optional[Union[Path, str]] = None, ): """ - The main interface to the Linode Metadata Service. + The constructor of the base client of Linode Metadata Service. This + client should not be used directly, please use MetadataClient or + AsyncMetadataClient instead. :param base_url: The base URL for Metadata API requests. Generally, you shouldn't change this. @@ -59,6 +70,8 @@ def __init__( :param token: An existing token to use with this client. This field cannot be specified when token management is enabled. :type token: Optional[str] + :param timeout: Timeout of an API call in seconds. + :type timeout: float :param managed_token: If true, the token for this client will be automatically generated and refreshed. :type managed_token: bool @@ -82,9 +95,9 @@ def __init__( self.timeout = timeout self._debug = debug if debug: - self._logger = logging.getLogger("linode_metadata") + self._logger = logging.getLogger(LOGGER_NAME) self._logger.setLevel(logging.DEBUG) - handler = ( + handler: logging.Handler = ( logging.FileHandler(debug_file) if debug_file else logging.StreamHandler() @@ -97,7 +110,7 @@ def __init__( self._logger.addHandler(handler) self._token = token - self.client = None + self.client = client self._managed_token = managed_token self._managed_token_expiry_seconds = managed_token_expiry_seconds @@ -217,7 +230,34 @@ def _user_agent(self): f"linode-py-metadata/{version('linode_metadata')}" ).strip() - def _log_request_debug_info(self, request_params): + def get_watcher( + self, + default_poll_interval: Optional[Union[timedelta, float, int]] = None, + ) -> Union[AsyncMetadataWatcher, MetadataWatcher]: + """ + Get a watcher instance with this metadata client. + + :param default_poll_interval: The default time interval for polling Linode + metadata services. + :type default_poll_interval: Optional[Union[timedelta, float, int]] + :return: A watcher instance. + :rtype: Union[AsyncMetadataWatcher, MetadataWatcher] + """ + if default_poll_interval is None: + default_poll_interval = timedelta(minutes=1) + + if not hasattr(self, "watcher") or self.watcher is None: + self.watcher = self._watcher_cls( + client=self, # type: ignore + default_poll_interval=default_poll_interval, + debug=self._debug, + ) + else: + self.watcher.set_default_poll_interval(default_poll_interval) + + return self.watcher + + def _log_request_debug_info(self, request_params: dict): """ Logging debug info for an HTTP request """ @@ -230,7 +270,7 @@ def _log_request_debug_info(self, request_params): self._logger.debug("> ") - def _log_response_debug_info(self, response): + def _log_response_debug_info(self, response: Response): """ Logging debug info for a response from requests """ @@ -251,8 +291,13 @@ class MetadataClient(BaseMetadataClient): The main sync client of the Linode Metadata Service. """ + watcher: Optional[MetadataWatcher] + _watcher_cls: Type[MetadataWatcher] + client: httpx.Client + def __init__( self, + client: Optional[httpx.Client] = None, base_url=BASE_URL, user_agent=None, token=None, @@ -263,7 +308,7 @@ def __init__( debug_file=None, ): """ - The main interface to the Linode Metadata Service. + The constructor of the client of Linode Metadata Service. :param base_url: The base URL for Metadata API requests. Generally, you shouldn't change this. @@ -277,6 +322,8 @@ def __init__( :param token: An existing token to use with this client. This field cannot be specified when token management is enabled. :type token: Optional[str] + :param timeout: Timeout of an API call in seconds. + :type timeout: float :param managed_token: If true, the token for this client will be automatically generated and refreshed. :type managed_token: bool @@ -290,7 +337,13 @@ def __init__( :type debug_file: str """ + if not client: + client = httpx.Client() + + self._watcher_cls = MetadataWatcher + super().__init__( + client=client, base_url=base_url, user_agent=user_agent, token=token, @@ -300,7 +353,6 @@ def __init__( debug=debug, debug_file=debug_file, ) - self.client = httpx.Client() def check_connection(self): """ @@ -322,7 +374,11 @@ def _validate_token(self): # We should implicitly refresh the token if the user is enrolled in # token management and the token has expired. if self._managed_token and ( - self._token is None or datetime.now() >= self._managed_token_expiry + self._token is None + or ( + self._managed_token_expiry + and datetime.now() >= self._managed_token_expiry + ) ): self.refresh_token( expiry_seconds=self._managed_token_expiry_seconds @@ -439,6 +495,12 @@ def get_ssh_keys(self) -> SSHKeysResponse: response = self._api_call("GET", "/ssh-keys") return SSHKeysResponse(json_data=response) + def close(self): + """ + Close the embedded HTTP client in this metadata client. + """ + self.client.close() + def __enter__(self) -> MetadataClient: self.client.__enter__() if self._managed_token: @@ -455,26 +517,31 @@ def __exit__( self.client.__exit__(exc_type, exc_value, traceback) -class MetadataAsyncClient(BaseMetadataClient): +class AsyncMetadataClient(BaseMetadataClient): """ The main async client of the Linode Metadata Service. """ + watcher: Optional[AsyncMetadataWatcher] + _watcher_cls: Type[AsyncMetadataWatcher] + client: httpx.AsyncClient + def __init__( self, - base_url=BASE_URL, - user_agent=None, - token=None, - timeout=DEFAULT_API_TIMEOUT, - managed_token=True, - managed_token_expiry_seconds=3600, - debug=False, - debug_file=None, + client: Optional[httpx.AsyncClient] = None, + base_url: str = BASE_URL, + user_agent: Optional[str] = None, + token: Optional[str] = None, + timeout: float = DEFAULT_API_TIMEOUT, + managed_token: bool = True, + managed_token_expiry_seconds: float = 3600.0, + debug: bool = False, + debug_file: Optional[Union[Path, str]] = None, ): """ - The main interface to the Linode Metadata Service. + The constructor of the async client of Linode Metadata Service. - :param base_url: The base URL for Metadata API requests. Generally, you shouldn't + :param base_url: The base URL for Metadata API requests. Generally, you shouldn't change this. :type base_url: str :param user_agent: What to append to the User Agent of all requests made @@ -486,6 +553,8 @@ def __init__( :param token: An existing token to use with this client. This field cannot be specified when token management is enabled. :type token: Optional[str] + :param timeout: Timeout of an API call in seconds. + :type timeout: float :param managed_token: If true, the token for this client will be automatically generated and refreshed. :type managed_token: bool @@ -499,7 +568,13 @@ def __init__( :type debug_file: str """ + if not client: + client = httpx.AsyncClient() + + self._watcher_cls = AsyncMetadataWatcher + super().__init__( + client=client, base_url=base_url, user_agent=user_agent, token=token, @@ -509,7 +584,6 @@ def __init__( debug=debug, debug_file=debug_file, ) - self.client = httpx.AsyncClient() async def check_connection(self) -> None: """ @@ -531,7 +605,11 @@ async def _validate_token(self) -> None: # We should implicitly refresh the token if the user is enrolled in # token management and the token has expired. if self._managed_token and ( - self._token is None or datetime.now() >= self._managed_token_expiry + self._token is None + or ( + self._managed_token_expiry + and datetime.now() >= self._managed_token_expiry + ) ): await self.refresh_token( expiry_seconds=self._managed_token_expiry_seconds @@ -648,7 +726,13 @@ async def get_ssh_keys(self) -> SSHKeysResponse: response = await self._api_call("GET", "/ssh-keys") return SSHKeysResponse(json_data=response) - async def __aenter__(self) -> MetadataAsyncClient: + async def close(self): + """ + Close the embedded HTTP client in this metadata client. + """ + self.client.aclose() + + async def __aenter__(self) -> AsyncMetadataClient: await self.client.__aenter__() if self._managed_token: await self.refresh_token() diff --git a/linode_metadata/objects/response_base.py b/linode_metadata/objects/response_base.py index e7da0c1..469bf8b 100644 --- a/linode_metadata/objects/response_base.py +++ b/linode_metadata/objects/response_base.py @@ -5,7 +5,7 @@ import dataclasses from dataclasses import dataclass -from typing import Any, Dict +from typing import Any, Dict, Optional @dataclass(init=False) @@ -16,7 +16,7 @@ class ResponseBase: :type json_data: Dict[str, Any] """ - def __init__(self, json_data: Dict[str, Any] = None): + def __init__(self, json_data: Optional[Dict[str, Any]] = None): if json_data is not None: self.populate(json_data) diff --git a/linode_metadata/watcher.py b/linode_metadata/watcher.py new file mode 100644 index 0000000..166fff7 --- /dev/null +++ b/linode_metadata/watcher.py @@ -0,0 +1,451 @@ +from __future__ import annotations + +import asyncio +import logging +import time +import traceback +from abc import ABC, abstractmethod +from collections.abc import AsyncGenerator, Awaitable, Callable, Generator +from dataclasses import asdict +from datetime import timedelta +from logging import Logger +from typing import TYPE_CHECKING, Optional, Union + +from linode_metadata.constants import LOGGER_NAME + +if TYPE_CHECKING: + from linode_metadata.metadata_client import ( + AsyncMetadataClient, + MetadataClient, + ) + +from linode_metadata.objects import InstanceResponse, NetworkResponse +from linode_metadata.objects.ssh_keys import SSHKeysResponse + +DEFAULT_POLL_INTERVAL = timedelta(minutes=1) + + +class BaseMetadataWatcher(ABC): + + _logger: Logger + + def __init__( + self, + default_poll_interval: Union[ + timedelta, float, int + ] = DEFAULT_POLL_INTERVAL, + debug: bool = False, + ): + """ + The constructor of the base metadata watcher. This should not be used + directly by the end users. + + :param default_poll_interval: The default time interval for polling Linode + metadata services. Defaults to 1 minute. + :type default_poll_interval: Optional[Union[timedelta, float, int]] + :param debug: Enables debug mode if set to True. + :type debug: bool + """ + self.set_default_poll_interval(default_poll_interval) + + self._logger = logging.getLogger(LOGGER_NAME) + self.debug = debug + + @abstractmethod + def watch_network( + self, + poll_interval: Optional[Union[timedelta, float, int]], + ignore_error: bool = False, + ): + pass + + @abstractmethod + def watch_instance( + self, + poll_interval: Optional[Union[timedelta, float, int]], + ignore_error: bool = False, + ): # pylint: disable-all + pass + + @abstractmethod + def watch_ssh_keys( + self, + poll_interval: Optional[Union[timedelta, float, int]], + ignore_error: bool = False, + ): + pass + + @abstractmethod + def poll( + self, + poller: Callable, + poll_interval: Optional[Union[timedelta, float, int]] = None, + ignore_error: bool = True, + ): + pass + + @staticmethod + def normalize_poll_interval( + poll_interval: Union[timedelta, float, int] + ) -> timedelta: + """ + Normalize poll_interval to be an instance of datetime.timedelta. + float and int will be considered as the number of seconds. + + :param poll_interval: The input poll_interval in various types. + :type poll_interval: Union[timedelta, float, int] + :return: normalized poll_interval in timedelta type. + :rtype: timedelta + """ + if not isinstance(poll_interval, timedelta): + poll_interval = timedelta(seconds=poll_interval) + return poll_interval + + @property + def default_poll_interval(self): + """ + Get the default_poll_interval in this watcher. + """ + return self._default_poll_interval + + def set_default_poll_interval(self, interval): + """ + Set the default_poll_interval to this watcher. + + :param interval: The input poll_interval in various types. + :type interval: Union[timedelta, float, int] + """ + self._default_poll_interval = self.normalize_poll_interval(interval) + + def get_poll_interval( + self, poll_interval: Optional[Union[timedelta, float, int]] + ) -> timedelta: + """ + Get the poll_interval in timedelta type. If a None is passed in, + the default_poll_interval of the watcher will be returned. + + :param poll_interval: The input poll_interval in various types. + :type poll_interval: Union[timedelta, float, int] + :return: the normalized poll_interval in timedelta type. + :rtype: timedelta + """ + if poll_interval is None: + poll_interval = self.default_poll_interval + + poll_interval = self.normalize_poll_interval(poll_interval) + return poll_interval + + +class MetadataWatcher(BaseMetadataWatcher): + client: MetadataClient + + def __init__( + self, + client: MetadataClient, + default_poll_interval: Union[ + timedelta, float, int + ] = DEFAULT_POLL_INTERVAL, + debug: bool = False, + ): + """ + The constructor of the metadata watcher. + + :param client: The metadata client object. + :type client: MetadataClient + :param default_poll_interval: The default time interval for polling Linode + metadata services. Defaults to 1 minute. + :type default_poll_interval: Optional[Union[timedelta, float, int]] + :param debug: Enables debug mode if set to True. + :type debug: bool + """ + self.client = client + super().__init__(default_poll_interval, debug) + + def watch_network( + self, + poll_interval: Optional[Union[timedelta, float, int]] = None, + ignore_error: bool = False, + ) -> Generator[NetworkResponse, None, None]: + """ + Watches the network changes. The new networking information will be + yielded at the beginning of the iterating or when there is a change + happened to the networking environment. + + :param poll_interval: The time interval between two polls of the networking + info endpoint of the Linode metadata services. + Defaults to default_poll_interval setting of the + watcher instance. + :type poll_interval: Optional[Union[timedelta, float, int]] + :param ignore_error: Whether to ignore the exception happen during the + call to the metadata service. If it is set to + True, it will print the exception with traceback + when it occurs, and if set to False, it will raise + the exception instead. Default to False. + :type ignore_error: bool + :return: A generator that yields next available networking info. + :rtype: Generator[NetworkResponse, None, None] + """ + yield from self.poll( + self.client.get_network, poll_interval, ignore_error + ) + + def watch_instance( + self, + poll_interval: Optional[Union[timedelta, float, int]] = None, + ignore_error: bool = False, + ) -> Generator[InstanceResponse, None, None]: + """ + Watches the instance changes. The new instance information will be + yielded at the beginning of the iterating or when there is a change + happened to the instance. + + :param poll_interval: The default time interval for polling the instance + info endpoint of the Linode metadata services. + Defaults to default_poll_interval setting of the + watcher instance. + :type poll_interval: Optional[Union[timedelta, float, int]] + :param ignore_error: Whether to ignore the exception happen during the + call to the metadata service. If it is set to + True, it will print the exception with traceback + when it occurs, and if set to False, it will raise + the exception instead. Default to False. + :type ignore_error: bool + :return: A generator that yields next available instance info. + :rtype: Generator[InstanceResponse, None, None] + """ + yield from self.poll( + self.client.get_instance, poll_interval, ignore_error + ) + + def watch_ssh_keys( + self, + poll_interval: Optional[Union[timedelta, float, int]] = None, + ignore_error: bool = False, + ) -> Generator[SSHKeysResponse, None, None]: + """ + Watches the ssh keys changes. The new ssh keys information will be + yielded at the beginning of the iterating or when there is a change + happened to the ssh keys. + + :param poll_interval: The default time interval for polling the ssh keys + info endpoint of the Linode metadata services. + Defaults to default_poll_interval setting of the + watcher instance. + :type poll_interval: Optional[Union[timedelta, float, int]] + :param ignore_error: Whether to ignore the exception happen during the + call to the metadata service. If it is set to + True, it will print the exception with traceback + when it occurs, and if set to False, it will raise + the exception instead. Default to False. + :type ignore_error: bool + :return: A generator that yields next available ssh keys info. + :rtype: Generator[SSHKeysResponse, None, None] + """ + yield from self.poll( + self.client.get_ssh_keys, poll_interval, ignore_error + ) + + def poll( + self, + poller: Callable[ + [], Union[NetworkResponse, InstanceResponse, SSHKeysResponse] + ], + poll_interval: Optional[Union[timedelta, float, int]] = None, + ignore_error: bool = False, + ): + """ + Continuously polling from Linode metadata services via the callable and + yields a response when it differs from the last response. + + :param poller: The callable that polls from Linode metadata services. + :type poller: Callable + :param poll_interval: The default time interval between polls of the + linode metadata. + Defaults to default_poll_interval setting of the + watcher. + :type poll_interval: Optional[Union[timedelta, float, int]] + :param ignore_error: Whether to ignore the exception happen during the + call to the metadata service. If it is set to + True, it will print the exception with traceback + when it occurs, and if set to False, it will raise + the exception instead. Default to False. + :type ignore_error: bool + :return: A generator that yields next available response from linode + metadata service. + :rtype: Generator + """ + poll_interval = self.get_poll_interval(poll_interval) + last_result = None + while True: + try: + result = poller() + if last_result is None or asdict(result) != asdict(last_result): + last_result = result + yield result + except Exception as e: + if ignore_error: + traceback.print_exc() + else: + raise RuntimeError( + "Failed to poll from Linode Metadata API in the watcher" + ) from e + time.sleep(poll_interval.seconds) + + +class AsyncMetadataWatcher(BaseMetadataWatcher): + client: AsyncMetadataClient + + def __init__( + self, + client: AsyncMetadataClient, + default_poll_interval: Union[ + timedelta, float, int + ] = DEFAULT_POLL_INTERVAL, + debug: bool = False, + ): + self.client = client + super().__init__(default_poll_interval, debug) + + async def watch_network( + self, + poll_interval: Optional[Union[timedelta, float, int]] = None, + ignore_error: bool = False, + ) -> AsyncGenerator[NetworkResponse, None]: + """ + Watches the network changes. The new networking information will be + yielded asynchronously at the beginning of the iterating or when there + is a change happened to the networking environment. + + :param poll_interval: The time interval between two polls of the networking + info endpoint of the Linode metadata services. + Defaults to default_poll_interval setting of the + watcher instance. + :type poll_interval: Optional[Union[timedelta, float, int]] + :param ignore_error: Whether to ignore the exception happen during the + call to the metadata service. If it is set to + True, it will print the exception with traceback + when it occurs, and if set to False, it will raise + the exception instead. Default to False. + :type ignore_error: bool + :return: A generator that asynchronously yields next available networking info. + :rtype: AsyncGenerator[NetworkResponse, None] + """ + + async for response in self.poll( + self.client.get_network, poll_interval, ignore_error + ): + yield response + + async def watch_instance( + self, + poll_interval: Optional[Union[timedelta, float, int]] = None, + ignore_error: bool = False, + ) -> AsyncGenerator[InstanceResponse, None]: + """ + Watches the instance changes. The new instance information will be + yielded asynchronously at the beginning of the iterating or when there + is a change happened to the instance. + + :param poll_interval: The default time interval for polling the instance + info endpoint of the Linode metadata services. + Defaults to default_poll_interval setting of the + watcher instance. + :type poll_interval: Optional[Union[timedelta, float, int]] + :param ignore_error: Whether to ignore the exception happen during the + call to the metadata service. If it is set to + True, it will print the exception with traceback + when it occurs, and if set to False, it will raise + the exception instead. Default to False. + :type ignore_error: bool + :return: A generator that asynchronously yields next available instance info. + :rtype: AsyncGenerator[InstanceResponse, None] + """ + async for response in self.poll( + self.client.get_instance, poll_interval, ignore_error + ): + yield response + + async def watch_ssh_keys( + self, + poll_interval: Optional[Union[timedelta, float, int]] = None, + ignore_error: bool = False, + ) -> AsyncGenerator[SSHKeysResponse, None]: + """ + Watches the ssh keys changes. The new ssh keys information will be + yielded asynchronously at the beginning of the iterating or when there + is a change happened to the ssh keys. + + :param poll_interval: The default time interval for polling the ssh keys + info endpoint of the Linode metadata services. + Defaults to default_poll_interval setting of the + watcher instance. + :type poll_interval: Optional[Union[timedelta, float, int]] + :param ignore_error: Whether to ignore the exception happen during the + call to the metadata service. If it is set to + True, it will print the exception with traceback + when it occurs, and if set to False, it will raise + the exception instead. Default to False. + :type ignore_error: bool + :return: A generator that asynchronously yields next available ssh keys info. + :rtype: AsyncGenerator[SSHKeysResponse, None] + """ + async for response in self.poll( + self.client.get_ssh_keys, + poll_interval, + ignore_error, + ): + yield response + + async def poll( + self, + poller: Callable[ + [], + Union[ + Awaitable[NetworkResponse], + Awaitable[InstanceResponse], + Awaitable[SSHKeysResponse], + ], + ], + poll_interval: Optional[Union[timedelta, float, int]] = None, + ignore_error: bool = False, + ): + """ + Continuously and asynchronously polling from Linode metadata services + via the provided callable and yields a response when it differs from + the last response. + + :param poller: The callable that polls from Linode metadata services. + :type poller: Callable + :param poll_interval: The default time interval between polls of the + linode metadata. + Defaults to default_poll_interval setting of the + watcher. + :type poll_interval: Optional[Union[timedelta, float, int]] + :param ignore_error: Whether to ignore the exception happen during the + call to the metadata service. If it is set to + True, it will print the exception with traceback + when it occurs, and if set to False, it will raise + the exception instead. Default to False. + :type ignore_error: bool + :return: A generator that yields next available response from linode + metadata service. + :rtype: AsyncGenerator + """ + poll_interval = self.get_poll_interval(poll_interval) + last_result = None + while True: + try: + result = await poller() + if last_result is None or asdict(result) != asdict(last_result): + last_result = result + yield result + + except Exception as e: + if ignore_error: + traceback.print_exc() + else: + raise RuntimeError( + "Failed to poll from Linode Metadata API in the watcher" + ) from e + + await asyncio.sleep(poll_interval.seconds) diff --git a/test/integration/conftest.py b/test/integration/conftest.py index 034133c..053b45b 100644 --- a/test/integration/conftest.py +++ b/test/integration/conftest.py @@ -1,7 +1,10 @@ +import time +from datetime import timedelta + import pytest import pytest_asyncio -from linode_metadata import MetadataAsyncClient, MetadataClient +from linode_metadata import AsyncMetadataClient, MetadataClient @pytest.fixture(scope="session") @@ -10,9 +13,9 @@ def client(): yield client -@pytest_asyncio.fixture() +@pytest_asyncio.fixture(scope="session") async def async_client(): - async with MetadataAsyncClient() as async_client: + async with AsyncMetadataClient() as async_client: yield async_client @@ -22,7 +25,23 @@ def client_unmanaged(): yield client -@pytest_asyncio.fixture() +@pytest_asyncio.fixture(scope="session") async def async_client_unmanaged(): - async with MetadataAsyncClient(managed_token=False) as async_client: + async with AsyncMetadataClient(managed_token=False) as async_client: yield async_client + + +@pytest.fixture(scope="session") +def watcher(client: MetadataClient): + return client.get_watcher(timedelta(minutes=1)) + + +@pytest_asyncio.fixture(scope="session") +async def async_watcher(async_client: AsyncMetadataClient): + return async_client.get_watcher(timedelta(minutes=1)) + + +# Slow down tests to prevent 429 Too Many Requests errors +@pytest.fixture(autouse=True) +def slow_down_tests(): + time.sleep(5) diff --git a/test/integration/helpers.py b/test/integration/helpers.py new file mode 100644 index 0000000..72ba6fe --- /dev/null +++ b/test/integration/helpers.py @@ -0,0 +1,64 @@ +import re + +import pytest + +from linode_metadata.objects.instance import InstanceResponse +from linode_metadata.objects.networking import ( + IPv4Networking, + IPv6Networking, + NetworkResponse, +) +from linode_metadata.objects.ssh_keys import SSHKeysResponse + + +def inspect_instance_response(instance: InstanceResponse): + assert isinstance(instance.id, int) + assert re.match(r"^[A-Za-z\-0-9]+$", instance.label) + assert re.match(r"^[a-z\-]+$", instance.region) + assert re.match(r"^[a-z\d\-]+$", instance.type) + assert isinstance(instance.specs.vcpus, int) + assert isinstance(instance.specs.memory, int) + assert isinstance(instance.specs.gpus, int) + assert isinstance(instance.specs.transfer, int) + assert isinstance(instance.specs.disk, int) + assert isinstance(instance.backups.enabled, bool) + assert instance.backups.status is None + assert re.match(r"^[a-f\d]+$", instance.host_uuid) + assert isinstance(instance.tags, list) + + +def inspect_network_response(network: NetworkResponse): + + assert isinstance(network.interfaces, list) + assert isinstance(network.ipv4, IPv4Networking) + assert isinstance(network.ipv6, IPv6Networking) + + ipv4_pattern = re.compile(r"^\d{1,3}(\.\d{1,3}){3}/\d+$") + ipv6_pattern = re.compile(r"^[0-9a-fA-F:]+/\d+$") + + for ip in network.ipv4.public: + assert ipv4_pattern.match(ip) + + for ip in [network.ipv6.slaac, network.ipv6.link_local]: + if ip is not None: + assert ipv6_pattern.match(ip) + + +def inspect_ssh_keys_response(ssh_keys: SSHKeysResponse): + # In some cases we may not have an authorized key + # configured for root + if len(ssh_keys.users) < 1: + pytest.skip( + "The current instance does not have any any SSH keys configured, skipping..." + ) + + ssh_key_pattern = re.compile( + r"^ssh-(?:rsa|ed25519|ecdsa|dss)\s[A-Za-z0-9+/]+[=]*\s\S*$" + ) + + for name, user in ssh_keys.users.items(): + assert isinstance(name, str) + + for key in user: + assert isinstance(key, str) + assert ssh_key_pattern.match(key) diff --git a/test/integration/test_instance.py b/test/integration/test_instance.py index 91d7a30..f5b3cf4 100644 --- a/test/integration/test_instance.py +++ b/test/integration/test_instance.py @@ -1,9 +1,8 @@ -import re +from test.integration.helpers import inspect_instance_response import pytest -from linode_metadata import MetadataAsyncClient, MetadataClient -from linode_metadata.objects.instance import InstanceResponse +from linode_metadata import AsyncMetadataClient, MetadataClient def test_get_instance_info(client: MetadataClient): @@ -11,23 +10,7 @@ def test_get_instance_info(client: MetadataClient): inspect_instance_response(instance) -@pytest.mark.asyncio -async def test_async_get_instance_info(async_client: MetadataAsyncClient): +@pytest.mark.asyncio(scope="session") +async def test_async_get_instance_info(async_client: AsyncMetadataClient): instance = await async_client.get_instance() inspect_instance_response(instance) - - -def inspect_instance_response(instance: InstanceResponse): - assert isinstance(instance.id, int) - assert re.match(r"^[A-Za-z\-0-9]+$", instance.label) - assert re.match(r"^[a-z\-]+$", instance.region) - assert re.match(r"^[a-z\d\-]+$", instance.type) - assert isinstance(instance.specs.vcpus, int) - assert isinstance(instance.specs.memory, int) - assert isinstance(instance.specs.gpus, int) - assert isinstance(instance.specs.transfer, int) - assert isinstance(instance.specs.disk, int) - assert isinstance(instance.backups.enabled, bool) - assert instance.backups.status is None - assert re.match(r"^[a-f\d]+$", instance.host_uuid) - assert isinstance(instance.tags, list) diff --git a/test/integration/test_network.py b/test/integration/test_network.py index e4657bc..b457490 100644 --- a/test/integration/test_network.py +++ b/test/integration/test_network.py @@ -1,13 +1,8 @@ -import re +from test.integration.helpers import inspect_network_response import pytest -from linode_metadata import MetadataAsyncClient, MetadataClient -from linode_metadata.objects.networking import ( - IPv4Networking, - IPv6Networking, - NetworkResponse, -) +from linode_metadata import AsyncMetadataClient, MetadataClient def test_get_network_info(client: MetadataClient): @@ -15,24 +10,7 @@ def test_get_network_info(client: MetadataClient): inspect_network_response(network) -@pytest.mark.asyncio -async def test_async_get_network_info(async_client: MetadataAsyncClient): +@pytest.mark.asyncio(scope="session") +async def test_async_get_network_info(async_client: AsyncMetadataClient): network = await async_client.get_network() inspect_network_response(network) - - -def inspect_network_response(network: NetworkResponse): - - assert isinstance(network.interfaces, list) - assert isinstance(network.ipv4, IPv4Networking) - assert isinstance(network.ipv6, IPv6Networking) - - ipv4_pattern = re.compile(r"^\d{1,3}(\.\d{1,3}){3}/\d+$") - ipv6_pattern = re.compile(r"^[0-9a-fA-F:]+/\d+$") - - for ip in network.ipv4.public: - assert ipv4_pattern.match(ip) - - for ip in [network.ipv6.slaac, network.ipv6.link_local]: - if ip is not None: - assert ipv6_pattern.match(ip) diff --git a/test/integration/test_ssh.py b/test/integration/test_ssh.py index 27013c1..efb3d8e 100644 --- a/test/integration/test_ssh.py +++ b/test/integration/test_ssh.py @@ -1,9 +1,8 @@ -import re +from test.integration.helpers import inspect_ssh_keys_response import pytest -from linode_metadata import MetadataAsyncClient, MetadataClient -from linode_metadata.objects.ssh_keys import SSHKeysResponse +from linode_metadata import AsyncMetadataClient, MetadataClient def test_get_ssh_keys(client: MetadataClient): @@ -11,27 +10,7 @@ def test_get_ssh_keys(client: MetadataClient): inspect_ssh_keys_response(ssh_keys) -@pytest.mark.asyncio -async def test_get_ssh_keys(async_client: MetadataAsyncClient): +@pytest.mark.asyncio(scope="session") +async def test_get_ssh_keys(async_client: AsyncMetadataClient): ssh_keys = await async_client.get_ssh_keys() inspect_ssh_keys_response(ssh_keys) - - -def inspect_ssh_keys_response(ssh_keys: SSHKeysResponse): - # In some cases we may not have an authorized key - # configured for root - if len(ssh_keys.users) < 1: - pytest.skip( - "The current instance does not have any any SSH keys configured, skipping..." - ) - - ssh_key_pattern = re.compile( - r"^ssh-(?:rsa|ed25519|ecdsa|dss)\s[A-Za-z0-9+/]+[=]*\s\S*$" - ) - - for name, user in ssh_keys.users.items(): - assert isinstance(name, str) - - for key in user: - assert isinstance(key, str) - assert ssh_key_pattern.match(key) diff --git a/test/integration/test_token.py b/test/integration/test_token.py index 9837e88..b06cb94 100644 --- a/test/integration/test_token.py +++ b/test/integration/test_token.py @@ -6,7 +6,7 @@ import linode_metadata from linode_metadata import MetadataClient -from linode_metadata.metadata_client import MetadataAsyncClient +from linode_metadata.metadata_client import AsyncMetadataClient from linode_metadata.objects.error import ApiError from linode_metadata.objects.token import MetadataToken @@ -39,9 +39,9 @@ def test_generate_and_use_new_metadata_token(client_unmanaged: MetadataClient): assert network is not None -@pytest.mark.asyncio +@pytest.mark.asyncio(scope="session") async def test_generate_and_use_new_metadata_token_async( - async_client_unmanaged: MetadataAsyncClient, + async_client_unmanaged: AsyncMetadataClient, ): client = async_client_unmanaged @@ -76,9 +76,9 @@ def test_verify_error_thrown_when_using_invalid_api_token( assert network is not None -@pytest.mark.asyncio +@pytest.mark.asyncio(scope="session") async def test_verify_error_thrown_when_using_invalid_api_token_async( - async_client_unmanaged: MetadataAsyncClient, invalid_token: str + async_client_unmanaged: AsyncMetadataClient, invalid_token: str ): client = async_client_unmanaged @@ -111,9 +111,9 @@ def test_unmanaged_token_expire(client_unmanaged: MetadataClient): assert "Unauthorized" in str(excinfo.value.json) -@pytest.mark.asyncio +@pytest.mark.asyncio(scope="session") async def test_unmanaged_token_expire_async( - async_client_unmanaged: MetadataAsyncClient, + async_client_unmanaged: AsyncMetadataClient, ): client = async_client_unmanaged @@ -143,9 +143,9 @@ def test_managed_token_auto_refresh(): assert networking is not None -@pytest.mark.asyncio +@pytest.mark.asyncio(scope="session") async def test_managed_token_auto_refresh_async(): - client = linode_metadata.MetadataAsyncClient( + client = linode_metadata.AsyncMetadataClient( managed_token_expiry_seconds=1, ) @@ -153,7 +153,7 @@ async def test_managed_token_auto_refresh_async(): instance = await client.get_instance() assert instance is not None - asyncio.sleep(2) + await asyncio.sleep(2) # Ensure the token is automatically refreshed networking = await client.get_network() diff --git a/test/integration/test_watcher.py b/test/integration/test_watcher.py new file mode 100644 index 0000000..f654356 --- /dev/null +++ b/test/integration/test_watcher.py @@ -0,0 +1,42 @@ +from test.integration.helpers import ( + inspect_instance_response, + inspect_network_response, + inspect_ssh_keys_response, +) + +import pytest + +from linode_metadata.watcher import AsyncMetadataWatcher, MetadataWatcher + + +@pytest.mark.asyncio(scope="session") +async def test_watch_instance_async(async_watcher: AsyncMetadataWatcher): + instance_watcher = async_watcher.watch_instance() + inspect_instance_response(await anext(instance_watcher)) + + +@pytest.mark.asyncio(scope="session") +async def test_watch_network_async(async_watcher: AsyncMetadataWatcher): + network_watcher = async_watcher.watch_network() + inspect_network_response(await anext(network_watcher)) + + +@pytest.mark.asyncio(scope="session") +async def test_watch_ssh_keys_async(async_watcher: AsyncMetadataWatcher): + ssh_keys_watcher = async_watcher.watch_ssh_keys() + inspect_ssh_keys_response(await anext(ssh_keys_watcher)) + + +def test_watch_instance(watcher: MetadataWatcher): + instance_watcher = watcher.watch_instance() + inspect_instance_response(next(instance_watcher)) + + +def test_watch_network(watcher: MetadataWatcher): + network_watcher = watcher.watch_network() + inspect_network_response(next(network_watcher)) + + +def test_watch_ssh_keys(watcher: MetadataWatcher): + ssh_keys_watcher = watcher.watch_ssh_keys() + inspect_ssh_keys_response(next(ssh_keys_watcher))