diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index f80f12e..a55ff76 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -2,7 +2,8 @@ ## Summary - +This release introduces the initial version of the Reporting API client with support for +retrieving single metric historical data for a single component. ## Upgrading @@ -10,7 +11,18 @@ ## New Features - +* Introducing the initial version of the Reporting API client, streamlined for +retrieving single metric historical data for a single component. It incorporates +pagination handling and utilizes a wrapper data class that retains the raw +protobuf response while offering transformation capabilities limited here +to generators of structured data representation via named tuples. + +* Current limitations include a single metric focus with plans for extensibility, +ongoing development for states and bounds integration, as well as support for +service-side features like resampling, streaming, and formula aggregations. + +* Code examples are provided to guide users through the basic usage of the client. + ## Bug Fixes diff --git a/examples/client.py b/examples/client.py new file mode 100644 index 0000000..7759d40 --- /dev/null +++ b/examples/client.py @@ -0,0 +1,139 @@ +# License: MIT +# Copyright © 2024 Frequenz Energy-as-a-Service GmbH + +"""Examples usage of reporting API.""" + +import argparse +import asyncio +from datetime import datetime +from pprint import pprint +from typing import AsyncGenerator + +import pandas as pd +from frequenz.client.common.metric import Metric + +from frequenz.client.reporting import ReportingClient + +# Experimental import +from frequenz.client.reporting._client import MetricSample + + +# pylint: disable=too-many-locals +async def main(microgrid_id: int, component_id: int) -> None: + """Test the ReportingClient. + + Args: + microgrid_id: int + component_id: int + """ + service_address = "localhost:50051" + client = ReportingClient(service_address) + + microgrid_components = [(microgrid_id, [component_id])] + metrics = [ + Metric.DC_POWER, + Metric.DC_CURRENT, + ] + + start_dt = datetime.fromisoformat("2023-11-21T12:00:00.00+00:00") + end_dt = datetime.fromisoformat("2023-11-21T12:01:00.00+00:00") + + page_size = 10 + + print("########################################################") + print("Iterate over single metric generator") + + async for sample in client.iterate_single_metric( + microgrid_id=microgrid_id, + component_id=component_id, + metric=metrics[0], + start_dt=start_dt, + end_dt=end_dt, + page_size=page_size, + ): + print("Received:", sample) + + ########################################################################### + # + # The following code is experimental and demonstrates potential future + # usage of the ReportingClient. + # + ########################################################################### + + async def components_data_iter() -> AsyncGenerator[MetricSample, None]: + """Iterate over components data. + + Yields: + Single metric sample + """ + # pylint: disable=protected-access + async for page in client._iterate_components_data_pages( + microgrid_components=microgrid_components, + metrics=metrics, + start_dt=start_dt, + end_dt=end_dt, + page_size=page_size, + ): + for entry in page.iterate_metric_samples(): + yield entry + + async def components_data_dict( + components_data_iter: AsyncGenerator[MetricSample, None] + ) -> dict[int, dict[int, dict[datetime, dict[Metric, float]]]]: + """Convert components data iterator into a single dict. + + The nesting structure is: + { + microgrid_id: { + component_id: { + timestamp: { + metric: value + } + } + } + } + + Args: + components_data_iter: async generator + + Returns: + Single dict with with all components data + """ + ret: dict[int, dict[int, dict[datetime, dict[Metric, float]]]] = {} + + async for ts, mid, cid, met, value in components_data_iter: + if mid not in ret: + ret[mid] = {} + if cid not in ret[mid]: + ret[mid][cid] = {} + if ts not in ret[mid][cid]: + ret[mid][cid][ts] = {} + + ret[mid][cid][ts][met] = value + + return ret + + print("########################################################") + print("Iterate over generator") + async for msample in components_data_iter(): + print("Received:", msample) + + print("########################################################") + print("Dumping all data as a single dict") + dct = await components_data_dict(components_data_iter()) + pprint(dct) + + print("########################################################") + print("Turn data into a pandas DataFrame") + data = [cd async for cd in components_data_iter()] + df = pd.DataFrame(data).set_index("timestamp") + pprint(df) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("microgrid_id", type=int, help="Microgrid ID") + parser.add_argument("component_id", type=int, help="Component ID") + + args = parser.parse_args() + asyncio.run(main(args.microgrid_id, args.component_id)) diff --git a/pyproject.toml b/pyproject.toml index e7242e5..1aeb68d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,6 +29,8 @@ requires-python = ">= 3.11, < 4" # TODO(cookiecutter): Remove and add more dependencies if appropriate dependencies = [ "typing-extensions >= 4.5.0, < 5", + "frequenz-api-reporting >= 0.1.1, < 1", + "frequenz-client-common @ git+https://github.com/frequenz-floss/frequenz-client-common-python.git@v0.x.x", ] dynamic = ["version"] @@ -62,6 +64,7 @@ dev-mypy = [ "types-Markdown == 3.4.2.10", # For checking the noxfile, docs/ script, and tests "frequenz-client-reporting[dev-mkdocs,dev-noxfile,dev-pytest]", + "pandas-stubs >= 2, < 3", # Only required for example ] dev-noxfile = [ "nox == 2023.4.22", @@ -71,6 +74,7 @@ dev-pylint = [ "pylint == 3.0.2", # For checking the noxfile, docs/ script, and tests "frequenz-client-reporting[dev-mkdocs,dev-noxfile,dev-pytest]", + "pandas >= 2, < 3", # Only required for example ] dev-pytest = [ "pytest == 8.0.0", @@ -82,6 +86,10 @@ dev-pytest = [ dev = [ "frequenz-client-reporting[dev-mkdocs,dev-flake8,dev-formatting,dev-mkdocs,dev-mypy,dev-noxfile,dev-pylint,dev-pytest]", ] +examples = [ + "grpcio >= 1.51.1, < 2", + "pandas >= 2, < 3", +] [project.urls] Documentation = "https://frequenz-floss.github.io/frequenz-client-reporting-python/" diff --git a/src/frequenz/client/reporting/__init__.py b/src/frequenz/client/reporting/__init__.py index 9db7257..f423cae 100644 --- a/src/frequenz/client/reporting/__init__.py +++ b/src/frequenz/client/reporting/__init__.py @@ -1,25 +1,12 @@ # License: MIT # Copyright © 2024 Frequenz Energy-as-a-Service GmbH -"""Reporting API client for Python. +"""Client to connect to the Reporting API. -TODO(cookiecutter): Add a more descriptive module description. +This package provides a low-level interface for interacting with the reporting API. """ -# TODO(cookiecutter): Remove this function -def delete_me(*, blow_up: bool = False) -> bool: - """Do stuff for demonstration purposes. +from ._client import ReportingClient - Args: - blow_up: If True, raise an exception. - - Returns: - True if no exception was raised. - - Raises: - RuntimeError: if blow_up is True. - """ - if blow_up: - raise RuntimeError("This function should be removed!") - return True +__all__ = ["ReportingClient"] diff --git a/src/frequenz/client/reporting/_client.py b/src/frequenz/client/reporting/_client.py new file mode 100644 index 0000000..772c10b --- /dev/null +++ b/src/frequenz/client/reporting/_client.py @@ -0,0 +1,289 @@ +# License: MIT +# Copyright © 2024 Frequenz Energy-as-a-Service GmbH + +"""Client for requests to the Reporting API.""" + +from collections import namedtuple +from dataclasses import dataclass +from datetime import datetime +from typing import Any, AsyncIterator, Awaitable, Generator, Type, cast + +import grpc.aio as grpcaio + +# pylint: disable=no-name-in-module +from frequenz.api.common.v1.metrics.metric_sample_pb2 import Metric as PBMetric +from frequenz.api.common.v1.microgrid.microgrid_pb2 import ( + MicrogridComponentIDs as PBMicrogridComponentIDs, +) +from frequenz.api.common.v1.pagination.pagination_params_pb2 import ( + PaginationParams as PBPaginationParams, +) +from frequenz.api.reporting.v1.reporting_pb2 import ( + ListMicrogridComponentsDataRequest as PBListMicrogridComponentsDataRequest, +) +from frequenz.api.reporting.v1.reporting_pb2 import ( + ListMicrogridComponentsDataResponse as PBListMicrogridComponentsDataResponse, +) +from frequenz.api.reporting.v1.reporting_pb2 import TimeFilter as PBTimeFilter +from frequenz.api.reporting.v1.reporting_pb2_grpc import ReportingStub +from frequenz.client.common.metric import Metric +from google.protobuf.timestamp_pb2 import Timestamp as PBTimestamp + +# pylint: enable=no-name-in-module + +Sample = namedtuple("Sample", ["timestamp", "value"]) +"""Type for a sample of a time series.""" + +MetricSample = namedtuple( + "MetricSample", ["timestamp", "microgrid_id", "component_id", "metric", "value"] +) +"""Type for a sample of a time series incl. metric type, microgrid and component ID""" + + +@dataclass(frozen=True) +class ComponentsDataPage: + """A page of microgrid components data returned by the Reporting service.""" + + _data_pb: PBListMicrogridComponentsDataResponse + """The underlying protobuf message.""" + + def is_empty(self) -> bool: + """Check if the page contains valid data. + + Returns: + True if the page contains no valid data. + """ + if not self._data_pb.microgrids: + return True + if not self._data_pb.microgrids[0].components: + return True + if not self._data_pb.microgrids[0].components[0].metric_samples: + return True + return False + + def iterate_metric_samples(self) -> Generator[MetricSample, None, None]: + """Get generator that iterates over all values in the page. + + Note: So far only `SimpleMetricSample` in the `MetricSampleVariant` + message is supported. + + + Yields: + A named tuple with the following fields: + * timestamp: The timestamp of the metric sample. + * microgrid_id: The microgrid ID. + * component_id: The component ID. + * metric: The metric name. + * value: The metric value. + """ + data = self._data_pb + for mdata in data.microgrids: + mid = mdata.microgrid_id + for cdata in mdata.components: + cid = cdata.component_id + for msample in cdata.metric_samples: + ts = msample.sampled_at.ToDatetime() + met = Metric.from_proto(msample.metric).name + value = ( + msample.sample.simple_metric.value + if msample.sample.simple_metric + else None + ) + yield MetricSample( + timestamp=ts, + microgrid_id=mid, + component_id=cid, + metric=met, + value=value, + ) + + @property + def next_page_token(self) -> Any: + """Get the token for the next page of data. + + Returns: + The token for the next page of data. + """ + return self._data_pb.pagination_info.next_page_token + + +class ReportingClient: + """A client for the Reporting service.""" + + def __init__(self, service_address: str): + """Create a new Reporting client. + + Args: + service_address: The address of the Reporting service. + """ + self._grpc_channel = grpcaio.insecure_channel(service_address) + self._stub = ReportingStub(self._grpc_channel) + + # pylint: disable=too-many-arguments + async def iterate_single_metric( + self, + *, + microgrid_id: int, + component_id: int, + metric: Metric, + start_dt: datetime, + end_dt: datetime, + page_size: int = 1000, + ) -> AsyncIterator[Sample]: + """Iterate over the data for a single metric. + + Args: + microgrid_id: The microgrid ID. + component_id: The component ID. + metric: The metric name. + start_dt: The start date and time. + end_dt: The end date and time. + page_size: The page size. + + Yields: + A named tuple with the following fields: + * timestamp: The timestamp of the metric sample. + * value: The metric value. + """ + async for page in self._iterate_components_data_pages( + microgrid_components=[(microgrid_id, [component_id])], + metrics=[metric], + start_dt=start_dt, + end_dt=end_dt, + page_size=page_size, + ): + for entry in page.iterate_metric_samples(): + yield Sample(timestamp=entry.timestamp, value=entry.value) + + # pylint: disable=too-many-arguments + async def _iterate_components_data_pages( + self, + *, + microgrid_components: list[tuple[int, list[int]]], + metrics: list[Metric], + start_dt: datetime, + end_dt: datetime, + page_size: int = 1000, + ) -> AsyncIterator[ComponentsDataPage]: + """Iterate over the pages of microgrid components data. + + Note: This does not yet support resampling or aggregating the data. It + also does not yet support fetching bound and state data. + + Args: + microgrid_components: A list of tuples of microgrid IDs and component IDs. + metrics: A list of metrics. + start_dt: The start date and time. + end_dt: The end date and time. + page_size: The page size. + + Yields: + A ComponentsDataPage object of microgrid components data. + """ + microgrid_components_pb = [ + PBMicrogridComponentIDs(microgrid_id=mid, component_ids=cids) + for mid, cids in microgrid_components + ] + + def dt2ts(dt: datetime) -> PBTimestamp: + ts = PBTimestamp() + ts.FromDatetime(dt) + return ts + + time_filter = PBTimeFilter( + start=dt2ts(start_dt), + end=dt2ts(end_dt), + ) + + list_filter = PBListMicrogridComponentsDataRequest.ListFilter( + time_filter=time_filter, + ) + + metrics_pb = [metric.to_proto() for metric in metrics] + + page_token = None + + while True: + pagination_params = PBPaginationParams( + page_size=page_size, page_token=page_token + ) + + response = await self._fetch_page( + microgrid_components=microgrid_components_pb, + metrics=metrics_pb, + list_filter=list_filter, + pagination_params=pagination_params, + ) + if not response or response.is_empty(): + break + + yield response + + page_token = response.next_page_token + if not page_token: + break + + async def _fetch_page( + self, + *, + microgrid_components: list[PBMicrogridComponentIDs], + metrics: list[PBMetric.ValueType], + list_filter: PBListMicrogridComponentsDataRequest.ListFilter, + pagination_params: PBPaginationParams, + ) -> ComponentsDataPage | None: + """Fetch a single page of microgrid components data. + + Args: + microgrid_components: A list of microgrid components. + metrics: A list of metrics. + list_filter: A list filter. + pagination_params: A pagination params. + + Returns: + A ComponentsDataPage object of microgrid components data. + """ + try: + request = PBListMicrogridComponentsDataRequest( + microgrid_components=microgrid_components, + metrics=metrics, + filter=list_filter, + pagination_params=pagination_params, + ) + response = await cast( + Awaitable[PBListMicrogridComponentsDataResponse], + self._stub.ListMicrogridComponentsData(request), + ) + except grpcaio.AioRpcError as e: + print(f"RPC failed: {e}") + return None + return ComponentsDataPage(response) + + async def close(self) -> None: + """Close the client and cancel any pending requests immediately.""" + await self._grpc_channel.close(grace=None) + + async def __aenter__(self) -> "ReportingClient": + """Enter the async context.""" + return self + + async def __aexit__( + self, + _exc_type: Type[BaseException] | None, + _exc_val: BaseException | None, + _exc_tb: Any | None, + ) -> bool | None: + """ + Exit the asynchronous context manager. + + Note that exceptions are not handled here, but are allowed to propagate. + + Args: + _exc_type: Type of exception raised in the async context. + _exc_val: Exception instance raised. + _exc_tb: Traceback object at the point where the exception occurred. + + Returns: + None, allowing any exceptions to propagate. + """ + await self.close() + return None diff --git a/tests/test_client_reporting.py b/tests/test_client_reporting.py index 40e9387..040953c 100644 --- a/tests/test_client_reporting.py +++ b/tests/test_client_reporting.py @@ -2,17 +2,42 @@ # Copyright © 2024 Frequenz Energy-as-a-Service GmbH """Tests for the frequenz.client.reporting package.""" +from typing import Generator +from unittest.mock import MagicMock, patch + import pytest -from frequenz.client.reporting import delete_me +from frequenz.client.reporting import ReportingClient +from frequenz.client.reporting._client import ComponentsDataPage + + +@pytest.fixture +def mock_channel() -> Generator[MagicMock, None, None]: + """Fixture for grpc.aio.insecure_channel.""" + with patch("grpc.aio.insecure_channel") as mock: + yield mock + + +@pytest.mark.asyncio +async def test_client_initialization(mock_channel: MagicMock) -> None: + """Test that the client initializes the channel.""" + client = ReportingClient("localhost:50051") # noqa: F841 + mock_channel.assert_called_once_with("localhost:50051") -def test_client_reporting_succeeds() -> None: # TODO(cookiecutter): Remove - """Test that the delete_me function succeeds.""" - assert delete_me() is True +def test_components_data_page_is_empty_true() -> None: + """Test that the is_empty method returns True when the page is empty.""" + data_pb = MagicMock() + data_pb.microgrids = [] + page = ComponentsDataPage(_data_pb=data_pb) + assert page.is_empty() is True -def test_client_reporting_fails() -> None: # TODO(cookiecutter): Remove - """Test that the delete_me function fails.""" - with pytest.raises(RuntimeError, match="This function should be removed!"): - delete_me(blow_up=True) +def test_components_data_page_is_empty_false() -> None: + """Test that the is_empty method returns False when the page is not empty.""" + data_pb = MagicMock() + data_pb.microgrids = [MagicMock()] + data_pb.microgrids[0].components = [MagicMock()] + data_pb.microgrids[0].components[0].metric_samples = [MagicMock()] + page = ComponentsDataPage(_data_pb=data_pb) + assert page.is_empty() is False