diff --git a/Makefile b/Makefile index 5a8666f..6d2cc47 100644 --- a/Makefile +++ b/Makefile @@ -9,3 +9,6 @@ lint: reformat: ruff check --select I --fix $(SOURCE) ruff format $(SOURCE) + +test: + PYTHONPATH=custom_components pytest -vv diff --git a/custom_components/polestar_api/pypolestar/models.py b/custom_components/polestar_api/pypolestar/models.py new file mode 100644 index 0000000..4b55048 --- /dev/null +++ b/custom_components/polestar_api/pypolestar/models.py @@ -0,0 +1,147 @@ +from dataclasses import dataclass +from datetime import date, datetime, timezone +from enum import StrEnum +from typing import Self + +from .utils import ( + GqlDict, + get_field_name_date, + get_field_name_datetime, + get_field_name_float, + get_field_name_int, + get_field_name_str, +) + + +class ChargingConnectionStatus(StrEnum): + CHARGER_CONNECTION_STATUS_CONNECTED = "Connected" + CHARGER_CONNECTION_STATUS_DISCONNECTED = "Disconnected" + CHARGER_CONNECTION_STATUS_FAULT = "Fault" + CHARGER_CONNECTION_STATUS_UNSPECIFIED = "Unspecified" + + +class ChargingStatus(StrEnum): + CHARGING_STATUS_DONE = "Done" + CHARGING_STATUS_IDLE = "Idle" + CHARGING_STATUS_CHARGING = "Charging" + CHARGING_STATUS_FAULT = "Fault" + CHARGING_STATUS_UNSPECIFIED = "Unspecified" + CHARGING_STATUS_SCHEDULED = "Scheduled" + CHARGING_STATUS_DISCHARGING = "Discharging" + CHARGING_STATUS_ERROR = "Error" + CHARGING_STATUS_SMART_CHARGING = "Smart Charging" + + +@dataclass(frozen=True) +class CarBaseInformation: + _received_timestamp: datetime + + +@dataclass(frozen=True) +class CarInformationData(CarBaseInformation): + vin: str | None + internal_vehicle_identifier: str | None + registration_no: str | None + registration_date: date | None + factory_complete_date: date | None + image_url: str | None + battery: str | None + torque: str | None + software_version: str | None + + @classmethod + def from_dict(cls, data: GqlDict) -> Self: + return cls( + vin=get_field_name_str("vin", data), + internal_vehicle_identifier=get_field_name_str( + "internalVehicleIdentifier", data + ), + registration_no=get_field_name_str("registrationNo", data), + registration_date=get_field_name_date("registrationDate", data), + factory_complete_date=get_field_name_date("factoryCompleteDate", data), + image_url=get_field_name_str("content/images/studio/url", data), + battery=get_field_name_str("content/specification/battery", data), + torque=get_field_name_str("content/specification/torque", data), + software_version=get_field_name_str("software/version", data), + _received_timestamp=datetime.now(tz=timezone.utc), + ) + + +@dataclass(frozen=True) +class CarOdometerData(CarBaseInformation): + average_speed_km_per_hour: float | None + odometer_meters: int | None + trip_meter_automatic_km: float | None + trip_meter_manual_km: float | None + event_updated_timestamp: datetime | None + + @classmethod + def from_dict(cls, data: GqlDict) -> Self: + return cls( + average_speed_km_per_hour=get_field_name_float( + "averageSpeedKmPerHour", data + ), + odometer_meters=get_field_name_int("odometerMeters", data), + trip_meter_automatic_km=get_field_name_float("tripMeterAutomaticKm", data), + trip_meter_manual_km=get_field_name_float("tripMeterManualKm", data), + event_updated_timestamp=get_field_name_datetime( + "eventUpdatedTimestamp/iso", data + ), + _received_timestamp=datetime.now(tz=timezone.utc), + ) + + +@dataclass(frozen=True) +class CarBatteryData(CarBaseInformation): + average_energy_consumption_kwh_per_100km: float | None + battery_charge_level_percentage: int | None + charger_connection_status: ChargingConnectionStatus + charging_current_amps: int + charging_power_watts: int + charging_status: ChargingStatus + estimated_charging_time_minutes_to_target_distance: int | None + estimated_charging_time_to_full_minutes: int | None + estimated_distance_to_empty_km: int | None + event_updated_timestamp: datetime | None + + @classmethod + def from_dict(cls, data: GqlDict) -> Self: + try: + charger_connection_status = ChargingConnectionStatus( + get_field_name_str("chargerConnectionStatus", data) + ) + except ValueError: + charger_connection_status = ( + ChargingConnectionStatus.CHARGER_CONNECTION_STATUS_UNSPECIFIED + ) + + try: + charging_status = ChargingStatus(get_field_name_str("chargingStatus", data)) + except ValueError: + charging_status = ChargingStatus.CHARGING_STATUS_UNSPECIFIED + + return cls( + average_energy_consumption_kwh_per_100km=get_field_name_float( + "averageEnergyConsumptionKwhPer100Km", data + ), + battery_charge_level_percentage=get_field_name_int( + "batteryChargeLevelPercentage", data + ), + charger_connection_status=charger_connection_status, + charging_current_amps=get_field_name_int("chargingCurrentAmps", data) or 0, + charging_power_watts=get_field_name_int("chargingPowerWatts", data) or 0, + charging_status=charging_status, + estimated_charging_time_minutes_to_target_distance=get_field_name_int( + "estimatedChargingTimeMinutesToTargetDistance", data + ), + estimated_charging_time_to_full_minutes=get_field_name_int( + "estimatedChargingTimeToFullMinutes", data + ), + estimated_distance_to_empty_km=get_field_name_int( + "estimatedDistanceToEmptyKm", data + ), + event_updated_timestamp=get_field_name_datetime( + "eventUpdatedTimestamp/iso", data + ), + _received_timestamp=datetime.now(tz=timezone.utc), + ) diff --git a/custom_components/polestar_api/pypolestar/polestar.py b/custom_components/polestar_api/pypolestar/polestar.py index cfff3b3..c8601cd 100644 --- a/custom_components/polestar_api/pypolestar/polestar.py +++ b/custom_components/polestar_api/pypolestar/polestar.py @@ -25,6 +25,7 @@ QUERY_GET_ODOMETER_DATA, get_gql_client, ) +from .models import CarBatteryData, CarInformationData, CarOdometerData _LOGGER = logging.getLogger(__name__) @@ -81,6 +82,66 @@ async def async_init(self, verbose: bool = False) -> None: def vins(self) -> list[str]: return list(self.data_by_vin.keys()) + def get_car_information(self, vin: str) -> CarInformationData | None: + """ + Get car information for the specified VIN. + + Args: + vin: The vehicle identification number + Returns: + CarInformationData if data exists, None otherwise + Raises: + KeyError: If the VIN doesn't exist + ValueError: If data conversion fails + """ + if vin not in self.data_by_vin: + raise KeyError(f"No data found for VIN: {vin}") + if data := self.data_by_vin[vin].get(CAR_INFO_DATA, {}).get("data"): + try: + return CarInformationData.from_dict(data) + except Exception as exc: + raise ValueError("Failed to convert car information data") from exc + + def get_car_battery(self, vin: str) -> CarBatteryData | None: + """ + Get car battery information for the specified VIN. + + Args: + vin: The vehicle identification number + Returns: + CarInformatiCarBatteryDataonData if data exists, None otherwise + Raises: + KeyError: If the VIN doesn't exist + ValueError: If data conversion fails + """ + if vin not in self.data_by_vin: + raise KeyError(f"No data found for VIN: {vin}") + if data := self.data_by_vin[vin].get(BATTERY_DATA, {}).get("data"): + try: + return CarBatteryData.from_dict(data) + except Exception as exc: + raise ValueError("Failed to convert car battery data") from exc + + def get_car_odometer(self, vin: str) -> CarOdometerData | None: + """ + Get car odomoter information for the specified VIN. + + Args: + vin: The vehicle identification number + Returns: + CarOdometerData if data exists, None otherwise + Raises: + KeyError: If the VIN doesn't exist + ValueError: If data conversion fails + """ + if vin not in self.data_by_vin: + raise KeyError(f"No data found for VIN: {vin}") + if data := self.data_by_vin[vin].get(ODO_METER_DATA, {}).get("data"): + try: + return CarOdometerData.from_dict(data) + except Exception as exc: + raise ValueError("Failed to convert car odometer data") from exc + def get_latest_data(self, vin: str, query: str, field_name: str) -> dict | None: """Get the latest data from the Polestar API.""" self.logger.debug( diff --git a/custom_components/polestar_api/pypolestar/utils.py b/custom_components/polestar_api/pypolestar/utils.py new file mode 100644 index 0000000..740210f --- /dev/null +++ b/custom_components/polestar_api/pypolestar/utils.py @@ -0,0 +1,128 @@ +from datetime import date, datetime + +GqlScalar = int | float | str | bool | None + +GqlDict = dict[str, type["GqlDict"] | GqlScalar] + + +def get_field_name_value(field_name: str, data: GqlDict) -> GqlScalar | GqlDict: + """Extract a value from nested dictionary using path-like notation. + + Args: + field_name: Path to the field using "/" as separator (e.g., "car/status/battery") + data: Nested dictionary containing the data + + Returns: + The value at the specified path + + Raises: + KeyError: If the path doesn't exist or is invalid + """ + + if field_name is None or not field_name.strip(): + raise ValueError("Field name cannot be empty") + + if data is None: + return None + + result: GqlScalar | GqlDict = data + + for key in field_name.split("/"): + if isinstance(result, dict): + if key not in result: + raise KeyError(f"Key '{key}' not found in path '{field_name}'") + result = result[key] + else: + raise KeyError( + f"Cannot access key '{key}' in non-dict value at path '{field_name}'" + ) + + return result + + +def get_field_name_str(field_name: str, data: GqlDict) -> str | None: + """Extract a str value from the nested dictionary. + Args: + field_name: Path to the str field + data: Nested dictionary containing the data + Returns: + str if successful, None otherwise + """ + if (value := get_field_name_value(field_name, data)) and (isinstance(value, str)): + return value + + +def get_field_name_float(field_name: str, data: GqlDict) -> float | None: + """Extract a float value from the nested dictionary. + Args: + field_name: Path to the float field + data: Nested dictionary containing the data + Returns: + float if successful, None otherwise + """ + if (value := get_field_name_value(field_name, data)) and isinstance(value, float): + return value + elif value is not None: + try: + return float(value) + except (ValueError, TypeError) as exc: + raise ValueError(f"Invalid float value at '{field_name}': {value}") from exc + + +def get_field_name_int(field_name: str, data: GqlDict) -> int | None: + """Extract a int value from the nested dictionary. + Args: + field_name: Path to the int field + data: Nested dictionary containing the data + Returns: + int if successful, None otherwise + """ + if (value := get_field_name_value(field_name, data)) and isinstance(value, int): + return value + elif value is not None: + try: + return int(value) + except (ValueError, TypeError) as exc: + raise ValueError( + f"Invalid integer value at '{field_name}': {value}" + ) from exc + + +def get_field_name_date(field_name: str, data: GqlDict) -> date | None: + """Extract and convert a date value from the nested dictionary. + Args: + field_name: Path to the date field + data: Nested dictionary containing the data + Returns: + date object if conversion successful, None otherwise + """ + if value := get_field_name_value(field_name, data): + if isinstance(value, date): + return value + if isinstance(value, str): + try: + return date.fromisoformat(value) + except ValueError as exc: + raise ValueError( + f"Invalid date format at '{field_name}': {value}" + ) from exc + + +def get_field_name_datetime(field_name: str, data: GqlDict) -> datetime | None: + """Extract and convert a datetime value from the nested dictionary. + Args: + field_name: Path to the datetime field + data: Nested dictionary containing the data + Returns: + datetime object if conversion successful, None otherwise + """ + if value := get_field_name_value(field_name, data): + if isinstance(value, datetime): + return value + if isinstance(value, str): + try: + return datetime.fromisoformat(value) + except ValueError as exc: + raise ValueError( + f"Invalid datetime format at '{field_name}': {value}" + ) from exc diff --git a/requirements.txt b/requirements.txt index 7b0f971..c53722d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,3 +3,4 @@ homeassistant==2024.6.0 pip>=21.3.1 ruff==0.7.1 gql[httpx]>=3.5.0 +pytest>=8.3.3 diff --git a/tests/test_utils.py b/tests/test_utils.py new file mode 100644 index 0000000..0f8d8df --- /dev/null +++ b/tests/test_utils.py @@ -0,0 +1,86 @@ +from datetime import date, datetime + +import pytest +from polestar_api.pypolestar.utils import ( + get_field_name_date, + get_field_name_datetime, + get_field_name_float, + get_field_name_int, + get_field_name_str, + get_field_name_value, +) + +TESTDATA = { + "vin": "YSMYKEAE7RB000000", + "internalVehicleIdentifier": "0c5f757e-5eb3-4191-b786-0513ddaa452f", + "registrationNo": "MLB007", + "registrationDate": None, + "factoryCompleteDate": "2024-04-16", + "content": { + "model": {"name": "Polestar 3"}, + "images": { + "studio": {"url": "https://example.com/image.png"}, + "specification": { + "battery": "400V lithium-ion battery, 111 kWh capacity, 17 modules", + "torque": "840 Nm / 620 lbf-ft", + }, + }, + "software": {"version": None, "versionTimestamp": None}, + }, + "averageEnergyConsumptionKwhPer100Km": 42.01, + "batteryChargeLevelPercentage": 100, +} + + +def test_get_field_name_value(): + assert get_field_name_value("vin", TESTDATA) == "YSMYKEAE7RB000000" + assert get_field_name_value("content/model/name", TESTDATA) == "Polestar 3" + assert get_field_name_value("content/software/version", TESTDATA) is None + assert get_field_name_value("content/images/studio", TESTDATA) == { + "url": "https://example.com/image.png" + } + with pytest.raises(KeyError): + assert ( + get_field_name_value("content/model/name/xyzzy", TESTDATA) == "Polestar 3" + ) + with pytest.raises(KeyError): + get_field_name_value("xyzzy", TESTDATA) + with pytest.raises(KeyError): + get_field_name_value("content/xyzzy", TESTDATA) + + +def test_get_field_name_str(): + assert get_field_name_str("vin", TESTDATA) == "YSMYKEAE7RB000000" + assert get_field_name_str("registrationDate", TESTDATA) is None # None handling + + +def test_get_field_name_float(): + assert ( + get_field_name_float("averageEnergyConsumptionKwhPer100Km", TESTDATA) == 42.01 + ) + assert get_field_name_float("registrationDate", TESTDATA) is None # None handling + with pytest.raises(ValueError): # Invalid date + get_field_name_float("vin", TESTDATA) + + +def test_get_field_name_int(): + assert get_field_name_int("batteryChargeLevelPercentage", TESTDATA) == 100 + assert get_field_name_int("registrationDate", TESTDATA) is None # None handling + + +def test_get_field_name_date(): + assert get_field_name_date("factoryCompleteDate", TESTDATA) == date(2024, 4, 16) + assert get_field_name_date("registrationDate", TESTDATA) is None # None handling + with pytest.raises(ValueError): # Invalid date + get_field_name_date("vin", TESTDATA) + + +def test_get_field_name_datetime(): + assert get_field_name_datetime("factoryCompleteDate", TESTDATA) == datetime( + 2024, 4, 16 + ) + assert ( + get_field_name_datetime("registrationDate", TESTDATA) is None + ) # None handling + with pytest.raises(ValueError): # Invalid datetime + get_field_name_datetime("vin", TESTDATA)