From f9e77b724647bcd85b28f57dde77e4e3ffc091fc Mon Sep 17 00:00:00 2001 From: Simon Date: Thu, 17 Jun 2021 23:37:52 +0200 Subject: [PATCH] Add broadcast listener to receive real-time updates (#9) * use fail counter to avoid unavailable state * move api to module * split api into multiple files * fix some issues * broadcast stuff * config option * destroy coordinator on unload * add "last" sensor to broadcast measurements --- custom_components/weatherlink/__init__.py | 105 ++- custom_components/weatherlink/air_quality.py | 4 +- custom_components/weatherlink/api.py | 598 ------------------ custom_components/weatherlink/api/__init__.py | 10 + .../weatherlink/api/broadcast.py | 176 ++++++ .../weatherlink/api/conditions/__init__.py | 166 +++++ .../weatherlink/api/conditions/air_quality.py | 73 +++ .../weatherlink/api/conditions/condition.py | 32 + .../weatherlink/api/conditions/iss.py | 212 +++++++ .../weatherlink/api/conditions/lss.py | 42 ++ .../weatherlink/api/conditions/moisture.py | 48 ++ .../weatherlink/api/from_json.py | 116 ++++ custom_components/weatherlink/api/rest.py | 89 +++ custom_components/weatherlink/config_flow.py | 25 +- custom_components/weatherlink/manifest.json | 2 +- custom_components/weatherlink/sensor.py | 2 +- .../weatherlink/sensor_air_quality.py | 2 +- .../weatherlink/sensor_common.py | 9 +- custom_components/weatherlink/sensor_iss.py | 36 +- .../weatherlink/sensor_moisture.py | 2 +- .../weatherlink/translations/en.json | 3 +- custom_components/weatherlink/weather.py | 4 +- tests/weatherlink/api/test_airlink.py | 5 +- tests/weatherlink/api/test_weatherlink.py | 43 +- tests/weatherlink/test_units.py | 16 +- 25 files changed, 1173 insertions(+), 647 deletions(-) delete mode 100644 custom_components/weatherlink/api.py create mode 100644 custom_components/weatherlink/api/__init__.py create mode 100644 custom_components/weatherlink/api/broadcast.py create mode 100644 custom_components/weatherlink/api/conditions/__init__.py create mode 100644 custom_components/weatherlink/api/conditions/air_quality.py create mode 100644 custom_components/weatherlink/api/conditions/condition.py create mode 100644 custom_components/weatherlink/api/conditions/iss.py create mode 100644 custom_components/weatherlink/api/conditions/lss.py create mode 100644 custom_components/weatherlink/api/conditions/moisture.py create mode 100644 custom_components/weatherlink/api/from_json.py create mode 100644 custom_components/weatherlink/api/rest.py diff --git a/custom_components/weatherlink/__init__.py b/custom_components/weatherlink/__init__.py index ad202ee..f2f1a49 100644 --- a/custom_components/weatherlink/__init__.py +++ b/custom_components/weatherlink/__init__.py @@ -1,5 +1,7 @@ +import asyncio import logging from datetime import timedelta +from typing import Optional from homeassistant.config_entries import ConfigEntry from homeassistant.core import HomeAssistant @@ -9,7 +11,9 @@ DataUpdateCoordinator, ) -from .api import CurrentConditions, WeatherLinkSession +from .api import CurrentConditions, WeatherLinkBroadcast, WeatherLinkRest +from .api.conditions import DeviceType +from .config_flow import get_listen_to_broadcasts from .const import DOMAIN, PLATFORMS from .units import UnitConfig, get_unit_config @@ -35,39 +39,100 @@ def get_update_interval(entry: ConfigEntry) -> timedelta: return timedelta(seconds=seconds) -class WeatherLinkCoordinator(DataUpdateCoordinator): - session: WeatherLinkSession +MAX_FAIL_COUNTER: int = 3 +FAIL_TIMEOUT: float = 3.0 + + +class WeatherLinkCoordinator(DataUpdateCoordinator[CurrentConditions]): + session: WeatherLinkRest units: UnitConfig - current_conditions: CurrentConditions + _device_type: DeviceType device_did: str device_name: str device_model_name: str + __broadcast_task: Optional[asyncio.Task] = None + + def __set_broadcast_task_state(self, on: bool) -> None: + if self.__broadcast_task: + logger.debug("stopping current broadcast task") + self.__broadcast_task.cancel() + + if on: + logger.info("starting live broadcast listener") + self.__broadcast_task = asyncio.create_task( + self.__broadcast_loop(), name="broadcast listener loop" + ) + else: + self.__broadcast_task = None + async def __update_config(self, hass: HomeAssistant, entry: ConfigEntry): self.units = get_unit_config(hass, entry) self.update_interval = get_update_interval(entry) - async def __initalize( - self, session: WeatherLinkSession, entry: ConfigEntry - ) -> None: + self.__set_broadcast_task_state( + self._device_type.supports_real_time_api() + and get_listen_to_broadcasts(entry) + ) + + async def __initalize(self, session: WeatherLinkRest, entry: ConfigEntry) -> None: self.session = session entry.add_update_listener(self.__update_config) - await self.__update_config(self.hass, entry) self.update_method = self.__fetch_data - await self.__fetch_data() - - conditions = self.current_conditions + conditions = self.data = await self.__fetch_data() + if conditions is None: + raise RuntimeError(f"failed to get conditions from {session.base_url!r}") + self._device_type = conditions.determine_device_type() self.device_did = conditions.did - self.device_model_name = conditions.determine_device_type().value + self.device_model_name = self._device_type.value self.device_name = conditions.determine_device_name() - async def __fetch_data(self) -> None: - self.current_conditions = await self.session.current_conditions() + await self.__update_config(self.hass, entry) + + async def __fetch_data(self) -> CurrentConditions: + for i in range(MAX_FAIL_COUNTER): + try: + conditions = await self.session.current_conditions() + except Exception as exc: + logger.warning( + "failed to get current conditions, error %s / %s", + i + 1, + MAX_FAIL_COUNTER, + exc_info=exc, + ) + await asyncio.sleep(FAIL_TIMEOUT) + else: + break + else: + conditions = self.data + + return conditions + + async def __broadcast_loop_once(self, broadcast: WeatherLinkBroadcast) -> None: + logger.debug("received broadcast conditions") + conditions = await broadcast.read() + self.data.update_from(conditions) + self.async_set_updated_data(self.data) + + async def __broadcast_loop(self) -> None: + try: + broadcast = await WeatherLinkBroadcast.start(self.session) + except Exception: + logger.exception("failed to start broadcast") + return + try: + while True: + try: + await self.__broadcast_loop_once(broadcast) + except Exception: + logger.exception("failed to read broadcast") + finally: + await broadcast.stop() @classmethod - async def build(cls, hass, session: WeatherLinkSession, entry: ConfigEntry): + async def build(cls, hass, session: WeatherLinkRest, entry: ConfigEntry): coordinator = cls( hass, logger, @@ -78,13 +143,16 @@ async def build(cls, hass, session: WeatherLinkSession, entry: ConfigEntry): return coordinator + async def destroy(self) -> None: + self.__set_broadcast_task_state(False) + async def setup_coordinator(hass, entry: ConfigEntry): host = entry.data["host"] coordinator = await WeatherLinkCoordinator.build( hass, - WeatherLinkSession(aiohttp_client.async_get_clientsession(hass), host), + WeatherLinkRest(aiohttp_client.async_get_clientsession(hass), host), entry, ) hass.data[DOMAIN][entry.entry_id] = coordinator @@ -105,7 +173,8 @@ async def async_unload_entry(hass, entry): for platform in PLATFORMS: await hass.config_entries.async_forward_entry_unload(entry, platform) - del hass.data[DOMAIN][entry.entry_id] + coordinator: WeatherLinkCoordinator = hass.data[DOMAIN].pop(entry.entry_id) + await coordinator.destroy() return True @@ -118,7 +187,7 @@ def __init__(self, coordinator: WeatherLinkCoordinator) -> None: @property def _conditions(self) -> CurrentConditions: - return self.coordinator.current_conditions + return self.coordinator.data @property def units(self) -> UnitConfig: diff --git a/custom_components/weatherlink/air_quality.py b/custom_components/weatherlink/air_quality.py index 4a4afd8..e45601e 100644 --- a/custom_components/weatherlink/air_quality.py +++ b/custom_components/weatherlink/air_quality.py @@ -3,7 +3,7 @@ from homeassistant.components.air_quality import AirQualityEntity from . import WeatherLinkCoordinator, WeatherLinkEntity -from .api import AirQualityCondition +from .api.conditions import AirQualityCondition from .const import DOMAIN logger = logging.getLogger(__name__) @@ -11,7 +11,7 @@ async def async_setup_entry(hass, entry, async_add_entities): c: WeatherLinkCoordinator = hass.data[DOMAIN][entry.entry_id] - if AirQualityCondition in c.current_conditions: + if AirQualityCondition in c.data: async_add_entities([AirQuality(c)]) return True diff --git a/custom_components/weatherlink/api.py b/custom_components/weatherlink/api.py deleted file mode 100644 index 5484179..0000000 --- a/custom_components/weatherlink/api.py +++ /dev/null @@ -1,598 +0,0 @@ -import abc -import dataclasses -import enum -import logging -from datetime import datetime -from typing import Any, Callable, Dict, Iterable, List, Mapping, Optional, Type, TypeVar - -import aiohttp - -logger = logging.getLogger(__name__) - -JsonObject = Dict[str, Any] - - -FromJsonT = TypeVar("FromJsonT", bound="FromJson") - - -class FromJson(abc.ABC): - OPT_STRICT = "strict" - - @classmethod - @abc.abstractmethod - def _from_json(cls, data: JsonObject, **kwargs): - ... - - @classmethod - def from_json(cls: Type[FromJsonT], data: JsonObject, **kwargs) -> FromJsonT: - try: - return cls._from_json(data, **kwargs) - except Exception as e: - if not getattr(e, "_handled", False): - logger.error( - f"failed to create `{cls.__qualname__}` from JSON: {data!r}" - ) - e._handled = True - - raise e from None - - -class ConditionType(enum.IntEnum): - Iss = 1 - Moisture = 2 - LssBar = 3 - LssTempHum = 4 - AirQuality = 6 - """Sent by AirLink""" - - def record_class(self) -> Type["ConditionRecord"]: - return _COND2CLS[self] - - -class ReceiverState(enum.IntEnum): - Tracking = 0 - """Transmitter has been acquired and is actively being received.""" - Synched = 1 - """Transmitter has been acquired, but we have missed 1-14 packets in a row.""" - Scanning = 2 - """Transmitter has not been acquired yet, or we’ve lost it (more than 15 missed packets in a row).""" - - -class CollectorSize(enum.IntEnum): - Millimeter01 = 3 - """0.1 mm""" - Millimeter02 = 2 - """0.2 mm""" - - Inches001 = 1 - """(0.01")""" - Inches0001 = 4 - """(0.001")""" - - def __mul__(self, x: int) -> int: - return x * self.to_mm() - - def to_mm(self) -> float: - return _COLLECTOR2MM[self] - - -_IN2MM = 25.4 -_COLLECTOR2MM = { - CollectorSize.Millimeter01: 0.1, - CollectorSize.Millimeter02: 0.2, - CollectorSize.Inches001: 0.01 * _IN2MM, - CollectorSize.Inches0001: 0.001 * _IN2MM, -} - - -@dataclasses.dataclass() -class ConditionRecord(FromJson, abc.ABC): - lsid: Optional[int] - """the numeric logic sensor identifier, or null if the device has not been registered""" - - -@dataclasses.dataclass() -class IssCondition(ConditionRecord): - txid: int - """transmitter ID""" - rx_state: Optional[ReceiverState] - """configured radio receiver state""" - - temp: Optional[float] - """most recent valid temperature""" - hum: Optional[float] - """most recent valid humidity **(%RH)**""" - dew_point: Optional[float] - """""" - wet_bulb: Optional[float] - """""" - heat_index: Optional[float] - """""" - wind_chill: Optional[float] - """""" - thw_index: Optional[float] - """""" - thsw_index: Optional[float] - """""" - - wind_speed_last: Optional[float] - """most recent valid wind speed **(km/h)**""" - wind_dir_last: Optional[int] - """most recent valid wind direction **(°degree)**""" - - wind_speed_avg_last_1_min: Optional[float] - """average wind speed over last 1 min **(km/h)**""" - wind_dir_scalar_avg_last_1_min: Optional[int] - """scalar average wind direction over last 1 min **(°degree)**""" - - wind_speed_avg_last_2_min: Optional[float] - """average wind speed over last 2 min **(km/h)**""" - wind_dir_scalar_avg_last_2_min: Optional[int] - """scalar average wind direction over last 2 min **(°degree)**""" - - wind_speed_hi_last_2_min: Optional[float] - """maximum wind speed over last 2 min **(km/h)**""" - wind_dir_at_hi_speed_last_2_min: Optional[int] - """gust wind direction over last 2 min **(°degree)**""" - - wind_speed_avg_last_10_min: Optional[float] - """average wind speed over last 10 min **(km/h)**""" - wind_dir_scalar_avg_last_10_min: Optional[int] - """scalar average wind direction over last 10 min **(°degree)**""" - - wind_speed_hi_last_10_min: Optional[float] - """maximum wind speed over last 10 min **(km/h)**""" - wind_dir_at_hi_speed_last_10_min: Optional[int] - """gust wind direction over last 10 min **(°degree)**""" - - rain_size: CollectorSize - """rain collector type/size""" - - rain_rate_last: float - rain_rate_last_counts: int - """most recent valid rain rate **(counts/hour)**""" - rain_rate_hi: Optional[float] - rain_rate_hi_counts: Optional[int] - """highest rain rate over last 1 min **(counts/hour)**""" - - rainfall_last_15_min: Optional[float] - rainfall_last_15_min_counts: Optional[int] - """total rain count over last 15 min **(counts)**""" - rain_rate_hi_last_15_min: float - rain_rate_hi_last_15_min_counts: int - """highest rain rate over last 15 min **(counts/hour)**""" - - rainfall_last_60_min: Optional[float] - rainfall_last_60_min_counts: Optional[int] - """total rain count for last 60 min **(counts)**""" - rainfall_last_24_hr: Optional[float] - rainfall_last_24_hr_counts: Optional[int] - """total rain count for last 24 hours **(counts)**""" - - rain_storm: Optional[float] - rain_storm_counts: Optional[int] - """total rain count since last 24 hour long break in rain **(counts)**""" - rain_storm_start_at: Optional[datetime] - """timestamp of current rain storm start""" - - solar_rad: Optional[int] - """most recent solar radiation **(W/m²)**""" - uv_index: Optional[float] - """most recent UV index **(Index)**""" - - trans_battery_flag: int - """transmitter battery status flag""" - - rainfall_daily: float - rainfall_daily_counts: int - """total rain count since local midnight **(counts)**""" - rainfall_monthly: float - rainfall_monthly_counts: int - """total rain count since first of month at local midnight **(counts)**""" - rainfall_year: float - rainfall_year_counts: int - """total rain count since first of user-chosen month at local midnight **(counts)**""" - - rain_storm_last: Optional[float] - rain_storm_last_counts: Optional[int] - """total rain count since last 24 hour long break in rain **(counts)**""" - rain_storm_last_start_at: Optional[datetime] - """timestamp of last rain storm start **(sec)**""" - rain_storm_last_end_at: Optional[datetime] - """timestamp of last rain storm end **(sec)**""" - - @classmethod - def _from_json(cls, data: JsonObject, **kwargs): - collector = CollectorSize(data["rain_size"]) - data["rain_size"] = collector - json_keys_counts_to_mm( - data, - collector, - "rain_rate_last", - "rain_rate_hi", - "rainfall_last_15_min", - "rain_rate_hi_last_15_min", - "rainfall_last_60_min", - "rainfall_last_24_hr", - "rain_storm", - "rainfall_daily", - "rainfall_monthly", - "rainfall_year", - "rain_storm_last", - ) - json_apply_converters(data, rx_state=ReceiverState) - json_keys_to_celsius( - data, - "temp", - "dew_point", - "wet_bulb", - "heat_index", - "wind_chill", - "thw_index", - "thsw_index", - ) - json_keys_to_datetime( - data, - "rain_storm_start_at", - "rain_storm_last_start_at", - "rain_storm_last_end_at", - ) - json_keys_to_kph( - data, - "wind_speed_last", - "wind_speed_avg_last_1_min", - "wind_speed_avg_last_2_min", - "wind_speed_hi_last_2_min", - "wind_speed_avg_last_10_min", - "wind_speed_hi_last_10_min", - ) - return cls(**data) - - -@dataclasses.dataclass() -class MoistureCondition(ConditionRecord): - txid: int - rx_state: Optional[ReceiverState] - """configured radio receiver state""" - - temp_1: Optional[float] - """most recent valid soil temp slot 1""" - temp_2: Optional[float] - """most recent valid soil temp slot 2""" - temp_3: Optional[float] - """most recent valid soil temp slot 3""" - temp_4: Optional[float] - """most recent valid soil temp slot 4""" - - moist_soil_1: Optional[float] - """most recent valid soil moisture slot 1 **(|cb|)**""" - moist_soil_2: Optional[float] - """most recent valid soil moisture slot 2 **(|cb|)**""" - moist_soil_3: Optional[float] - """most recent valid soil moisture slot 3 **(|cb|)**""" - moist_soil_4: Optional[float] - """most recent valid soil moisture slot 4 **(|cb|)**""" - - wet_leaf_1: Optional[float] - """most recent valid leaf wetness slot 1""" - wet_leaf_2: Optional[float] - """most recent valid leaf wetness slot 2""" - - trans_battery_flag: Optional[int] - """transmitter battery status flag""" - - @classmethod - def _from_json(cls, data: JsonObject, **kwargs): - json_apply_converters(data, rx_state=ReceiverState) - json_keys_to_celsius(data, "temp_1", "temp_2", "temp_3", "temp_4") - return cls(**data) - - -@dataclasses.dataclass() -class LssBarCondition(ConditionRecord): - bar_sea_level: float - """most recent bar sensor reading with elevation adjustment **(hpa)**""" - bar_trend: Optional[float] - """current 3 hour bar trend **(hpa)**""" - bar_absolute: float - """raw bar sensor reading **(hpa)**""" - - @classmethod - def _from_json(cls, data: JsonObject, **kwargs): - json_keys_to_hpa(data, "bar_sea_level", "bar_trend", "bar_absolute") - return cls(**data) - - -@dataclasses.dataclass() -class LssTempHumCondition(ConditionRecord): - temp_in: float - """most recent valid inside temp""" - hum_in: float - """most recent valid inside humidity **(%RH)**""" - dew_point_in: float - """""" - heat_index_in: float - """""" - - @classmethod - def _from_json(cls, data: JsonObject, **kwargs): - json_keys_to_celsius(data, "temp_in", "dew_point_in", "heat_index_in") - return cls(**data) - - -@dataclasses.dataclass() -class AirQualityCondition(ConditionRecord): - temp: float - """most recent valid air temperature reading""" - hum: float - """most recent valid humidity reading in %RH""" - dew_point: float - """dew point temperature calculated from the most recent valid temperature / humidity reading""" - wet_bulb: float - """wet bulb temperature calculated from the most recent valid temperature / humidity reading and user elevation""" - heat_index: float - """heat index temperature calculated from the most recent valid temperature / humidity reading""" - - pm_1_last: int - """most recent valid PM 1.0 reading calculated using atmospheric calibration in µg/m^3.""" - pm_2p5_last: int - """most recent valid PM 2.5 reading calculated using atmospheric calibration in µg/m^3.""" - pm_10_last: int - """most recent valid PM 10.0 reading calculated using atmospheric calibration in µg/m^3.""" - - pm_1: float - """average of all PM 1.0 readings in the last minute calculated using atmospheric calibration in µg/m^3.""" - pm_2p5: float - """average of all PM 2.5 readings in the last minute calculated using atmospheric calibration in µg/m^3.""" - pm_10: float - """average of all PM 10.0 readings in the last minute calculated using atmospheric calibration in µg/m^3.""" - - pm_2p5_last_1_hour: float - """average of all PM 2.5 readings in the last hour calculated using atmospheric calibration in µg/m^3.""" - pm_2p5_last_3_hours: float - """average of all PM 2.5 readings in the last 3 hours calculated using atmospheric calibration in µg/m^3.""" - pm_2p5_last_24_hours: float - """weighted average of all PM 2.5 readings in the last 24 hours calculated using atmospheric calibration in µg/m^3.""" - pm_2p5_nowcast: float - """weighted average of all PM 2.5 readings in the last 12 hours calculated using atmospheric calibration in µg/m^3.""" - - pm_10_last_1_hour: float - """average of all PM 10.0 readings in the last hour calculated using atmospheric calibration in µg/m^3.""" - pm_10_last_3_hours: float - """average of all PM 10.0 readings in the last 3 hours calculated using atmospheric calibration in µg/m^3.""" - pm_10_last_24_hours: float - """weighted average of all PM 10.0 readings in the last 24 hours calculated using atmospheric calibration in µg/m^3.""" - pm_10_nowcast: float - """weighted average of all PM 10.0 readings in the last 12 hours calculated using atmospheric calibration in µg/m^3.""" - - last_report_time: datetime - """timestamp of the last time a valid reading was received from the PM sensor (or time since boot if time has not been synced), with resolution of seconds.""" - - pct_pm_data_last_1_hour: int - """amount of PM data available to calculate averages in the last hour (rounded down to the nearest percent)""" - pct_pm_data_last_3_hours: int - """amount of PM data available to calculate averages in the last 3 hours (rounded down to the nearest percent)""" - pct_pm_data_last_24_hours: int - """amount of PM data available to calculate averages in the last 24 hours (rounded down to the nearest percent)""" - pct_pm_data_nowcast: int - """amount of PM data available to calculate averages in the last 12 hours (rounded down to the nearest percent)""" - - @classmethod - def _from_json(cls, data: JsonObject, **kwargs): - json_keys_to_celsius(data, "temp", "dew_point", "wet_bulb", "heat_index") - json_keys_to_datetime(data, "last_report_time") - return cls(**data) - - -_STRUCTURE_TYPE_KEY = "data_structure_type" - -_COND2CLS = { - ConditionType.Iss: IssCondition, - ConditionType.Moisture: MoistureCondition, - ConditionType.LssBar: LssBarCondition, - ConditionType.LssTempHum: LssTempHumCondition, - ConditionType.AirQuality: AirQualityCondition, -} - - -def condition_from_json(data: JsonObject, **kwargs) -> ConditionRecord: - cond_ty = ConditionType(data.pop(_STRUCTURE_TYPE_KEY)) - cls = cond_ty.record_class() - return cls.from_json(data, **kwargs) - - -def flatten_conditions(conditions: Iterable[JsonObject]) -> List[JsonObject]: - cond_by_type = {} - for cond in conditions: - cond_type = cond[_STRUCTURE_TYPE_KEY] - try: - existing = cond_by_type[cond_type] - except KeyError: - cond_by_type[cond_type] = cond - else: - update_dict_where_none(existing, cond) - - return list(cond_by_type.values()) - - -class DeviceType(enum.Enum): - WeatherLink = "WeatherLink" - AirLink = "AirLink" - - -RecordT = TypeVar("RecordT", bound=ConditionRecord) - - -@dataclasses.dataclass() -class CurrentConditions(FromJson, Mapping[Type[RecordT], RecordT]): - did: str - """the device serial number as a string""" - ts: datetime - """the timestamp of the moment the response was generated, with a resolution of seconds. - - If the time has not yet been synchronized from the network, this will instead measure the time in seconds since bootup. - """ - - conditions: List[ConditionRecord] - """a list of current condition data records, one per logical sensor.""" - - name: Optional[str] = None - """Only present for AirLink""" - - @classmethod - def _from_json(cls, data: JsonObject, **kwargs): - conditions = [] - raw_conditions = flatten_conditions(data["conditions"]) - for i, cond_data in enumerate(raw_conditions): - try: - cond = condition_from_json(cond_data, **kwargs) - except Exception: - if kwargs.get(cls.OPT_STRICT): - raise - - logger.exception( - f"failed to build condition record at index {i}: {cond_data!r}" - ) - continue - - conditions.append(cond) - - return cls( - did=data["did"], - ts=datetime.fromtimestamp(data["ts"]), - conditions=conditions, - name=data.get("name"), - ) - - def __getitem__(self, cls: Type[RecordT]) -> RecordT: - try: - return next(cond for cond in self.conditions if isinstance(cond, cls)) - except StopIteration: - raise KeyError(repr(cls.__qualname__)) from None - - def __iter__(self) -> Iterable[ConditionRecord]: - return iter(self.conditions) - - def __len__(self) -> int: - return len(self.conditions) - - def get(self, cls: Type[RecordT]) -> Optional[RecordT]: - try: - return self[cls] - except KeyError: - return None - - def determine_device_type(self) -> DeviceType: - if self.name is None: - return DeviceType.WeatherLink - else: - return DeviceType.AirLink - - def determine_device_name(self) -> str: - name = self.name - if name: - return name - - model_name = self.determine_device_type().name - return f"{model_name} {self.did}" - - -@dataclasses.dataclass() -class ApiError(Exception, FromJson): - code: int - message: str - - def __str__(self) -> str: - return f"{self.code}: {self.message}" - - @classmethod - def _from_json(cls, data: JsonObject, **kwargs): - return cls(code=data["code"], message=data["message"]) - - -def get_data_from_body(body: JsonObject) -> JsonObject: - if err := body.get("error"): - raise ApiError.from_json(err) - - return body["data"] - - -class WeatherLinkSession: - EP_CURRENT_CONDITIONS = "/v1/current_conditions" - - session: aiohttp.ClientSession - base_url: str - - def __init__(self, session: aiohttp.ClientSession, base_url: str) -> None: - self.session = session - self.base_url = base_url - - async def _request(self, path: str) -> JsonObject: - async with self.session.get(self.base_url + path) as resp: - body = await resp.json() - return get_data_from_body(body) - - async def current_conditions(self) -> CurrentConditions: - raw_data = await self._request(self.EP_CURRENT_CONDITIONS) - return CurrentConditions.from_json(raw_data) - - -def fahrenheit_to_celsius(value: float) -> float: - return (value - 32) * 5 / 9 - - -def mph_to_kph(value: float) -> float: - return 1.609344 * value - - -def in_hg_to_hpa(value: float) -> float: - return 33.86389 * value - - -def json_apply_converters(d: JsonObject, **converters: Callable[[Any], Any]) -> None: - for key, converter in converters.items(): - try: - value = d[key] - except KeyError: - continue - - if value is None: - continue - - d[key] = converter(value) - - -def json_keys_to_datetime(d: JsonObject, *keys: str) -> None: - json_apply_converters(d, **{key: datetime.fromtimestamp for key in keys}) - - -def json_keys_to_celsius(d: JsonObject, *keys: str) -> None: - json_apply_converters(d, **{key: fahrenheit_to_celsius for key in keys}) - - -def json_keys_to_kph(d: JsonObject, *keys: str) -> None: - json_apply_converters(d, **{key: mph_to_kph for key in keys}) - - -def json_keys_to_hpa(d: JsonObject, *keys: str) -> None: - json_apply_converters(d, **{key: in_hg_to_hpa for key in keys}) - - -def json_keys_counts_to_mm(d: JsonObject, collector: CollectorSize, *keys: str): - for key in keys: - counts = d.get(key) - d[f"{key}_counts"] = counts - if counts: - d[key] = collector * counts - - -def json_set_default_none(d: JsonObject, *keys: str) -> None: - for key in keys: - if key not in d: - d[key] = None - - -def update_dict_where_none(d: JsonObject, updates: JsonObject) -> None: - for key, value in updates.items(): - if d.get(key) is None: - d[key] = value diff --git a/custom_components/weatherlink/api/__init__.py b/custom_components/weatherlink/api/__init__.py new file mode 100644 index 0000000..56a3ad5 --- /dev/null +++ b/custom_components/weatherlink/api/__init__.py @@ -0,0 +1,10 @@ +from .broadcast import WeatherLinkBroadcast +from .conditions import CurrentConditions +from .rest import ApiError, WeatherLinkRest + +__all__ = [ + "ApiError", + "CurrentConditions", + "WeatherLinkBroadcast", + "WeatherLinkRest", +] diff --git a/custom_components/weatherlink/api/broadcast.py b/custom_components/weatherlink/api/broadcast.py new file mode 100644 index 0000000..ecf7f01 --- /dev/null +++ b/custom_components/weatherlink/api/broadcast.py @@ -0,0 +1,176 @@ +import asyncio +import json +import logging +import time +from datetime import timedelta +from typing import Any, Optional, Tuple, Union + +from .conditions import CurrentConditions +from .rest import WeatherLinkRest + +logger = logging.getLogger(__name__) + + +class Protocol(asyncio.DatagramProtocol): + remote_addr: str + + transport: asyncio.DatagramTransport + queue: asyncio.Queue + connection_lost_fut: asyncio.Future + + def __init__(self, remote_addr: str, *, queue_size: int = 16) -> None: + super().__init__() + self.remote_addr = remote_addr + + # transport made by `connection_made` + self.queue = asyncio.Queue(queue_size) + self.connection_lost_fut = asyncio.Future() + + def __str__(self) -> str: + return f"<{type(self).__qualname__} {self.remote_addr=!r}>" + + @classmethod + async def open(cls, remote_addr: str, *, addr: str, port: int, **kwargs): + loop = asyncio.get_running_loop() + _, protocol = await loop.create_datagram_endpoint( + lambda: cls(remote_addr, **kwargs), + local_addr=(addr, port), + ) + return protocol + + def connection_made(self, transport: asyncio.DatagramTransport) -> None: + logger.debug("%s connection made", self) + self.transport = transport + + def connection_lost(self, exc: Optional[Exception]) -> None: + logger.debug("%s connection lost with error: %s", self, exc) + self.connection_lost_fut.set_result(exc) + + def __queue_put(self, item: Any) -> None: + try: + self.queue.put_nowait(item) + except asyncio.QueueFull: + pass + + def datagram_received(self, data: bytes, addr: Tuple[str, int]) -> None: + if addr[0] != self.remote_addr: + return + + if self.queue.full(): + logger.warning( + "ignoring message from %s because queue is already full", addr + ) + return + + try: + data = json.loads(data) + except Exception as exc: + logger.exception(f"failed to parse broadcast payload from {addr}: {data}") + return + + msg: Union[CurrentConditions, BaseException] + try: + msg = CurrentConditions.from_json(data) + except Exception as exc: + msg = exc + + self.__queue_put(msg) + + async def close(self) -> None: + self.transport.close() + await self.connection_lost_fut + + def raise_if_connection_lost(self) -> None: + fut = self.connection_lost_fut + if not fut.done(): + return + + if exc := fut.result(): + raise exc + + raise RuntimeError("connection closed") + + async def __queue_get_raw(self) -> Union[CurrentConditions, BaseException]: + try: + return self.queue.get_nowait() + except asyncio.QueueEmpty: + pass + + conn_lost = self.connection_lost_fut + queue_get = asyncio.create_task(self.queue.get()) + await asyncio.wait({conn_lost, queue_get}, return_when=asyncio.FIRST_COMPLETED) + # handle the case where the connection was lost + self.raise_if_connection_lost() + return await queue_get + + async def queue_get(self) -> CurrentConditions: + msg = await self.__queue_get_raw() + if isinstance(msg, BaseException): + raise msg + return msg + + +class BroadcastRenewer: + remote_addr: str + broadcast_port: int + + _rest: WeatherLinkRest + _duration: timedelta + _renew_at: float + + def __init__( + self, + rest: WeatherLinkRest, + duration: timedelta, + ) -> None: + self._rest = rest + self._duration = duration + self._renew_at = 0.0 + + @classmethod + async def init(cls, rest: WeatherLinkRest, *, duration: timedelta): + inst = cls(rest, duration) + await inst.update() + return inst + + def should_renew(self) -> bool: + return time.time() >= self._renew_at + + async def update(self) -> bool: + if not self.should_renew(): + return False + + logger.info("renewing real-time broadcast") + rt = await self._rest.real_time(duration=self._duration) + + self._renew_at = time.time() + rt.duration / 2 + self.remote_addr = rt.addr + self.broadcast_port = rt.broadcast_port + return True + + +class WeatherLinkBroadcast: + _protocol: Protocol + _renewer: BroadcastRenewer + + def __init__(self, protocol: Protocol, renewer: BroadcastRenewer) -> None: + self._protocol = protocol + self._renewer = renewer + + @classmethod + async def start(cls, rest: WeatherLinkRest): + renewer: BroadcastRenewer = await BroadcastRenewer.init( + rest, duration=timedelta(hours=1) + ) + protocol = await Protocol.open( + renewer.remote_addr, addr="0.0.0.0", port=renewer.broadcast_port + ) + return cls(protocol, renewer) + + async def stop(self) -> None: + await self._protocol.close() + + async def read(self) -> CurrentConditions: + if await self._renewer.update(): + self._protocol.remote_addr = self._renewer.remote_addr + return await self._protocol.queue_get() diff --git a/custom_components/weatherlink/api/conditions/__init__.py b/custom_components/weatherlink/api/conditions/__init__.py new file mode 100644 index 0000000..7af894d --- /dev/null +++ b/custom_components/weatherlink/api/conditions/__init__.py @@ -0,0 +1,166 @@ +import dataclasses +import enum +import logging +from datetime import datetime +from typing import Iterable, List, Mapping, Optional, Type, TypeVar + +from .. import from_json +from . import air_quality, condition, iss, lss, moisture +from .air_quality import * +from .condition import * +from .iss import * +from .lss import * +from .moisture import * + +__all__ = [ + "ConditionType", + "CurrentConditions", + "DeviceType", + *air_quality.__all__, + *condition.__all__, + *iss.__all__, + *lss.__all__, + *moisture.__all__, +] + +logger = logging.getLogger(__name__) + + +class ConditionType(enum.IntEnum): + Iss = 1 + Moisture = 2 + LssBar = 3 + LssTempHum = 4 + AirQuality = 6 + """Sent by AirLink""" + + def record_class(self) -> Type["ConditionRecord"]: + return _COND2CLS[self] + + +class DeviceType(enum.Enum): + WeatherLink = "WeatherLink" + AirLink = "AirLink" + + def supports_real_time_api(self) -> bool: + return self != self.AirLink + + +RecordT = TypeVar("RecordT", bound=ConditionRecord) + + +@dataclasses.dataclass() +class CurrentConditions(from_json.FromJson, Mapping[Type[RecordT], RecordT]): + did: str + """the device serial number as a string""" + ts: datetime + """the timestamp of the moment the response was generated, with a resolution of seconds. + + If the time has not yet been synchronized from the network, this will instead measure the time in seconds since bootup. + """ + + conditions: List[ConditionRecord] + """a list of current condition data records, one per logical sensor.""" + + name: Optional[str] = None + """Only present for AirLink""" + + @classmethod + def _from_json(cls, data: from_json.JsonObject, **kwargs): + conditions = [] + raw_conditions = flatten_conditions(data["conditions"]) + for i, cond_data in enumerate(raw_conditions): + try: + cond = condition_from_json(cond_data, **kwargs) + except Exception: + if kwargs.get(cls.OPT_STRICT): + raise + + logger.exception( + f"failed to build condition record at index {i}: {cond_data!r}" + ) + continue + + conditions.append(cond) + + return cls( + did=data["did"], + ts=datetime.fromtimestamp(data["ts"]), + conditions=conditions, + name=data.get("name"), + ) + + def __getitem__(self, cls: Type[RecordT]) -> RecordT: + try: + return next(cond for cond in self.conditions if isinstance(cond, cls)) + except StopIteration: + raise KeyError(repr(cls.__qualname__)) from None + + def __iter__(self) -> Iterable[ConditionRecord]: + return iter(self.conditions) + + def __len__(self) -> int: + return len(self.conditions) + + def get(self, cls: Type[RecordT]) -> Optional[RecordT]: + try: + return self[cls] + except KeyError: + return None + + def determine_device_type(self) -> DeviceType: + if self.name is None: + return DeviceType.WeatherLink + else: + return DeviceType.AirLink + + def determine_device_name(self) -> str: + name = self.name + if name: + return name + + model_name = self.determine_device_type().name + return f"{model_name} {self.did}" + + def update_from(self, other: "CurrentConditions") -> None: + for other_condition in other: + condition_cls = type(other_condition) + try: + condition: ConditionRecord = self[condition_cls] + except KeyError: + self.conditions.append(other_condition) + else: + condition.update_from(other_condition) + + +_STRUCTURE_TYPE_KEY = "data_structure_type" + +_COND2CLS = { + ConditionType.Iss: IssCondition, + ConditionType.Moisture: MoistureCondition, + ConditionType.LssBar: LssBarCondition, + ConditionType.LssTempHum: LssTempHumCondition, + ConditionType.AirQuality: AirQualityCondition, +} + + +def condition_from_json(data: from_json.JsonObject, **kwargs) -> ConditionRecord: + cond_ty = ConditionType(data.pop(_STRUCTURE_TYPE_KEY)) + cls = cond_ty.record_class() + return cls.from_json(data, **kwargs) + + +def flatten_conditions( + conditions: Iterable[from_json.JsonObject], +) -> List[from_json.JsonObject]: + cond_by_type = {} + for cond in conditions: + cond_type = cond[_STRUCTURE_TYPE_KEY] + try: + existing = cond_by_type[cond_type] + except KeyError: + cond_by_type[cond_type] = cond + else: + from_json.update_dict_where_none(existing, cond) + + return list(cond_by_type.values()) diff --git a/custom_components/weatherlink/api/conditions/air_quality.py b/custom_components/weatherlink/api/conditions/air_quality.py new file mode 100644 index 0000000..42622f9 --- /dev/null +++ b/custom_components/weatherlink/api/conditions/air_quality.py @@ -0,0 +1,73 @@ +import dataclasses +from datetime import datetime + +from .. import from_json +from .condition import ConditionRecord + +__all__ = [ + "AirQualityCondition", +] + + +@dataclasses.dataclass() +class AirQualityCondition(ConditionRecord): + temp: float + """most recent valid air temperature reading""" + hum: float + """most recent valid humidity reading in %RH""" + dew_point: float + """dew point temperature calculated from the most recent valid temperature / humidity reading""" + wet_bulb: float + """wet bulb temperature calculated from the most recent valid temperature / humidity reading and user elevation""" + heat_index: float + """heat index temperature calculated from the most recent valid temperature / humidity reading""" + + pm_1_last: int + """most recent valid PM 1.0 reading calculated using atmospheric calibration in µg/m^3.""" + pm_2p5_last: int + """most recent valid PM 2.5 reading calculated using atmospheric calibration in µg/m^3.""" + pm_10_last: int + """most recent valid PM 10.0 reading calculated using atmospheric calibration in µg/m^3.""" + + pm_1: float + """average of all PM 1.0 readings in the last minute calculated using atmospheric calibration in µg/m^3.""" + pm_2p5: float + """average of all PM 2.5 readings in the last minute calculated using atmospheric calibration in µg/m^3.""" + pm_10: float + """average of all PM 10.0 readings in the last minute calculated using atmospheric calibration in µg/m^3.""" + + pm_2p5_last_1_hour: float + """average of all PM 2.5 readings in the last hour calculated using atmospheric calibration in µg/m^3.""" + pm_2p5_last_3_hours: float + """average of all PM 2.5 readings in the last 3 hours calculated using atmospheric calibration in µg/m^3.""" + pm_2p5_last_24_hours: float + """weighted average of all PM 2.5 readings in the last 24 hours calculated using atmospheric calibration in µg/m^3.""" + pm_2p5_nowcast: float + """weighted average of all PM 2.5 readings in the last 12 hours calculated using atmospheric calibration in µg/m^3.""" + + pm_10_last_1_hour: float + """average of all PM 10.0 readings in the last hour calculated using atmospheric calibration in µg/m^3.""" + pm_10_last_3_hours: float + """average of all PM 10.0 readings in the last 3 hours calculated using atmospheric calibration in µg/m^3.""" + pm_10_last_24_hours: float + """weighted average of all PM 10.0 readings in the last 24 hours calculated using atmospheric calibration in µg/m^3.""" + pm_10_nowcast: float + """weighted average of all PM 10.0 readings in the last 12 hours calculated using atmospheric calibration in µg/m^3.""" + + last_report_time: datetime + """timestamp of the last time a valid reading was received from the PM sensor (or time since boot if time has not been synced), with resolution of seconds.""" + + pct_pm_data_last_1_hour: int + """amount of PM data available to calculate averages in the last hour (rounded down to the nearest percent)""" + pct_pm_data_last_3_hours: int + """amount of PM data available to calculate averages in the last 3 hours (rounded down to the nearest percent)""" + pct_pm_data_last_24_hours: int + """amount of PM data available to calculate averages in the last 24 hours (rounded down to the nearest percent)""" + pct_pm_data_nowcast: int + """amount of PM data available to calculate averages in the last 12 hours (rounded down to the nearest percent)""" + + @classmethod + def _from_json(cls, data: from_json.JsonObject, **kwargs): + from_json.keys_to_celsius(data, "temp", "dew_point", "wet_bulb", "heat_index") + from_json.keys_to_datetime(data, "last_report_time") + return cls(**data) diff --git a/custom_components/weatherlink/api/conditions/condition.py b/custom_components/weatherlink/api/conditions/condition.py new file mode 100644 index 0000000..8502fe9 --- /dev/null +++ b/custom_components/weatherlink/api/conditions/condition.py @@ -0,0 +1,32 @@ +import abc +import dataclasses +import enum +from typing import Optional + +from ..from_json import FromJson + +__all__ = [ + "ConditionRecord", + "ReceiverState", +] + + +class ReceiverState(enum.IntEnum): + Tracking = 0 + """Transmitter has been acquired and is actively being received.""" + Synched = 1 + """Transmitter has been acquired, but we have missed 1-14 packets in a row.""" + Scanning = 2 + """Transmitter has not been acquired yet, or we’ve lost it (more than 15 missed packets in a row).""" + + +@dataclasses.dataclass() +class ConditionRecord(FromJson, abc.ABC): + lsid: Optional[int] + """the numeric logic sensor identifier, or null if the device has not been registered""" + + def update_from(self, other: "ConditionRecord") -> None: + for key, value in dataclasses.asdict(other).items(): + if value is None: + continue + setattr(self, key, value) diff --git a/custom_components/weatherlink/api/conditions/iss.py b/custom_components/weatherlink/api/conditions/iss.py new file mode 100644 index 0000000..9442151 --- /dev/null +++ b/custom_components/weatherlink/api/conditions/iss.py @@ -0,0 +1,212 @@ +import dataclasses +import enum +from datetime import datetime +from typing import Optional + +from .. import from_json +from .condition import ConditionRecord, ReceiverState + +__all__ = [ + "CollectorSize", + "IssCondition", +] + + +class CollectorSize(enum.IntEnum): + Millimeter01 = 3 + """0.1 mm""" + Millimeter02 = 2 + """0.2 mm""" + + Inches001 = 1 + """(0.01")""" + Inches0001 = 4 + """(0.001")""" + + def __mul__(self, x: int) -> int: + return x * self.to_mm() + + def to_mm(self) -> float: + return _COLLECTOR2MM[self] + + +@dataclasses.dataclass() +class IssCondition(ConditionRecord): + txid: int + """transmitter ID""" + + rain_size: CollectorSize + """rain collector type/size""" + + rain_rate_last: float + rain_rate_last_counts: int + """most recent valid rain rate **(counts/hour)**""" + + rainfall_daily: float + rainfall_daily_counts: int + """total rain count since local midnight **(counts)**""" + rainfall_monthly: float + rainfall_monthly_counts: int + """total rain count since first of month at local midnight **(counts)**""" + rainfall_year: float + rainfall_year_counts: int + """total rain count since first of user-chosen month at local midnight **(counts)**""" + + rx_state: Optional[ReceiverState] = None + """configured radio receiver state""" + + temp: Optional[float] = None + """most recent valid temperature""" + hum: Optional[float] = None + """most recent valid humidity **(%RH)**""" + dew_point: Optional[float] = None + """""" + wet_bulb: Optional[float] = None + """""" + heat_index: Optional[float] = None + """""" + wind_chill: Optional[float] = None + """""" + thw_index: Optional[float] = None + """""" + thsw_index: Optional[float] = None + """""" + + wind_speed_last: Optional[float] = None + """most recent valid wind speed **(km/h)**""" + wind_dir_last: Optional[int] = None + """most recent valid wind direction **(°degree)**""" + + wind_speed_avg_last_1_min: Optional[float] = None + """average wind speed over last 1 min **(km/h)**""" + wind_dir_scalar_avg_last_1_min: Optional[int] = None + """scalar average wind direction over last 1 min **(°degree)**""" + + wind_speed_avg_last_2_min: Optional[float] = None + """average wind speed over last 2 min **(km/h)**""" + wind_dir_scalar_avg_last_2_min: Optional[int] = None + """scalar average wind direction over last 2 min **(°degree)**""" + + wind_speed_hi_last_2_min: Optional[float] = None + """maximum wind speed over last 2 min **(km/h)**""" + wind_dir_at_hi_speed_last_2_min: Optional[int] = None + """gust wind direction over last 2 min **(°degree)**""" + + wind_speed_avg_last_10_min: Optional[float] = None + """average wind speed over last 10 min **(km/h)**""" + wind_dir_scalar_avg_last_10_min: Optional[int] = None + """scalar average wind direction over last 10 min **(°degree)**""" + + wind_speed_hi_last_10_min: Optional[float] = None + """maximum wind speed over last 10 min **(km/h)**""" + wind_dir_at_hi_speed_last_10_min: Optional[int] = None + """gust wind direction over last 10 min **(°degree)**""" + + rain_rate_hi: Optional[float] = None + rain_rate_hi_counts: Optional[int] = None + """highest rain rate over last 1 min **(counts/hour)**""" + rain_rate_hi_last_15_min: Optional[float] = None + rain_rate_hi_last_15_min_counts: Optional[int] = None + """highest rain rate over last 15 min **(counts/hour)**""" + + rainfall_last_15_min: Optional[float] = None + rainfall_last_15_min_counts: Optional[int] = None + """total rain count over last 15 min **(counts)**""" + rainfall_last_60_min: Optional[float] = None + rainfall_last_60_min_counts: Optional[int] = None + """total rain count for last 60 min **(counts)**""" + rainfall_last_24_hr: Optional[float] = None + rainfall_last_24_hr_counts: Optional[int] = None + """total rain count for last 24 hours **(counts)**""" + + rain_storm: Optional[float] = None + rain_storm_counts: Optional[int] = None + """total rain count since last 24 hour long break in rain **(counts)**""" + rain_storm_start_at: Optional[datetime] = None + """timestamp of current rain storm start""" + + solar_rad: Optional[int] = None + """most recent solar radiation **(W/m²)**""" + uv_index: Optional[float] = None + """most recent UV index **(Index)**""" + + trans_battery_flag: Optional[int] = None + """transmitter battery status flag""" + + rain_storm_last: Optional[float] = None + rain_storm_last_counts: Optional[int] = None + """total rain count since last 24 hour long break in rain **(counts)**""" + rain_storm_last_start_at: Optional[datetime] = None + """timestamp of last rain storm start **(sec)**""" + rain_storm_last_end_at: Optional[datetime] = None + """timestamp of last rain storm end **(sec)**""" + + @classmethod + def _from_json(cls, data: from_json.JsonObject, **kwargs): + collector = CollectorSize(data["rain_size"]) + data["rain_size"] = collector + from_json.keys_from_aliases( + data, + rainfall_last_15_min="rain_15_min", + rainfall_last_60_min="rain_60_min", + rainfall_last_24_hr="rain_24_hr", + ) + keys_counts_to_mm( + data, + collector, + "rain_rate_last", + "rain_rate_hi", + "rainfall_last_15_min", + "rain_rate_hi_last_15_min", + "rainfall_last_60_min", + "rainfall_last_24_hr", + "rain_storm", + "rainfall_daily", + "rainfall_monthly", + "rainfall_year", + "rain_storm_last", + ) + from_json.apply_converters(data, rx_state=ReceiverState) + from_json.keys_to_celsius( + data, + "temp", + "dew_point", + "wet_bulb", + "heat_index", + "wind_chill", + "thw_index", + "thsw_index", + ) + from_json.keys_to_datetime( + data, + "rain_storm_start_at", + "rain_storm_last_start_at", + "rain_storm_last_end_at", + ) + from_json.keys_to_kph( + data, + "wind_speed_last", + "wind_speed_avg_last_1_min", + "wind_speed_avg_last_2_min", + "wind_speed_hi_last_2_min", + "wind_speed_avg_last_10_min", + "wind_speed_hi_last_10_min", + ) + return cls(**data) + + +_IN2MM = 25.4 +_COLLECTOR2MM = { + CollectorSize.Millimeter01: 0.1, + CollectorSize.Millimeter02: 0.2, + CollectorSize.Inches001: 0.01 * _IN2MM, + CollectorSize.Inches0001: 0.001 * _IN2MM, +} + + +def keys_counts_to_mm(d: from_json.JsonObject, collector: CollectorSize, *keys: str): + for key in keys: + counts = d.get(key) + d[f"{key}_counts"] = counts + if counts: + d[key] = collector * counts diff --git a/custom_components/weatherlink/api/conditions/lss.py b/custom_components/weatherlink/api/conditions/lss.py new file mode 100644 index 0000000..bee469e --- /dev/null +++ b/custom_components/weatherlink/api/conditions/lss.py @@ -0,0 +1,42 @@ +import dataclasses +from typing import Optional + +from .. import from_json +from .condition import ConditionRecord + +__all__ = [ + "LssBarCondition", + "LssTempHumCondition", +] + + +@dataclasses.dataclass() +class LssBarCondition(ConditionRecord): + bar_sea_level: float + """most recent bar sensor reading with elevation adjustment **(hpa)**""" + bar_trend: Optional[float] + """current 3 hour bar trend **(hpa)**""" + bar_absolute: float + """raw bar sensor reading **(hpa)**""" + + @classmethod + def _from_json(cls, data: from_json.JsonObject, **kwargs): + from_json.keys_to_hpa(data, "bar_sea_level", "bar_trend", "bar_absolute") + return cls(**data) + + +@dataclasses.dataclass() +class LssTempHumCondition(ConditionRecord): + temp_in: float + """most recent valid inside temp""" + hum_in: float + """most recent valid inside humidity **(%RH)**""" + dew_point_in: float + """""" + heat_index_in: float + """""" + + @classmethod + def _from_json(cls, data: from_json.JsonObject, **kwargs): + from_json.keys_to_celsius(data, "temp_in", "dew_point_in", "heat_index_in") + return cls(**data) diff --git a/custom_components/weatherlink/api/conditions/moisture.py b/custom_components/weatherlink/api/conditions/moisture.py new file mode 100644 index 0000000..061b290 --- /dev/null +++ b/custom_components/weatherlink/api/conditions/moisture.py @@ -0,0 +1,48 @@ +import dataclasses +from typing import Optional + +from .. import from_json +from .condition import ConditionRecord, ReceiverState + +__all__ = [ + "MoistureCondition", +] + + +@dataclasses.dataclass() +class MoistureCondition(ConditionRecord): + txid: int + rx_state: Optional[ReceiverState] + """configured radio receiver state""" + + temp_1: Optional[float] + """most recent valid soil temp slot 1""" + temp_2: Optional[float] + """most recent valid soil temp slot 2""" + temp_3: Optional[float] + """most recent valid soil temp slot 3""" + temp_4: Optional[float] + """most recent valid soil temp slot 4""" + + moist_soil_1: Optional[float] + """most recent valid soil moisture slot 1 **(|cb|)**""" + moist_soil_2: Optional[float] + """most recent valid soil moisture slot 2 **(|cb|)**""" + moist_soil_3: Optional[float] + """most recent valid soil moisture slot 3 **(|cb|)**""" + moist_soil_4: Optional[float] + """most recent valid soil moisture slot 4 **(|cb|)**""" + + wet_leaf_1: Optional[float] + """most recent valid leaf wetness slot 1""" + wet_leaf_2: Optional[float] + """most recent valid leaf wetness slot 2""" + + trans_battery_flag: Optional[int] + """transmitter battery status flag""" + + @classmethod + def _from_json(cls, data: from_json.JsonObject, **kwargs): + from_json.apply_converters(data, rx_state=ReceiverState) + from_json.keys_to_celsius(data, "temp_1", "temp_2", "temp_3", "temp_4") + return cls(**data) diff --git a/custom_components/weatherlink/api/from_json.py b/custom_components/weatherlink/api/from_json.py new file mode 100644 index 0000000..5c806a3 --- /dev/null +++ b/custom_components/weatherlink/api/from_json.py @@ -0,0 +1,116 @@ +import abc +import logging +from datetime import datetime +from typing import Any, Callable, Dict, Iterable, Type, TypeVar, Union + +__all__ = [ + "FromJson", + "JsonObject", +] + +logger = logging.getLogger(__name__) + +JsonObject = Dict[str, Any] + + +FromJsonT = TypeVar("FromJsonT", bound="FromJson") + + +class FromJson(abc.ABC): + OPT_STRICT = "strict" + + @classmethod + @abc.abstractmethod + def _from_json(cls, data: JsonObject, **kwargs): + ... + + @classmethod + def from_json(cls: Type[FromJsonT], data: JsonObject, **kwargs) -> FromJsonT: + try: + return cls._from_json(data, **kwargs) + except Exception as e: + if not getattr(e, "_handled", False): + logger.error( + f"failed to create `{cls.__qualname__}` from JSON: {data!r}" + ) + e._handled = True + + raise e from None + + +def fahrenheit_to_celsius(value: float) -> float: + return (value - 32) * 5 / 9 + + +def mph_to_kph(value: float) -> float: + return 1.609344 * value + + +def in_hg_to_hpa(value: float) -> float: + return 33.86389 * value + + +def apply_converters(d: JsonObject, **converters: Callable[[Any], Any]) -> None: + for key, converter in converters.items(): + try: + value = d[key] + except KeyError: + continue + + if value is None: + continue + + d[key] = converter(value) + + +def keys_to_datetime(d: JsonObject, *keys: str) -> None: + apply_converters(d, **{key: datetime.fromtimestamp for key in keys}) + + +def keys_to_celsius(d: JsonObject, *keys: str) -> None: + apply_converters(d, **{key: fahrenheit_to_celsius for key in keys}) + + +def keys_to_kph(d: JsonObject, *keys: str) -> None: + apply_converters(d, **{key: mph_to_kph for key in keys}) + + +def keys_to_hpa(d: JsonObject, *keys: str) -> None: + apply_converters(d, **{key: in_hg_to_hpa for key in keys}) + + +def remove_optional_keys(d: JsonObject, *keys: str) -> None: + for key in keys: + try: + del d[key] + except KeyError: + continue + + +def keys_from_aliases(d: JsonObject, **key_aliases: Union[str, Iterable[str]]) -> None: + for key, aliases in key_aliases.items(): + if key in d: + continue + + if isinstance(aliases, str): + aliases = (aliases,) + + for alias in aliases: + try: + value = d[alias] + except KeyError: + continue + + d[key] = value + break + + for aliases in key_aliases.values(): + if isinstance(aliases, str): + aliases = (aliases,) + remove_optional_keys(d, *aliases) + + +def update_dict_where_none(d: JsonObject, updates: JsonObject) -> None: + for key, value in updates.items(): + if d.get(key) is None: + d[key] = value diff --git a/custom_components/weatherlink/api/rest.py b/custom_components/weatherlink/api/rest.py new file mode 100644 index 0000000..57f5495 --- /dev/null +++ b/custom_components/weatherlink/api/rest.py @@ -0,0 +1,89 @@ +import asyncio +import dataclasses +from datetime import timedelta +from typing import Mapping, Type + +import aiohttp + +from .conditions import CurrentConditions +from .from_json import FromJson, FromJsonT, JsonObject + +EP_CURRENT_CONDITIONS = "/v1/current_conditions" +EP_REAL_TIME = "/v1/real_time" + + +@dataclasses.dataclass() +class RealTimeBroadcastResponse(FromJson): + addr: str = dataclasses.field(init=False) + broadcast_port: int + duration: float + + @classmethod + def _from_json(cls, data: JsonObject, **kwargs): + return cls(**data) + + +@dataclasses.dataclass() +class ApiError(Exception, FromJson): + code: int + message: str + + def __str__(self) -> str: + return f"{self.code}: {self.message}" + + @classmethod + def _from_json(cls, data: JsonObject, **kwargs): + return cls(code=data["code"], message=data["message"]) + + +def raw_data_from_body(body: JsonObject) -> JsonObject: + if err := body.get("error"): + raise ApiError.from_json(err) + + return body["data"] + + +def parse_from_json(cls: Type[FromJsonT], body: JsonObject, **kwargs) -> FromJsonT: + data = raw_data_from_body(body) + return cls.from_json(data, **kwargs) + + +class WeatherLinkRest: + session: aiohttp.ClientSession + base_url: str + + _lock: asyncio.Lock + + def __init__(self, session: aiohttp.ClientSession, base_url: str) -> None: + self.session = session + self.base_url = base_url + + self._lock = asyncio.Lock() + + async def _request( + self, cls: Type[FromJsonT], path: str, /, *, params: Mapping[str, str] = None + ) -> FromJsonT: + # lock is needed because the WeatherLink hardware can't serve multiple clients at once + async with self._lock: + async with self.session.get(self.base_url + path, params=params) as resp: + body = await resp.json() + return parse_from_json(cls, body) + + async def current_conditions(self) -> CurrentConditions: + return await self._request(CurrentConditions, EP_CURRENT_CONDITIONS) + + async def real_time(self, *, duration: timedelta) -> RealTimeBroadcastResponse: + async with self.session.get( + self.base_url + EP_REAL_TIME, + params={"duration": int(duration.total_seconds())}, + ) as resp: + peername_raw = resp.connection.transport.get_extra_info("peername") + if peername_raw is None: + raise ValueError("failed to get peername from request") + + body = await resp.json() + + broadcast_resp = parse_from_json(RealTimeBroadcastResponse, body) + server_addr, _ = peername_raw + broadcast_resp.addr = server_addr + return broadcast_resp diff --git a/custom_components/weatherlink/config_flow.py b/custom_components/weatherlink/config_flow.py index 11eae86..6cee73c 100644 --- a/custom_components/weatherlink/config_flow.py +++ b/custom_components/weatherlink/config_flow.py @@ -3,11 +3,12 @@ from typing import Any, Dict import voluptuous as vol +from aiohttp.client_exceptions import ServerDisconnectedError from homeassistant import config_entries from homeassistant.helpers import aiohttp_client from homeassistant.helpers import config_validation as cv -from .api import WeatherLinkSession +from .api import WeatherLinkRest from .const import DOMAIN from .units import UnitConfig, get_unit_config @@ -15,6 +16,12 @@ FORM_SCHEMA = vol.Schema({vol.Required("host"): str}) +KEY_LISTEN_TO_BROADCASTS = "listen_to_broadcasts" + + +def get_listen_to_broadcasts(config_entry: config_entries.ConfigEntry) -> bool: + return config_entry.options.get(KEY_LISTEN_TO_BROADCASTS, True) + @dataclasses.dataclass() class FormError(Exception): @@ -33,9 +40,14 @@ async def discover(self, host: str) -> dict: logger.info("discovering: %s", host) session = aiohttp_client.async_get_clientsession(self.hass) - session = WeatherLinkSession(session, host) + session = WeatherLinkRest(session, host) try: conditions = await session.current_conditions() + except ServerDisconnectedError: + logger.warning( + f"server {host!r} disconnected during request, this device is probably already being polled" + ) + raise FormError("host", "connect_failed") except Exception: logger.exception(f"failed to connect to {host!r}") raise FormError("host", "connect_failed") @@ -104,6 +116,9 @@ async def async_step_misc(self, user_input=None): errors = {} if user_input is not None: + self.options[KEY_LISTEN_TO_BROADCASTS] = user_input[ + KEY_LISTEN_TO_BROADCASTS + ] try: self.options["update_interval"] = cv.time_period_str( user_input["update_interval"] @@ -120,7 +135,11 @@ async def async_step_misc(self, user_input=None): vol.Required( "update_interval", default=str(get_update_interval(self.config_entry)), - ): str + ): str, + vol.Required( + KEY_LISTEN_TO_BROADCASTS, + default=get_listen_to_broadcasts(self.config_entry), + ): bool, } ), errors=errors, diff --git a/custom_components/weatherlink/manifest.json b/custom_components/weatherlink/manifest.json index e62c6d8..822d01f 100644 --- a/custom_components/weatherlink/manifest.json +++ b/custom_components/weatherlink/manifest.json @@ -2,7 +2,7 @@ "domain": "weatherlink", "name": "WeatherLink", "version": "0.2.1", - "documentation": "https://github.com/siku2/hass-weatherlink", + "documentation": "https://github.com/siku2/hass-weatherlink/wiki", "issue_tracker": "https://github.com/siku2/hass-weatherlink/issues", "codeowners": [ "@siku2" diff --git a/custom_components/weatherlink/sensor.py b/custom_components/weatherlink/sensor.py index 42150ab..d5410ea 100644 --- a/custom_components/weatherlink/sensor.py +++ b/custom_components/weatherlink/sensor.py @@ -1,5 +1,5 @@ from . import WeatherLinkCoordinator, units -from .api import LssBarCondition, LssTempHumCondition +from .api.conditions import LssBarCondition, LssTempHumCondition from .const import DECIMALS_HUMIDITY, DOMAIN from .sensor_air_quality import * from .sensor_common import WeatherLinkSensor, round_optional diff --git a/custom_components/weatherlink/sensor_air_quality.py b/custom_components/weatherlink/sensor_air_quality.py index d782be8..170b0e1 100644 --- a/custom_components/weatherlink/sensor_air_quality.py +++ b/custom_components/weatherlink/sensor_air_quality.py @@ -1,7 +1,7 @@ from datetime import datetime from . import units -from .api import AirQualityCondition +from .api.conditions import AirQualityCondition from .const import DECIMALS_HUMIDITY from .sensor_common import WeatherLinkSensor diff --git a/custom_components/weatherlink/sensor_common.py b/custom_components/weatherlink/sensor_common.py index 32d77b2..ed38411 100644 --- a/custom_components/weatherlink/sensor_common.py +++ b/custom_components/weatherlink/sensor_common.py @@ -3,7 +3,7 @@ from typing import Iterable, Iterator, List, Optional, Type, Union from . import WeatherLinkCoordinator, WeatherLinkEntity -from .api import ConditionRecord, CurrentConditions +from .api.conditions import ConditionRecord, CurrentConditions from .units import Measurement logger = logging.getLogger(__name__) @@ -74,8 +74,11 @@ def iter_sensors_for_coordinator( cls, coord: WeatherLinkCoordinator ) -> Iterator["WeatherLinkSensor"]: for cls in cls._SENSORS: - if not cls._conditions_ok(coord.current_conditions): - logger.info("ignoring sensor %s because requirements are not met", cls) + if not cls._conditions_ok(coord.data): + logger.debug( + "ignoring sensor %s because requirements are not met", + cls.__qualname__, + ) continue yield cls(coord) diff --git a/custom_components/weatherlink/sensor_iss.py b/custom_components/weatherlink/sensor_iss.py index 5d90c4a..fe0e4fa 100644 --- a/custom_components/weatherlink/sensor_iss.py +++ b/custom_components/weatherlink/sensor_iss.py @@ -1,7 +1,7 @@ from typing import Optional from . import units -from .api import IssCondition +from .api.conditions import IssCondition from .const import DECIMALS_HUMIDITY, DECIMALS_RADIATION, DECIMALS_UV from .sensor_common import WeatherLinkSensor, round_optional @@ -135,6 +135,23 @@ def device_state_attributes(self): } +class WindSpeedNow( + IssSensor, + sensor_name="Wind speed last", + unit_of_measurement=units.WindSpeed, + device_class=None, +): + @property + def icon(self): + return "mdi:weather-windy" + + @property + def state(self): + return self.units.wind_speed.convert_optional( + self._iss_condition.wind_speed_last + ) + + class WindMaxSpeed( IssSensor, sensor_name="Wind max speed", @@ -184,6 +201,21 @@ def device_state_attributes(self): } +class WindBearingNow( + IssSensor, + sensor_name="Wind bearing last", + unit_of_measurement="°", + device_class=None, +): + @property + def icon(self): + return "mdi:compass-rose" + + @property + def state(self): + return self._iss_condition.wind_dir_last + + class WindDirection( IssSensor, sensor_name="Wind direction", @@ -284,7 +316,7 @@ def device_state_attributes(self): u = self.units.rain_rate return { "high": u.convert_optional(c.rain_rate_hi), - "15_min_high": u.convert(c.rain_rate_hi_last_15_min), + "15_min_high": u.convert_optional(c.rain_rate_hi_last_15_min), } diff --git a/custom_components/weatherlink/sensor_moisture.py b/custom_components/weatherlink/sensor_moisture.py index 2216eec..1dee314 100644 --- a/custom_components/weatherlink/sensor_moisture.py +++ b/custom_components/weatherlink/sensor_moisture.py @@ -1,7 +1,7 @@ from typing import Optional from . import units -from .api import CurrentConditions, MoistureCondition +from .api.conditions import CurrentConditions, MoistureCondition from .const import DECIMALS_LEAF_WETNESS, DECIMALS_SOIL_MOISTURE from .sensor_common import WeatherLinkSensor, round_optional diff --git a/custom_components/weatherlink/translations/en.json b/custom_components/weatherlink/translations/en.json index 70b25fc..1c80c61 100644 --- a/custom_components/weatherlink/translations/en.json +++ b/custom_components/weatherlink/translations/en.json @@ -25,7 +25,8 @@ "title": "Misc", "description": "(1 / 3)", "data": { - "update_interval": "Update interval" + "update_interval": "Update interval", + "listen_to_broadcasts": "Listen to broadcasts" } }, "units": { diff --git a/custom_components/weatherlink/weather.py b/custom_components/weatherlink/weather.py index 6f5318a..1630167 100644 --- a/custom_components/weatherlink/weather.py +++ b/custom_components/weatherlink/weather.py @@ -3,7 +3,7 @@ from homeassistant.components.weather import WeatherEntity from . import WeatherLinkCoordinator, WeatherLinkEntity -from .api import IssCondition, LssBarCondition +from .api.conditions import IssCondition, LssBarCondition from .const import DOMAIN logger = logging.getLogger(__name__) @@ -11,7 +11,7 @@ async def async_setup_entry(hass, entry, async_add_entities): c: WeatherLinkCoordinator = hass.data[DOMAIN][entry.entry_id] - if IssCondition in c.current_conditions: + if IssCondition in c.data: async_add_entities([Weather(c)]) return True diff --git a/tests/weatherlink/api/test_airlink.py b/tests/weatherlink/api/test_airlink.py index 8ec7382..d8a4e3b 100644 --- a/tests/weatherlink/api/test_airlink.py +++ b/tests/weatherlink/api/test_airlink.py @@ -1,6 +1,7 @@ import json -from weatherlink.api import AirQualityCondition, CurrentConditions, get_data_from_body +from weatherlink.api.conditions import AirQualityCondition, CurrentConditions +from weatherlink.api.rest import parse_from_json SAMPLE_RESPONSE = json.loads( """ @@ -47,5 +48,5 @@ def test_parse(): - data = CurrentConditions.from_json(get_data_from_body(SAMPLE_RESPONSE), strict=True) + data = parse_from_json(CurrentConditions, SAMPLE_RESPONSE, strict=True) assert data[AirQualityCondition] diff --git a/tests/weatherlink/api/test_weatherlink.py b/tests/weatherlink/api/test_weatherlink.py index fc7e3e2..1e5c51f 100644 --- a/tests/weatherlink/api/test_weatherlink.py +++ b/tests/weatherlink/api/test_weatherlink.py @@ -1,12 +1,12 @@ import json -from weatherlink.api import ( +from weatherlink.api.conditions import ( CurrentConditions, IssCondition, LssBarCondition, LssTempHumCondition, - get_data_from_body, ) +from weatherlink.api.rest import parse_from_json def test_parse(): @@ -83,7 +83,7 @@ def test_parse(): """ ) - data = CurrentConditions.from_json(get_data_from_body(payload), strict=True) + data = parse_from_json(CurrentConditions, payload, strict=True) assert data[IssCondition] assert data[LssTempHumCondition] assert data[LssBarCondition] @@ -208,9 +208,44 @@ def test_parse_02(): """ ) - data = CurrentConditions.from_json(get_data_from_body(payload), strict=True) + data = parse_from_json(CurrentConditions, payload, strict=True) assert data[IssCondition] assert data[LssTempHumCondition] assert data[LssBarCondition] assert data[IssCondition].wind_dir_last == 252 + + +def test_parse_live() -> None: + payload = json.loads( + """ + { + "did": "001D0A7139D6", + "ts": 1622919120, + "conditions": [ + { + "lsid": 380030, + "data_structure_type": 1, + "txid": 1, + "wind_speed_last": 0.0, + "wind_dir_last": 0, + "rain_size": 2, + "rain_rate_last": 0, + "rain_15_min": 0, + "rain_60_min": 0, + "rain_24_hr": 199, + "rain_storm": 202, + "rain_storm_start_at": 1622784421, + "rainfall_daily": 54, + "rainfall_monthly": 204, + "rainfall_year": 2399, + "wind_speed_hi_last_10_min": 0.0, + "wind_dir_at_hi_speed_last_10_min": 0 + } + ] + } + """ + ) + + data = CurrentConditions.from_json(payload, strict=True) + assert data[IssCondition] diff --git a/tests/weatherlink/test_units.py b/tests/weatherlink/test_units.py index 72d06a1..7bf5e21 100644 --- a/tests/weatherlink/test_units.py +++ b/tests/weatherlink/test_units.py @@ -1,12 +1,12 @@ -from weatherlink.units import WindSpeed, UnitConfig +from weatherlink.units import Unit, UnitConfig, WindSpeed METRIC_CONFIG_JSON = { - "temperature": "CELSIUS", - "pressure": "HPA", - "wind_speed": "KMH", - "pm": "UG_PER_M3", - "rain_rate": "MMH", - "rainfall": "MM", + "pm": {"key": "UG_PER_M3", "ndigits": 2}, + "pressure": {"key": "HPA", "ndigits": 0}, + "rain_rate": {"key": "MMH", "ndigits": 1}, + "rainfall": {"key": "MM", "ndigits": 1}, + "temperature": {"key": "CELSIUS", "ndigits": 1}, + "wind_speed": {"key": "KMH", "ndigits": 1}, } @@ -20,5 +20,5 @@ def test_from_json(): partial_config_json = METRIC_CONFIG_JSON.copy() del partial_config_json["wind_speed"] partial_config = UnitConfig.default_metric() - partial_config.wind_speed = WindSpeed.default() + partial_config.wind_speed = Unit.from_unit_info(WindSpeed.default()) assert UnitConfig.from_dict(partial_config_json) == partial_config